본문 바로가기

Docker,Jenkins

MSA에서 데이터베이스 값 동기화 및 kafka connect docker compose 구성

728x90

일단 각종 용어 정리부터 해보겠습니다.

 

Connector (Debezium)

  • 원본 데이터 베이스에 리플리케이션 클라이언트로 붙어서 사용
  • 데이터 변경을 캡처하고, 변경 메시지를 카프카에 프로듀싱 하는 역할
  • MS-SQL, MySQL, Postgre... 등 여러 데이터베이스와 연동 가능
  • 직렬화, 역직렬화 규격을 사용 (redis 와 똑같음)

ZooKeeper

  • 분산 코디네이션 시스템으로, 카프카 클러스터의 정보를 관리, 리더 선출, 잠금, 동기화를 위해 사용

Connector

  • 카프카 토픽에서 읽어낸 메시지를 처리하여 읽어내는 애플리케이션
  • 주로 데이터를 변형하거나 가공하여 대상 데이터베이스에 적용하는 로직을 수행

Source Connector vs Sink Connector

  • Source Connector: data source에 담긴 데이터를 topic에 담는 역할(Producer)을 하는 connector
  • Sink Connector: topic에 담긴 데이터를 특정 data source로 보내는 역할(Consumer 역할)을 하는 connector

 

왜 kafka connect를 구성했는가 ?

 

만약 apply-service 라는 micro서비스를 2개를 기동시켰다고 가정하자

그럼 클라이어언트의 요청이 로드밸러서에 의해 분산되어 처리가 될 것이고

apply-service의 데이터도 분산되어 저장이 될 것이다 . -> 데이터베이스 값 동기화 문제

 

다음 사진처럼 apply-service 가 2개가 가동이 되어도 하나의 데이터베이스에서 적절한 값이 오고 가야할 것이다.

그래서 order-service에서  Source Connector에 의해 전송할 dto값을 전송하게 되면 topic에 해당하는 데이터가 

직렬화되어 바이트코드로 담기게되고 Sinck Connector는 해당 데이터를 dataSource, database에 전송할 수 있게된다.

또한 kafka는 Message queue방식으로 이루어지기 때문에 데이터를 순차적으로 보낼 수 있다.

 

전송이 된다면 다음과 같은과정으로 전송이 될 것이다.

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka-network  # Kafka와 관련된 컨테이너와 같은 네트워크 그룹에 속하도록 설정
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:7.0.1
    container_name: kafka
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    networks:
      - kafka-network  # Kafka와 관련된 컨테이너와 같은 네트워크 그룹에 속하도록 설정
      - default
  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.0.1
    container_name: kafka-connect
    depends_on:
      - kafka
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "answer-avro"
      CONNECT_CONFIG_STORAGE_TOPIC: "answer-avro-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "answer-avro-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "answer-avro-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    ports:
      - "8083:8083"
    volumes:
      - ./connectors:/etc/kafka-connect/jars
      - ./config:/etc/kafka-connect/config
    networks:
      - kafka-network  # Kafka와 관련된 컨테이너와 같은 네트워크 그룹에 속하도록 설정
  mysql:
    image: mysql:latest
    container_name: mysql
    environment:
      MYSQL_ROOT_PASSWORD: 1248
    ports:
      - "3306:3306"
    networks:
      - kafka-network  # Kafka와 관련된 컨테이너와 같은 네트워크 그룹에 속하도록 설정
networks:
  kafka-network:
    driver: bridge  # 컨테이너를 연결할 커스텀 네트워크 그룹 생성

chat gpt에게 envirment각 주석을 달아달라고 했다.

 

environment:
  ZOOKEEPER_CLIENT_PORT: 2181
  ZOOKEEPER_TICK_TIME: 2000

 

  • ZOOKEEPER_CLIENT_PORT: Zookeeper 클라이언트 연결을 위한 포트 번호를 설정합니다. 여기서는 2181 포트를 사용하고 있으며, 클라이언트는 이 포트를 통해 Zookeeper에 연결합니다.
  • ZOOKEEPER_TICK_TIME: Zookeeper의 하트비트(heartbeat) 주기를 밀리초 단위로 설정합니다. 이 값은 Zookeeper의 내부 타이머 간격을 나타내며, 일반적으로 2000 밀리초(2초)로 설정됩니다.
