Example code for Java
Requirements for JSON Producer and Consumer
When using our Event Streaming service with java applications, you will need the kafka-clients
as main dependency. json
and faker
are just small dependencies to execute a minimal
working example for this case. The given code example in this section will guide you towards a Maven project The most recent version can be found
in the Maven Repository or the artifact repository of your choice.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka-test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version> <!-- replace with the version you want to use -->
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20230618</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
Remember to setup your keystore and truststore according to our documentation
Your first kafka Producer and Consumer
Here is an example code for a SecureKafkaProducer.java
package org.example;
import com.github.javafaker.Faker;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONObject;
import java.util.Properties;
import java.util.UUID;
public class SecureKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<kafka_bootstrap_servers");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// SSL properties
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "</path/to/truststore.jks>");
props.put("ssl.truststore.password", "<truststore_password>");
props.put("ssl.keystore.location", "</path/to/keystore.jks>");
props.put("ssl.keystore.password", "<keystore_password>");
props.put("ssl.key.password", "<key_password>");
Producer<String, String> producer = new KafkaProducer<>(props);
Faker faker = new Faker();
for (int i = 0; i < 100; i++) {
UUID uuid = UUID.randomUUID();
String name = faker.name().fullName();
String location = faker.address().fullAddress();
JSONObject message = new JSONObject();
message.put("id", uuid.toString());
message.put("name", name);
message.put("location", location);
producer.send(new ProducerRecord<>("your-topic-name", uuid.toString(), message.toString()));
System.out.println("Sent message: " + message);
}
producer.close();
}
}
Analog you can define a SecureKafkaConsumer.java
package org.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.json.JSONObject;
import java.util.Collections;
import java.util.Properties;
public class SecureKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<bootstrap_servers>");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "<group_id>"); // Set consumer group
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "<auto_offset_level>");
// SSL properties
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "</path/to/truststore.jks>");
props.put("ssl.truststore.password", "<truststore_password>");
props.put("ssl.keystore.location", "</path/to/keystore.jks>");
props.put("ssl.keystore.password", "<keystore_password>");
props.put("ssl.key.password", "<key_password>");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("your-topic-name"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
JSONObject message = new JSONObject(record.value());
String id = message.getString("id");
String name = message.getString("name");
String location = message.getString("location");
System.out.printf("Received message: id = %s, name = %s, location = %s%n", id, name, location);
}
}
}
}
Requirements for Avro Producer and Consumer
For avro serialized data, we need a couple more requirements. Here is a updated pom.xml
for a Maven project to consume a topic with a schema registry.
In production environment, you should always consider using a schema registry so that your consuming applications know the expected fields.
You can always use the artifact repository of your choice.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<groupId>org.example</groupId>
<artifactId>kafka-test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version> <!-- replace with the version you want to use -->
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20230618</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.4.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
We included a build step to generate Java classes from .avsc
files, which compile any needed methods for producing and consuming messages.
The User.avsc
for this example looks like this:
{
"namespace": "org.example",
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "location", "type": "string"}
]
}
Your Kafka Avro Procuder and Consumer
An example implementation for a SecureKafkaAvroProducer.java
to produce avro serialized data for the given avro schema defined above:
package org.example;
import com.github.javafaker.Faker;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
public class SecureKafkaAvroProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "<bootstrap_servers>");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
String topic = "<topic_name>";
// SSL properties for Kafka
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "</path/to/truststore.jks>");
props.put("ssl.truststore.password", "<truststore_password>");
props.put("ssl.keystore.location", "</path/to/keystore.jks>");
props.put("ssl.keystore.password", "<keystore_password>");
props.put("ssl.key.password", "<key_password>");
// Schema Registry location
props.put("schema.registry.url", "<schema_registry_url>");
// SSL properties for Schema Registry
props.put("schema.registry.ssl.keystore.location", "</path/to/keystore.jks>");
props.put("schema.registry.ssl.keystore.password", "<keystore_password>");
props.put("schema.registry.ssl.key.password", "<key_password>");
Producer<String, User> producer = new KafkaProducer<>(props);
Faker faker = new Faker();
for (int i = 0; i < 100; i++) {
User user = new User();
user.setId(UUID.randomUUID().toString());
user.setName(faker.name().fullName());
user.setLocation(faker.address().fullAddress());
producer.send(new ProducerRecord<>(topic, (String) user.getId(), user));
System.out.println("Sent message: " + user);
}
producer.close();
}
}
Analog we can define a SecureKafkaAvroConsumer.java
to consume our sample data.
package org.example;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SecureKafkaAvroConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "<bootstrap_servers>");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.setProperty("group.id", "<consumer_id>");
props.setProperty("auto.offset.reset", "<consumer_offset>");
String topic = "<topic_name>";
// SSL properties for Kafka
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "</path/to/truststore.jks>");
props.put("ssl.truststore.password", "<truststore_password>");
props.put("ssl.keystore.location", "</path/to/keystore.jks>");
props.put("ssl.keystore.password", "<keystore_password>");
props.put("ssl.key.password", "<key_password>");
// Schema Registry location
props.put("schema.registry.url", "<schema_registry_url>");
// SSL properties for Schema Registry
props.put("schema.registry.ssl.keystore.location", "</path/to/keystore.jks>");
props.put("schema.registry.ssl.keystore.password", "<keystore_password>");
props.put("schema.registry.ssl.key.password", "<key_password>");
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, User> record : records) {
System.out.println("Key: " + record.key() + ", Value: " + record.value());
System.out.println("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
}
}