Wednesday, December 30, 2020

Apache Kafka Connect concept in windows

In past all example we had written producer and consumer by hand to send and consume the data from topic. But what if someone has already developed this part for us. Means code for producer which will fetch data from the external system and insert the data inside the topic this is called Kakfa connect source API and same is the case to read the data from the Kakfa from topic and insert into external system using inbuild kafka consumer api this is called kafka sink api.

In this example we will try to understand use the below model of execution

Source connect :-
My SQL DB –> Apache Kafka Connect source API –> Apache Kafka Topic

Sink connect :-
Apache Kafka Topic –> Apache Kafka Connect source API –> My SQL DB

Lets first concentrate on Source connect :-
My SQL DB –> Apache Kafka Connect source API –> Apache Kafka Topic

As we are using MySQL DB we will be going to use debezium/debezium-connector-mysql

i.e. https://www.confluent.io/hub/debezium/debezium-connector-mysql

We can either download the zip that contains all the jar files that is required to connect to the MySQL or we can use the maven repository.

We already have MySQL installed in our window machine and we have a table called as a siddhu with two column as shown below.

Image22

Now Lets make follwing entry in our pom.xml to incorporate debezium-connector-mysql connector.

Prerequisite:-
1- We assume you already have MySQL installed in your system.
2- MySql has a schema called Test and a table name as siddhu with two coliumn idSiddhu and name.
3- We have Apache Kafka installed in your system. We are using confluent Apache Kafka for the same.
As our system is Window please follow below given step to use Apache Connect in windows

  1. Configure mysql connector jar in following location. If you don’t have folder with name kafka-connect-jdbc then create it
    C:\confluent-community-6.0.1\confluent-6.0.1\share\java\kafka-connect-jdbc\mysql-connector-java-5.1.38-bin.jar
  2. Create folder kafka-connect-jdbc inside etc folder of kagfka and create a property files quickstart-mysql.properties in
    C:\confluent-community-6.0.1\confluent-6.0.1\etc\kafka-connect-jdbc\quickstart-mysql.properties
    with following content
1
2
3
4
5
6
7
name=test-source-mysql-jdbc-autoincrement      
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/test?user=root&password=root
mode=incrementing
incrementing.column.name=idSiddhu
topic.prefix=my-replicated-topic-siddhu

Follow below steps religiously
Steps:

  1. Start zookeeper
    zookeeper-server-start.bat \etc\kafka\zookeeper.properties
  2. Start Kafka Broker
    kafka-server-start.bat \etc\kafka\server.properties
  3. Start Mysql Source
    connect-standalone.bat C:\confluent-community-6.0.1\confluent-6.0.1\etc\kafka\connect-standalone.properties C:\confluent-community-6.0.1\confluent-6.0.1\etc\kafka-connect-jdbc\quickstart-mysql.properties

Start inserting data inside your siddhu table of test schema.

You will be able to see that my-replicated-topic-siddhusiddhu topic created with the data in json format as shown in the below

as we have given topic.prefix=my-replicated-topic-siddhu the new topic will be created using this as prefix to the table in which we inserted the data


Now execute any query in the table i.e. unsert, update , delete or update and you will find corresponding message is update in the kafka topic.

No comments: