Thursday, December 24, 2020

Consumer API example for Kafka

 As we have producer class in same way we also have consumer class that will consume message send by producer. You can write simple consumer class as given below that will consume the message

Step 1:- Set the prooerties values
Step 2:- Create consumer object using properties created in step1
Step 3:- Subscribe the consumer to a topic where we want to read the message. You can also use the wild card in this.
Step 4:- Create a while loop for continues reading
Step 5:- Create a consumer record object and call the poll method with millisecond as an arguments.
Step 6:- Finally close the consumer object.

1- SiddhuSimpleConsumer:-

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.*;
import java.io.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
 
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.*;
import java.io.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
 
public class SiddhuSimpleConsumer{
 
    public static void main(String[] args) throws Exception{
 
        String topicName = "SiddhuTopic";
        String groupName = "SiddhuTopicGroup";
 
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");
        props.put("group.id", groupName);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
 
        KafkaConsumer<String, String> consumer = null;
 
        try {
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topicName));
 
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records){
                    System.out.println("Record Value = " + record.toString());
                }
 
            }
        }catch(Exception ex){
            ex.printStackTrace();
        }finally{
 
            consumer.close();
        }
    }
}

But you may soon find issues with this above class for offset issues i.e. current and committed offset to over come this we need to do this things in the above class.

Add following line
props.put(“enable.auto.commit”, “false”);
this will tell broker to stop autocommit and consumer will manually commit the offset.
//To make sure the commit is done aysnc
consumer.commitAsync();
//in case of exception don’t wait for Async but do it sync wise
consumer.commitSync();

2- SiddhuSimpleOffsetCommitConsumer:-

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.*;
import java.io.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
 
public class SiddhuSimpleOffsetCommitConsumer{
 
    public static void main(String[] args) throws Exception{
 
        String topicName = "SiddhuTopic";
        String groupName = "SiddhuTopicGroup";
 
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");
        props.put("group.id", groupName);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //will add below line to stop auto commit and do manually commit offset.
        props.put("enable.auto.commit", "false");
 
        KafkaConsumer<String, String> consumer = null;
 
        try {
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topicName));
 
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records){
                    System.out.println("Record Value = " + record.toString());
                }
                //To make sure the commit is done aysnc
                consumer.commitAsync();
            }
        }catch(Exception ex){
            ex.printStackTrace();
        }finally{
            //in case of exception don't wait for Async but do it sync wise
            consumer.commitSync();
            consumer.close();
        }
    }
}

the above example gives us the gurrente that offset commited manually at the end of every poll method even if the exeption comes. But some times we want to commit the offset even in the middle of the process i.e. if the cosumer get 10 record to process and if it process 6 records and not committed any offset and if the rebalance happen then again the new consumer instance may get this records which is already processed above as it is still not comitted. So there is changes of dulication. To over come this we want two things
1- We should know when the rebalance is happening
2- Before it happen we must commit our offset of the record that is already processed.

For this we need to maintained the offset of current process record and commit it when rebalance happened.

To know the rebalance has happened kafka give us two api for ConsumerRebalanceListener class.

this can be done using below code.
1- onPartitionedRevoked – called when partition assigned to you i.e consumer is taken away.So this is the place where you can commit the processed offset.
2- onPartitionedAssigned :- This is called when rebalance is done and partitioned is assigned to you i.e. consumer and you are ready to take the new records.

1- SiddhuConsumerRebalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Arrays;
import java.util.Properties;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class SiddhuConsumerRebalance{
     
     
    public static void main(String[] args) throws Exception{
        Logger logger = LoggerFactory.getLogger(SiddhuConsumerRebalance.class.getName());
 
             
            KafkaConsumer<String, String> ObjConsumer = null;
             
            String groupName = "Siddhu-Consumer-Rebalance";
            Properties props = new Properties();
            props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.simple_rebalance_applicationID);
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.simple_rebalance_bootstrapServers);
            props.put("group.id", AppConfigs.simple_rebalance_groupName);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //to do the offset commit manually
            props.put("enable.auto.commit", "false");
 
            ObjConsumer = new KafkaConsumer<>(props);
            //creating instance of our listener that will help us to call the inbuild method that will be executing before and immediately rebalance happens. Providing consumer instance as an input.
            SiddhuRebalanceListner objRebalanceListner = new SiddhuRebalanceListner(ObjConsumer);
             
            ObjConsumer.subscribe(Arrays.asList(AppConfigs.simple_rebalance_topicName),objRebalanceListner);
            try{
                while (true){
                    ConsumerRecords<String, String> records = ObjConsumer.poll(100);
                    for (ConsumerRecord<String, String> record : records){
                        //System.out.println("record:-"+ record.toString());   
                        //Perform business action here
                          objRebalanceListner.addOffset(record.topic(), record.partition(),record.offset());
                    }
                        
                }
            }catch(Exception ex){
                logger.info(ex.toString());
                ex.printStackTrace();
            }
            finally{
                logger.info("Closing the consumer");
                    ObjConsumer.close();
            }
    }
     
}

