Wednesday, December 23, 2020

Apache kafka example using JAVA client – Producer and Consumer using JAVA language

In general practise we dont use CLI to send data and to receive data. We use language that belongs to our project for performing this operation. Any way CLI tool is good to do daily routeing like log check, health status, current load etc.

In the below example we will try to send the message/data and consume it using java producer and client. We are going to use the Maven build for the same.

Follow the below step to create the create a maven project in eclipse IDE. you can use IDE that suites you.

Step 1- Create a maven project and update your pom.xml with following dependencies.

First we will create a simple java producer application that will insert data into our topic
We will create a constant file that will store the constant value that we are going to use in our project.

Lets create few of differet Producer that will insert data into our topic
1:- SimpleProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
 *
 */
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
 
import java.util.Properties;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 *
 * @author Siddhartha
 *
 */
public class SimpleProducer {
    Logger logger = LoggerFactory.getLogger(SimpleProducer.class.getName());
 
    /**
     * @param args
     */
    public static void main(String[] args) {
        new SimpleProducer().run();
 
    }
 
    public void run(){
        // TODO Auto-generated method stub
        logger.info("Kafka Producer start...");
        Properties props = new Properties();
        props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         
        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(props);
 
         
         
        logger.info("Sending messages...");
        for (int i = 1; i <= AppConfigs.numEvents; i++) {
            producer.send(new ProducerRecord<Integer, String>(AppConfigs.topicName, i, "Simple Message-" + i));
        }
 
        logger.info("Finished message sending- Closing Kafka Producer.");
        // add a shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("stopping application...");
            // flush data
            producer.flush();
            logger.info("closing producer...");
            producer.close();
            logger.info("done!");
        }));
        //producer.close();
    }
 
}

here we just are inserting data with message Simple Message- increment in topic.

2-SiddhuSimpleProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Properties;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
 
 
public class SiddhuSimpleProducer {
 
    Logger logger = LoggerFactory.getLogger(SiddhuSimpleProducer.class.getName());
 
    public SiddhuSimpleProducer(){}
 
    public static void main(String[] args) {
        new SiddhuSimpleProducer().run();
    }
 
    public void run(){
 
        logger.info("Setup");
        // create a kafka producer
        KafkaProducer<Integer, String> producer = createKafkaProducer();
 
        // add a shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("stopping application...");
            // flush data
            producer.flush();
            producer.close();
            logger.info("done!");
        }));
 
        // loop to send tweets to kafka
        logger.info("Sending messages...");
        for (int i = 1; i <= AppConfigs.numEvents; i++) {
            producer.send(new ProducerRecord<Integer, String>(AppConfigs.topicName, i, "Siddhu Simple Producer Message-" + i));
        }
        logger.info("End of application");
    }
 
 
 
    public KafkaProducer<Integer, String> createKafkaProducer(){
        //String bootstrapServers = "127.0.0.1:9092";
 
        // create Producer properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,  AppConfigs.bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        // create safe Producer
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // kafka 2.0 >= 1.1 so we can keep this as 5. Use 1 otherwise.
 
        // high throughput producer (at the expense of a bit of latency and CPU usage)
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size
 
        // create the producer
        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
        return producer;
    }
}

Here we are using proper shutdown hook to close the application along with some of the concept like safe producer and high throughput.

3- SiddhuSimpleProducerWithCallBack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
 
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
 
 
public class SiddhuSimpleProducerWithCallBack {
 
    Logger logger = LoggerFactory.getLogger(SiddhuSimpleProducerWithCallBack.class.getName());
 
    public SiddhuSimpleProducerWithCallBack(){}
 
    public static void main(String[] args) {
        new SiddhuSimpleProducerWithCallBack().run();
    }
 
