Thursday, December 24, 2020

Avro Schema example for Kafka Producer

 Till in past all lacture we had discussed about the sending data and consuming it using KAFKA as broker and java as programing language.

All the data which we had send is either a string or Intger or a simple single java class object. But in real life scenario this is not the case. You will get the message which will have in depth hirarchi of the data model and creating java data model of all this class by hand is really difficult with to maintaining their serializable and nonserializable version of each class.

So the best option is to have a schema to develop for this class and should have some mechanisum with which we can create plain pojo class that can be easily serialize and deserialize.

We have many such frame work available most popular are
1- JSON :-http://www.jsonschema2pojo.org/
2- AVRO :- https://avro.apache.org/

We will see both one by one first we will talk about Avro. People working on bigdata concept might be aware of this.

Main advantages of using AVRO data formate to send and received data in KAFKA is that it is tested, highly recommanded by groups. It send data in binary formate so secure, It is bydefault in zip format so creating less traffic and as less traffic so less noise/time/data consumtion/more speed.

In addition By using AVRO support even if the schema changes old and new kafka consumer can consume the data.

So what is the concept.

1- Producer send the data to the broker with KafkaAvroSerializer
2- Along with this they also send the schema for the data formate and register it to the kafka
props.put(“schema.registry.url”, “http://localhost:8081”);
3- Finally consumer receive this data and deserialize it using

props.put(“value.deserializer”, “io.confluent.kafka.serializers.KafkaAvroDeserializer”);
props.put(“schema.registry.url”, “http://localhost:8081”);
props.put(“specific.avro.reader”, “true”);

Now the question is how we create the schema , what should be file extension, how to create a pojo class etc

Lets do it practically

lets say we have the following data that need to be send to the broker using karka producer using Avro
Step 1:- Create a schema files

1- SiddhuUserRecord.avsc
{“type”: “record”,
“name”: “SiddhuUserRecord”,
“fields”: [
{“name”: “id”, “type”: “string”},
{“name”: “username”, “type”: [“string”, “null”]},
{“name”: “useraddress”, “type”: [“string”, “null”]},
{“name”: “language”, “type”: [“string”, “null”], “default”: “None”}
]
}

2- As we are using Maven project add following line in the pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<!-- Maven Avro plugin for generating pojo-->
    <!-- Maven Avro plugin for generating pojo -->
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.10.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/com/siddhu/kafka/example/siddhukafkaexample/simpleproducer</outputDirectory>
                            <imports>
                                <import>${project.basedir}/src/main/resources/schema/SiddhuUserRecord.avsc</import>
                            </imports>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
             
              <dependencies>
 
        <!-- Avro dependency-->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.1</version>
        </dependency>
 
    </dependencies>

This is our whole pom.xml files

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>siddhukafkaexample</groupId>
    <artifactId>siddhukafkaexample</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
 
    <name>siddhukafkaexample</name>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kafka.version>2.5.1</kafka.version>
        <jackson.version>2.12.0</jackson.version>
<!--         <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target> -->
        <java.version>1.8</java.version>
    </properties>
 
    <build>
        <plugins>
            <!-- Maven Compiler Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
 
            <!-- Maven Avro plugin for generating pojo -->
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.10.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/com/siddhu/kafka/example/siddhukafkaexample/simpleproducer</outputDirectory>
                            <imports>
                                <import>${project.basedir}/src/main/resources/schema/SiddhuUserRecord.avsc</import>
                            </imports>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
 
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
 
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <!-- Avro dependency -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>
</project>

Finally run the project using maven command

clean compile

you will find we will get an files called as SiddhuUserRecord created by avro and this class can be easily serialzed now.

Now lets create our producer that will send message to broker using above avro generated class file.

1- SiddhuAvroRecordProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.siddhu.kafka.example.siddhukafkaexample.simpleproducer;
import java.util.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SiddhuAvroRecordProducer {
 
    public static void main(String[] args) throws Exception{
        Logger logger = LoggerFactory.getLogger(SiddhuAvroRecordProducer.class.getName());
 
        String msg;
 
         
        Properties props = new Properties();
        props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.avro_applicationID);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.avro_bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", AppConfigs.avro_registry_url);
         
 
        Producer<String, SiddhuUserRecord> producer = new KafkaProducer <>(props);
        SiddhuUserRecord cr = new SiddhuUserRecord();
        try{
            cr.setId("1".toString());
            cr.setLanguage("Spanish");
            cr.setUseraddress("address1".toString());
            cr.setUsername("siddhuname".toString());
            producer.send(new ProducerRecord<String, SiddhuUserRecord>(AppConfigs.avro_topicName,cr.getId().toString(),cr)).get();
            logger.info("Message send successfully");
        }
        catch(Exception ex){
             logger.error("error occured");
            ex.printStackTrace();
        }
        finally{
            producer.close();
        }
 
   }
}

To Execute our producer we will need to make few chages in pom.xml

i.e

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kafka.version>5.3.0-ccs</kafka.version>
        <jackson.version>2.12.0</jackson.version>
        <!-- <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> -->
        <java.version>1.8</java.version>
        <confluent.version>5.3.0</confluent.version>
    </properties>
     
<dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        

Our complete pom.xml is

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>siddhukafkaexample</groupId>
    <artifactId>siddhukafkaexample</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
 
    <name>siddhukafkaexample</name>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kafka.version>5.3.0-ccs</kafka.version>
        <jackson.version>2.12.0</jackson.version>
        <!-- <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> -->
        <java.version>1.8</java.version>
        <confluent.version>5.3.0</confluent.version>
    </properties>
    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>
    <build>
        <plugins>
            <!-- Maven Compiler Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
 
            <!-- Maven Avro plugin for generating pojo -->
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.10.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/com/siddhu/kafka/example/siddhukafkaexample/simpleproducer</outputDirectory>
                            <imports>
                                <import>${project.basedir}/src/main/resources/schema/SiddhuUserRecord.avsc</import>
                            </imports>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
 
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
        <!-- Confluent Kafka Avro Serializer -->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
 
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <!-- Avro dependency -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.1</version>
        </dependency>
 
 
    </dependencies>
</project>

Finally, have confluent kafka set up in your machine so that you can have schema-registry-start.bat available. Start your register, zooker and broker server. and execute our topic created by the producer you will get message inside. Check the message using kafka tool free open source

Let run the above example taking kafka setup from confluent.io download for windows.

you can download the desired version from
https://www.confluent.io/download/

Once set up run our producer and check we got the serialized avro message in Topic using Kakfa Tool.

As I am using window machine till this blog updated I am using confluent-community-6.0.1.zip. Unzip this and set the environment variable path to confluent kakfa using below command

C:\confluent-community-6.0.1\confluent-6.0.1\bin\windows

To run the producer you need to first start the registry. Currently by default confluent gives schema-registry-start only for sh files means for linux base. For windows base you can download these two files in your
C:\confluent-community-6.0.1\confluent-6.0.1\bin\windows
schema-registry-start.bat and schema-registry-run-class.bat

https://github.com/renukaradhya/confluentplatform/tree/master/bin/windows

Now run below given command to run the schema register
C:\confluent-community-6.0.1\confluent-6.0.1\bin\windows\schema-registry-start.bat ..\config\confluent-schema-registry.properties

Finally run the producer

you will be able to see the output in Kafka tool as shown below.

Download:- Source code https://github.com/shdhumale/siddhukafkaexample.git

No comments: