Simple Producer-Consumer using SpringBoot and Kafka

Simple Producer-Consumer using SpringBoot and Kafka

A quick tutorial of implementing 2 dedicated micro-services




Architecture



Requirements                                                                                                                                                                                                                                                                           
1.     Kafka set-up:   /Downloads/kafka_2.12-2.3.0/bin
a.     $ ./zookeeper-server-start.sh ../config/zookeeper.properties
b.     $ ./kafka-server-start.sh ../config/server.properties
c.     $ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
                                               i.     $ kafka-console-producer --broker-list localhost:9092 --topic test
>>HELLO Kafka  <hit enter>
                                             ii.     $ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
>>HELLO Kafka

2.     2 SpringBoot projects –
a.     kafkaProducer
                                               i.     https://github.com/namitsharma99/kafkaProducer.git      
b.     kafkaConsumer
                                               i.     https://github.com/namitsharma99/kafkaConsumer.git     



Functionality

Producer


Define the producer properties -

server.port:8087
spring.kafka.producer.bootstrap-servers: localhost:9092

Define the producer application -

@SpringBootApplication
@ComponentScan(basePackages={"com.kafka"})
public class KafkaProducerApplication {

       public static void main(String[] args) {
              SpringApplication.run(KafkaProducerApplication.class, args);
       }

}


Define the producer controller -

@RestController
public class ProducerController {

       @Autowired
       ProducerService producerService;

       @GetMapping("/kafkaProducerStatus")
       public String status() {
              return "kafkaProducerStatus Up and Running on :: " + new Date();
       }

       @GetMapping("/sendMessage")
       public String trigger(@RequestParam("message") String message) {
              producerService.sendMessage(message);
              return "Message sent to topic - KAFKA_TEST";
       }

}

Define the producer service -

@Service
public class ProducerService {

       String kafkaTopic = "KAFKA_TEST";

       @Autowired
       private KafkaTemplate<String, String> kafkaTemplate;

       public void sendMessage(String message) {
              kafkaTemplate.send(kafkaTopic, message);
       }

}




Consumer


Define the consumer service -

server.port:8088

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest


Define the consumer application -

@SpringBootApplication
@ComponentScan(basePackages={"com.kafka"})
public class KafkaConsumerApplication {

       public static void main(String[] args) {
              SpringApplication.run(KafkaConsumerApplication.class, args);
       }

}

Define the consumer controller -

@RestController
public class ConsumerController {

       @Autowired
       ConsumerService consumerService;

       @GetMapping("/kafkaConsumerStatus")
       public String status() {
              return "kafkaConsumerStatus Up and Running on :: " + new Date();
       }

}

Define the consumer service -

@Service
public class ConsumerService {

       @KafkaListener(topics = "KAFKA_TEST")
       public void listen(@Payload String message) {
              System.out.println("Message received :::: " + message);
       }

}





Execution

2020-04-06 01:06:54.627  INFO 44146 --- [nio-8087-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-04-06 01:06:54.627  INFO 44146 --- [nio-8087-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-04-06 01:06:54.627  INFO 44146 --- [nio-8087-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1586115414627
2020-04-06 01:06:54.635  INFO 44146 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: ztj7U2TWQ8ScSXlQCiF3ww
kafkaProducerStatus Up and Running on :: Mon Apr 06 01:06:51 IST 2020
Message sent to topic - KAFKA_TEST

2020-04-06 01:07:55.205  INFO 44581 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-04-06 01:07:55.205  INFO 44581 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-04-06 01:07:55.205  INFO 44581 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1586115475205
2020-04-06 01:07:55.206  INFO 44581 --- [  restartedMain] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-1, groupId=group-id] Subscribed to topic(s): KAFKA_TEST
2020-04-06 01:07:55.207  INFO 44581 --- [  restartedMain] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-04-06 01:07:55.226  INFO 44581 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-1, groupId=group-id] Cluster ID: ztj7U2TWQ8ScSXlQCiF3ww
2020-04-06 01:07:55.227  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] Discovered group coordinator 192.168.0.108:9092 (id: 2147483647 rack: null)
2020-04-06 01:07:55.229  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] Revoking previously assigned partitions []
2020-04-06 01:07:55.230  INFO 44581 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-id: partitions revoked: []
2020-04-06 01:07:55.230  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] (Re-)joining group
2020-04-06 01:07:55.236  INFO 44581 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8088 (http) with context path ''
2020-04-06 01:07:55.238  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] (Re-)joining group
2020-04-06 01:07:55.241  INFO 44581 --- [  restartedMain] c.k.c.k.KafkaConsumerApplication         : Started KafkaConsumerApplication in 2.762 seconds (JVM running for 3.174)
2020-04-06 01:07:55.249  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] Successfully joined group with generation 69
2020-04-06 01:07:55.253  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] Setting newly assigned partitions: KAFKA_TEST-0
2020-04-06 01:07:55.263  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] Found no committed offset for partition KAFKA_TEST-0
2020-04-06 01:07:55.277  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-1, groupId=group-id] Resetting offset for partition KAFKA_TEST-0 to offset 0.
2020-04-06 01:07:55.304  INFO 44581 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-id: partitions assigned: [KAFKA_TEST-0]
Message received :::: hello

kafkaConsumerStatus Up and Running on :: Mon Apr 06 01:08:12 IST 2020


Featured post

Oracle SQL Scheduled Jobs - An Interesting Approach

  Oracle SQL Scheduled Jobs A DB Scheduler is the best way to automate any backend database job. For instance, if you want to process the p...