Simple Producer-Consumer using SpringBoot and Kafka
A quick tutorial of implementing 2 dedicated micro-services
Architecture
Requirements
1. Kafka set-up: /Downloads/kafka_2.12-2.3.0/bin
a. $ ./zookeeper-server-start.sh ../config/zookeeper.properties
b. $ ./kafka-server-start.sh ../config/server.properties
c. $ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
i. $ kafka-console-producer --broker-list localhost:9092 --topic test
>>HELLO Kafka <hit enter>
ii. $ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
>>HELLO Kafka
2. 2 SpringBoot projects –
a. kafkaProducer
b. kafkaConsumer
Functionality
Producer
Define the producer properties -
server.port:8087
spring.kafka.producer.bootstrap-servers: localhost:9092
Define the producer application -
@SpringBootApplication
@ComponentScan(basePackages={"com.kafka"})
public class KafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
}
Define the producer controller -
@RestController
public class ProducerController {
@Autowired
ProducerService producerService;
@GetMapping("/kafkaProducerStatus")
public String status() {
return "kafkaProducerStatus Up and Running on :: " + new Date();
}
@GetMapping("/sendMessage")
public String trigger(@RequestParam("message") String message) {
producerService.sendMessage(message);
return "Message sent to topic - KAFKA_TEST";
}
}
Define the producer service -
@Service
public class ProducerService {
String kafkaTopic = "KAFKA_TEST";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send(kafkaTopic, message);
}
}
Consumer
Define the consumer service -
server.port:8088
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
Define the consumer application -
@SpringBootApplication
@ComponentScan(basePackages={"com.kafka"})
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
}
Define the consumer controller -
@RestController
public class ConsumerController {
@Autowired
ConsumerService consumerService;
@GetMapping("/kafkaConsumerStatus")
public String status() {
return "kafkaConsumerStatus Up and Running on :: " + new Date();
}
}
Define the consumer service -
@Service
public class ConsumerService {
@KafkaListener(topics = "KAFKA_TEST")
public void listen(@Payload String message) {
System.out.println("Message received :::: " + message);
}
}
Execution
2020-04-06 01:06:54.627 INFO 44146 --- [nio-8087-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2020-04-06 01:06:54.627 INFO 44146 --- [nio-8087-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2020-04-06 01:06:54.627 INFO 44146 --- [nio-8087-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1586115414627
2020-04-06 01:06:54.635 INFO 44146 --- [ad | producer-2] org.apache.kafka.clients.Metadata : [Producer clientId=producer-2] Cluster ID: ztj7U2TWQ8ScSXlQCiF3ww
kafkaProducerStatus Up and Running on :: Mon Apr 06 01:06:51 IST 2020
Message sent to topic - KAFKA_TEST
2020-04-06 01:07:55.205 INFO 44581 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2020-04-06 01:07:55.205 INFO 44581 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2020-04-06 01:07:55.205 INFO 44581 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1586115475205
2020-04-06 01:07:55.206 INFO 44581 --- [ restartedMain] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-1, groupId=group-id] Subscribed to topic(s): KAFKA_TEST
2020-04-06 01:07:55.207 INFO 44581 --- [ restartedMain] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2020-04-06 01:07:55.226 INFO 44581 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-1, groupId=group-id] Cluster ID: ztj7U2TWQ8ScSXlQCiF3ww
2020-04-06 01:07:55.227 INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=group-id] Discovered group coordinator 192.168.0.108:9092 (id: 2147483647 rack: null)
2020-04-06 01:07:55.229 INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=group-id] Revoking previously assigned partitions []
2020-04-06 01:07:55.230 INFO 44581 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group-id: partitions revoked: []
2020-04-06 01:07:55.230 INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=group-id] (Re-)joining group
2020-04-06 01:07:55.236 INFO 44581 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8088 (http) with context path ''
2020-04-06 01:07:55.238 INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=group-id] (Re-)joining group
2020-04-06 01:07:55.241 INFO 44581 --- [ restartedMain] c.k.c.k.KafkaConsumerApplication : Started KafkaConsumerApplication in 2.762 seconds (JVM running for 3.174)
2020-04-06 01:07:55.249 INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=group-id] Successfully joined group with generation 69
2020-04-06 01:07:55.253 INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=group-id] Setting newly assigned partitions: KAFKA_TEST-0
2020-04-06 01:07:55.263 INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=group-id] Found no committed offset for partition KAFKA_TEST-0
2020-04-06 01:07:55.277 INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-1, groupId=group-id] Resetting offset for partition KAFKA_TEST-0 to offset 0.
2020-04-06 01:07:55.304 INFO 44581 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group-id: partitions assigned: [KAFKA_TEST-0]
Message received :::: hello
kafkaConsumerStatus Up and Running on :: Mon Apr 06 01:08:12 IST 2020