The Spring Boot Apache Kafka Example introduces you to the way Spring Boot will auto-configure a Spring Kafka application based on the jar dependencies using default values. But you make sure the spring-kafka-xxx.jar
is on the project classpath. Let’s dig deeper.
Spring Boot Apache Kafka Example
Download and Install Apache Kafka
If you work on Windows platform, please read the post to know the way to download and install Apache Kafka. If you work on other platforms please read the official documentation here. So far, Apache Kafka is started on your local machine.
Project Setup
Spring Kafka
: 2.3.4.RELEASESpring Boot
: 2.2.2.RELEASEApache Kafka
: kafka_2.13-2.4.0Maven
: 3.5
Project Structure
Maven Dependencies
At least, we must have these dependencies: spring-boot-starter, spring-kafka
in pom.xml file. Note, you can change spring-kafka version for your own needs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
<properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-kafka.version>2.3.4.RELEASE</spring-kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> </dependencies> |
Configure Kafka Application with application.yml
Spring Boot takes care of most of the configuration. However, some properties that need to be explicitly set in the application.yml file:
- The
kafka.consumer.auto-offset-reset:earliest
by default, it will start reading from the beginning of the topic and stream all of the existing. - The
kafka.consumer.group-id: employee
you should always configuregroup.id
unless you are using the simple assignment API and you don’t need to store offsets in Kafka.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
spring: kafka: consumer: group-id: employee auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer app: topic: employee: employee.t |
Sending Kafka Messages with Spring Boot
Spring Boot automatically configures and initializes a KafkaTemplate
based on the properties configured in the application.yml
property file.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Service public class MySender { private static final Logger LOG = LoggerFactory.getLogger(MySender.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${app.topic.employee}") private String topic; public void send(String message){ LOG.info("sending message='{}' to topic='{}'", message, topic); kafkaTemplate.send(topic, message); } } |
Receiving Kafka Messages with Spring Boot
By annotating a method with @KafkaListener
annotation Spring Kafka will automatically create a message listener container. Annotation that marks a method to be the target of a Kafka message listener on the specified topics.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@Service public class MyReceiver { private static final Logger LOG = LoggerFactory.getLogger(MyReceiver.class); @KafkaListener(topics = "${app.topic.employee}") public void receive(@Payload String message, @Headers MessageHeaders headers) { LOG.info("received message='{}'", message); headers.keySet().forEach(key -> LOG.info("{}: {}", key, headers.get(key))); } } |
Spring Boot Application
Now, we create a Spring Boot application to demonstrate the application like below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@SpringBootApplication public class KafkaSpringBootApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(KafkaSpringBootApplication.class, args); } @Autowired private MySender mySender; @Override public void run(String... strings) throws Exception { mySender.send("Spring Kafka and Spring Boot Example"); } } |
Running the Application
Before we make a demo for the Spring Boot Apache Kafka example, Kafka server must start on localhost with port 9092 which is the default configuration of Kafka. Then we run Spring boot application normally and see the output.
1 2 3 4 5 6 7 8 9 10 11 |
2020-02-06 10:25:03.290 INFO 19624 --- [ main] c.j.kafka.KafkaSpringBootApplication : Started KafkaSpringBootApplication in 19.066 seconds (JVM running for 29.925) 2020-02-06 10:25:03.292 INFO 19624 --- [ main] com.javabycode.kafka.MySender : sending message='Spring Kafka and Spring Boot Example' to topic='employee.t' 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : received message='Spring Kafka and Spring Boot Example' 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_offset: 3 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7814863e 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_timestampType: CREATE_TIME 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_receivedMessageKey: null 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_receivedPartitionId: 0 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_receivedTopic: employee.t 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_receivedTimestamp: 1580959503813 2020-02-06 10:25:35.936 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_groupId: employee |
That’s all about Spring Boot Apache Kafka Example. You should download the complete source code and run on your local machine to get more deeply the way Kafka works.
Download complete source code
spring-kafka-spring-boot-example.zip (209 downloads)References
Apache Kafka Official Website
Spring Kafka Client Compatability
Spring Kafka Documentation
Spring Boot Tutorial for Beginners