728x90
일단 각종 용어 정리부터 해보겠습니다.
Connector (Debezium)
- 원본 데이터 베이스에 리플리케이션 클라이언트로 붙어서 사용
- 데이터 변경을 캡처하고, 변경 메시지를 카프카에 프로듀싱 하는 역할
- MS-SQL, MySQL, Postgre... 등 여러 데이터베이스와 연동 가능
- 직렬화, 역직렬화 규격을 사용 (redis 와 똑같음)
ZooKeeper
- 분산 코디네이션 시스템으로, 카프카 클러스터의 정보를 관리, 리더 선출, 잠금, 동기화를 위해 사용
Connector
- 카프카 토픽에서 읽어낸 메시지를 처리하여 읽어내는 애플리케이션
- 주로 데이터를 변형하거나 가공하여 대상 데이터베이스에 적용하는 로직을 수행
Source Connector vs Sink Connector
- Source Connector: data source에 담긴 데이터를 topic에 담는 역할(Producer)을 하는 connector
- Sink Connector: topic에 담긴 데이터를 특정 data source로 보내는 역할(Consumer 역할)을 하는 connector
왜 kafka connect를 구성했는가 ?
만약 apply-service 라는 micro서비스를 2개를 기동시켰다고 가정하자
그럼 클라이어언트의 요청이 로드밸러서에 의해 분산되어 처리가 될 것이고
apply-service의 데이터도 분산되어 저장이 될 것이다 . -> 데이터베이스 값 동기화 문제
다음 사진처럼 apply-service 가 2개가 가동이 되어도 하나의 데이터베이스에서 적절한 값이 오고 가야할 것이다.
그래서 order-service에서 Source Connector에 의해 전송할 dto값을 전송하게 되면 topic에 해당하는 데이터가
직렬화되어 바이트코드로 담기게되고 Sinck Connector는 해당 데이터를 dataSource, database에 전송할 수 있게된다.
또한 kafka는 Message queue방식으로 이루어지기 때문에 데이터를 순차적으로 보낼 수 있다.
전송이 된다면 다음과 같은과정으로 전송이 될 것이다.
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
ports:
- "2181:2181"
networks:
- kafka-network # Kafka와 관련된 컨테이너와 같은 네트워크 그룹에 속하도록 설정
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.0.1
container_name: kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
networks:
- kafka-network # Kafka와 관련된 컨테이너와 같은 네트워크 그룹에 속하도록 설정
- default
kafka-connect:
image: confluentinc/cp-kafka-connect:7.0.1
container_name: kafka-connect
depends_on:
- kafka
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:29092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "answer-avro"
CONNECT_CONFIG_STORAGE_TOPIC: "answer-avro-config"
CONNECT_OFFSET_STORAGE_TOPIC: "answer-avro-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "answer-avro-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
ports:
- "8083:8083"
volumes:
- ./connectors:/etc/kafka-connect/jars
- ./config:/etc/kafka-connect/config
networks:
- kafka-network # Kafka와 관련된 컨테이너와 같은 네트워크 그룹에 속하도록 설정
mysql:
image: mysql:latest
container_name: mysql
environment:
MYSQL_ROOT_PASSWORD: 1248
ports:
- "3306:3306"
networks:
- kafka-network # Kafka와 관련된 컨테이너와 같은 네트워크 그룹에 속하도록 설정
networks:
kafka-network:
driver: bridge # 컨테이너를 연결할 커스텀 네트워크 그룹 생성
chat gpt에게 envirment각 주석을 달아달라고 했다.
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
- ZOOKEEPER_CLIENT_PORT: Zookeeper 클라이언트 연결을 위한 포트 번호를 설정합니다. 여기서는 2181 포트를 사용하고 있으며, 클라이언트는 이 포트를 통해 Zookeeper에 연결합니다.
- ZOOKEEPER_TICK_TIME: Zookeeper의 하트비트(heartbeat) 주기를 밀리초 단위로 설정합니다. 이 값은 Zookeeper의 내부 타이머 간격을 나타내며, 일반적으로 2000 밀리초(2초)로 설정됩니다.
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
- KAFKA_BROKER_ID: Kafka 브로커의 고유 ID를 설정합니다. 여기서는 1로 설정되어 있습니다.
- KAFKA_ZOOKEEPER_CONNECT: Kafka 브로커가 연결할 Zookeeper 서버의 주소와 포트를 설정합니다. 이 경우, "zookeeper:2181"로 설정되어 있으며, Zookeeper 컨테이너의 이름과 포트를 지정합니다.
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: Kafka 브로커의 각 리스너(Listener)의 보안 프로토콜을 설정합니다. 여기서는 "PLAINTEXT" 리스너가 "PLAINTEXT" 프로토콜을 사용하도록 설정되어 있습니다.
- KAFKA_ADVERTISED_LISTENERS: Kafka 브로커가 외부에서 접속할 때 사용할 주소를 설정합니다. 이 경우, "PLAINTEXT://localhost:9092"와 "PLAINTEXT_INTERNAL://kafka:29092"로 설정되어 있으며, 외부 클라이언트 및 내부 컨테이너 간의 통신을 지원합니다.
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: Kafka 오프셋 토픽의 복제 팩터(Replication Factor)를 설정합니다. 이 값은 오프셋 토픽의 데이터 복제 수를 나타냅니다. 여기서는 1로 설정되어 있으므로 데이터의 단일 복제만 사용합니다.
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: Kafka 트랜잭션 상태 로그의 최소 ISR(In-Sync Replica) 수를 설정합니다. 이 값은 ISR에 포함된 최소 브로커 수를 나타냅니다. 여기서는 1로 설정되어 있으므로 하나의 브로커에서도 트랜잭션 상태 로그를 사용할 수 있습니다.
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: Kafka 트랜잭션 상태 로그의 복제 팩터를 설정합니다. 이 값은 트랜잭션 상태 로그의 데이터 복제 수를 나타냅니다. 여기서는 1로 설정되어 있으므로 데이터의 단일 복제만 사용합니다.
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:29092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "answer-avro"
CONNECT_CONFIG_STORAGE_TOPIC: "answer-avro-config"
CONNECT_OFFSET_STORAGE_TOPIC: "answer-avro-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "answer-avro-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
- CONNECT_BOOTSTRAP_SERVERS: Kafka Connect가 부트스트랩할 Kafka 브로커의 서버 주소를 설정합니다. 여기서는 "kafka:29092"로 설정되어 있으며, Kafka 컨테이너의 이름과 포트를 지정합니다.
- CONNECT_REST_PORT: Kafka Connect REST API가 사용할 포트 번호를 설정합니다. 여기서는 8083 포트로 설정되어 있습니다.
- CONNECT_GROUP_ID: Kafka Connect 그룹 ID를 설정합니다. 여기서는 "answer-avro"로 설정되어 있습니다.
- CONNECT_CONFIG_STORAGE_TOPIC: Kafka Connect의 설정 저장소 토픽 이름을 설정합니다.
- CONNECT_OFFSET_STORAGE_TOPIC: Kafka Connect의 오프셋 저장소 토픽 이름을 설정합니다.
- CONNECT_STATUS_STORAGE_TOPIC: Kafka Connect의 상태 저장소 토픽 이름을 설정합니다.
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 설정 저장소 토픽의 복제 팩터를 설정합니다.
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 오프셋 저장소 토픽의 복제 팩터를 설정합니다.
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 상태 저장소 토픽의 복제 팩터를 설정합니다.
- CONNECT_KEY_CONVERTER, CONNECT_VALUE_CONVERTER, CONNECT_INTERNAL_KEY_CONVERTER, CONNECT_INTERNAL_VALUE_CONVERTER: Kafka Connect에서 사용할 데이터 변환기(컨버터)를 설정합니다. 여기서는 JSON 컨버터를 사용하도록 설정되어 있습니다.
- CONNECT_REST_ADVERTISED_HOST_NAME: Kafka Connect REST API의 외부에서 접속할 때 사용할 호스트 이름을 설정합니다.
- CONNECT_LOG4J_ROOT_LOGLEVEL: 로깅 레벨을 설정합니다. 여기서는 DEBUG로 설정되어 있으므로 디버그 레벨의 로그를 기록합니다.
- CONNECT_PLUGIN_PATH: Kafka Connect 플러그인이 위치하는 경로를 설정합니다.
728x90
'Docker,Jenkins' 카테고리의 다른 글
kafka 실행시켜보기 (feat. docker) (0) | 2023.09.23 |
---|---|
[jenkins] docker build 후 amazon ECR로 이미지 업로드하기 (0) | 2023.07.25 |
JenkinsFile로 pipeline 구축 (1) | 2023.07.11 |
Docker와 springboot,mysql Docker Compose로 올려서 사용하기 (0) | 2023.04.02 |
Container Volume 이란 ? (0) | 2023.03.31 |