Wednesday, December 23, 2020

Apache kafka example using JAVA client – Producer and Consumer using JAVA language

In general practise we dont use CLI to send data and to receive data. We use language that belongs to our project for performing this operation. Any way CLI tool is good to do daily routeing like log check, health status, current load etc.

In the below example we will try to send the message/data and consume it using java producer and client. We are going to use the Maven build for the same.

Follow the below step to create the create a maven project in eclipse IDE. you can use IDE that suites you.

Step 1- Create a maven project and update your pom.xml with following dependencies.

First we will create a simple java producer application that will insert data into our topic
We will create a constant file that will store the constant value that we are going to use in our project.

Lets create few of differet Producer that will insert data into our topic
1:- SimpleProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
 *
 */
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
 
import java.util.Properties;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 *
 * @author Siddhartha
 *
 */
public class SimpleProducer {
    Logger logger = LoggerFactory.getLogger(SimpleProducer.class.getName());
 
    /**
     * @param args
     */
    public static void main(String[] args) {
        new SimpleProducer().run();
 
    }
 
    public void run(){
        // TODO Auto-generated method stub
        logger.info("Kafka Producer start...");
        Properties props = new Properties();
        props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         
        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(props);
 
         
         
        logger.info("Sending messages...");
        for (int i = 1; i <= AppConfigs.numEvents; i++) {
            producer.send(new ProducerRecord<Integer, String>(AppConfigs.topicName, i, "Simple Message-" + i));
        }
 
        logger.info("Finished message sending- Closing Kafka Producer.");
        // add a shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("stopping application...");
            // flush data
            producer.flush();
            logger.info("closing producer...");
            producer.close();
            logger.info("done!");
        }));
        //producer.close();
    }
 
}

here we just are inserting data with message Simple Message- increment in topic.

2-SiddhuSimpleProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Properties;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
 
 
public class SiddhuSimpleProducer {
 
    Logger logger = LoggerFactory.getLogger(SiddhuSimpleProducer.class.getName());
 
    public SiddhuSimpleProducer(){}
 
    public static void main(String[] args) {
        new SiddhuSimpleProducer().run();
    }
 
    public void run(){
 
        logger.info("Setup");
        // create a kafka producer
        KafkaProducer<Integer, String> producer = createKafkaProducer();
 
        // add a shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("stopping application...");
            // flush data
            producer.flush();
            producer.close();
            logger.info("done!");
        }));
 
        // loop to send tweets to kafka
        logger.info("Sending messages...");
        for (int i = 1; i <= AppConfigs.numEvents; i++) {
            producer.send(new ProducerRecord<Integer, String>(AppConfigs.topicName, i, "Siddhu Simple Producer Message-" + i));
        }
        logger.info("End of application");
    }
 
 
 
    public KafkaProducer<Integer, String> createKafkaProducer(){
        //String bootstrapServers = "127.0.0.1:9092";
 
        // create Producer properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,  AppConfigs.bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        // create safe Producer
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // kafka 2.0 >= 1.1 so we can keep this as 5. Use 1 otherwise.
 
        // high throughput producer (at the expense of a bit of latency and CPU usage)
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size
 
        // create the producer
        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
        return producer;
    }
}

Here we are using proper shutdown hook to close the application along with some of the concept like safe producer and high throughput.

3- SiddhuSimpleProducerWithCallBack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
 
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
 
 
public class SiddhuSimpleProducerWithCallBack {
 
    Logger logger = LoggerFactory.getLogger(SiddhuSimpleProducerWithCallBack.class.getName());
 
    public SiddhuSimpleProducerWithCallBack(){}
 
    public static void main(String[] args) {
        new SiddhuSimpleProducerWithCallBack().run();
    }
 