    public void run(){
 
        logger.info("Setup");
        // create a kafka producer
        KafkaProducer<Integer, String> producer = createKafkaProducer();
 
        // add a shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("stopping application...");
            // flush data
            producer.flush();
            producer.close();
            logger.info("done!");
        }));
 
        // loop to send tweets to kafka
        logger.info("Sending messages...");
        /*
         * for (int i = 1; i <= AppConfigs.numEvents; i++) { producer.send(new
         * ProducerRecord<Integer, String>(AppConfigs.topicName, i,
         * "Siddhu Simple Producer Message-" + i)); }
         */
        for (int i=0; i<20; i++ ) {
            // create a producer record
            ProducerRecord<Integer, String> record =
                    new ProducerRecord<Integer, String>(AppConfigs.topicName, i , "Siddhu different key Simple Producer with callback Message -" + i);
 
            // send data - asynchronous
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // executes every time a record is successfully sent or an exception is thrown
                    if (e == null) {
                        // the record was successfully sent
                        logger.info("Received new metadata. \n" +
                                "Topic:" + recordMetadata.topic() + "\n" +
                                "Partition: " + recordMetadata.partition() + "\n" +
                                "Offset: " + recordMetadata.offset() + "\n" +
                                "Timestamp: " + recordMetadata.timestamp());
                    } else {
                        logger.error("Error while producing", e);
                    }
                }
            });
 
            /*   try {
            for (int i=0; i<20; i++ ) {
                // create a producer record
                ProducerRecord<Integer, String> record =
                        new ProducerRecord<Integer, String>(AppConfigs.topicName, "Siddhu Sync Simple Producer with callback Message -" + i);
 
                // send data - asynchronous
                producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        // executes every time a record is successfully sent or an exception is thrown
                        if (e == null) {
                            // the record was successfully sent
                            logger.info("Received new metadata. \n" +
                                    "Topic:" + recordMetadata.topic() + "\n" +
                                    "Partition: " + recordMetadata.partition() + "\n" +
                                    "Offset: " + recordMetadata.offset() + "\n" +
                                    "Timestamp: " + recordMetadata.timestamp());
                        } else {
                            logger.error("Error while producing", e);
                        }
                    }
                }).get(); // block the .send() to make it synchronous - don't do this in production!
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }*/
            logger.info("End of application");
        }
    }
 
 
 
    public KafkaProducer<Integer, String> createKafkaProducer(){
        //String bootstrapServers = "127.0.0.1:9092";
 
        // create Producer properties
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,  AppConfigs.bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        // create safe Producer
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // kafka 2.0 >= 1.1 so we can keep this as 5. Use 1 otherwise.
 
        // high throughput producer (at the expense of a bit of latency and CPU usage)
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size
 
        // create the producer
        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
        return producer;
    }
}

Here we are sending the message async in nature by implementing Callback. Also if you want this to be sync just add .get() for send() method and we are done with it.

4- Multithreading example SiddhuKafkaThreadProducer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
 
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Scanner;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class SiddhuKafkaThreadProducer  implements Runnable{ 
     
    private static Logger logger = LoggerFactory.getLogger(SiddhuKafkaThreadProducer.class.getName());
    private String fileLocation;
    private String topicName;
    private KafkaProducer<Integer, String> producer;
 
    SiddhuKafkaThreadProducer(KafkaProducer<Integer, String> producer, String topicName, String fileLocation) {
        this.producer = producer;
        this.topicName = topicName;
        this.fileLocation = fileLocation;
    }
 
    @Override
    public void run() {
        logger.info("Start Processing " + fileLocation);
        File file = new File(fileLocation);
        int counter = 0;
 
        try (Scanner scanner = new Scanner(file)) {
            while (scanner.hasNextLine()) {
                String line = scanner.nextLine();
                producer.send(new ProducerRecord<>(topicName, null, line));
                counter++;
            }
            logger.info("Finished Sending " + counter + " messages from " + fileLocation);
        } catch (FileNotFoundException e) {
            throw new RuntimeException(e);
        }
 
    }
 
    public static void main(String args[]){ 
         
        Properties props = new Properties();
        try{
            InputStream inputStream = new FileInputStream(AppConfigs.thread_kafkaConfigFileLocation);
            props.load(inputStream);
            props.put(ProducerConfig.CLIENT_ID_CONFIG,AppConfigs.thread_applicationID);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        }catch (IOException e){
            throw new RuntimeException(e);
        }
 
        KafkaProducer<Integer,String> producer = new KafkaProducer<>(props);
        Thread[] objThread = new Thread[AppConfigs.thread_eventFiles.length];
        logger.info("Starting Dispatcher threads...");
        for(int i=0;i<AppConfigs.thread_eventFiles.length;i++){
            objThread[i]=new Thread(new SiddhuKafkaThreadProducer(producer,AppConfigs.thread_topicName,AppConfigs.thread_eventFiles[i]));
            objThread[i].start();
        }
 
        try {
            for (Thread t : objThread) t.join();
        }catch (InterruptedException e){
            logger.error("Main Thread Interrupted");
        }finally {
            producer.close();
            logger.info("Finished Dispatcher Demo");
        }
     
    

Many time we have a requirement where in we need to have the Trasection like behaviour for Atomicity of operation. i.e. either all the message is entered in to topic or if any one fail the roll back all. This can be done in Kafka using producer.beginTransaction(); and producer.commitTransaction();

Make sure when you use this trasection api unless and untill you commit or abort the instance that are in transection dont start the new trasection on the same object else you will get error.

Let have one example of the same

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Properties;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class SiddhuTransectionProducer {
    private static Logger logger = LoggerFactory.getLogger(SiddhuTransectionProducer.class.getName());
 
    public static void main(String[] args) {
 
        logger.info("Creating Kafka Producer...");
        Properties props = new Properties();
        props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.transection_applicationID);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.transection_bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, AppConfigs.transection_transaction_id);
 
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
        producer.initTransactions();
 
        logger.info("Starting First Transaction...");
        producer.beginTransaction();
        try {
            for (int i = 1; i <= AppConfigs.transection_numEvents; i++) {
                producer.send(new ProducerRecord<>(AppConfigs.transection_topicName1, i, "Simple Message-T11-" + i));
                producer.send(new ProducerRecord<>(AppConfigs.transection_topicName2, i, "Simple Message-T11-" + i));
            }
            logger.info("Committing First Transaction.");
            producer.commitTransaction();
        }catch (Exception e){
            logger.error("Exception in First Transaction. Aborting...");
            producer.abortTransaction();
            producer.close();
            throw new RuntimeException(e);
        }
 
        logger.info("Starting Second Transaction...");
        producer.beginTransaction();
        try {
            for (int i = 1; i <= AppConfigs.transection_numEvents; i++) {
                producer.send(new ProducerRecord<>(AppConfigs.transection_topicName1, i, "Simple Message-T22-" + i));
                producer.send(new ProducerRecord<>(AppConfigs.transection_topicName2, i, "Simple Message-T22-" + i));
                throw new Exception();
            }
            logger.info("Aborting Second Transaction.");
            //producer.abortTransaction();
            //producer.commitTransaction();
        }catch (Exception e){
            logger.error("Exception in Second Transaction. Aborting...");
            producer.abortTransaction();
            producer.close();
            throw new RuntimeException(e);
        }
 
        logger.info("Finished - Closing Kafka Producer.");
        producer.close();
 
    }
}

Till now were sending the data i.e. key and message as text which is by serializable. In real world solution we need to send original java object that is serialized to broker. Lets see one example of the same

1 MySerializerProducer: –

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Date;
import java.util.Properties;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class MySerializerProducer {
 
   public static void main(String[] args) throws Exception{
 
      String topicName = "AddressTopic";
 
      Properties props = new Properties();
      props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.ser_der_applicationID);
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.ser_der_bootstrapServers);
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.AddressSerializer");
      
      Producer<String, Address> producer = new KafkaProducer <>(props);
 
 
          Address sp1 = new Address(101,"Society 1.",new Date());
          Address sp2 = new Address(102,"Society 2.",new Date());
 
         producer.send(new ProducerRecord<String,Address>(topicName,"MySup",sp1)).get();
         producer.send(new ProducerRecord<String,Address>(topicName,"MySup",sp2)).get();
           
         producer.close();
 
   }
}

In the above class we have created our own serializer
props.put(“value.serializer”, “com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.AddressSerializer”);
this will serialize the data that need to be transfer to the internet.

2 :- AddressSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.nio.ByteBuffer;
import java.util.Map;
 
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
 
