SpringBoot Kafka发送消息与接收消息实例

前言 

Kafka的基本工作原理 

 我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(代理),这样就可以大致描绘出这样一个场面:

生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。 

 1.引入spring-kafka的jar包

在pom.xml里面导入spring-kafka包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>SpringBootKafka</artifactId><version>0.0.1-SNAPSHOT</version><name>SpringBootKafka</name><description>SpringBootKafka</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- pom.xml --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency></dependencies><repositories><repository><id>central</id><name>aliyun maven</name><url>https://maven.aliyun.com/repository/public/</url><layout>default</layout><!-- 是否开启发布版构件下载 --><releases><enabled>true</enabled></releases><!-- 是否开启快照版构件下载 --><snapshots><enabled>false</enabled></snapshots></repository></repositories><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

2.编写配置文件

在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者 

spring:kafka:bootstrap-servers: 192.168.110.105:9092#streams:#application-id: my-streams-appconsumer:group-id: myGroupIdauto-offset-reset: latestenable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 5

3.编写生产者

使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果

package com.example.springbootkafka.service;import com.example.springbootkafka.entity.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;@Slf4j
@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;private final ObjectMapper objectMapper;@Autowiredpublic KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {this.kafkaTemplate = kafkaTemplate;this.objectMapper = objectMapper;}public void sendMessage(String message) {log.info("KafkaProducer message:{}", message);//kafkaTemplate.send("test", message).addCallback();Future<SendResult<String, String>> future = kafkaTemplate.send("test", message);CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});// 使用whenComplete方法completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});/*future.whenComplete((result, ex) -> {if (ex == null) {// 成功发送RecordMetadata metadata = result.getRecordMetadata();System.out.println("Message sent successfully with offset: " + metadata.offset());} else {// 发送失败System.err.println("Failed to send message due to: " + ex.getMessage());}});*/}public void sendUser(User user) throws JsonProcessingException {//final ProducerRecord<String, String> record = createRecord(data);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", message);//ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", user);String userJson = objectMapper.writeValueAsString(user);ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", userJson);/*future.addCallback(success -> System.out.println("Message sent successfully: " + userJson),failure -> System.err.println("Failed to send message: " + failure.getMessage()));*/CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {future.get(); // 等待原始future完成} catch (Exception e) {throw new RuntimeException(e);}});completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.out.println("Error occurred: " + ex.getMessage());// 成功发送} else {System.out.println("Completed successfully");}});}
}

 4.编写消费者

通过org.springframework.kafka.annotation.KafkaListener来监听消息

package com.example.springbootkafka.service;import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class KafkaConsumer {@KafkaListener(topics = "test", groupId = "myGroupId")public void consume(String message) {System.out.println("Received message: " + message);log.info("KafkaConsumer message:{}", message);}
}

5.测试消息的生成与发送

package com.example.springbootkafka.controller;import com.example.springbootkafka.entity.User;
import com.example.springbootkafka.service.KafkaProducer;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
public class MessageController {private final KafkaProducer producer;@Autowiredpublic MessageController(KafkaProducer producer) {this.producer = producer;}@GetMapping("/send-message")public String sendMessage() {log.info("MessageController sendMessage start!");producer.sendMessage("hello, Kafka!");log.info("MessageController sendMessage end!");return "Message sent successfully.";}@GetMapping("/send")public String sendMessage1() {log.info("MessageController sendMessage1 start!");User user = User.builder().name("xuxin").dept("IT/DevlopMent").build();try {producer.sendUser(user);} catch (JsonProcessingException e) {throw new RuntimeException(e);}log.info("MessageController sendMessage1 end!");return "Message sendMessage1 successfully.";}
}

 6.查看结果:

 

详细代码见https://gitee.com/dylan_2017/springboot-kafka.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/879801.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT打开摄像头采集

QT打开摄像头采集 今天好不容易把opencv的环境装好&#xff0c;然后想学习一下人脸识别的功能&#xff0c;但是在图书馆坐了4个多小时了&#xff0c;屁股疼就先写个摄像头采集的功能&#xff0c;明天继续学习吧&#xff0c;废话不多&#xff0c;嚼个奶片开始发车&#xff01;&…

JVM java主流的追踪式垃圾收集器

目录 前言 分代垃圾收集理论 标记清除算法 标记复制算法 标记整理法 前言 从对象消亡的角度出发, 垃圾回收器可以分为引用计数式垃圾收集和追踪式垃圾收集两大类, 但是java主流的一般是追踪式的垃圾收集器, 因此我们重点讲解. 分代垃圾收集理论 分代收集这种理…

Linux Vim编辑器常用命令

目录 一、命令模式快捷键 二、编辑/输入模式快捷键 三、编辑模式切换到命令模式 四、搜索命令 注&#xff1a;本章内容全部基于Centos7进行操作&#xff0c;查阅本章节内容前请确保您当前所在的Linux系统版本&#xff0c;且具有足够的权限执行操作。 一、命令模式快捷键 二…

企业专用智能云盘 | 帮助企业便捷管控企业文档 | 天锐绿盘云文档安全管理系统

由于当前多数企业内部的办公文件普遍散落于各员工电脑中&#xff0c;导致存在诸多潜在的文档使用风险。为优化团队协作效率&#xff0c;天 锐 绿盘是一款集文档统一管理、高效协同于一体的企业云盘&#xff0c;帮助企业解决文档管理中的诸多难题。 【地址&#xff1a;点击了解天…