    public void run(){
 
        logger.info("Setup");
        // create a kafka producer
        KafkaProducer<Integer, String> producer = createKafkaProducer();
 
        // add a shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("stopping application...");
            // flush data
            producer.flush();
            producer.close();
            logger.info("done!");
        }));
 
        // loop to send tweets to kafka
        logger.info("Sending messages...");
        /*
         * for (int i = 1; i <= AppConfigs.numEvents; i++) { producer.send(new
         * ProducerRecord<Integer, String>(AppConfigs.topicName, i,
         * "Siddhu Simple Producer Message-" + i)); }
         */
        for (int i=0; i<20; i++ ) {
            // create a producer record
            ProducerRecord<Integer, String> record =
                    new ProducerRecord<Integer, String>(AppConfigs.topicName, i , "Siddhu different key Simple Producer with callback Message -" + i);
 
            // send data - asynchronous
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // executes every time a record is successfully sent or an exception is thrown
                    if (e == null) {
                        // the record was successfully sent
                        logger.info("Received new metadata. \n" +
                                "Topic:" + recordMetadata.topic() + "\n" +
                                "Partition: " + recordMetadata.partition() + "\n" +
                                "Offset: " + recordMetadata.offset() + "\n" +
                                "Timestamp: " + recordMetadata.timestamp());
                    } else {
                        logger.error("Error while producing", e);
                    }
                }
            });
 
            /*   try {
            for (int i=0; i<20; i++ ) {
                // create a producer record
                ProducerRecord<Integer, String> record =
                        new ProducerRecord<Integer, String>(AppConfigs.topicName, "Siddhu Sync Simple Producer with callback Message -" + i);
 
                // send data - asynchronous
                producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        // executes every time a record is successfully sent or an exception is thrown
                        if (e == null) {
                            // the record was successfully sent
                            logger.info("Received new metadata. \n" +
                                    "Topic:" + recordMetadata.topic() + "\n" +
                                    "Partition: " + recordMetadata.partition() + "\n" +
                                    "Offset: " + recordMetadata.offset() + "\n" +
                                    "Timestamp: " + recordMetadata.timestamp());
                        } else {
                            logger.error("Error while producing", e);
                        }
                    }
                }).get(); // block the .send() to make it synchronous - don't do this in production!
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }*/
            logger.info("End of application");
        }
    }
 
 
 
    public KafkaProducer<Integer, String> createKafkaProducer(){
        //String bootstrapServers = "127.0.0.1:9092";
 
        // create Producer properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,  AppConfigs.bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        // create safe Producer
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // kafka 2.0 >= 1.1 so we can keep this as 5. Use 1 otherwise.
 
        // high throughput producer (at the expense of a bit of latency and CPU usage)
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size
 
        // create the producer
        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
        return producer;
    }
}

Here we are sending the message async in nature by implementing Callback. Also if you want this to be sync just add .get() for send() method and we are done with it.

