kafka Connect
카프카 커넥트는 ETL에서 E와 L의 역할을 해준다.(Extract,Load)
Source Connector는 데이터 소스에서 카프카로 보내는 역할을 하고, Sink Connector는 카프카에서 다른 목적지로 보내는 역할을 한다.
Kafka Connect를 왜 사용할까?
kafka를 통해서 Msa환경에 분리되어 있는 DB에 동일한 데이터 값을 전달할 수 있게 된다.
예를 들어 OrderService에 각각 다른 DB를 가지고 있고 로드밸런싱이 이루어진다고 가정하면
사용자가 데이터를 추가할 때 해당 Service Port에 들어와 있는 서비스의 DB에만 저장이 될 것이다.
동기화문제가 발생하기 때문에 이러한 단점을 해결하기위해 사용한다.
더 자세한 설명은 밑에 링크에 첨부 하겠습니다.
우리의 최종목표
Kafka Connect 실습해보기
MariaDB 설치
$ brew install mariadb
시작, 종료, 상태확인
$ mysql.server start, mysql.server stop, mysql.server status
접속
$ mysql –uroot
데이터베이스 생성
mysql> create database mydb;
Access denied 발생시)
$ sudo mysql –u root
mysql> use mysql;
mysql> select user, host, plugin FROM mysql.user;
mysql> set password for 'root'@'localhost'=password('test1357’);
mysql> flush privileges;
Windows)
다운로드
mariadb-10.5.8-winx64.zip 파일 다운로드
데이터베이스 초기화
.\bin\mariadb-install-db.exe
--datadir=C:\Work\mariadb-10.5.8-winx64\data
--service=mariaDB
--port=3306
--password=test1357
테이블 생성
create table users(
id int auto_increment primary key,
user_id varchar(20),
pwd varchar(20),
name varchar(20),
created_at datetime default NOW()
);
create table orders (
id int auto_increment primary key,
product_id varchar(20) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
user_id varchar(50) not null,
order_id varchar(50) not null,
created_at datetime default NOW()
);
Kafka Connect 설치
curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
tar xvf confluent-community-6.1.0.tar.gz
cd $KAFKA_CONNECT_HOME
Kafka Connect 실행
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
JDBC Connector 설치
- https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html
- confluentinc-kafka-connect-jdbc-10.0.1.zip
etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가
kafka-connect-jdbc-10.0.02.jar 파일이 필요하므로
예)confluentinc-kafka-connect-jdbc-10.0.2/lib
- plugin.path=[confluentinc-kafka-connect-jdbc-10.0.1 폴더]
JdbcSourceConnector에서 MariaDB 사용하기 위해 mariadb 드라이버 복사
cd ~/.m2/repository/org/mariadb 에 있는 jdbc jar 을 kafka-connect-jdbc 를
./share/java/kafka/ 폴더에 mariadb-java-client-2.7.2.jar 파일 복사
이제 모든 설정이 완료 됐다.
쥬키퍼실행
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
카프카서버실행
./bin/kafka-server-start.sh ./config/server.properties
카프카커넥트 실행
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
토픽리스트 확인
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Message Producer
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_users
Message Consumer
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
Localhost:8083/connectors Post로 실행!
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"1234",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
my-source-connect 라는 커넥터가 잘 저장됐고
데이터의 변경사항이 생길 시 my_topic_users 라는 토픽이 생성되고 변경된 DB데이터값이 토픽에 저장된다.
kafka-console-producer에서 데이터 전송->topic에 추가-> mariaDb에 추가
띄워져 있는 서버
Zookeeper Server , Kafak Server, Kafka console consumer, Kafka connect
localhost:8083/connectors. Post 로 Send!
'MessageBroker' 카테고리의 다른 글
Redis가 싱글스레드임에도 높은 성능을 보장하는이유 (0) | 2024.11.30 |
---|---|
실습으로 배우는 선착순 이벤트 시스템 - review (1) | 2023.10.02 |
Spring Cloud Bus란 무엇인가? (0) | 2023.06.16 |
MOM 미들웨어란 무엇인가? (0) | 2023.06.16 |
RabbitMq 설치하는방법 (0) | 2023.06.16 |