environment:
  KAFKA_BROKER_ID: 1
  KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
  KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
  KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
  KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  • KAFKA_BROKER_ID: Kafka 브로커의 고유 ID를 설정합니다. 여기서는 1로 설정되어 있습니다.
  • KAFKA_ZOOKEEPER_CONNECT: Kafka 브로커가 연결할 Zookeeper 서버의 주소와 포트를 설정합니다. 이 경우, "zookeeper:2181"로 설정되어 있으며, Zookeeper 컨테이너의 이름과 포트를 지정합니다.
  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: Kafka 브로커의 각 리스너(Listener)의 보안 프로토콜을 설정합니다. 여기서는 "PLAINTEXT" 리스너가 "PLAINTEXT" 프로토콜을 사용하도록 설정되어 있습니다.
  • KAFKA_ADVERTISED_LISTENERS: Kafka 브로커가 외부에서 접속할 때 사용할 주소를 설정합니다. 이 경우, "PLAINTEXT://localhost:9092"와 "PLAINTEXT_INTERNAL://kafka:29092"로 설정되어 있으며, 외부 클라이언트 및 내부 컨테이너 간의 통신을 지원합니다.
  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: Kafka 오프셋 토픽의 복제 팩터(Replication Factor)를 설정합니다. 이 값은 오프셋 토픽의 데이터 복제 수를 나타냅니다. 여기서는 1로 설정되어 있으므로 데이터의 단일 복제만 사용합니다.
  • KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: Kafka 트랜잭션 상태 로그의 최소 ISR(In-Sync Replica) 수를 설정합니다. 이 값은 ISR에 포함된 최소 브로커 수를 나타냅니다. 여기서는 1로 설정되어 있으므로 하나의 브로커에서도 트랜잭션 상태 로그를 사용할 수 있습니다.
  • KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: Kafka 트랜잭션 상태 로그의 복제 팩터를 설정합니다. 값은 트랜잭션 상태 로그의 데이터 복제 수를 나타냅니다. 여기서는 1 설정되어 있으므로 데이터의 단일 복제만 사용합니다.
environment:
  CONNECT_BOOTSTRAP_SERVERS: kafka:29092
  CONNECT_REST_PORT: 8083
  CONNECT_GROUP_ID: "answer-avro"
  CONNECT_CONFIG_STORAGE_TOPIC: "answer-avro-config"
  CONNECT_OFFSET_STORAGE_TOPIC: "answer-avro-offsets"
  CONNECT_STATUS_STORAGE_TOPIC: "answer-avro-status"
  CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
  CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
  CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
  CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
  CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
  CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
  • CONNECT_BOOTSTRAP_SERVERS: Kafka Connect가 부트스트랩할 Kafka 브로커의 서버 주소를 설정합니다. 여기서는 "kafka:29092"로 설정되어 있으며, Kafka 컨테이너의 이름과 포트를 지정합니다.
  • CONNECT_REST_PORT: Kafka Connect REST API가 사용할 포트 번호를 설정합니다. 여기서는 8083 포트로 설정되어 있습니다.
  • CONNECT_GROUP_ID: Kafka Connect 그룹 ID를 설정합니다. 여기서는 "answer-avro"로 설정되어 있습니다.
  • CONNECT_CONFIG_STORAGE_TOPIC: Kafka Connect의 설정 저장소 토픽 이름을 설정합니다.
  • CONNECT_OFFSET_STORAGE_TOPIC: Kafka Connect의 오프셋 저장소 토픽 이름을 설정합니다.
  • CONNECT_STATUS_STORAGE_TOPIC: Kafka Connect의 상태 저장소 토픽 이름을 설정합니다.
  • CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 설정 저장소 토픽의 복제 팩터를 설정합니다.
  • CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 오프셋 저장소 토픽의 복제 팩터를 설정합니다.
  • CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 상태 저장소 토픽의 복제 팩터를 설정합니다.
  • CONNECT_KEY_CONVERTER, CONNECT_VALUE_CONVERTER, CONNECT_INTERNAL_KEY_CONVERTER, CONNECT_INTERNAL_VALUE_CONVERTER: Kafka Connect에서 사용할 데이터 변환기(컨버터)를 설정합니다. 여기서는 JSON 컨버터를 사용하도록 설정되어 있습니다.
  • CONNECT_REST_ADVERTISED_HOST_NAME: Kafka Connect REST API의 외부에서 접속할 때 사용할 호스트 이름을 설정합니다.
  • CONNECT_LOG4J_ROOT_LOGLEVEL: 로깅 레벨을 설정합니다. 여기서는 DEBUG로 설정되어 있으므로 디버그 레벨의 로그를 기록합니다.
  • CONNECT_PLUGIN_PATH: Kafka Connect 플러그인이 위치하는 경로를 설정합니다.

 

 

이미지 출처: https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%ED%81%B4%EB%9D%BC%EC%9A%B0%EB%93%9C-%EB%A7%88%EC%9D%B4%ED%81%AC%EB%A1%9C%EC%84%9C%EB%B9%84%EC%8A%A4/dashboard

 

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) - 인프런 | 강의

Spring framework의 Spring Cloud 제품군을 이용하여 마이크로서비스 애플리케이션을 개발해 보는 과정입니다. Cloud Native Application으로써의 Spring Cloud를 어떻게 사용하는지, 구성을 어떻게 하는지에 대해

www.inflearn.com

 

728x90