SpringBoot整合Kafka(二)

Posted by Kaka Blog on March 28, 2019

环境准备

  • 启动zk,kafka_1.0.1
  • 创建一个Topic

具体实现

1、添加pom依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2、修改配置文件

spring.kafka.bootstrap-servers=192.168.241.140:9092,192.168.241.141:9092,192.168.241.142:9092
spring.kafka.consumer.group-id=test-consumer-group
  • consumer.group-id必须配置,可以看kafka的consumer.propertis

3、新建生产者类

@RestController
public class KafkaProducerController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("send")
    public String send(String msg) {
        kafkaTemplate.send("topic_1", msg);
        return "success";
    }
}

4、新建消费者类

@Component
public class KafkaConsumerController {
    @KafkaListener(topics = "topic_1")
    public void listen(ConsumerRecord<?, ?> record) throws Exception {
        System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }
}

测试

运行项目, 在浏览器输入:http://localhost:8080/send?msg=hello22,正常可以看到控制台输出:

topic = topic_1, offset = 3, value = hellofang 
topic = topic_1, offset = 4, value = hello22 

FAQ

  • 启动提示Marking the coordinator localhost:9092:在hosts文件添加主机名IP映射。

参考