1.新建项目
用的idea是20222.1.3版本,没有Spring Initializr 插件,不能直接创建springboot项目
可以在以下网址创建项目,下载后解压,然后用idea打开项目即可
1.1 在 https://start.spring.io/ 上创建项目
1.2上传到linux,解压
unzip Spring_Boot-kafka.zip
1.3 idea打开项目(文件-打开- Spring-Boot-kafka)
2.添加依赖(pox.xml)
(也可以选择修改对应的jdk版本,创建项目时勾选的是22)
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
3.springboot生产者集成Kafka
3.1生产者api
新建controller包ProducerController类
package com.lir.Spring.Boot_kafka.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;//导入注解@RestControllerpublic class ProducerController {//自动注入内容到kafka模板@AutowiredKafkaTemplate<String,String> kafka;//导入注解,使外部能访问方法@RequestMapping("/ljr")//接收外部数据public String data(String msg){//通过kafka发送出去kafka.send("customers",msg);return "发送成功";}}
3.2修改application.properties文件
#连接kafka集群spring.kafka.bootstrap-servers=node1:9092,node2:9092# key value 序列化spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
3.3运行SpringBootKafkaApplication
SpringBootKafkaApplication这个程序在创建项目时自动生成
打开网页输入地址node1:8080/ljr?msg="lisiguang" /ljr?msg均是ProducerController指定的
输入后网页返回发送成功
Kafka consumer 消费到信息
可见spring boot生成者集成Kafka成功
4.spring boot消费者集成Kafka
4.1消费者api
package com.lir.Spring.Boot_kafka.controller;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.KafkaListener;//指定这个类为configuration类,以让KafkaListener能扫描到生效@Configurationpublic class MyConsumer {//监听主题@KafkaListener(topics = "customers")public void consumerTopic(String msg){System.out.println("接收信息:" + msg);}}
4.2修改application.properties文件
# key value 反序列化spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 消费者组idspring.kafka.consumer.group-id=sbk
4.3运行SpringBootKafkaApplication
打开网页输入地址node1:8080/ljr?msg="lisi"
kafka生产信息
控制台有输出,spring boot消费者集成Kafka成功。