Monday, December 28, 2020

JSON Schema example for Kafka consumer

 In last example we have created producer that uses the JAVA pojo that was created by using JSON schema using jsonschema2pojo.

Now lets create a consumer that will read this data using JSON deserialize.

For this we need below JsonDeserializer

1:- JsonDeserializer:-

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
 
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
 
import java.util.Map;
 

public class JsonDeserializer<T> implements Deserializer<T> {
    private ObjectMapper objectMapper = new ObjectMapper();
    private Class<T> className;
 
    public JsonDeserializer() {
 
    }
 
    /**
     * Set the specific Java Object Class Name
     *
     * @param props set specific.class.name to your specific Java Class Name
     * @param isKey set it to false
     */
    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> props, boolean isKey) {
        className = (Class<T>) props.get("specific.class.name");
    }
 
    /**
     * Deserialize to a POJO
     *
     * @param topic topic name
     * @param data  message bytes
     * @return Specific Java Object
     */
    @Override
    public T deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        try {
            return objectMapper.readValue(data, className);
        } catch (Exception e) {
            throw new SerializationException(e);
        }
    }
 
    @Override
    public void close() {
        //nothing to close
    }
}

Create SiddhuJsonRecordConsumer

2:- SiddhuJsonRecordConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SiddhuJsonRecordConsumer {
 
    public static void main(String[] args) throws Exception{
        Logger logger = LoggerFactory.getLogger(SiddhuJsonRecordConsumer.class.getName());
 
        Properties props = new Properties();
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfigs.json_applicationID);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.json_bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");       
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.JsonDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConfigs.json_groupName);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put("specific.class.name", com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.SiddhuUserJsonRecord.class);
         
         
        KafkaConsumer<String, com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.SiddhuUserJsonRecord> consumer = new KafkaConsumer<String, com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.SiddhuUserJsonRecord>(props);
        consumer.subscribe(Arrays.asList(AppConfigs.json_topicName));
        try {  
            while (true){
 
                try {
                    ConsumerRecords<String, com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.SiddhuUserJsonRecord> records = consumer.poll(100);
                    for (ConsumerRecord<String, com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.SiddhuUserJsonRecord> record : records){
                        logger.info("client Id="+ record.value().getId()
                                + " User Name=" + record.value().getName()
                                + " Society Name=" + record.value().getSocietyname()                           
                                + " Pin Number" + record.value().getPinnumber().toString()
                                + " House Number=" + record.value().getHousenumber().toString());
                    }
                }
                catch(Exception ex){
                    ex.printStackTrace();
                }
            }
 
 
        }catch(Exception ex){
            ex.printStackTrace();
        }
        finally{
            consumer.close();
        }
 
    }
}

As you can see we can the json object as output.

You can download the code from

https://github.com/shdhumale/siddhukafkaexample.git

No comments: