본문 바로가기

MessageBroker

Kafka Connect Source,Sink 통해 다른시스템과 데이터 주고받기

728x90

 

 

kafka Connect

카프카 커넥트는 ETL에서 E와 L의 역할을 해준다.(Extract,Load)

Source Connector는 데이터 소스에서 카프카로 보내는 역할을 하고, Sink Connector는 카프카에서 다른 목적지로 보내는 역할을 한다.

 

 

Kafka Connect를 왜 사용할까?

 

kafka를 통해서 Msa환경에 분리되어 있는 DB에 동일한 데이터 값을 전달할 수 있게 된다.

예를 들어 OrderService에 각각 다른 DB를 가지고 있고 로드밸런싱이 이루어진다고 가정하면

사용자가 데이터를 추가할 때 해당 Service Port에 들어와 있는 서비스의 DB에만 저장이 될 것이다.

동기화문제가 발생하기 때문에 이러한 단점을 해결하기위해 사용한다.

더 자세한 설명은 밑에 링크에 첨부 하겠습니다.

https://minwoo-it-factory.tistory.com/entry/Msa-%EB%A1%9C%EB%93%9C%EB%B0%B8%EB%9F%B0%EC%8B%B1-%EB%8F%99%EA%B8%B0%ED%99%94-%EB%AC%B8%EC%A0%9C-%ED%95%B4%EA%B2%B0%EB%B0%A9%EC%95%88

 

Msa 로드밸런싱 동기화 문제 해결방안

사이트 규모가 증가함에 따라서 과부하를 처리 하는 방식 중 하나를 로드 밸런싱이라 합니다. 그리고 로드 밸런싱은 N개의 다른포트에 똑같은 서비스 ( ex : 주문 서비스 ) 를 기동시켜 교차하며

minwoo-it-factory.tistory.com

우리의 최종목표

 

 

 

Kafka Connect 실습해보기

 

MariaDB 설치

$ brew install mariadb

시작, 종료, 상태확인

$ mysql.server start, mysql.server stop, mysql.server status

접속

$ mysql –uroot

데이터베이스 생성

mysql> create database mydb;

Access denied 발생시) 

$ sudo mysql –u root

mysql> use mysql;

mysql> select user, host, plugin FROM mysql.user;

mysql> set password for 'root'@'localhost'=password('test1357’);

mysql> flush privileges;

Windows)

다운로드

mariadb-10.5.8-winx64.zip 파일 다운로드

데이터베이스 초기화

.\bin\mariadb-install-db.exe 

    --datadir=C:\Work\mariadb-10.5.8-winx64\data 

    --service=mariaDB 

    --port=3306 

    --password=test1357

테이블 생성

create table users(

    id int auto_increment primary key,

    user_id varchar(20),

    pwd varchar(20),

    name varchar(20),

    created_at datetime default NOW()

);

create table orders (

    id int auto_increment primary key,

    product_id varchar(20) not null,

    qty int default 0,

    unit_price int default 0,

    total_price int default 0,

    user_id varchar(50) not null,

    order_id varchar(50) not null,

    created_at datetime default NOW()

);

 

 

Kafka Connect 설치

curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz

curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz

tar xvf confluent-community-6.1.0.tar.gz

cd  $KAFKA_CONNECT_HOME

 

Kafka Connect 실행

./bin/connect-distributed ./etc/kafka/connect-distributed.properties

 

JDBC Connector 설치

- https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html

- confluentinc-kafka-connect-jdbc-10.0.1.zip 

 

etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가

kafka-connect-jdbc-10.0.02.jar  파일이 필요하므로

예)confluentinc-kafka-connect-jdbc-10.0.2/lib

- plugin.path=[confluentinc-kafka-connect-jdbc-10.0.1 폴더]

 

JdbcSourceConnector에서 MariaDB 사용하기 위해 mariadb 드라이버 복사

cd ~/.m2/repository/org/mariadb 에 있는 jdbc jar 을 kafka-connect-jdbc 를

./share/java/kafka/ 폴더에 mariadb-java-client-2.7.2.jar  파일 복사

 

이제 모든 설정이 완료 됐다.

 

쥬키퍼실행

 

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

 

카프카서버실행

 

./bin/kafka-server-start.sh ./config/server.properties

 

카프카커넥트 실행

 

./bin/connect-distributed ./etc/kafka/connect-distributed.properties

 

토픽리스트 확인

 

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

 

Message Producer

 

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_users

 

Message Consumer

 

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning

 

Localhost:8083/connectors  Post로 실행! 

 

{

 

"name" : "my-source-connect",

 

"config" : {

 

"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",

 

"connection.url":"jdbc:mysql://localhost:3306/mydb",

 

"connection.user":"root",

 

"connection.password":"1234",

 

"mode": "incrementing",

 

"incrementing.column.name" : "id",

 

"table.whitelist":"users",

 

"topic.prefix" : "my_topic_",

 

"tasks.max" : "1"

 

}

 

}

 

my-source-connect 라는 커넥터가 잘 저장됐고

데이터의 변경사항이 생길 시 my_topic_users 라는 토픽이 생성되고 변경된 DB데이터값이 토픽에 저장된다.

 

kafka-console-producer에서 데이터 전송->topic에 추가-> mariaDb에 추가

 

띄워져 있는 서버

 

Zookeeper Server , Kafak Server, Kafka console consumer, Kafka connect 

 

 

localhost:8083/connectors. Post 로 Send!

{

"name":"my-sink-connect",

"config":{

"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",

"connection.url":"jdbc:mysql://localhost:3306/mydb",

"connection.user":"root",

"connection.password":"1234",

"auto.create":"true",

"auto.evolve":"true",

"delete.enabled":"false",

"tasks.max":"1",

"topics":"my_topic_users"

}

}

 

 

 

출처:https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%ED%81%B4%EB%9D%BC%EC%9A%B0%EB%93%9C-%EB%A7%88%EC%9D%B4%ED%81%AC%EB%A1%9C%EC%84%9C%EB%B9%84%EC%8A%A4/dashboard

728x90