Wednesday, December 30, 2020

Apache Kafka Connect concept in windows

In past all example we had written producer and consumer by hand to send and consume the data from topic. But what if someone has already developed this part for us. Means code for producer which will fetch data from the external system and insert the data inside the topic this is called Kakfa connect source API and same is the case to read the data from the Kakfa from topic and insert into external system using inbuild kafka consumer api this is called kafka sink api.

In this example we will try to understand use the below model of execution

Source connect :-
My SQL DB –> Apache Kafka Connect source API –> Apache Kafka Topic

Sink connect :-
Apache Kafka Topic –> Apache Kafka Connect source API –> My SQL DB

Lets first concentrate on Source connect :-
My SQL DB –> Apache Kafka Connect source API –> Apache Kafka Topic

As we are using MySQL DB we will be going to use debezium/debezium-connector-mysql

i.e. https://www.confluent.io/hub/debezium/debezium-connector-mysql

We can either download the zip that contains all the jar files that is required to connect to the MySQL or we can use the maven repository.

We already have MySQL installed in our window machine and we have a table called as a siddhu with two column as shown below.

Image22

Now Lets make follwing entry in our pom.xml to incorporate debezium-connector-mysql connector.

Prerequisite:-
1- We assume you already have MySQL installed in your system.
2- MySql has a schema called Test and a table name as siddhu with two coliumn idSiddhu and name.
3- We have Apache Kafka installed in your system. We are using confluent Apache Kafka for the same.
As our system is Window please follow below given step to use Apache Connect in windows

  1. Configure mysql connector jar in following location. If you don’t have folder with name kafka-connect-jdbc then create it
    C:\confluent-community-6.0.1\confluent-6.0.1\share\java\kafka-connect-jdbc\mysql-connector-java-5.1.38-bin.jar
  2. Create folder kafka-connect-jdbc inside etc folder of kagfka and create a property files quickstart-mysql.properties in
    C:\confluent-community-6.0.1\confluent-6.0.1\etc\kafka-connect-jdbc\quickstart-mysql.properties
    with following content
1
2
3
4
5
6
7
name=test-source-mysql-jdbc-autoincrement      
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/test?user=root&password=root
mode=incrementing
incrementing.column.name=idSiddhu
topic.prefix=my-replicated-topic-siddhu

Follow below steps religiously
Steps:

  1. Start zookeeper
    zookeeper-server-start.bat \etc\kafka\zookeeper.properties
  2. Start Kafka Broker
    kafka-server-start.bat \etc\kafka\server.properties
  3. Start Mysql Source
    connect-standalone.bat C:\confluent-community-6.0.1\confluent-6.0.1\etc\kafka\connect-standalone.properties C:\confluent-community-6.0.1\confluent-6.0.1\etc\kafka-connect-jdbc\quickstart-mysql.properties

Start inserting data inside your siddhu table of test schema.

You will be able to see that my-replicated-topic-siddhusiddhu topic created with the data in json format as shown in the below

as we have given topic.prefix=my-replicated-topic-siddhu the new topic will be created using this as prefix to the table in which we inserted the data


Now execute any query in the table i.e. unsert, update , delete or update and you will find corresponding message is update in the kafka topic.

Tuesday, December 29, 2020

Kafka Stream with simple example

Till now, we had handwritten the code for producer and client to send and receive the data. There are many occasion when we need to perform some additional operation on the data that is present in Partition. i.e. find the message/data for invoice that is present on a particular date or having amount value greater than a particular figure. Agreed with you, we can write our own java class and fetch the value from topic and recalculate it but don’t you think it would be really great if we can get this ready-made filter available and for this purpose we have Apache Kafka Stream. It help is performing operation like filtering, calculating, formatting, bifurcating, cosmetic changes in the message etc.

For Apache kafka stream it is a mandate that it should take the data from any topic i.e. we cannot feed data from the outside world to kafka stream.

Let take one example.

Step 1:- First we need to add few of the maven dependensies for the same in our pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.7.0</version>
</dependency>
 
         
we will also use the gson library to parse the data
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.6</version>
</dependency>
 
<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
    <version>4.1.16</version>
</dependency>

Step 2:-

Lets create a java class in this class we will have following four step
1- Create properties
2- Create topology
3- build topology
4- start stream application

1:- SiddhuStreamsFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Properties;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import com.google.gson.JsonParser;
 
public class SiddhuStreamsFilter {
 
    public static void main(String[] args) {
        // create properties
        Properties properties = new Properties();
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.stream_bootstrapServers);
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.stream_applicationID);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());       
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
         
 
        // create a topology
        StreamsBuilder streamsBuilder = new StreamsBuilder();
 
        // input topic
        KStream<String, String> inputTopic = streamsBuilder.stream(AppConfigs.stream_topicName);
        KStream<String, String> filteredStream = inputTopic.filter(
                // filter for tweets which has a user of over 10000 followers
                (k, data) ->  extractUserFollowersInTweet(data) > 100
        );
        filteredStream.to(AppConfigs.stream_new_topicName);
 
        // build the topology
        KafkaStreams kafkaStreams = new KafkaStreams(
                streamsBuilder.build(),
                properties
        );
 
        // start our streams application
        kafkaStreams.start();
    }
 
    private static JsonParser jsonParser = new JsonParser();
 
    @SuppressWarnings("deprecation")
    private static Integer extractUserFollowersInTweet(String data){
        // gson library
        try {
            return jsonParser.parse(data)
                    .getAsJsonObject()
                    .get("pinnumber")
                    .getAsInt();
        }
        catch (NullPointerException e){
            return 0;
        }
    }
}

Let’s insert some record into our Topic. We are inserting two record one having pinnumber greate than 100 and other less than 100. As shown below

Now lets run our class as suggested in the class it will traverse all the jason record and check the pinnumber if it is greater than 100 then it will create a new topic named as SiddhuNewJsonTopic and insert that record inside it.

Download the code from https://github.com/shdhumale/siddhukafkaexample.git