4- Multithreading example SiddhuKafkaThreadProducer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
 
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Scanner;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class SiddhuKafkaThreadProducer  implements Runnable{ 
     
    private static Logger logger = LoggerFactory.getLogger(SiddhuKafkaThreadProducer.class.getName());
    private String fileLocation;
    private String topicName;
    private KafkaProducer<Integer, String> producer;
 
    SiddhuKafkaThreadProducer(KafkaProducer<Integer, String> producer, String topicName, String fileLocation) {
        this.producer = producer;
        this.topicName = topicName;
        this.fileLocation = fileLocation;
    }
 
    @Override
    public void run() {
        logger.info("Start Processing " + fileLocation);
        File file = new File(fileLocation);
        int counter = 0;
 
        try (Scanner scanner = new Scanner(file)) {
            while (scanner.hasNextLine()) {
                String line = scanner.nextLine();
                producer.send(new ProducerRecord<>(topicName, null, line));
                counter++;
            }
            logger.info("Finished Sending " + counter + " messages from " + fileLocation);
        } catch (FileNotFoundException e) {
            throw new RuntimeException(e);
        }
 
    }
 
    public static void main(String args[]){ 
         
        Properties props = new Properties();
        try{
            InputStream inputStream = new FileInputStream(AppConfigs.thread_kafkaConfigFileLocation);
            props.load(inputStream);
            props.put(ProducerConfig.CLIENT_ID_CONFIG,AppConfigs.thread_applicationID);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        }catch (IOException e){
            throw new RuntimeException(e);
        }
 
        KafkaProducer<Integer,String> producer = new KafkaProducer<>(props);
        Thread[] objThread = new Thread[AppConfigs.thread_eventFiles.length];
        logger.info("Starting Dispatcher threads...");
        for(int i=0;i<AppConfigs.thread_eventFiles.length;i++){
            objThread[i]=new Thread(new SiddhuKafkaThreadProducer(producer,AppConfigs.thread_topicName,AppConfigs.thread_eventFiles[i]));
            objThread[i].start();
        }
 
        try {
            for (Thread t : objThread) t.join();
        }catch (InterruptedException e){
            logger.error("Main Thread Interrupted");
        }finally {
            producer.close();
            logger.info("Finished Dispatcher Demo");
        }
     
    

Many time we have a requirement where in we need to have the Trasection like behaviour for Atomicity of operation. i.e. either all the message is entered in to topic or if any one fail the roll back all. This can be done in Kafka using producer.beginTransaction(); and producer.commitTransaction();

Make sure when you use this trasection api unless and untill you commit or abort the instance that are in transection dont start the new trasection on the same object else you will get error.

Let have one example of the same

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Properties;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class SiddhuTransectionProducer {
    private static Logger logger = LoggerFactory.getLogger(SiddhuTransectionProducer.class.getName());
 
    public static void main(String[] args) {
 
        logger.info("Creating Kafka Producer...");
        Properties props = new Properties();
        props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.transection_applicationID);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.transection_bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, AppConfigs.transection_transaction_id);
 
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
        producer.initTransactions();
 
        logger.info("Starting First Transaction...");
        producer.beginTransaction();
        try {
            for (int i = 1; i <= AppConfigs.transection_numEvents; i++) {
                producer.send(new ProducerRecord<>(AppConfigs.transection_topicName1, i, "Simple Message-T11-" + i));
                producer.send(new ProducerRecord<>(AppConfigs.transection_topicName2, i, "Simple Message-T11-" + i));
            }
            logger.info("Committing First Transaction.");
            producer.commitTransaction();
        }catch (Exception e){
            logger.error("Exception in First Transaction. Aborting...");
            producer.abortTransaction();
            producer.close();
            throw new RuntimeException(e);
        }
 
        logger.info("Starting Second Transaction...");
        producer.beginTransaction();
        try {
            for (int i = 1; i <= AppConfigs.transection_numEvents; i++) {
                producer.send(new ProducerRecord<>(AppConfigs.transection_topicName1, i, "Simple Message-T22-" + i));
                producer.send(new ProducerRecord<>(AppConfigs.transection_topicName2, i, "Simple Message-T22-" + i));
                throw new Exception();
            }
            logger.info("Aborting Second Transaction.");
            //producer.abortTransaction();
            //producer.commitTransaction();
        }catch (Exception e){
            logger.error("Exception in Second Transaction. Aborting...");
            producer.abortTransaction();
            producer.close();
            throw new RuntimeException(e);
        }
 
        logger.info("Finished - Closing Kafka Producer.");
        producer.close();
 
    }
}

Till now were sending the data i.e. key and message as text which is by serializable. In real world solution we need to send original java object that is serialized to broker. Lets see one example of the same

1 MySerializerProducer: –

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Date;
import java.util.Properties;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class MySerializerProducer {
 
   public static void main(String[] args) throws Exception{
 
      String topicName = "AddressTopic";
 
      Properties props = new Properties();
      props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.ser_der_applicationID);
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.ser_der_bootstrapServers);
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.AddressSerializer");
      
      Producer<String, Address> producer = new KafkaProducer <>(props);
 
 
          Address sp1 = new Address(101,"Society 1.",new Date());
          Address sp2 = new Address(102,"Society 2.",new Date());
 
         producer.send(new ProducerRecord<String,Address>(topicName,"MySup",sp1)).get();
         producer.send(new ProducerRecord<String,Address>(topicName,"MySup",sp2)).get();
           
         producer.close();
 
   }
}

In the above class we have created our own serializer
props.put(“value.serializer”, “com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.AddressSerializer”);
this will serialize the data that need to be transfer to the internet.

2 :- AddressSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.nio.ByteBuffer;
import java.util.Map;
 
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
 
public class AddressSerializer implements Serializer<Address> {
    private String encoding = "UTF8";
 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to configure
    }
 
    @Override
    public byte[] serialize(String topic, Address data) {
 
        int sizeOfSocietyName;
        int sizeOfStartDate;
        byte[] serializedSocietyName;
        byte[] serializedStartDate;
 
        try {
            if (data == null)
                return null;
            serializedSocietyName = data.getSocietyName().getBytes(encoding);
            sizeOfSocietyName = serializedSocietyName.length;
            serializedStartDate = data.getStartDate().toString().getBytes(encoding);
            sizeOfStartDate = serializedStartDate.length;
 
            ByteBuffer buf = ByteBuffer.allocate(sizeOfSocietyName+sizeOfStartDate+50);
            buf.putInt(data.getHouseId());
            buf.putInt(sizeOfSocietyName);
            buf.put(serializedSocietyName);
            buf.putInt(sizeOfStartDate);
            buf.put(serializedStartDate);
 
 
            return buf.array();
 
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Address to byte[]");
        }
    }
 
    @Override
    public void close() {
        // nothing to do
    }
}

