Today, I introduce a Spring Boot Kafka Batch Listener Example and demonstrate how to implement Kafka producer with consumer Batch Listener. I make sure that this tutorial is useful with your real projects. Let’ do together.
Spring Boot Kafka Batch Listener Example
Project Setup
Spring Kafka
: 2.3.4.RELEASESpring Boot
: 2.2.2.RELEASEApache Kafka
: kafka_2.13-2.4.0Maven
: 3.5
Project Structure
Maven Dependencies
At least, we must have these dependencies: spring-boot-starter, spring-kafka
in pom.xml file. Note, you can change spring-kafka version for your own needs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
<properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-kafka.version>2.3.4.RELEASE</spring-kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> </dependencies> |
Configure Spring Kafka Application with application.yml
Some properties that need to be explicitly set in the application.yml file, e.g host, port, topic name:
1 2 3 4 5 6 7 |
spring: kafka: servers: localhost:9092 app: topic: staff: staff.t |
Create Kafka Producer and Consumer
If you want to understand deeply how to create Producer and Consumer with configuration, please the post Spring Boot Kafka Producer Consumer Configuration or You can also create Spring Boot Kafka Producer and Consumer without configuration, let check out the post Spring Boot Apache Kafka Example. Here we reuse Sender and SenderConfig from the post Spring Boot Kafka Producer Consumer Configuration.
Configuring a Batch Listener
We maybe need to do these steps:
- Enable the Batch Listener property of
ConcurrentKafkaListenerContainerFactory
using the methodsetBatchListener(true)
. - Create a BatchErrorHandler using the method
setBatchErrorHandler(new BatchLoggingErrorHandler())
. You may create your own Batch Error Handler. - Set an upper limit for the batch size by setting the
ConsumerConfig.MAX_POLL_RECORDS_CONFIG
to a value that you need. Here we are setting that value to 4.
You can have a look in the full configuration of ReceiverConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
@EnableKafka @Configuration public class ReceiverConfig { @Value("${spring.kafka.servers}") private String bootstrapServers; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4"); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); factory.getContainerProperties(); factory.setBatchErrorHandler(new BatchLoggingErrorHandler()); return factory; } } |
Receive Kafka Messages using a Batch Listener
We are receiving batch messages so the method receive()
must accept a list of messages, and certainly, this method can accept a list of headers and a list of offsets. Let’s have a look Receiver
below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
@Service public class Receiver { private static final Logger LOG = LoggerFactory.getLogger(Receiver.class); @KafkaListener(id = "batch-listener", topics = "${app.topic.batch}") public void receive(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { LOG.info("starting to consume batch messages"); // Sender that have sent a total of 9 messages, then it will be consumed by three batches for (int i = 0; i < messages.size(); i++) { LOG.info("received message='{}' with partition-offset='{}'", messages.get(i), partitions.get(i) + "-" + offsets.get(i)); } LOG.info("all messages consumed"); } } |
Spring Boot Application
To demo our Spring Boot Kafka Batch Listener Example then we create a simple Spring Boot Application below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
@SpringBootApplication public class MySpringKafkaApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(MySpringKafkaApplication.class, args); } @Autowired private Sender sender; @Override public void run(String... strings) throws Exception { //send all 9 messages for (int i = 0; i < 9; i++){ sender.send("message-" + i); } } } |
Running Spring Boot Application
Running Spring Boot Application normally and check out the output. Just remind that Sender
that have sent a total of 9
messages, then Receiver
will be consumed by three batches. You will see that in the output below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.2.2.RELEASE) 2020-02-11 14:59:45.573 INFO 23204 --- [ main] c.j.kafka.MySpringKafkaApplication : Starting MySpringKafkaApplication on DESKTOP-BP6EDPD with PID 23204 2020-02-11 14:59:45.575 DEBUG 23204 --- [ main] c.j.kafka.MySpringKafkaApplication : Running with Spring Boot v2.2.2.RELEASE, Spring v5.2.2.RELEASE 2020-02-11 14:59:45.576 INFO 23204 --- [ main] c.j.kafka.MySpringKafkaApplication : No active profile set, falling back to default profiles: default 2020-02-11 14:59:46.649 INFO 23204 --- [ main] c.j.kafka.MySpringKafkaApplication : Started MySpringKafkaApplication in 1.501 seconds (JVM running for 5.504) 2020-02-11 14:59:46.650 INFO 23204 --- [ main] com.javabycode.kafka.producer.Sender : sending message='message-0' to topic='batch.t' 2020-02-11 14:59:46.891 INFO 23204 --- [ main] com.javabycode.kafka.producer.Sender : sending message='message-1' to topic='batch.t' 2020-02-11 14:59:46.891 INFO 23204 --- [ main] com.javabycode.kafka.producer.Sender : sending message='message-2' to topic='batch.t' 2020-02-11 14:59:46.892 INFO 23204 --- [ main] com.javabycode.kafka.producer.Sender : sending message='message-3' to topic='batch.t' 2020-02-11 14:59:46.892 INFO 23204 --- [ main] com.javabycode.kafka.producer.Sender : sending message='message-4' to topic='batch.t' 2020-02-11 14:59:46.892 INFO 23204 --- [ main] com.javabycode.kafka.producer.Sender : sending message='message-5' to topic='batch.t' 2020-02-11 14:59:46.893 INFO 23204 --- [ main] com.javabycode.kafka.producer.Sender : sending message='message-6' to topic='batch.t' 2020-02-11 14:59:46.893 INFO 23204 --- [ main] com.javabycode.kafka.producer.Sender : sending message='message-7' to topic='batch.t' 2020-02-11 14:59:46.894 INFO 23204 --- [ main] com.javabycode.kafka.producer.Sender : sending message='message-8' to topic='batch.t' 2020-02-11 14:59:47.022 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : starting to consume batch messages 2020-02-11 14:59:47.023 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : received message='message-0' with partition-offset='0-20' 2020-02-11 14:59:47.023 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : received message='message-1' with partition-offset='0-21' 2020-02-11 14:59:47.023 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : received message='message-2' with partition-offset='0-22' 2020-02-11 14:59:47.023 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : received message='message-3' with partition-offset='0-23' 2020-02-11 14:59:47.023 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : all messages consumed 2020-02-11 14:59:47.029 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : starting to consume batch messages 2020-02-11 14:59:47.029 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : received message='message-4' with partition-offset='0-24' 2020-02-11 14:59:47.029 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : received message='message-5' with partition-offset='0-25' 2020-02-11 14:59:47.029 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : received message='message-6' with partition-offset='0-26' 2020-02-11 14:59:47.029 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : received message='message-7' with partition-offset='0-27' 2020-02-11 14:59:47.029 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : all messages consumed 2020-02-11 14:59:47.036 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : starting to consume batch messages 2020-02-11 14:59:47.036 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : received message='message-8' with partition-offset='0-28' 2020-02-11 14:59:47.036 INFO 23204 --- [-listener-0-C-1] com.javabycode.kafka.consumer.Receiver : all messages consumed |
That’s all about Spring Boot Kafka Batch Listener Example. Now you can try to do your own practices and don’t forget to download the complete source code of Spring Boot Kafka Batch Listener Example below.
Download the complete source code
spring-kafka-batchlistener-example.zip (379 downloads)References
Spring Boot Kafka Producer Consumer Configuration
Spring Boot Apache Kafka Example
Spring Kafka Client Compatability