一丶IDEA创建一个空项目
二丶添加相关依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.13</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.28</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.12</version></dependency></dependencies>
三丶编写简单生产者
/*** 简单的生产者消费者* @param message*/@GetMapping("/kafka/normal/message")public void sendNormalMessage(@RequestParam("message") String message) {log.info("======================="+message);kafkaTemplate.send("sb_topic", 0, System.currentTimeMillis(), "key1", message);}
四丶编写简单消费者
@Component
public class KafkaConsumer {//监听消费//@KafkaListener(topics = {"sb_topic"})@KafkaListener(topics = {"sb_topic","callbackOne_topic"}, groupId = "testGroup")public void onNormalMessage(ConsumerRecords<String,Object> records) {for (ConsumerRecord<String, Object> record : records) {System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());}}}
这里有个坑,ConsumerRecord如果不加s会报错,我之间在借鉴他人代码的时候出现的,不知道是不是版本问题。我也刚用kafka,正在研究哈哈,见谅见谅;
postman请求:
成功:
结尾:目前只是一个简单的demo,后续我在完善,我也正在学习这玩意儿,哈哈,喜欢的朋友点个赞收藏吧;