3:- Address

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Date;
public class Address{
        private int houseId;
        private String societyName;
        private Date startDate;
 
        public Address(int houseId, String societyName, Date startDate){
                this.houseId = houseId;
                this.societyName = societyName;
                this.startDate = startDate;
        }
 
        /**
         * @return the houseId
         */
        public int getHouseId() {
            return houseId;
        }
 
        /**
         * @param houseId the houseId to set
         */
        public void setHouseId(int houseId) {
            this.houseId = houseId;
        }
 
        /**
         * @return the societyName
         */
        public String getSocietyName() {
            return societyName;
        }
 
        /**
         * @param societyName the societyName to set
         */
        public void setSocietyName(String societyName) {
            this.societyName = societyName;
        }
 
        /**
         * @return the startDate
         */
        public Date getStartDate() {
            return startDate;
        }
 
        /**
         * @param startDate the startDate to set
         */
        public void setStartDate(Date startDate) {
            this.startDate = startDate;
        }
 
        @Override
        public String toString() {
            return "Address [houseId=" + houseId + ", societyName=" + societyName + ", startDate=" + startDate + "]";
        }
 
        
}

Note: – For such configuration we must also write the derializer at the consumer end.

Let write our first Consumer

1- MySerializerConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
 
public class MySerializerConsumer{
 
        public static void main(String[] args) throws Exception{
 
                String topicName = "AddressTopic";
                String groupName = "AddressTopicGroup";
 
                Properties props = new Properties();
                props.put("group.id", groupName);
                props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.ser_der_applicationID);
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.ser_der_bootstrapServers);
                props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer", "com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.AddressDeserializer");
                 
                KafkaConsumer<String, Address> consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Arrays.asList(topicName));
 
                while (true){
                        ConsumerRecords<String, Address> records = consumer.poll(100);
                        for (ConsumerRecord<String, Address> record : records){
                                System.out.println("House number= " + String.valueOf(record.value().getHouseId()) + " Society  Name = " + record.value().getSocietyName() + " Start Date = " + record.value().getStartDate().toString());
                        }
                }
 
        }
}

Here we are using deserailizer of address calls given below for reading searlize Address class from producer.

2- AddressDeserializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.function.Supplier;
 
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
 
public class AddressDeserializer implements Deserializer<Address> {
    private String encoding = "UTF8";
 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
                //Nothing to configure
        }
 
    @Override
    public Address deserialize(String topic, byte[] data) {
 
        try {
            if (data == null){
                System.out.println("Null recieved at deserialize");
                                return null;
                                }
            ByteBuffer buf = ByteBuffer.wrap(data);
            int id = buf.getInt();
 
            int sizeOfName = buf.getInt();
            byte[] nameBytes = new byte[sizeOfName];
            buf.get(nameBytes);
            String deserializedName = new String(nameBytes, encoding);
 
            int sizeOfDate = buf.getInt();
            byte[] dateBytes = new byte[sizeOfDate];
            buf.get(dateBytes);
            String dateString = new String(dateBytes,encoding);
 
            DateFormat df = new SimpleDateFormat("EEE MMM dd HH:mm:ss Z yyyy");
 
            return new Address(id,deserializedName,df.parse(dateString));
 
 
 
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to Address");
        }
    }
 
    @Override
    public void close() {
        // nothing to do
    }
}

In same way we can have our own partitioner class also.
You can find the running code at belwo given Git location
https://github.com/shdhumale/siddhukafkaexample.git

Wednesday, December 16, 2020

Apache KAFKA CLI

 To understand and use initial level Apache kafka we need to install it on our machine. As i am using window i had downloaded kafka_2.13-2.6.0.tgz the same from the belwo site.

https://kafka.apache.org/downloads

extract it and set the required environment variable so that you can directly use it when you open any cmd prompt.
i.e.
KAFKA_HOME C:\kafka_2.13-2.6.0\bin

However you can also take he commercial version from confluent.io or also used the managed service from cloud system like confluent, amazone etc.
benefit of using commercial version is that they provided a bunch of tool and support that is needed to you. most likely in prod env we are going to use the confluent.io.Additional for managed approach from cloude we dont need to think of anything all infra needs are taken care by them we only get the instance and we are good to go with it.

Now there is sequence of step that we need to for starting our Apache cluster in local machine.

Step 1 :-
‘- Zookeper starting

Zookeper is bundled by default with Apache kafka it is used to monitor kafka. To understand it visit this site.
https://zookeeper.apache.org/index.html. Zookeeper keeps track of status of the Kafka cluster nodes and it also keeps track of Kafka topics, partitions etc. Zookeeper it self is allowing multiple clients to perform simultaneous reads and writes and acts as a shared configuration service within the system.

C:\kafka_2.13-2.6.0\bin\windows>zookeeper-server-start.bat ../../config/zookeeper.properties

Step 2:-
‘- kafka broker
Then we will start the broker using below command.

C:\kafka_2.13-2.6.0\bin\windows>kafka-server-start.bat C:\kafka_2.13-2.6.0\config\server.properties

Note:- We can create many broker as we want only the changes we want to in the C:\kafka_2.13-2.6.0\config\server.properties are

1- broker.id=0
2- log.dirs=/tmp/kafka-logs
3- listeners=PLAINTEXT://:9092

Step 3:-

‘- Create Topics
We will create a topic where we are going to store our message send from the producer.

C:\kafka_2.13-2.6.0\bin\windows>kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test-siddhu

Ste 4:-
‘- consumer

We will create a consumer who will send a message to above created topics.

C:\kafka_2.13-2.6.0\bin\windows>kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic test-siddhu –from-beginning

Step 5:-
‘- producer

We will create a producer which will read the data entered in to the Topic by Producer.

C:\kafka_2.13-2.6.0\bin\windows>kafka-console-producer.bat –broker-list localhost:9092 –topic test-siddhu

Step 6:-
‘- Check topic is created and list of the topics.

C:\kafka_2.13-2.6.0\bin\windows>kafka-topics.bat –list –zookeeper localhost:2181
test-siddhu

‘- Topic descriptions
C:\kafka_2.13-2.6.0\bin\windows>kafka-topics.bat –describe -zookeeper localhost:2181 –topic test-siddhu
Topic: test-siddhu PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test-siddhu Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Now lets send the data from the producer and check it is coming to consumer. We had created/setup the apache kafka with one producer and one consumer




Now lets send the data from the producer and check it is coming to consumer. We had created/setup the apache kafka with one producer and one consumer





For this we will three broker lets create it. This can be done simply by coping the C:\kafka_2.13-2.6.0\config\server.properties to differ name properties file i.e. C:\kafka_2.13-2.6.0\config\server1.properties and C:\kafka_2.13-2.6.0\config\server2.properties

Additional we also need to change the following below given three things in each properties files

server.properties- Will keep its value default
server1.properties-
broker.id=1
port=9093
log.dirs=/tmp/kafka-logs1
server2.properties
broker.id=2
port=9094
log.dirs=/tmp/kafka-logs2

Now start zookeeper using command

C:\kafka_2.13-2.6.0\bin\windows>zookeeper-server-start.bat ../../config/zookeeper.properties

Start three broker using command
C:\kafka_2.13-2.6.0\bin\windows>kafka-server-start.bat C:\kafka_2.13-2.6.0\config\server.properties
C:\kafka_2.13-2.6.0\bin\windows>kafka-server-start.bat C:\kafka_2.13-2.6.0\config\server1.properties
C:\kafka_2.13-2.6.0\bin\windows>kafka-server-start.bat C:\kafka_2.13-2.6.0\config\server2.properties

Now create a topic that will have 3 partition using below command
C:\kafka_2.13-2.6.0\bin\windows>kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test-consumer-group

Test Topic is created using below command
C:\kafka_2.13-2.6.0\bin\windows>kafka-topics.bat --describe -zookeeper localhost:2181 --topic test-consumer-group




Now lets create a two consumer with same consumer group for that execute below command twice

C:\kafka_2.13-2.6.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-consumer-group --from-beginning --group Group1

Now lets produce the data using below command 

C:\kafka_2.13-2.6.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test-consumer-group < C:\to_delete\Apache_Kafka\data.xlsx

Once you hit the above command our data will be inserted into the two consumer which indicate that work is shared with both client/consumer as they belog to same consumer group.




You can also check the log folder that will give you which data is moved which consumner.






Tuesday, December 15, 2020

Apache Kafka as a Messaging bus.

 What is Apache Kafka

As per the defination on site it “open-source distributed event streaming platform used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.”
This means it act a messaging bus in mid of the two application so that one application generate the data and other consume this generated data either as a live streaming or as batch stream. This application can be either DB, Web any S/W applications.

How it work?
It act as a middle men work. it sits between producer application and consumer application and decoupled them. There are bright chances that they even might not be knowing each other.

Where it come from and why is the need of this system?

It is developed by Linkedin and open source in 2011. The main reason why this approach was needed is because lots of data was flowing from the Producer i.e. web, mobile, other application and this data was expected to be consume by other system like analytical, run time suggestion etc. Storing the data in DB and asking them to collect from it is a very slow process and also not user-friendly. As the new system added in to the ecosystem more complexity is added i.e. protocol they used https, http, FTP, SFTP, JDBC, TCP/IP etc and even the data that is flow between them is of different type i.e. some use json other uses xml, avro etc. To handle this we required a system that work on highthrough put and handle this efficietly with out any overhead.

Where we can apply this solution?
This can be applied in the situation given below

1- Breaking long running process so that system does not get hault on that movement

2- When large number of application need to be intracted with each other

3- If our echo system is bigger and need frequent addition/update of system to fullfill the purpose.

4- It we need high throughput with large amount of processing data

5- If we need large number of system with different data consumtion to talk each other.

6- It we want fault tolerace, self healing system

7- Data integration 

8- Microservice architecture for stream processing

9- Real time data processing data warehousing

and the list is end less usecase and possibilities.

Evalution of Kafka:-
In the begining the Kafka was developed in linkedin only for caterring the below integration of system

for this they had developer
1- Kakfa Broker :- the heart of Kakfa where it sit and perform all the operation of taking the data from the producer. Storing it to its HD i.e. using topic , partition etc and allowing the consumer to consume it as on when needed.
2- JAVA api i.e. producer and client api. :- Producer JAVA API uses to send data from the producer application to Broker and Consumer JAVA API use to consume the data from Broker to client for further respective process.

Further in due course of time its need and requirement changes and Kafka develop itself to further concept like
3- Kakfa Connect :- Use to provide ready made connect for producer and consumer so that we did not required code to writes.
4- Kafka Stream :- Use to process the data when they are in topic before seding it to consumer i.e.transfomation, filteration, complex logic etc.
5- KSQL :- This the latest form in whihc Kafka is preparing to declare it as a perstance system and can be quired with SQL structure.

So the Eco system of the Apache Kafka consist of Kafka broker, client API [producer and consumer], connect, streams, ksql

Now let’s discuss the core concept of Apache Kafka. These are some of the terminology that we are going to use though out the Kakfa family discussion.
‘-producer :- Apps used to produce the data
‘-consumer:- Apps used to consume the data
‘-broker:- Act as a middle man that facilitate the work of producer and consumer to use their data.
‘-cluster:- As it is a distributed system for high throughput we need the broker to run in cluster environment. Where in we have many machines in which we have individual topic run in a cluster environment where they know each other and can talk with each other.
‘-topic :- This is the place where message or data is stored in Kafka. You can consider this as an Table in DBMS.
‘-partition :- As the topic can be huge in nature and size for scalability and performance and security it is divided into part and each part is know as partition. This partition is stored in broker.
‘-offsets :- This is an autogenerated and read-only number provided to partition when the data is stored. It is like pk in RDDMS.
‘-consumer groups:- to increase the performance of the consumner they are tag in single group depending on their name so that they can share the work given to them. We can have max 500 consumer in a group.
‘-Replication factor :- for security purpose the data in the partition is replicated using this replication factor if it is2 then we had one original partition and duplicate.
‘-Partition factors :- This will indicate how many partitions can be done for a single topic. General practice is if you have broker less than 12 always 2* no of broker= number of partitions.
‘-Leader-follower :- Now as we have replication factor which duplicate the partition we must declare one of the partition as leader which will be used for update and read i.e. take the request from producer and also consumer for reading and other as read only for fault tolerance concept.

It is also good to see the competitor in the market for Apache KAFKA i.e. RabbitMQ and ActiveMQ. The reason why Apache Kafka has better handle
1- They work on pub sub method
2- They can store the data/message
3- IT can also behave as a queue if required.
4- IT is fault tolerance, high throughput and easy to maintain
5- Open source with lots of tools.
6- Provide large number of API client in different Language like java, python, node etc.
7- Easy learning curve and large follower groups.
8- highly configurable and easy to manage and understand this configuration.

In next series we will try to use the core concept of Apache Kafka programmatically using java api i.e. broker, producer and consumer API and also CLI tool.