2-SiddhuRebalanceListner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class SiddhuRebalanceListner implements ConsumerRebalanceListener {
    Logger logger = LoggerFactory.getLogger(SiddhuRebalanceListner.class.getName());
    private KafkaConsumer objConsumer;
    private Map<TopicPartition, OffsetAndMetadata> objCurrentOffsets = new HashMap();
 
    public SiddhuRebalanceListner(KafkaConsumer con){
        this.objConsumer=con;
    }
 
    public void addOffset(String topic, int partition, long offset){
        objCurrentOffsets.put(new TopicPartition(topic, partition),new OffsetAndMetadata(offset,"Commit"));
    }
 
    public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets(){
        return objCurrentOffsets;
    }
 
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        logger.info("Inside onPartitionsAssigned");       
        for(TopicPartition partition: partitions)  
        {
            logger.info("iteratig partition onPartitionsAssigned" + partition.partition()+",");
        }
 
    }
 
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        logger.info("Inside onPartitionsRevoked");
        for(TopicPartition partition: partitions)  
        {
            logger.info("iteratig partition onPartitionsRevoked" + partition.partition()+",");   
        }
 
 
        for(TopicPartition tp: objCurrentOffsets.keySet())
        {
            logger.info("iteratig partition commited" + tp.partition());
        }
 
        objConsumer.commitSync(objCurrentOffsets);
        objCurrentOffsets.clear();
    }
}

Now when you create a two partition and insert record in the said topic AppConfigs.simple_rebalance_topicName and if we start the one consumer we will find that one consumer will listen to both the partition as no other consumer is present.
Not if i start the anothe consumer then we will see following changes

1- Old consumer prompt will print log in this sequece

a:- Partition Revoked :- that will show both the partition i.e. 0 and 1 revoked from consumer 1
b:- Partition committed :- that will show both the partition i.e. 0 and 1 committed from consumer 1
c:- Partition assigned:- in this we will get only one partition i.e. 0 assinged as the new consumer instance is started

2-New prompt of second consumer instance started
a:- Partition assigned:- in this we will get only one partition i.e. 1 assinged

If we kill any one of the consumer instace kafka will again do the rebalancing activities and remaining one consumer will get both the partition i.e. 0 and 1.

Many time we need a situation where in we want both the case to be in our handle
1- Partition management/assignment i.e. rather than kafka do the partition assignment to different consumer we will do the parition assignement to our consumer.
2- Committed offset :- we don’t want kafka to do the offset commitment rathe we want to take the controle of offset commitment as we had done in above example.

This can be done using following below code

// to inform kafka that we are going to handld the offset commmit manually.
props.put(“enable.auto.commit”, “false”);
consumer = new KafkaConsumer<>(props);
TopicPartition p0 = new TopicPartition(topicName, 0);
TopicPartition p1 = new TopicPartition(topicName, 1);
TopicPartition p2 = new TopicPartition(topicName, 2);

1
2
3
4
5
6
7
8
9
10
11
12
consumer.assign(Arrays.asList(p0,p1,p2));
//here we had assigned the partition to the consumer
System.out.println("Current position p0=" + consumer.position(p0)
                 + " p1=" + consumer.position(p1)
                 + " p2=" + consumer.position(p2));
//this will set the current offset to the 0
consumer.seek(p0, 0);
consumer.seek(p1, 0);
consumer.seek(p2, 0);
System.out.println("New positions po=" + consumer.position(p0)
                 + " p1=" + consumer.position(p1)
                 + " p2=" + consumer.position(p2));

No comments: