In last example we have created producer that uses the JAVA pojo that was created by using JSON schema using jsonschema2pojo.
Now lets create a consumer that will read this data using JSON deserialize.
For this we need below JsonDeserializer
1:- JsonDeserializer:-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; public class JsonDeserializer<T> implements Deserializer<T> { private ObjectMapper objectMapper = new ObjectMapper(); private Class<T> className; public JsonDeserializer() { } /** * Set the specific Java Object Class Name * * @param props set specific.class.name to your specific Java Class Name * @param isKey set it to false */ @SuppressWarnings("unchecked") @Override public void configure(Map<String, ?> props, boolean isKey) { className = (Class<T>) props.get("specific.class.name"); } /** * Deserialize to a POJO * * @param topic topic name * @param data message bytes * @return Specific Java Object */ @Override public T deserialize(String topic, byte[] data) { if (data == null) { return null; } try { return objectMapper.readValue(data, className); } catch (Exception e) { throw new SerializationException(e); } } @Override public void close() { //nothing to close } } |
Create SiddhuJsonRecordConsumer
2:- SiddhuJsonRecordConsumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SiddhuJsonRecordConsumer { public static void main(String[] args) throws Exception{ Logger logger = LoggerFactory.getLogger(SiddhuJsonRecordConsumer.class.getName()); Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfigs.json_applicationID); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.json_bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.JsonDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConfigs.json_groupName); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put("specific.class.name", com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.SiddhuUserJsonRecord.class); KafkaConsumer<String, com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.SiddhuUserJsonRecord> consumer = new KafkaConsumer<String, com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.SiddhuUserJsonRecord>(props); consumer.subscribe(Arrays.asList(AppConfigs.json_topicName)); try { while (true){ try { ConsumerRecords<String, com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.SiddhuUserJsonRecord> records = consumer.poll(100); for (ConsumerRecord<String, com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.SiddhuUserJsonRecord> record : records){ logger.info("client Id="+ record.value().getId() + " User Name=" + record.value().getName() + " Society Name=" + record.value().getSocietyname() + " Pin Number" + record.value().getPinnumber().toString() + " House Number=" + record.value().getHousenumber().toString()); } } catch(Exception ex){ ex.printStackTrace(); } } }catch(Exception ex){ ex.printStackTrace(); } finally{ consumer.close(); } } } |
As you can see we can the json object as output.
You can download the code from
No comments:
Post a Comment