目录
- 一、添加依赖
- 二、SpringBoot 生产者
- 三、SpringBoot 消费者
一、添加依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies>
二、SpringBoot 生产者
1、修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息
spring.application.name=atguigu_springboot_kafka
#指定kafka集群地址
spring.kafka.bootstrap-servers=192.168.239.11:9092,192.168.239.12:9092#key value序列化
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
2、创建 controller 从浏览器接收数据, 并写入指定的 topic
@RestController
public class ProducerController {@AutowiredKafkaTemplate<String, String> kafka;@RequestMapping("/atguigu")public String data(String msg){// 通过kafka发送出去kafka.send("first", msg);return "ok";}
}
三、SpringBoot 消费者
1、修改 SpringBoot 核心配置文件 application.propeti
#指定kafka集群地址
spring.kafka.bootstrap-servers=192.168.239.11:9092,192.168.239.12:9092#key value序列化
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer# key value 反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer#配置消费者组
spring.kafka.consumer.group-id=atguigu
2、创建类消费 Kafka 中指定 topic 的数据
@Configuration
public class KafkaConsumer {@KafkaListener(topics = "first")public void consumerTopic(String msg){System.out.println("收到消息:" + msg);}
}