SpringBoot 3.1.7 集成Kafka 3.5.0

一、背景

写这边篇文章的目的,是记录我在集成kafka客户端遇到的一些问题,文章会记录整个接入的过程,其中会遇到几个坑,如果需要最终版本,直接看最后一节就行了,感觉Spring-Kafka的文档太少了,如果采用SpringBoot集成的方式接入,一不小可能就会踩坑

二、操作步骤

1 添加依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2 添加配置文件

spring:profiles:active: devapplication:name: goods-centerkafka:bootstrap-servers: 192.168.31.114:9092producer:acks: alltimeout.ms: 5000# 值序列化:使用Jsonvalue-serializer: org.springframework.kafka.support.serializer.JsonSerializerkey-serializer: org.apache.kafka.common.serialization.LongSerializerenable:idempotence: true # 默认为Trueconsumer:group-id: goods-centervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerkey-deserializer: org.apache.kafka.common.serialization.LongDeserializerenable-auto-commit: false # 取消自动提交

3 生产者代码

package com.ychen.goodscenter.fafka;import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class MessageProducer {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate KafkaTemplate<Long, SubmitOrderReq> kafkaTemplate;public void sendOrderMessage(SubmitOrderReq msg) {kafkaTemplate.send(TopicConstants.ORDER_MESSAGE_TOPIC, msg.getOrderId(), msg);logger.info("order-message-topic message sent, orderId: {}", msg.getOrderId());}
}

4 消费者代码

package com.ychen.goodscenter.fafka;import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class MessageListener {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate OrderService orderService;@KafkaListener(topics = "order-message-topic")public void processMessage(Consumer<Long, SubmitOrderReq> consumer, SubmitOrderReq submitOrderReq) {try {logger.info("order-message-topic message received, orderId: {}", submitOrderReq.getOrderId());orderService.submitOrder(submitOrderReq);// 同步提交consumer.commitSync();logger.info("order-message-topic message acked: orderId: {}", submitOrderReq.getOrderId());} catch (DuplicateKeyException dupe) {// 处理异常情况logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);// 重复数据,忽略掉,同步提交consumer.commitSync();} catch (Exception e) {// 处理异常情况logger.error("order-message-topic message processMessage error", e);}}
}

三、开始踩坑了

1 添加信任自己包

Caused by: java.lang.IllegalArgumentException: The class 'com.ychen.goodscenter.vo.req.SubmitOrderReq' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572)at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)... 15 common frames omitted

原因: 因为我在消费消息时用了json序列化,需要给这个序列化,添加信任自己包,不加json序列号会报错

解决方法:添加配置

spring:kafka:consumer:properties:spring.json.trusted.packages: "com.ychen.**"  

解决途径:百度

2 consumer.commitSync(); 无效

问题发现:当我正在批量消费消息时,强制重启应用进程,发现有部分消息丢失了,没有处理

我发了5000个样本请求,最后只生成了4912 个订单(中途强制重启了2次)

问题分析:有2中可能

第一种:之前配置的enable-auto-commit: false  是无效的。

第二种: consumer.commitSync(); 一次将批量拉取的offset提交了

问题排查:

通过在 consumer.commitSync(); 代码之前和之后分别打一个断点,然后发送一批数据

consumer.commitSync(); 之前:

consumer.commitSync(); 之后

结果发生了突变,说明是consumer.commitSync();执行之后引发的offset突变

翻阅源码:

总体而言,通过官方文档和源代码,我们可以确定 commitSync() 提交的是已经成功拉取到的消息的最大 offset,而不是当前正在处理的消息的 offset。

3 缺少AckMode 配置

既然consumer.commitSync();无法在批量处理消息的环境保证消息不丢失,那么需要寻找新的解决方案:

在org.springframework.kafka.annotation.KafkaListener 类的注释上面有写到可以使用org.springframework.kafka.support.Acknowledgment

然后我们消费者的代码改造后为:

