Monday, December 28, 2020

Avro Schema example for Kafka consumer

 In past, we have seen how to produce a Avro schema and generate java pojo from it. Additional we also have send the data i.e. Avro format data along with Avro Schema to the Broker. For that we have to start the Schema Register on the server.

We also confirm that the message is send to the Topic/Partition using Kafka Tool.

Now lets see how to write the consumer and deserialized the Avro format data from the broker and print it into the console.

1:- SiddhuAvroRecordConsumer:-

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Arrays;
import java.util.Properties;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
 
public class SiddhuAvroRecordConsumer{   
 
    public static void main(String[] args) throws Exception{
 
 
        Logger logger = LoggerFactory.getLogger(SiddhuAvroRecordConsumer.class.getName());
 
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.avro_bootstrapServers);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfigs.avro_applicationID);       
        props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConfigs.avro_groupName);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        props.put("schema.registry.url", AppConfigs.avro_registry_url);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("specific.avro.reader", "true");
 
        //      props.put("bootstrap.servers", "localhost:9092,localhost:9093");
        //        props.put("group.id", "SiddhuAvroTopic-groupname");
        //        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //        props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        //        props.put("schema.registry.url", "http://localhost:8081");
        //        props.put("specific.avro.reader", "true");
        KafkaConsumer<String, SiddhuUserRecord> consumer = new KafkaConsumer<String, SiddhuUserRecord>(props);
        consumer.subscribe(Arrays.asList(AppConfigs.avro_topicName));
        try {  
            while (true){
 
                try {
                    ConsumerRecords<String, SiddhuUserRecord> records = consumer.poll(100);
                    for (ConsumerRecord<String, SiddhuUserRecord> record : records){
                        logger.info("client Id="+ record.value().getId()
                                + " Langaugae=" + record.value().getLanguage()
                                + " User Name=" + record.value().getUsername()
                                + " User Address=" + record.value().getUseraddress());
                    }
                } /*catch (SerializationException e) {
                    String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                    String topics = s.split("-")[0];
                    int offset = Integer.valueOf(s.split("offset ")[1]);
                    int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
 
                    TopicPartition topicPartition = new TopicPartition(topics, partition);
                    logger.info("Skipping " + AppConfigs.avro_topicName + "-" + partition + " offset " + offset);
                    consumer.seek(topicPartition, offset + 1);
                }*/
                catch(Exception ex){
                    ex.printStackTrace();
                }
//              for (ConsumerRecord<String, SiddhuUserRecord> record : records){
//                  logger.info("client Id="+ record.value().getId()
//                          + " Langaugae=" + record.value().getLanguage()
//                          + " User Name=" + record.value().getUsername()
//                          + " User Address=" + record.value().getUseraddress());
//              }
            }
             
 
 
        //}
 
    }catch(Exception ex){
        ex.printStackTrace();
    }
    finally{
        consumer.close();
    }
}
}

Note:- If you get this error

Could not find class specified in writer’s schema whilst finding reader’s schema for a Specific Record.

Make sure to recheck this things
1- You should use the same name for both producer and consumer for registry i.e. in our case we used
props.put(“schema.registry.url”, AppConfigs.avro_registry_url);
2- Make sure that AVRO generated class is created in proper packages i.e in my case i have used this below schema
{
“type”: “record”,
“name”: “SiddhuUserRecord”,
“fields”: [
{“name”: “id”, “type”: “string”},
{“name”: “username”, “type”: [“string”, “null”]},
{“name”: “useraddress”, “type”: [“string”, “null”]},
{“name”: “language”, “type”: [“string”, “null”], “default”: “None”}
]
}
without name space and then I manually copied the file to my desire packages this was giving me this error.

So i changed the namespace so that it can go to perticular packages

{“namespace”: “com.siddhu.kafka.example.siddhukafkaexample.simpleproducer”,
“type”: “record”,
“name”: “SiddhuUserRecord”,
“fields”: [
{“name”: “id”, “type”: “string”},
{“name”: “username”, “type”: [“string”, “null”]},
{“name”: “useraddress”, “type”: [“string”, “null”]},
{“name”: “language”, “type”: [“string”, “null”], “default”: “None”}
]
}

and also chages the pom.xml

from
${project.basedir}/src/main/java/com/siddhu/kafka/example/siddhukafkaexample/simpleproducer

to

${project.basedir}/src/main/java/

And then I was able to consumer the data after deserializing it from Avro serializer.


Download:- https://github.com/shdhumale/siddhukafkaexample.git

No comments: