Today, I introduce a Spring Boot Kafka Json Serializer Example and demo how to send and receive a Java Object as JSON object from Apache Kafka using Spring-Kafka and Spring Boot. I make sure that this tutorial is useful with a real project. Let’ do together.
Spring Boot Kafka Json Serializer Example
Project Setup
Spring Kafka
: 2.3.4.RELEASESpring Boot
: 2.2.2.RELEASEApache Kafka
: kafka_2.13-2.4.0Maven
: 3.5
Project Structure
Maven Dependencies
At least, we must have these dependencies: spring-boot-starter, spring-kafka, jackson-databind
in pom.xml file. Note, you can change spring-kafka version for your own needs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
<properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-kafka.version>2.3.4.RELEASE</spring-kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> <!-- json support --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> </dependencies> |
Configure Spring Kafka Application with application.yml
Some properties that need to be explicitly set in the application.yml file, e.g host, port, topic name:
1 2 3 4 5 6 7 |
spring: kafka: servers: localhost:9092 app: topic: staff: staff.t |
Simple POJO to Serialize/Deserialize
We do with simple POJO class named Staff
, its object will be serialized/deserialized to java object/json object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
package com.javabycode.kafka; public class Staff { private String name; private String country; @JsonSerialize(using = LocalDateTimeSerializer.class) @JsonDeserialize(using = LocalDateTimeDeserializer.class) private LocalDateTime birthday; public Staff() { } public Staff(String name, String country, LocalDateTime birthday) { this.name = name; this.country = country; this.birthday = birthday; } // getter/setter } |
Using Jackson to Serialize/Deserialize
Here, we create two classes to custom Serialize/Deserialize for the birthday
field.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
// convert LocalDateTime to string in format MM/dd/yyyy HH:mm:ss public class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> { private static final Logger LOG = LoggerFactory.getLogger(LocalDateTimeSerializer.class); @Override public void serialize(LocalDateTime arg0, JsonGenerator arg1, SerializerProvider arg2) throws IOException { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MM/dd/yyyy HH:mm:ss"); String format = formatter.format(arg0); LOG.info("Serialized Date='{}'", format); arg1.writeString(format); } } //and // convert string in format MM/dd/yyyy HH:mm:ss to LocalDateTime public class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> { private static final Logger LOG = LoggerFactory.getLogger(LocalDateTimeDeserializer.class); @Override public LocalDateTime deserialize(JsonParser arg0, DeserializationContext arg1) throws IOException { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MM/dd/yyyy HH:mm:ss"); LOG.info("Deserialized Date='{}'", arg0.getText()); return LocalDateTime.parse(arg0.getText(),formatter); } } |
Create Kafka Producer and Consumer
If you want to understand deeply how to create Producer and Consumer with configuration, please the post Spring Boot Kafka Producer Consumer Configuration or You can also create Spring Boot Kafka Producer and Consumer without configuration, let check out the post Spring Boot Apache Kafka Example. Here I just introduce java source code for demonstration purposes.
Sending Kafka Messages to Topic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
@Service public class StaffSender { private static final Logger LOG = LoggerFactory.getLogger(StaffSender.class); @Autowired private KafkaTemplate<String, Staff> kafkaTemplate; @Value("${app.topic.staff}") private String topic; public void send(Staff data){ LOG.info("sending JSON data='{}' to topic='{}'", data, topic); //create message object from JSON data object Message<Staff> message = MessageBuilder .withPayload(data) .setHeader(KafkaHeaders.TOPIC, topic) .build(); kafkaTemplate.send(message); } } |
Here is the Spring Boot Kafka Producer configuration.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
@Configuration public class StaffSenderConfig { @Value("${spring.kafka.servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory<String, Staff> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, Staff> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } } |
Receiving Kafka Messages from Topic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Service public class StaffReceiver { private static final Logger LOG = LoggerFactory.getLogger(StaffReceiver.class); @KafkaListener(topics = "${app.topic.staff}") public void receive(@Payload Staff data, @Headers MessageHeaders headers) { LOG.info("received JSON data='{}'", data); headers.keySet().forEach(key -> { LOG.info("{}: {}", key, headers.get(key)); }); } } |
Here is the Spring Boot Kafka Consumer configuration:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
@Configuration @EnableKafka public class StaffReceiverConfig { @Value("${spring.kafka.servers}") private String bootstrapServers; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "json"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @Bean public ConsumerFactory<String, Staff> consumerFactory() { return new DefaultKafkaConsumerFactory<>( consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Staff.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Staff> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Staff> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } } |
Spring Boot Application
To demo our Spring Boot Kafka Json Serializer Example then we create a simple Spring Boot Application below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@SpringBootApplication public class MySpringKafkaApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(MySpringKafkaApplication.class, args); } @Autowired private StaffSender sender; @Override public void run(String... strings) throws Exception { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MM/dd/yyyy HH:mm:ss"); Staff staff = new Staff("JavabyCode", "United State", LocalDateTime.parse("07/15/2018 00:00:00", formatter)); sender.send(staff); } } |
Running Spring Boot Application
Just run Spring Boot Application normally, it will print the output below
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.2.2.RELEASE) 2020-02-10 15:49:14.524 INFO 20136 --- [ main] c.j.kafka.MySpringKafkaApplication : Starting MySpringKafkaApplication on DESKTOP-BP6EDPD with PID 20136 (D:\Projects\spring-kafka-json-serializer-deserializer-example\target\classes started by Admin in D:\Projects\spring-kafka-json-serializer-deserializer-example) 2020-02-10 15:49:14.527 DEBUG 20136 --- [ main] c.j.kafka.MySpringKafkaApplication : Running with Spring Boot v2.2.2.RELEASE, Spring v5.2.2.RELEASE 2020-02-10 15:49:14.527 INFO 20136 --- [ main] c.j.kafka.MySpringKafkaApplication : No active profile set, falling back to default profiles: default 2020-02-10 15:49:15.688 INFO 20136 --- [ main] c.j.kafka.MySpringKafkaApplication : Started MySpringKafkaApplication in 1.63 seconds (JVM running for 5.287) 2020-02-10 15:49:15.694 INFO 20136 --- [ main] c.javabycode.kafka.producer.StaffSender : sending JSON data='Staff{name='JavabyCode', country='United State', birthday='2018-07-15T00:00'}' to topic='staff.t' 2020-02-10 15:49:15.894 INFO 20136 --- [ main] c.j.kafka.LocalDateTimeSerializer : Serialized Date='07/15/2018 00:00:00' 2020-02-10 15:49:15.947 INFO 20136 --- [ntainer#0-0-C-1] c.j.kafka.LocalDateTimeDeserializer : Deserialized Date='07/15/2018 00:00:00' 2020-02-10 15:49:15.955 INFO 20136 --- [ntainer#0-0-C-1] c.j.kafka.consumer.StaffReceiver<span style="color: #ff0000;"> : received JSON data='Staff{name='JavabyCode', country='United State', birthday='2018-07-15T00:00'}' </span>2020-02-10 15:49:15.955 INFO 20136 --- [ntainer#0-0-C-1] c.j.kafka.consumer.StaffReceiver : kafka_offset: 12 2020-02-10 15:49:15.956 INFO 20136 --- [ntainer#0-0-C-1] c.j.kafka.consumer.StaffReceiver : kafka_consumer: org.apache.kafka.clients.consumer.KafkaConsumer@13d9e0a5 2020-02-10 15:49:15.956 INFO 20136 --- [ntainer#0-0-C-1] c.j.kafka.consumer.StaffReceiver : kafka_timestampType: CREATE_TIME 2020-02-10 15:49:15.956 INFO 20136 --- [ntainer#0-0-C-1] c.j.kafka.consumer.StaffReceiver : kafka_receivedMessageKey: null 2020-02-10 15:49:15.956 INFO 20136 --- [ntainer#0-0-C-1] c.j.kafka.consumer.StaffReceiver : kafka_receivedPartitionId: 0 2020-02-10 15:49:15.956 INFO 20136 --- [ntainer#0-0-C-1] c.j.kafka.consumer.StaffReceiver : kafka_receivedTopic: staff.t 2020-02-10 15:49:15.956 INFO 20136 --- [ntainer#0-0-C-1] c.j.kafka.consumer.StaffReceiver : kafka_receivedTimestamp: 1581324555901 2020-02-10 15:49:15.956 INFO 20136 --- [ntainer#0-0-C-1] c.j.kafka.consumer.StaffReceiver : kafka_groupId: json |
That’s all about Spring Boot Kafka Json Serializer Example. Now you can try to do your own practices and don’t forget to download the complete source code of Spring Boot Kafka Json Serializer Example below.
Download the complete source code
spring-kafka-json-serializer-deserializer-example.zip (342 downloads)References
Spring Boot Kafka Producer Consumer Configuration
Spring Boot Apache Kafka Example
Spring Kafka Serialize Deserialize Documentation