실습으로 배우는 선착순 이벤트 시스템이란 것을 인프런에서 시청했다.
kafka를 통해 영감을 받거나 새로운 insight를 얻은 것을 정리하려고 한다.
쿠폰 서비스 개발하기
100개의 쿠폰을 선착순으로 개발할 예정이고 1000명이 해당 쿠폰을 사려고 대기하고 있는 상황을 가정하고 개발을 진행하였습니다.
@Entity
public class Coupon {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long userId;
public Coupon() {
}
public Coupon(Long userId) {
this.userId = userId;
}
public Long getId() {
return id;
}
}
package com.example.couponsystem.service;
import com.example.couponsystem.domain.Coupon;
import com.example.couponsystem.producer.CouponCreateProducer;
import com.example.couponsystem.repository.AppliedUserRepository;
import com.example.couponsystem.repository.CouponCountRepository;
import com.example.couponsystem.repository.CouponRepository;
import org.springframework.stereotype.Service;
@Service
public class ApplyService {
private final CouponRepository couponRepository;
private final CouponCountRepository couponCountRepository;
private final CouponCreateProducer couponCreateProducer;
private final AppliedUserRepository appliedUserRepository;
public ApplyService(CouponRepository couponRepository,CouponCountRepository couponCountRepository,CouponCreateProducer couponCreateProducer,AppliedUserRepository appliedUserRepository) {
this.couponRepository = couponRepository;
this.couponCountRepository=couponCountRepository;
this.couponCreateProducer=couponCreateProducer;
this.appliedUserRepository=appliedUserRepository;
}
public void apply(Long userId){
Long apply = appliedUserRepository.add(userId);
if(apply!=1){
return;
}
Long count = couponCountRepository.increment();
if(count>100){
return;
}
couponCreateProducer.create(userId);
}
}
@DisplayName("여러명이 응모를 했을 경우 갯수의 오차가 발생한다.")
@Test
void 여러명응모() throws InterruptedException {
//given
int threadCount=1000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
//모든 요청이 끝날떄까지 대기
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
//when
for(int i=0; i<threadCount;i++){
long userId=i;
executorService.submit(()->{
try{
applyService.apply(userId);
}finally{
countDownLatch.countDown();
}
});
}
countDownLatch.await();
Thread.sleep(10000);
long count = couponRepository.count();
//then
assertThat(count).isEqualTo(100);
}
// 해결방안 redisson 을 활용하여 싱글스레드 방식으로 처리
// 하지만 쿠폰발급 전체로직을 싱글스레드로 처리하면 성능은 급격하게 저하됨
// synchroized를 사용하면 서버가 여러대일 경우 다시 레이스 컨디션이 일어난다 (왜지 ? )
// 쿠폰의 갯수만 동기화시켜주면 됨.
// redis에서는 Incr이라는 명령어가 존재하고
// key:value 를 1씩 증가시키는 역할을함.
자바는 멀티스레드기반으로 동작하기때문에 레이스 컨디션이 발생할 수 있다.
레이스 컨디션이란? 두개이상의 스레드가 공유데이터에 access를 하고 동시에 작업을 하려고할 때 발생하는 문제
해결방안 -> redis를 사용하여 싱글스레드 방식으로 처리
redis에서는 incr이라는 명령어가 존재하고 key:value를 1씩 증가시키는 역할을 한다.
이를 통해 데이터의 정합성을 맞출 수 있다.
하지만 이도 문제가 발생한다.
Redis는 인메모리 데이터베이스로서 싱글 스레드 기반으로 동작하기 때문이다. Redis의 incr 명령어는 원자적(atomic)으로 카운터를 증가시키므로 여러 스레드가 동시에 접근하더라도 증가된 값을 정확히 반환한다. 다음 그림을 살펴보자.
시간 | Thread 1 | Redis - count | Thread 2 |
10:00 | start - 10:00 incr -> count end - 10: 02 |
99 | |
10:01 | 99 | wait... wait... start - 10:02 incr count end - 10:03 |
|
10:02 | 100 | ||
10:03 | create -> faile | 100 | |
101 | create -> faile |
즉, 여러 스레드가 동시에 Redis에 접근하더라도 Redis 자체에서 동시성 문제를 해결하여 정확한 쿠폰 발급 개수를 유지할 수 있게 된다.
그렇다면 이러한 로직은 완벽할 것인가?
여전히 발생할 수 있는 문제점이 있다.
첫 번째는 Redis 장애이다. Redis가 장애 상태인 경우에는 카운터를 증가시키지 못하고, 쿠폰 발급이 중단될 수 있다. 이를 해결하기 위해서는 Redis의 가용성과 장애 복구 전략을 고려해서 사용해야 한다. (클러스터 구축 및 aws management cached )
두 번째는 데이터베이스 부하 문제이다. 예를 들어 mysql이 1분에 100개의 insert가 가능하다고 가정해보자. 10:00시에 쿠폰 10,000개 생성 요청이 들어온다면 100분이 걸리게 되고 이후 로직들은 100분 이후에 처리되게 된다.
추후 : jmeter나 ngrinder 를 사용해서 부하테스트를 직접해보자
kafka를 활용한 페이지 독립성 갖추기
먼저 카프카 producer 관련 설정 클래스를 다음과 같이 작성한다.
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Long> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Long> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
Producer 클래스는 다음과 같이 작성한다.
@Component
@RequiredArgsConstructor
public class CouponCreateProducer {
private final KafkaTemplate<String, Long> kafkaTemplate;
public void create(Long userId) {
kafkaTemplate.send("coupon create", userId);
}
}
서비스에서 이를 활용해보자. apply 메서드를 다음과 같이 변경한다.
public void apply(Long userId) {
// long count = couponRepository.count();
Long count = couponCountRepository.increment();
log.info("쿠폰 개수: {}", count);
if (count > 100) {
return;
}
// couponRepository.save(new Coupon(userId)); // 직접 쿠폰을 생성하는 로직을 삭제
couponCreateProducer.create(userId);
}
직접 쿠폰을 생성하는 로직을 삭제하고 producer에게 create를 맡긴다.
컨슈머를 실행해보면 정상적으로 데이터를 가져오는 것을 확인할 수 있다.
docker에 kafka를 올렸을 경우 consumer 예제 코드
docker exec -it kafka kafka-console-consumer.sh --topic coupon_create --bootstrap-server localhost:9092 --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"
이제 컨슈머에서 해당 데이터를 읽고 처리하도록 해보자.
producer config를 만든 것처럼 다음과 같이 consumer config를 설정한다.
@Bean
public ConsumerFactory<String, Long> consumerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
consumer의 경우 listener를 함께 구현해주어야 한다.
@Bean
ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
이후 consume 후 로직을 수행할 클래스를 작성한다. listen 하는 메서드를 다음과 같이 작성한다.
@KafkaListener(topics = "coupon create", groupId = "group_1")
public void listener(Long userId){
}
listener는 받는 데이터를 저장해주는 로직을 작성하면 되므로 기존의 쿠폰 서비스의 insert 로직을 가져오면 된다.
@KafkaListener(topics = "coupon create", groupId = "group_1")
public void listener(Long userId){
couponRepository.save(new Coupon(userId));
}
Consumer는 대기하고 있다가 토픽에 데이터가 생성되면 이를 받는다. 그런데 이때 데이터를 처리하는 순서는 producer 쪽에서 데이터를 보내는 시차와 동기화 되지 않으므로, 즉 비동기적으로 처리하게 되므로 기존 테스트 코드를 그대로 사용하면 실패한다.
예를 들어 10:00 테스트케이스가 시작되고 producer가 데이터를 10:01에 보낸다고 하자. 이때 테스트케이스는 producer가 일을 다 하였으므로 10:02에 종료한다. 그러나 Consumer 입장에서는 데이터 수신을 10:01부터 시작하여 10:03에 마치고 모든 처리를 10:05에 마칠 수 있다. 이런 경우 producer 쪽의 테스트는 실패로 완료되는 것이다.
thread를 넉넉하게 10초간 대기시킨다면 통과하는 것을 볼 수 있다.
@Test
public void applyMulti() throws InterruptedException {
int threadCount = 1000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
long userId = i;
executorService.submit(() -> {
try {
applyService.apply(userId);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
Thread.sleep(10000);
long count = couponRepository.count();
assertThat(count).isEqualTo(100);
}
이와 같이 Producer가 쿠폰 생성 이벤트를 Kafka에 전송하고, Consumer가 해당 이벤트를 소비하여 실제 쿠폰 생성을 처리한다. 이를 통해 쿠폰 생성 작업을 백그라운드로 처리하여 API에서 직접 쿠폰을 생성할 때에 비해서 처리량을 조절할 수 있다. 따라서 db 생성량에 대한 부하를 해결할 수 있다.
결론: 동시성문제를 해결하기위해 redis와 kafka를 이용할 수 있었다.
하지만 redis를 여러가지 성능문제나 redis가 다운되었을 때의 대처를 미리 잘 해놔야한다.
kafka를 사용하게되면 쿠폰생성을 비동기적으로 처리하고 consumer가 해당 이벤트를 소비하여
쿠폰처리 로직을 수행할 수 있도록할 수 있다. 이경우 api에서 직접 쿠폰을 생성하는
경우보다 처리량을 조절할 수 있어 부하를 관리할 수 있다는 장점이 있다.
추가적으로
혹여나 consumer에서 제대로 쿠폰발급이 이루어지지않을 수도 있을 것이다 그러면 어떻게 할것인가?
@Component
public class CouponCreatedConsumer {
private final CouponRepository couponRepository;
private final FailedEventRepository failedEventRepository;
private final Logger logger= LoggerFactory.getLogger(CouponCreatedConsumer.class);
public CouponCreatedConsumer(CouponRepository couponRepository,FailedEventRepository failedEventRepository) {
this.couponRepository = couponRepository;
this.failedEventRepository=failedEventRepository;
}
@KafkaListener(topics = "coupon_create",groupId = "group_1")
public void listener(Long userId){
try{
couponRepository.save(new Coupon(userId));
}catch (Exception e){
logger.error("failed to created coupon::"+ userId);
failedEventRepository.save(new FailedEvent(userId));
}
}
}
다음로직처럼 오류가 발생했을 때 db나 로그로 따로 빼서 해당하는 userId 및 유저정보를 가지고
해당하는 쿠폰을 다시 재발행해주는 배치프로그램 spring batch 등등에 스케줄러를 이용해서
일정 시간마다 실패한 쿠폰에 대해 재 발급해주는 로직을 구현해보면 좋을 것 같다.
참고 자료
인프런 - 실습으로 배우는 선착순 이벤트 시스템
'MessageBroker' 카테고리의 다른 글
Redis가 싱글스레드임에도 높은 성능을 보장하는이유 (0) | 2024.11.30 |
---|---|
Kafka Connect Source,Sink 통해 다른시스템과 데이터 주고받기 (0) | 2023.06.22 |
Spring Cloud Bus란 무엇인가? (0) | 2023.06.16 |
MOM 미들웨어란 무엇인가? (0) | 2023.06.16 |
RabbitMq 설치하는방법 (0) | 2023.06.16 |