Monday, December 28, 2020

JSON Schema example for Kafka Producer

 Now lets look at the different formate that can use use to create the JAVA POJO class. In last example we have created a schema using AVRO format, and then we created a producer to send this avro format data by serializing and consumer to consumer this Avro data by deserializing it.

For this we had used inbuilt Avro serialize and deserializes for our use. There is another famous format in which we send the data i.e. JSON.
The main advantages of this format is it is in string format, so you can easily debug the data that is going through the network. Agreed to the point that data will be big in size while it move from network in comparison with zip format with binary stream of AVRO. But as you know in current market JSON is widely accepted as a format and I am sure when you work with Kafka you will get a point where in you will need to fetch the data and send the data in JSON format. For this I am showing you the simple way how we can create a JSON Schema and then create a java pojo class from the same. More over JSON Schema also supports extends and inheritance concept for creating pojo which is lack by AVRO.

We will take same class SiddhuUserRecord that we had used for AVRO.

Steps remain same
1- Create a schema
2- Create a Pojo Class from the schema using api provided by http://www.jsonschema2pojo.org/.
3- Finally create a simple JSONSerializer and JSONDeserializer class that will be used by producer and consumer.

So lets first create a Schema for JSON.

1- SiddhuUserJsonRecord

1
2
3
4
5
6
7
8
9
10
11
{
  "type": "object",
  "javaType": "com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.SiddhuUserJsonRecord",
  "properties": {
    "id": {"type": "string"},
    "name": {"type": "string"},
    "housenumber": {"type": "number"},
    "societyname": {"type": "string"},   
    "pinnumber": {"type": "integer"}   
  }
}

2- Lets now modify our pom.xml to incorporate the json jar that will create the pojo files for us.

for that we will need to add following plug in

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<!-- Json Schema to POJO plugin-->
            <plugin>
                <groupId>org.jsonschema2pojo</groupId>
                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
                <version>1.0.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>generate</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                            <includeAdditionalProperties>false</includeAdditionalProperties>
                            <includeHashcodeAndEquals>false</includeHashcodeAndEquals>
                            <generateBuilders>true</generateBuilders>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            

Also we need to add these dependecies

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<!--Apache commons -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.11</version>
        </dependency>
 
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.jsonschema2pojo</groupId>
            <artifactId>jsonschema2pojo-maven-plugin</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.12.0</version>
        </dependency>

Now execute your pom.xml with
clean compile

and you will see that you got a java file SiddhuUserJsonRecord in your packages. This is autogenerated pojo class that is created by our jsonschema2pojo-maven-plugin.

In JSON we also need to create our own Serializable class. For Avro it gives us inbuild but for JSON we need to create it by own. So lets create it.

1- JsonSerializer :-

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/*
 * Copyright (c) 2018. Prashant Kumar Pandey
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * You may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and limitations under the License.
 */
 
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
 
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
 
import java.util.Map;
 
/**
 * Json Serializer
 */
public class JsonSerializer<T> implements Serializer<T> {
    private final ObjectMapper objectMapper = new ObjectMapper();
 
    public JsonSerializer() {
 
    }
 
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        //Nothing to Configure
    }
 
    /**
     * Serialize JsonNode
     *
     * @param topic Kafka topic name
     * @param data  data as JsonNode
     * @return byte[]
     */
    @Override
    public byte[] serialize(String topic, T data) {
        if (data == null) {
            return null;
        }
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error serializing JSON message", e);
        }
    }
 
    @Override
    public void close() {
 
    }
}

Now lets create a producer for the same.

1:- SiddhuJsonRecordProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.Properties;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SiddhuJsonRecordProducer {
 
    public static void main(String[] args) throws Exception{
        Logger logger = LoggerFactory.getLogger(SiddhuJsonRecordProducer.class.getName());
 
        String msg;
 
 
        Properties props = new Properties();
        props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.json_applicationID);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.json_bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");       
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, com.siddhu.kafka.example.siddhukafkaexample.simpleproducer.JsonSerializer.class);
 
 
        Producer<String, SiddhuUserJsonRecord> producer = new KafkaProducer <>(props);
        SiddhuUserJsonRecord cr = new SiddhuUserJsonRecord();
        try{
            cr.setId("1".toString());
            cr.setName("SiddhuName");
            cr.setHousenumber(Double.parseDouble("601"));
            cr.setSocietyname("SiddhuSociety");
            cr.setPinnumber(Integer.parseInt("1234"));
            producer.send(new ProducerRecord<String, SiddhuUserJsonRecord>(AppConfigs.json_topicName,cr.getId().toString(),cr)).get();
            logger.info("Message send successfully");
 
        }
 
        catch(Exception ex){
            logger.error("error occured");
            ex.printStackTrace();
        }
        finally{
            producer.close();
        }
 
    }
}

Now lets run the code and you will see we are able to successfully insert the json data inside topic.

Download :- https://github.com/shdhumale/siddhukafkaexample.git

No comments: