본문 바로가기

기타

server sent event (sse) 사용시 connection pool 부족 문제

728x90

새로고침이되거나 다른페이지로 랜더링이 될 시 sse프로토콜을 재 연결시켜줘야했다 그때마다 백엔드 서버에서

커넥션 pool을 반환을 해주지않아 오류가 발생했다.

 

HikariPool-2 - Pool stats (total=10, active=10, idle=0, waiting=1)

커넥션 풀이 꽉차게되고 더이상 백엔드서버에 아무런 api도 호출이 되지 않는 문제가 발생했다.

 

 

 

결국 chatgpt에게 물어보았지만 제대로 된 답변은 얻기 힘들었고, 혼자 고민을 해보기도 하고 인터넷 서칭도 해봤다.

 

결론적으로 2가지의 해결방안을 찾았다.

해결방안 1. open session in view의 설정을 false로 연결

 

open session in view의 default는 true이다 세션을 요청의 끝까지 유지하게되고 커넥션 풀의 연결을 종료하지않게된다.

결국 frontend에서 connection을 맺고 반환을 해주지 않게 된것이다. 리소스문제가 발생하게되는 것이다.

 

tmi:예전에 김영한님 강의에서도 보았듯. open session in view를 false 로 해두면 영속성컨텍스트의 범위가 controller단에서는

끊어지게 되고, 분리가 잘 된다고 들었던 것 같다.

 

그럼 false로 해둔다면 ? 세션이 controller단에서 닫히므로 커넥션 풀에서 연결을 반환을 할 것 이빈다. 이때 세션이 아직 완전히 종료되지

않은 상태에서 연결을 반환하려고하면 문제가 발생할 수 있다.

 

 

해결방안 2. @Transcation 어노테션을 제거

@RequiredArgsConstructor
@Service
@Transactional(readOnly = true)
@Slf4j
public class NotificationServiceImpl implements NotificationService {

    // 기본 타임아웃 설정
    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
    private final RedisOperations<String, NotificationResponseDto> eventRedisOperations;
    private static final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
    private final ObjectMapper objectMapper;
    private final NotificationRepository notificationRepository;
    private final RedisMessageListenerContainer redisMessageListenerContainer;
    private final MemberRepository memberRepository;


    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @TransactionalEventListener
    //실질적으로 알림을 저장하고 redis 채널에 메시지를 publish하는 역할
    public void send(final CreateMessageEvent createMessageEvent) {

        Notification notification = CreateMessageEvent.toEntity(createMessageEvent);
        notificationRepository.save(notification);
        final String id = String.valueOf(notification.getMemberId());
        eventRedisOperations.convertAndSend(getChannelName(id), notification);
    }

    @Override
    @Transactional(readOnly = true)
    public List<NotificationListResponseDto> getNotificationList(String username) {

        Member member = memberRepository.findByUsername(username).orElseThrow(() -> new ApplicationException(ApplicationErrorType.UNAUTHORIZED));

        List<Notification> notificationListEntity = notificationRepository.findAllByMemberId(member.getId());


        List<NotificationListResponseDto> notificationListDto = notificationListEntity.stream().
                map(NotificationListResponseDto::from)
                .collect(Collectors.toList());
        return notificationListDto;
    }

    /**
     * 클라이언트가 구독을 위해 호출하는 메서드.
     *
     * @param memberId - 구독하는 클라이언트의 사용자 아이디.
     * @return SseEmitter - 서버에서 보낸 이벤트 Emitter
     */
    public SseEmitter subscribe(final Long memberId) throws IOException{
        final String id = String.valueOf(memberId);
        final SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
        //초반 연결용 메시지!!
        emitter.send(SseEmitter.event()
                .id(id)
                .name("sse"));
        emitters.add(emitter);
        // MessageListener 익명함수 사용해서 onMessage 구현, Redis에서 새로운 알림이 발생하면 자동적으로 onMessage가 호출
        // 즉 알림을 serialize하고 해당 Client에게 알림을 전송한다.
        final MessageListener messageListener = (message, pattern) -> {
            final NotificationResponseDto notificationResponse = serialize(message);
            sendToClient(emitter, id, notificationResponse);
        };
        this.redisMessageListenerContainer.addMessageListener(messageListener, ChannelTopic.of(getChannelName(id)));
        checkEmitterStatus(emitter, messageListener);
        return emitter;
    }



    private NotificationResponseDto serialize(final Message message) {
        try {
            Notification notification = this.objectMapper.readValue(message.getBody(), Notification.class);
            return NotificationResponseDto.from(notification);
        } catch (IOException e) {
            throw new ApplicationException(ApplicationErrorType.BAD_REQUEST);
        }
    }

    // 클라이언트에게 메시지를 전달하는 부분
    private void sendToClient(final SseEmitter emitter, final String id, final Object data) {
        try {
            emitter.send(SseEmitter.event()
                    .id(id)
                    .name("sse")
                    .data(data));
        } catch (IOException e) {
            emitters.remove(emitter);
            log.error("SSE 연결이 올바르지 않습니다. 해당 memberID={}", id);
        }
    }

    private void checkEmitterStatus(final SseEmitter emitter, final MessageListener messageListener) {
        emitter.onCompletion(() -> {
            log.info("끊어버린다1");
            emitters.remove(emitter);
            this.redisMessageListenerContainer.removeMessageListener(messageListener);
        });
        emitter.onTimeout(() -> {
            log.info("끊어버린다2");
            emitters.remove(emitter);
            this.redisMessageListenerContainer.removeMessageListener(messageListener);
        });
    }

    private void removeEmitter(SseEmitter emitter) {
        emitters.remove(emitter);
    }
    private String getChannelName(final String memberId) {
        return "topics:" + memberId;
    }
}

 

트랜잭션을 전체 클래스에 걸어놨기 때문에 @Transcation을 걸었다는 것은 커넥션풀을 이용해서 db에 접근하는데

이것 때문에 커넥션풀을 계속 차지하고 있어서 문제가 됐었다. 그래서 subscribe하는 곳에는 @Transaction이 아예 일어나지 않도록

설정하는 것이다.

 

 

해결하는데 도움이 많이된코드

logging:
  level:
    com.zaxxer.hikari.HikariConfig: DEBUG
    com.zaxxer.hikari: TRACE

결국 몇의 커넥션이 풀이 active되어있고 , wating중이고, idle 중인지 확인할 수 있는 로그들을 통해

손쉽게 해결할 수 있었다.

 

로그를 하나하나 기록해두는 일이 정말 중요하다고 생각하게 됐다.

728x90