package com.ychen.goodscenter.fafka;import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class MessageListener {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate OrderService orderService;@KafkaListener(topics = "order-message-topic")public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {try {logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());orderService.submitOrder(record.value());// 同步提交acknowledgment.acknowledge();logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());} catch (DuplicateKeyException dupe) {// 处理异常情况logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);// 重复数据,忽略掉,同步提交acknowledgment.acknowledge();} catch (Exception e) {// 处理异常情况logger.error("order-message-topic message processMessage error", e);}}
}
Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.

这里我也有点不懂了,明明已经配置自动提交了,还需要配置 ack-mode: MANUAL,既然他说要那就配置吧

在application.yml 增加配置

spring:kafka:listener:ack-mode: MANUAL

现在准备2000个样本,然后让消费者实例强制重启2次,看看数据库的订单数量是否为2000条

现在正确了,支持系统宕机仍然不丢失消息了

四、最终的配置文件和消费者代码

1 配置文件

spring:profiles:active: devapplication:name: goods-centerkafka:bootstrap-servers: 192.168.31.114:9092listener:ack-mode: MANUALproducer:acks: alltimeout.ms: 5000# 值序列化:使用Jsonvalue-serializer: org.springframework.kafka.support.serializer.JsonSerializerkey-serializer: org.apache.kafka.common.serialization.LongSerializerenable:idempotence: true # 默认为Trueconsumer:properties:spring.json.trusted.packages: "com.ychen.**" # 信任自己包,不加json序列号会报错group-id: goods-centervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerkey-deserializer: org.apache.kafka.common.serialization.LongDeserializerenable-auto-commit: false # 取消自动提交

2 消费者代码

package com.ychen.goodscenter.fafka;import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class MessageListener {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate OrderService orderService;@KafkaListener(topics = "order-message-topic")public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {try {logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());orderService.submitOrder(record.value());// 同步提交acknowledgment.acknowledge();logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());} catch (DuplicateKeyException dupe) {// 处理异常情况logger.error("order-message-topic message error DuplicateKeyException", dupe);// 重复数据,忽略掉,同步提交acknowledgment.acknowledge();} catch (Exception e) {// 处理异常情况logger.error("order-message-topic message error unknown ", e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/646159.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【github】使用github action 拉取国外docker镜像

使用github action 拉取国外docker镜像 k8s部署经常用到国外镜像&#xff0c;如果本地无法拉取可以考虑使用github action环境 github action的ci服务器在国外&#xff0c;不受中国防火墙影响github action 自带docker命令运行时直接将你仓库代码拉取下来 步骤 你的国内dock…

React16源码: React中的unwindWork的源码实现

unwindWork 1 &#xff09;概述 在 renderRoot 的 throw Exception 里面, 对于被捕获到错误的组件进行了一些处理并且向上去寻找能够处理这些异常的组件&#xff0c;比如说 class component 里面具有getDerivedStateFromError 或者 componentDidCatch 这样的生命周期方法这个c…

前端学习-0125

工具 VscodeChrome mac-vs code实用快捷键 格式&#xff1a;shiftoptionf快速移动一行&#xff1a;option上下箭快速复制一行&#xff1a;shiftoption上下箭快速保存&#xff1a;commands文件内查找&#xff1a;commandf文件内快速替换&#xff1a;optioncommandf HTML5 标…

QT发生弹出警告窗口

QTC开发程序弹出警告窗口&#xff0c;如上图 实施代码&#xff1a; #include <QMessageBox> int main() {// 在发生错误的地方QMessageBox::critical(nullptr, "错误", "发生了一个错误&#xff0c;请检查您的操作。");}上面的文字可以更改&#x…

【学网攻】 第(5)节 -- Cisco VTP的使用

文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用【学网攻】 第(3)节 -- 交换机配置聚合端口【学网攻】 第(4)节 -- 交换机划分Vlan 前言 网络已经成为了我们生活中不可或缺的一部分&#xff0c;它连接了世界各地的人们&#xff0c;让信息和资…

前端语音识别(webkitSpeechRecognition)

前端语音识别(webkitSpeechRecognition)-CSDN博客 Excerpt 文章浏览阅读1.8k次,点赞4次,收藏4次。浏览器实现语音转文字_webkitspeechrecognition webkitSpeechRecognition(语音识别) <span class="token comment">// 创建一个webkitSpeechRecognition实…

【VTKExamples::PolyData】第十七期 GreedyTerrainDecimation

很高兴在雪易的CSDN遇见你 VTK技术爱好者 QQ:870202403 前言 本文分享VTK样例GreedyTerrainDecimation,并解析接口vtkGreedyTerrainDecimation,希望对各位小伙伴有所帮助! 感谢各位小伙伴的点赞+关注,小易会继续努力分享,一起进步! 你的点赞就是我的动力(^U^)ノ…

社区信息员灾情上报系统-计算机毕业设计源码13263

摘 要 科技进步的飞速发展引起人们日常生活的巨大变化&#xff0c;电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流&#xff0c;人类发展的历史正进入一个新时代。在现实运用中&#xff0c;应用软件的工作…

初识Docker(架构、安装Docker)

一、什么是Docker Docker 是一个开源的应用容器引擎&#xff0c;它允许开发者将应用程序及其依赖打包到一个轻量级、可移植的容器中。这些容器可以在不同的计算平台上运行&#xff0c;如Linux和Windows&#xff0c;并且可以实现虚拟化。Docker 的设计目标是提供一种快速且轻量…

【数据类型转换】C语言中的数据类型转换

1.定义 数据类型转换&#xff0c;听这个名字你就懂了&#xff0c;就是将数据从一种类型转换为另一种类型。 2.自动类型转换 自动类型转换就是编译器默默地、隐式地、偷偷地进行的数据类型转换&#xff0c;这种转换不需要程序员干预&#xff0c;会自动发生。比如说&#xff1a…

Spring-注解开发

一、引子 我们在前面向读者介绍了Spring的配置文件&#xff0c;可以在配置文件中声明Bean&#xff0c;把对象的创建权交给Spring容器&#xff0c;并且实际演示了如何配置一个经典的Bean——数据源的配置。经过实际的上手我们会发现&#xff1a;xml的配置显得有些繁重&#xff…

redis-发布缓存

一.redis的发布订阅 什么 是发布和订阅 Redis 发布订阅 (pub/sub) 是一种消息通信模式&#xff1a;发送者 (pub) 发送消息&#xff0c;订阅者 (sub) 接收消息。 Redis 客户端可以订阅任意数量的频道。 Redis的发布和订阅 客户端订阅频道发布的消息 频道发布消息 订阅者就可…

研究性学习:社会关注的热点问题研究

1. 课题名称 社会关注的热点问题研究 2. 起止时间 起始时间:2024年1月25日 结束时间:2024年2月20日 3. 项目组成员 组长:刘明组员:赵丽、陈鑫校内指导教师:王老师校外指导教师:社会工作者李教授4. 组员分工情况 搜集整理资料: 刘明:负责搜集西安市健康上网问题的资…

第四篇【传奇开心果短博文系列】Python的OpenCV库技术点案例示例:机器学习

传奇开心短博文系列 系列短博文目录Python的OpenCV库技术点案例示例系列短博文 短博文目录一、项目目标二、OpenCV机器学习介绍三、OpenCV支持向量机示例代码四、OpenCV支持向量机示例代码扩展五、OpenCVK均值聚类示例代码六、OpenCVK均值聚类示例代码扩展七、OpenCV决策树示例…

编译PCL Qt程序

使用PCL的qt程序时&#xff0c;提示不是用QVTK编译的&#xff0c;所以需要在编译VTK时打开Qt的编译选项&#xff08;由于CMakeList比较复杂&#xff0c;使用CMakeGui进行配置&#xff0c;PCL同理&#xff09;&#xff0c;编译VTK完成后&#xff0c;编译PCL也需要配置Qt支持&…

异构计算/高性能计算算子库与软件栈

上交超算平台 openblas Armadillo LAPACK 清华大学翟季冬 http://pacman.cs.tsinghua.edu.cn/~zjd/ 澎峰科技 湖南大学李肯立

【前端web入门第一天】02 HTML图片标签 超链接标签

文章目录: 1.HTML图片标签 1.1 图像标签-基本使用1.2 图像标签-属性1.3 路径 1.3.1 相对路径 1.3.2 绝对路径 2.超链接标签 3.音频标签 4.视频标签 1.HTML图片标签 1.1 图像标签-基本使用 作用:在网页中插入图片。 <img src"图片的URL">src用于指定图像…

Python + Selenium —— 网页元素定位之Xpath定位!

前面讲的定位方式&#xff0c;都能够很方便的定位到网页元素。但是这些属性并非所有的网页元素都具备&#xff0c;可以这么说&#xff0c;绝大部分情况下都很难保证元素具备这些属性。 也就是很多时候需要使用其他的方式来定位&#xff0c;在 WebDriver 中提供了 Xpath 和 Css…

二叉树堆的应用实例分析:堆排序 | TOP-K问题

&#x1f4f7; 江池俊&#xff1a; 个人主页 &#x1f525;个人专栏&#xff1a; ✅数据结构冒险记 ✅C语言进阶之路 &#x1f305; 有航道的人&#xff0c;再渺小也不会迷途。 文章目录 前言一、堆排序1.1 排序思想1.2 堆排序过程&#xff08;图解&#xff09;1.3 堆排序代…

Linux CentOS安装Subversion(SVN)(svn常用命令)

1、确保您具有root权限或具有sudo特权 2、打开终端并运行以下命令以更新系统软件包&#xff1a; sudo yum update 3、使用以下命令安装Subversion软件包&#xff1a; sudo yum install subversion 安装过程中&#xff0c;系统会提示您确认安装&#xff0c;按下"y&quo…