Spring Kafka SendTo Example introduces you to the way Spring Kafka forwards message using a @SendTo
annotation . Starting with version 2.0, if you also annotate a @KafkaListener
with a @SendTo
annotation. Let’s dig deeer.
Spring Kafka SendTo Example
Project Setup
Spring Kafka
: 2.3.4.RELEASESpring Boot
: 2.2.2.RELEASEApache Kafka
: kafka_2.13-2.4.0Maven
: 3.5
Project Structure
Maven Dependencies
At least, we must have these dependencies: spring-boot-starter, spring-kafka
in pom.xml file. Note, you can change spring-kafka
version for your own needs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
<properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-kafka.version>2.3.4.RELEASE</spring-kafka.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> </dependencies> |
Configure Kafka Application with application.yml
Spring Boot takes care of most of the configuration. However, some properties that need to be explicitly set in the application.yml file:
- The
kafka.consumer.auto-offset-reset:earliest
by default, it will start reading from the beginning of the topic and stream all of the existing. - The
kafka.consumer.group-id: first
you should always configuregroup.id
unless you are using the simple assignment API and you don’t need to store offsets in Kafka.
1 2 3 4 5 6 7 8 9 10 |
spring: kafka: consumer: group-id: first auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.DoubleDeserializer value-deserializer: org.apache.kafka.common.serialization.DoubleDeserializer producer: key-serializer: org.apache.kafka.common.serialization.DoubleSerializer value-serializer: org.apache.kafka.common.serialization.DoubleSerializer |
Sending Kafka Messages with Spring Boot
Spring Boot automatically configures and initializes a KafkaTemplate
based on the properties configured in the application.yml
property file. We are sending a number to the topic first.t
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@Service public class Sender { private static final Logger LOG = LoggerFactory.getLogger(Sender.class); @Autowired private KafkaTemplate<String, Double> kafkaTemplate; public void send(Double number){ LOG.info("sending number='{}' to topic='{}'", number, "first.t"); kafkaTemplate.send("first.t", number); } } |
Forwarding Listener Results using @SendTo
Starting with version 2.0, if you also annotate a @KafkaListener
with a @SendTo
annotation and the method invocation returns a result, the result will be forwared to the topic specified by the @SendTo
.
The @SendTo
value can have several forms:
@SendTo("someTopic")
routes to the literal topic@SendTo("#{someExpression}")
routes to the topic determined by evaluating the expression once during application context initialization.@SendTo("!{someExpression}")
routes to the topic determined by evaluating the expression at runtime. The#root
object for the evaluation has 3 properties:- request – the inbound
ConsumerRecord
(orConsumerRecords
object for a batch listener)) - source – the
org.springframework.messaging.Message<?>
converted from therequest
. - result – the method return result.
The result of the expression evaluation must be a String
representing the topic name. You will see the @SendTo
example Receiver
below.
Receiving Kafka Messages with Spring Boot
By annotating a method with @KafkaListener
annotation Spring Kafka will automatically create a message listener container. Annotation that marks a method to be the target of a Kafka message listener on the specified topics first.t
.
The calculate
method consumes a number of the first.t
topic then calculate base power of exponent from that number then send to the topic second.t
. Finally, the result
method consumes the result from the second.t
topic. Let have a look at Receiver
below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
@Service public class Receiver { private static final Logger LOG = LoggerFactory.getLogger(Receiver.class); @SendTo("second.t") @KafkaListener(topics = "first.t") public Double calculate(Double number) { LOG.info("calculating base power of exponent from='{}'", number); return Math.pow(number,3.0); // set exponent = 3.0 } @KafkaListener(topics = "second.t") public void result(Double number){ LOG.info("received base power of exponent ='{}'", number); } } |
Spring Boot Application
Now, we create a Spring Boot application to demonstrate the application like below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@SpringBootApplication public class MySpringKafkaApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(MySpringKafkaApplication.class, args); } @Autowired private Sender sender; @Override public void run(String... strings) throws Exception { sender.send(5.0); } } |
Running the Application
Before we make a demo for the Spring Kafka SendTo Example, Kafka server must start on localhost with port 9092 which is the default configuration of Kafka. Then we run Spring boot application normally and see the output.
1 2 3 4 5 6 7 8 9 10 11 |
2020-02-06 10:25:03.290 INFO 19624 --- [ main] c.j.kafka.KafkaSpringBootApplication : Started KafkaSpringBootApplication in 19.066 seconds (JVM running for 29.925) 2020-02-06 10:25:03.292 INFO 19624 --- [ main] com.javabycode.kafka.MySender : sending message='Spring Kafka and Spring Boot Example' to topic='employee.t' 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : received message='Spring Kafka and Spring Boot Example' 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_offset: 3 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7814863e 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_timestampType: CREATE_TIME 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_receivedMessageKey: null 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_receivedPartitionId: 0 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_receivedTopic: employee.t 2020-02-06 10:25:35.935 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_receivedTimestamp: 1580959503813 2020-02-06 10:25:35.936 INFO 19624 --- [ntainer#0-0-C-1] com.javabycode.kafka.MyReceiver : kafka_groupId: employee |
That’s all about Spring Kafka SendTo Example. You should download the complete source code and run on your local machine to get more deeply the way Kafka works.
Download complete source code
spring-kafka-sendto-example.zip (224 downloads)References
Apache Kafka Official Website
Forwarding Listener Results using @SendTo
Spring Kafka Documentation
Spring Boot Tutorial for Beginners