Simple Producer-Consumer using SpringBoot and Kafka
A quick tutorial of implementing 2 dedicated micro-services
Architecture
Requirements                                                                                                                                                                                                                                                                           
1.     Kafka set-up:   /Downloads/kafka_2.12-2.3.0/bin
a.     $ ./zookeeper-server-start.sh ../config/zookeeper.properties
b.     $ ./kafka-server-start.sh ../config/server.properties
c.     $ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
                                               i.     $ kafka-console-producer --broker-list localhost:9092 --topic test
>>HELLO Kafka  <hit enter>
                                             ii.     $ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
>>HELLO Kafka
2.     2 SpringBoot projects –
a.     kafkaProducer
b.     kafkaConsumer
Functionality
Producer
Define the producer properties -
server.port:8087
spring.kafka.producer.bootstrap-servers: localhost:9092
Define the producer application -
@SpringBootApplication
@ComponentScan(basePackages={"com.kafka"})
public class KafkaProducerApplication {
       public static void main(String[] args) {
              SpringApplication.run(KafkaProducerApplication.class, args);
       }
}
Define the producer controller -
@RestController
public class ProducerController {
       @Autowired
       ProducerService producerService;
       @GetMapping("/kafkaProducerStatus")
       public String status() {
              return "kafkaProducerStatus Up and Running on :: " + new Date();
       }
       @GetMapping("/sendMessage")
       public String trigger(@RequestParam("message") String message) {
              producerService.sendMessage(message);
              return "Message sent to topic - KAFKA_TEST";
       }
}
Define the producer service -
@Service
public class ProducerService {
       String kafkaTopic = "KAFKA_TEST";
       @Autowired
       private KafkaTemplate<String, String> kafkaTemplate;
       public void sendMessage(String message) {
              kafkaTemplate.send(kafkaTopic, message);
       }
}
Consumer
Define the consumer service -
server.port:8088
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
Define the consumer application -
@SpringBootApplication
@ComponentScan(basePackages={"com.kafka"})
public class KafkaConsumerApplication {
       public static void main(String[] args) {
              SpringApplication.run(KafkaConsumerApplication.class, args);
       }
}
Define the consumer controller -
@RestController
public class ConsumerController {
       @Autowired
       ConsumerService consumerService;
       @GetMapping("/kafkaConsumerStatus")
       public String status() {
              return "kafkaConsumerStatus Up and Running on :: " + new Date();
       }
}
Define the consumer service -
@Service
public class ConsumerService {
       @KafkaListener(topics = "KAFKA_TEST")
       public void listen(@Payload String message) {
              System.out.println("Message received :::: " + message);
       }
}
Execution
2020-04-06 01:06:54.627  INFO 44146 --- [nio-8087-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-04-06 01:06:54.627  INFO 44146 --- [nio-8087-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-04-06 01:06:54.627  INFO 44146 --- [nio-8087-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1586115414627
2020-04-06 01:06:54.635  INFO 44146 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: ztj7U2TWQ8ScSXlQCiF3ww
kafkaProducerStatus Up and Running on :: Mon Apr 06 01:06:51 IST 2020
Message sent to topic - KAFKA_TEST
2020-04-06 01:07:55.205  INFO 44581 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-04-06 01:07:55.205  INFO 44581 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-04-06 01:07:55.205  INFO 44581 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1586115475205
2020-04-06 01:07:55.206  INFO 44581 --- [  restartedMain] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-1, groupId=group-id] Subscribed to topic(s): KAFKA_TEST
2020-04-06 01:07:55.207  INFO 44581 --- [  restartedMain] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-04-06 01:07:55.226  INFO 44581 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-1, groupId=group-id] Cluster ID: ztj7U2TWQ8ScSXlQCiF3ww
2020-04-06 01:07:55.227  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] Discovered group coordinator 192.168.0.108:9092 (id: 2147483647 rack: null)
2020-04-06 01:07:55.229  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] Revoking previously assigned partitions []
2020-04-06 01:07:55.230  INFO 44581 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-id: partitions revoked: []
2020-04-06 01:07:55.230  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] (Re-)joining group
2020-04-06 01:07:55.236  INFO 44581 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8088 (http) with context path ''
2020-04-06 01:07:55.238  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] (Re-)joining group
2020-04-06 01:07:55.241  INFO 44581 --- [  restartedMain] c.k.c.k.KafkaConsumerApplication         : Started KafkaConsumerApplication in 2.762 seconds (JVM running for 3.174)
2020-04-06 01:07:55.249  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] Successfully joined group with generation 69
2020-04-06 01:07:55.253  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] Setting newly assigned partitions: KAFKA_TEST-0
2020-04-06 01:07:55.263  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=group-id] Found no committed offset for partition KAFKA_TEST-0
2020-04-06 01:07:55.277  INFO 44581 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-1, groupId=group-id] Resetting offset for partition KAFKA_TEST-0 to offset 0.
2020-04-06 01:07:55.304  INFO 44581 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : group-id: partitions assigned: [KAFKA_TEST-0]
Message received :::: hello
kafkaConsumerStatus Up and Running on :: Mon Apr 06 01:08:12 IST 2020