【2023工业异常检测文献】SimpleNet

SimpleNet:ASimpleNetworkforImageAnomalyDetectionandLocalization 1、Background 图像异常检测和定位主要任务是识别并定位图像中异常区域。 工业异常检测最大的难题在于异常样本少&#xff0c;一般采用无监督方法&#xff0c;在训练过程中只使用正常样本。 解决工业异常检…

TCP客户端编码和解码处理:发送和接收指定编码消息

文章目录 引言基于Netty实现TCP客户端Netty发送GBK编码指令Netty接收GBK编码基于Channel发送指令基于ChannelHandlerContext发送指令:建立连接时发送登陆指令开启日志,查看报文信息基于ChannelInboundHandlerAdapter进行业务逻辑处理原生API实现TCP客户端基于DataOutputStrea…

AI预测福彩3D采取888=3策略+和值012路或胆码测试9月19日新模型预测第92弹

经过90多期的测试&#xff0c;当然有很多彩友也一直在观察我每天发的预测结果&#xff0c;得到了一个非常有价值的信息&#xff0c;那就是9码定位的命中率非常高&#xff0c;90多期一共只错了10次&#xff0c;这给喜欢打私房菜的朋友提供了极高价值的预测结果~当然了&#xff0…

C语言中的assert断言

Assert断言 断言是程序中处理异常的一种高级形式。可以在任何时候启用和禁用断言验证&#xff0c;因此可以在测试时启用断言&#xff0c;而在部署时禁用断言。同样&#xff0c;程序投入运行后&#xff0c;最终用户在遇到问题时可以重新启用断言。 用法&#xff1a; #…

23.面试题02.07链表相交

public class Solution {public ListNode getIntersectionNode(ListNode headA, ListNode headB) {ListNode apheadA;ListNode bpheadB;int lenA0,lenB0;//求两个链表长度while(ap!null){apap.next;lenA;}while(bp!null){bpbp.next;lenB;}apheadA;bpheadB;int len0;//用来计算让…

Msf之Python分离免杀

Msf之Python分离免杀 ——XyLin. 成果展示&#xff1a; VT查杀率:8/73 (virustotal.com) 火绒和360可以过掉&#xff0c;但Windows Defender点开就寄掉了 提示&#xff1a;我用360测的时候&#xff0c;免杀过了&#xff0c;但360同时也申报了&#xff0c;估计要不了多久就寄…

【C++ Primer Plus习题】6.9

大家好,这里是国中之林! ❥前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。有兴趣的可以点点进去看看← 问题: 解答: #include <iostream> #include <string> #include <…

2-99 基于matlab多尺度形态学提取眼前节组织

基于matlab多尺度形态学提取眼前节组织&#xff0c;通过应用不同尺度的结构元素进行边缘检测&#xff0c;再通过加权融合的思想来整合检测到的边缘&#xff0c;降低图像噪声的影响&#xff0c;提高边缘检测的精度。程序已调通&#xff0c;可直接运行。 下载源程序请点链接&…

OpenAI o1解决了「Quiet-STaR」的挑战吗?

随着OpenAI o1近期的发布&#xff0c;业界讨论o1关联论文最多之一可能是早前这篇斯坦福大学和Notbad AI Inc的研究人员开发的Quiet-STaR&#xff0c;即让AI学会先安静的“思考”再“说话” &#xff0c;回想自己一年前对于这一领域的思考和探索&#xff0c;当初也将这篇论文进行…

Electron 图标修改

目录 1. 图片基本要求 2. 在main.js中配置icon 位置 ​3. 在package.json 中配置icon 位置 4. 问题&#xff1a;左上角图片 开发环境下显示&#xff0c;生产环境下不显示 1. 图片基本要求 图片格式为ico&#xff0c;图片像素像素为256*256&#xff1b; 将ico文件放在pub…

C++编译环境(IDE)推荐及安装

IDE是什么 嗨嗨嗨&#xff0c;我又来水博文了 今天来给大家推荐几款好用的IDE IDE是集成开发环境&#xff08;Integrated Development Environment&#xff09;的缩写&#xff0c;是一种软件应用程序&#xff0c;提供了用于软件开发的各种工具和功能&#xff0c;包括代码编辑…

人工智能安全治理新篇章:《2024人工智能安全治理框架1.0版》深度解读@附20页PDF文件下载

在数字化浪潮席卷全球的今天&#xff0c;人工智能&#xff08;AI&#xff09;技术正以前所未有的速度融入我们的日常生活&#xff0c;从智能助手到自动驾驶&#xff0c;从医疗诊断到金融风控&#xff0c;AI的身影无处不在。然而&#xff0c;技术的双刃剑特性也让我们不得不面对…

螺栓与散装物体检测系统源码分享

螺栓与散装物体检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Comput…

数据结构-3.3.栈的链式存储实现

一.链栈的定义&#xff1a; 二.总结&#xff1a;

Solidwork角度尺寸标注

效果如下&#xff1a; 首先&#xff0c; 先选第一条边 然后选第二条边&#xff0c;即可

18.DHT11编程案例

温湿度传感器 产品概述 DHT11数字温湿度传感器是一款含有已校准数字信号输出的温湿度复合传感器&#xff0c;应用领域&#xff1a;暖通 空调&#xff1b;汽车&#xff1b;消费品&#xff1b;气象站&#xff1b;湿度调节器&#xff1b;除湿器&#xff1b;家电&#xff1b;医疗…