public class AddressSerializer implements Serializer<Address> {
    private String encoding = "UTF8";
 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to configure
    }
 
    @Override
    public byte[] serialize(String topic, Address data) {
 
        int sizeOfSocietyName;
        int sizeOfStartDate;
        byte[] serializedSocietyName;
        byte[] serializedStartDate;
 
        try {
            if (data == null)
                return null;
            serializedSocietyName = data.getSocietyName().getBytes(encoding);
            sizeOfSocietyName = serializedSocietyName.length;
            serializedStartDate = data.getStartDate().toString().getBytes(encoding);
            sizeOfStartDate = serializedStartDate.length;
 
            ByteBuffer buf = ByteBuffer.allocate(sizeOfSocietyName+sizeOfStartDate+50);
            buf.putInt(data.getHouseId());
            buf.putInt(sizeOfSocietyName);
            buf.put(serializedSocietyName);
            buf.putInt(sizeOfStartDate);
            buf.put(serializedStartDate);
 
 
            return buf.array();
 
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Address to byte[]");
        }
    }
 
    @Override
    public void close() {
        // nothing to do
    }
}

3:- Address

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Date;
public class Address{
        private int houseId;
        private String societyName;
        private Date startDate;
 
        public Address(int houseId, String societyName, Date startDate){
                this.houseId = houseId;
                this.societyName = societyName;
                this.startDate = startDate;
        }
 
        /**
         * @return the houseId
         */
        public int getHouseId() {
            return houseId;
        }
 
        /**
         * @param houseId the houseId to set
         */
        public void setHouseId(int houseId) {
            this.houseId = houseId;
        }
 
        /**
         * @return the societyName
         */
        public String getSocietyName() {
            return societyName;
        }
 
        /**
         * @param societyName the societyName to set
         */
        public void setSocietyName(String societyName) {
            this.societyName = societyName;
        }
 
        /**
         * @return the startDate
         */
        public Date getStartDate() {
            return startDate;
        }
 
        /**
         * @param startDate the startDate to set
         */
        public void setStartDate(Date startDate) {
            this.startDate = startDate;
        }
 
        @Override
        public String toString() {
            return "Address [houseId=" + houseId + ", societyName=" + societyName + ", startDate=" + startDate + "]";
        }
 
        
}

Note: – For such configuration we must also write the derializer at the consumer end.

Let write our first Consumer

1- MySerializerConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
 
public class MySerializerConsumer{
 
        public static void main(String[] args) throws Exception{
 
                String topicName = "AddressTopic";
                String groupName = "AddressTopicGroup";
 
                Properties props = new Properties();
                props.put("group.id", groupName);
                props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.ser_der_applicationID);
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.ser_der_bootstrapServers);
                props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer", "com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.AddressDeserializer");
                 
                KafkaConsumer<String, Address> consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Arrays.asList(topicName));
 
                while (true){
                        ConsumerRecords<String, Address> records = consumer.poll(100);
                        for (ConsumerRecord<String, Address> record : records){
                                System.out.println("House number= " + String.valueOf(record.value().getHouseId()) + " Society  Name = " + record.value().getSocietyName() + " Start Date = " + record.value().getStartDate().toString());
                        }
                }
 
        }
}

Here we are using deserailizer of address calls given below for reading searlize Address class from producer.

2- AddressDeserializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.function.Supplier;
 
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
 
public class AddressDeserializer implements Deserializer<Address> {
    private String encoding = "UTF8";
 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
                //Nothing to configure
        }
 
    @Override
    public Address deserialize(String topic, byte[] data) {
 
        try {
            if (data == null){
                System.out.println("Null recieved at deserialize");
                                return null;
                                }
            ByteBuffer buf = ByteBuffer.wrap(data);
            int id = buf.getInt();
 
            int sizeOfName = buf.getInt();
            byte[] nameBytes = new byte[sizeOfName];
            buf.get(nameBytes);
            String deserializedName = new String(nameBytes, encoding);
 
            int sizeOfDate = buf.getInt();
            byte[] dateBytes = new byte[sizeOfDate];
            buf.get(dateBytes);
            String dateString = new String(dateBytes,encoding);
 
            DateFormat df = new SimpleDateFormat("EEE MMM dd HH:mm:ss Z yyyy");
 
            return new Address(id,deserializedName,df.parse(dateString));
 
 
 
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to Address");
        }
    }
 
    @Override
    public void close() {
        // nothing to do
    }
}

In same way we can have our own partitioner class also.
You can find the running code at belwo given Git location
https://github.com/shdhumale/siddhukafkaexample.git

No comments: