실습으로 배우는 선착순 이벤트 시스템이란 것을 인프런에서 시청했다.
kafka를 통해 영감을 받거나 새로운 insight를 얻은 것을 정리하려고 한다.
실습으로 배우는 선착순 이벤트 시스템 - 인프런 | 강의
선착순 이벤트 시스템을 구현할 때 어떤 문제가 발생할 수 있고 어떻게 해결할 수 있는지 배워봅니다., - 강의 소개 | 인프런
www.inflearn.com
쿠폰 서비스 개발하기
100개의 쿠폰을 선착순으로 개발할 예정이고 1000명이 해당 쿠폰을 사려고 대기하고 있는 상황을 가정하고 개발을 진행하였습니다.
@Entity
public class Coupon {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long userId;
public Coupon() {
}
public Coupon(Long userId) {
this.userId = userId;
}
public Long getId() {
return id;
}
}
package com.example.couponsystem.service;
import com.example.couponsystem.domain.Coupon;
import com.example.couponsystem.producer.CouponCreateProducer;
import com.example.couponsystem.repository.AppliedUserRepository;
import com.example.couponsystem.repository.CouponCountRepository;
import com.example.couponsystem.repository.CouponRepository;
import org.springframework.stereotype.Service;
@Service
public class ApplyService {
private final CouponRepository couponRepository;
private final CouponCountRepository couponCountRepository;
private final CouponCreateProducer couponCreateProducer;
private final AppliedUserRepository appliedUserRepository;
public ApplyService(CouponRepository couponRepository,CouponCountRepository couponCountRepository,CouponCreateProducer couponCreateProducer,AppliedUserRepository appliedUserRepository) {
this.couponRepository = couponRepository;
this.couponCountRepository=couponCountRepository;
this.couponCreateProducer=couponCreateProducer;
this.appliedUserRepository=appliedUserRepository;
}
public void apply(Long userId){
Long apply = appliedUserRepository.add(userId);
if(apply!=1){
return;
}
Long count = couponCountRepository.increment();
if(count>100){
return;
}
couponCreateProducer.create(userId);
}
}
@DisplayName("여러명이 응모를 했을 경우 갯수의 오차가 발생한다.")
@Test
void 여러명응모() throws InterruptedException {
//given
int threadCount=1000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
//모든 요청이 끝날떄까지 대기
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
//when
for(int i=0; i<threadCount;i++){
long userId=i;
executorService.submit(()->{
try{
applyService.apply(userId);
}finally{
countDownLatch.countDown();
}
});
}
countDownLatch.await();
Thread.sleep(10000);
long count = couponRepository.count();
//then
assertThat(count).isEqualTo(100);
}
// 해결방안 redisson 을 활용하여 싱글스레드 방식으로 처리
// 하지만 쿠폰발급 전체로직을 싱글스레드로 처리하면 성능은 급격하게 저하됨
// synchroized를 사용하면 서버가 여러대일 경우 다시 레이스 컨디션이 일어난다 (왜지 ? )
// 쿠폰의 갯수만 동기화시켜주면 됨.
// redis에서는 Incr이라는 명령어가 존재하고
// key:value 를 1씩 증가시키는 역할을함.
자바는 멀티스레드기반으로 동작하기때문에 레이스 컨디션이 발생할 수 있다.
레이스 컨디션이란? 두개이상의 스레드가 공유데이터에 access를 하고 동시에 작업을 하려고할 때 발생하는 문제
해결방안 -> redis를 사용하여 싱글스레드 방식으로 처리
redis에서는 incr이라는 명령어가 존재하고 key:value를 1씩 증가시키는 역할을 한다.
이를 통해 데이터의 정합성을 맞출 수 있다.
하지만 이도 문제가 발생한다.
Redis는 인메모리 데이터베이스로서 싱글 스레드 기반으로 동작하기 때문이다. Redis의 incr 명령어는 원자적(atomic)으로 카운터를 증가시키므로 여러 스레드가 동시에 접근하더라도 증가된 값을 정확히 반환한다. 다음 그림을 살펴보자.
시간 | Thread 1 | Redis - count | Thread 2 |
10:00 | start - 10:00 incr -> count end - 10: 02 |
99 | |
10:01 | 99 | wait... wait... start - 10:02 incr count end - 10:03 |
|
10:02 | 100 | ||
10:03 | create -> faile | 100 | |
101 | create -> faile |
즉, 여러 스레드가 동시에 Redis에 접근하더라도 Redis 자체에서 동시성 문제를 해결하여 정확한 쿠폰 발급 개수를 유지할 수 있게 된다.
그렇다면 이러한 로직은 완벽할 것인가?
여전히 발생할 수 있는 문제점이 있다.
첫 번째는 Redis 장애이다. Redis가 장애 상태인 경우에는 카운터를 증가시키지 못하고, 쿠폰 발급이 중단될 수 있다. 이를 해결하기 위해서는 Redis의 가용성과 장애 복구 전략을 고려해서 사용해야 한다. (클러스터 구축 및 aws management cached )
두 번째는 데이터베이스 부하 문제이다. 예를 들어 mysql이 1분에 100개의 insert가 가능하다고 가정해보자. 10:00시에 쿠폰 10,000개 생성 요청이 들어온다면 100분이 걸리게 되고 이후 로직들은 100분 이후에 처리되게 된다.
추후 : jmeter나 ngrinder 를 사용해서 부하테스트를 직접해보자
kafka를 활용한 페이지 독립성 갖추기
먼저 카프카 producer 관련 설정 클래스를 다음과 같이 작성한다.
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Long> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Long> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
Producer 클래스는 다음과 같이 작성한다.
@Component
@RequiredArgsConstructor
public class CouponCreateProducer {
private final KafkaTemplate<String, Long> kafkaTemplate;
public void create(Long userId) {
kafkaTemplate.send("coupon create", userId);
}
}
서비스에서 이를 활용해보자. apply 메서드를 다음과 같이 변경한다.
public void apply(Long userId) {
// long count = couponRepository.count();
Long count = couponCountRepository.increment();
log.info("쿠폰 개수: {}", count);
if (count > 100) {
return;
}
// couponRepository.save(new Coupon(userId)); // 직접 쿠폰을 생성하는 로직을 삭제
couponCreateProducer.create(userId);
}
직접 쿠폰을 생성하는 로직을 삭제하고 producer에게 create를 맡긴다.
컨슈머를 실행해보면 정상적으로 데이터를 가져오는 것을 확인할 수 있다.
docker에 kafka를 올렸을 경우 consumer 예제 코드
docker exec -it kafka kafka-console-consumer.sh --topic coupon_create --bootstrap-server localhost:9092 --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"
이제 컨슈머에서 해당 데이터를 읽고 처리하도록 해보자.
producer config를 만든 것처럼 다음과 같이 consumer config를 설정한다.
@Bean
public ConsumerFactory<String, Long> consumerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
consumer의 경우 listener를 함께 구현해주어야 한다.
@Bean
ConcurrentKafkaListenerContainerFactory<String, Long> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Long> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
이후 consume 후 로직을 수행할 클래스를 작성한다. listen 하는 메서드를 다음과 같이 작성한다.
@KafkaListener(topics = "coupon create", groupId = "group_1")
public void listener(Long userId){
}
listener는 받는 데이터를 저장해주는 로직을 작성하면 되므로 기존의 쿠폰 서비스의 insert 로직을 가져오면 된다.
@KafkaListener(topics = "coupon create", groupId = "group_1")
public void listener(Long userId){
couponRepository.save(new Coupon(userId));
}
Consumer는 대기하고 있다가 토픽에 데이터가 생성되면 이를 받는다. 그런데 이때 데이터를 처리하는 순서는 producer 쪽에서 데이터를 보내는 시차와 동기화 되지 않으므로, 즉 비동기적으로 처리하게 되므로 기존 테스트 코드를 그대로 사용하면 실패한다.
예를 들어 10:00 테스트케이스가 시작되고 producer가 데이터를 10:01에 보낸다고 하자. 이때 테스트케이스는 producer가 일을 다 하였으므로 10:02에 종료한다. 그러나 Consumer 입장에서는 데이터 수신을 10:01부터 시작하여 10:03에 마치고 모든 처리를 10:05에 마칠 수 있다. 이런 경우 producer 쪽의 테스트는 실패로 완료되는 것이다.
thread를 넉넉하게 10초간 대기시킨다면 통과하는 것을 볼 수 있다.
@Test
public void applyMulti() throws InterruptedException {
int threadCount = 1000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
long userId = i;
executorService.submit(() -> {
try {
applyService.apply(userId);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
Thread.sleep(10000);
long count = couponRepository.count();
assertThat(count).isEqualTo(100);
}
이와 같이 Producer가 쿠폰 생성 이벤트를 Kafka에 전송하고, Consumer가 해당 이벤트를 소비하여 실제 쿠폰 생성을 처리한다. 이를 통해 쿠폰 생성 작업을 백그라운드로 처리하여 API에서 직접 쿠폰을 생성할 때에 비해서 처리량을 조절할 수 있다. 따라서 db 생성량에 대한 부하를 해결할 수 있다.
결론: 동시성문제를 해결하기위해 redis와 kafka를 이용할 수 있었다.
하지만 redis를 여러가지 성능문제나 redis가 다운되었을 때의 대처를 미리 잘 해놔야한다.
kafka를 사용하게되면 쿠폰생성을 비동기적으로 처리하고 consumer가 해당 이벤트를 소비하여
쿠폰처리 로직을 수행할 수 있도록할 수 있다. 이경우 api에서 직접 쿠폰을 생성하는
경우보다 처리량을 조절할 수 있어 부하를 관리할 수 있다는 장점이 있다.
추가적으로
혹여나 consumer에서 제대로 쿠폰발급이 이루어지지않을 수도 있을 것이다 그러면 어떻게 할것인가?
@Component
public class CouponCreatedConsumer {
private final CouponRepository couponRepository;
private final FailedEventRepository failedEventRepository;
private final Logger logger= LoggerFactory.getLogger(CouponCreatedConsumer.class);
public CouponCreatedConsumer(CouponRepository couponRepository,FailedEventRepository failedEventRepository) {
this.couponRepository = couponRepository;
this.failedEventRepository=failedEventRepository;
}
@KafkaListener(topics = "coupon_create",groupId = "group_1")
public void listener(Long userId){
try{
couponRepository.save(new Coupon(userId));
}catch (Exception e){
logger.error("failed to created coupon::"+ userId);
failedEventRepository.save(new FailedEvent(userId));
}
}
}
다음로직처럼 오류가 발생했을 때 db나 로그로 따로 빼서 해당하는 userId 및 유저정보를 가지고
해당하는 쿠폰을 다시 재발행해주는 배치프로그램 spring batch 등등에 스케줄러를 이용해서
일정 시간마다 실패한 쿠폰에 대해 재 발급해주는 로직을 구현해보면 좋을 것 같다.
참고 자료
인프런 - 실습으로 배우는 선착순 이벤트 시스템
'MessageBroker' 카테고리의 다른 글
Redis가 싱글스레드임에도 높은 성능을 보장하는이유 (0) | 2024.11.30 |
---|---|
Kafka Connect Source,Sink 통해 다른시스템과 데이터 주고받기 (0) | 2023.06.22 |
Spring Cloud Bus란 무엇인가? (0) | 2023.06.16 |
MOM 미들웨어란 무엇인가? (0) | 2023.06.16 |
RabbitMq 설치하는방법 (0) | 2023.06.16 |