SpringBoot 3.1.7 集成Kafka 3.5.0

一、背景

写这边篇文章的目的,是记录我在集成kafka客户端遇到的一些问题,文章会记录整个接入的过程,其中会遇到几个坑,如果需要最终版本,直接看最后一节就行了,感觉Spring-Kafka的文档太少了,如果采用SpringBoot集成的方式接入,一不小可能就会踩坑

二、操作步骤

1 添加依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2 添加配置文件

spring:profiles:active: devapplication:name: goods-centerkafka:bootstrap-servers: 192.168.31.114:9092producer:acks: alltimeout.ms: 5000# 值序列化:使用Jsonvalue-serializer: org.springframework.kafka.support.serializer.JsonSerializerkey-serializer: org.apache.kafka.common.serialization.LongSerializerenable:idempotence: true # 默认为Trueconsumer:group-id: goods-centervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerkey-deserializer: org.apache.kafka.common.serialization.LongDeserializerenable-auto-commit: false # 取消自动提交

3 生产者代码

package com.ychen.goodscenter.fafka;import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class MessageProducer {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate KafkaTemplate<Long, SubmitOrderReq> kafkaTemplate;public void sendOrderMessage(SubmitOrderReq msg) {kafkaTemplate.send(TopicConstants.ORDER_MESSAGE_TOPIC, msg.getOrderId(), msg);logger.info("order-message-topic message sent, orderId: {}", msg.getOrderId());}
}

4 消费者代码

package com.ychen.goodscenter.fafka;import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class MessageListener {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate OrderService orderService;@KafkaListener(topics = "order-message-topic")public void processMessage(Consumer<Long, SubmitOrderReq> consumer, SubmitOrderReq submitOrderReq) {try {logger.info("order-message-topic message received, orderId: {}", submitOrderReq.getOrderId());orderService.submitOrder(submitOrderReq);// 同步提交consumer.commitSync();logger.info("order-message-topic message acked: orderId: {}", submitOrderReq.getOrderId());} catch (DuplicateKeyException dupe) {// 处理异常情况logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);// 重复数据,忽略掉,同步提交consumer.commitSync();} catch (Exception e) {// 处理异常情况logger.error("order-message-topic message processMessage error", e);}}
}

三、开始踩坑了

1 添加信任自己包

Caused by: java.lang.IllegalArgumentException: The class 'com.ychen.goodscenter.vo.req.SubmitOrderReq' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572)at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)... 15 common frames omitted

原因: 因为我在消费消息时用了json序列化,需要给这个序列化,添加信任自己包,不加json序列号会报错

解决方法:添加配置

spring:kafka:consumer:properties:spring.json.trusted.packages: "com.ychen.**"  

解决途径:百度

2 consumer.commitSync(); 无效

问题发现:当我正在批量消费消息时,强制重启应用进程,发现有部分消息丢失了,没有处理

我发了5000个样本请求,最后只生成了4912 个订单(中途强制重启了2次)

问题分析:有2中可能

第一种:之前配置的enable-auto-commit: false  是无效的。

第二种: consumer.commitSync(); 一次将批量拉取的offset提交了

问题排查:

通过在 consumer.commitSync(); 代码之前和之后分别打一个断点,然后发送一批数据

consumer.commitSync(); 之前:

consumer.commitSync(); 之后

结果发生了突变,说明是consumer.commitSync();执行之后引发的offset突变

翻阅源码:

总体而言,通过官方文档和源代码,我们可以确定 commitSync() 提交的是已经成功拉取到的消息的最大 offset,而不是当前正在处理的消息的 offset。

3 缺少AckMode 配置

既然consumer.commitSync();无法在批量处理消息的环境保证消息不丢失,那么需要寻找新的解决方案:

在org.springframework.kafka.annotation.KafkaListener 类的注释上面有写到可以使用org.springframework.kafka.support.Acknowledgment

然后我们消费者的代码改造后为:

package com.ychen.goodscenter.fafka;import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class MessageListener {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate OrderService orderService;@KafkaListener(topics = "order-message-topic")public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {try {logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());orderService.submitOrder(record.value());// 同步提交acknowledgment.acknowledge();logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());} catch (DuplicateKeyException dupe) {// 处理异常情况logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);// 重复数据,忽略掉,同步提交acknowledgment.acknowledge();} catch (Exception e) {// 处理异常情况logger.error("order-message-topic message processMessage error", e);}}
}
Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.

这里我也有点不懂了,明明已经配置自动提交了,还需要配置 ack-mode: MANUAL,既然他说要那就配置吧

在application.yml 增加配置

spring:kafka:listener:ack-mode: MANUAL

现在准备2000个样本,然后让消费者实例强制重启2次,看看数据库的订单数量是否为2000条

现在正确了,支持系统宕机仍然不丢失消息了

四、最终的配置文件和消费者代码

1 配置文件

spring:profiles:active: devapplication:name: goods-centerkafka:bootstrap-servers: 192.168.31.114:9092listener:ack-mode: MANUALproducer:acks: alltimeout.ms: 5000# 值序列化:使用Jsonvalue-serializer: org.springframework.kafka.support.serializer.JsonSerializerkey-serializer: org.apache.kafka.common.serialization.LongSerializerenable:idempotence: true # 默认为Trueconsumer:properties:spring.json.trusted.packages: "com.ychen.**" # 信任自己包,不加json序列号会报错group-id: goods-centervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerkey-deserializer: org.apache.kafka.common.serialization.LongDeserializerenable-auto-commit: false # 取消自动提交

2 消费者代码

package com.ychen.goodscenter.fafka;import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class MessageListener {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate OrderService orderService;@KafkaListener(topics = "order-message-topic")public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {try {logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());orderService.submitOrder(record.value());// 同步提交acknowledgment.acknowledge();logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());} catch (DuplicateKeyException dupe) {// 处理异常情况logger.error("order-message-topic message error DuplicateKeyException", dupe);// 重复数据,忽略掉,同步提交acknowledgment.acknowledge();} catch (Exception e) {// 处理异常情况logger.error("order-message-topic message error unknown ", e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/646159.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【github】使用github action 拉取国外docker镜像

使用github action 拉取国外docker镜像 k8s部署经常用到国外镜像&#xff0c;如果本地无法拉取可以考虑使用github action环境 github action的ci服务器在国外&#xff0c;不受中国防火墙影响github action 自带docker命令运行时直接将你仓库代码拉取下来 步骤 你的国内dock…

React16源码: React中的unwindWork的源码实现

unwindWork 1 &#xff09;概述 在 renderRoot 的 throw Exception 里面, 对于被捕获到错误的组件进行了一些处理并且向上去寻找能够处理这些异常的组件&#xff0c;比如说 class component 里面具有getDerivedStateFromError 或者 componentDidCatch 这样的生命周期方法这个c…

QT发生弹出警告窗口

QTC开发程序弹出警告窗口&#xff0c;如上图 实施代码&#xff1a; #include <QMessageBox> int main() {// 在发生错误的地方QMessageBox::critical(nullptr, "错误", "发生了一个错误&#xff0c;请检查您的操作。");}上面的文字可以更改&#x…

【学网攻】 第(5)节 -- Cisco VTP的使用

文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交换机认识及使用【学网攻】 第(3)节 -- 交换机配置聚合端口【学网攻】 第(4)节 -- 交换机划分Vlan 前言 网络已经成为了我们生活中不可或缺的一部分&#xff0c;它连接了世界各地的人们&#xff0c;让信息和资…

社区信息员灾情上报系统-计算机毕业设计源码13263

摘 要 科技进步的飞速发展引起人们日常生活的巨大变化&#xff0c;电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流&#xff0c;人类发展的历史正进入一个新时代。在现实运用中&#xff0c;应用软件的工作…

初识Docker(架构、安装Docker)

一、什么是Docker Docker 是一个开源的应用容器引擎&#xff0c;它允许开发者将应用程序及其依赖打包到一个轻量级、可移植的容器中。这些容器可以在不同的计算平台上运行&#xff0c;如Linux和Windows&#xff0c;并且可以实现虚拟化。Docker 的设计目标是提供一种快速且轻量…

【数据类型转换】C语言中的数据类型转换

1.定义 数据类型转换&#xff0c;听这个名字你就懂了&#xff0c;就是将数据从一种类型转换为另一种类型。 2.自动类型转换 自动类型转换就是编译器默默地、隐式地、偷偷地进行的数据类型转换&#xff0c;这种转换不需要程序员干预&#xff0c;会自动发生。比如说&#xff1a…

redis-发布缓存

一.redis的发布订阅 什么 是发布和订阅 Redis 发布订阅 (pub/sub) 是一种消息通信模式&#xff1a;发送者 (pub) 发送消息&#xff0c;订阅者 (sub) 接收消息。 Redis 客户端可以订阅任意数量的频道。 Redis的发布和订阅 客户端订阅频道发布的消息 频道发布消息 订阅者就可…

第四篇【传奇开心果短博文系列】Python的OpenCV库技术点案例示例:机器学习

传奇开心短博文系列 系列短博文目录Python的OpenCV库技术点案例示例系列短博文 短博文目录一、项目目标二、OpenCV机器学习介绍三、OpenCV支持向量机示例代码四、OpenCV支持向量机示例代码扩展五、OpenCVK均值聚类示例代码六、OpenCVK均值聚类示例代码扩展七、OpenCV决策树示例…

编译PCL Qt程序

使用PCL的qt程序时&#xff0c;提示不是用QVTK编译的&#xff0c;所以需要在编译VTK时打开Qt的编译选项&#xff08;由于CMakeList比较复杂&#xff0c;使用CMakeGui进行配置&#xff0c;PCL同理&#xff09;&#xff0c;编译VTK完成后&#xff0c;编译PCL也需要配置Qt支持&…

【前端web入门第一天】02 HTML图片标签 超链接标签

文章目录: 1.HTML图片标签 1.1 图像标签-基本使用1.2 图像标签-属性1.3 路径 1.3.1 相对路径 1.3.2 绝对路径 2.超链接标签 3.音频标签 4.视频标签 1.HTML图片标签 1.1 图像标签-基本使用 作用:在网页中插入图片。 <img src"图片的URL">src用于指定图像…

Python + Selenium —— 网页元素定位之Xpath定位!

前面讲的定位方式&#xff0c;都能够很方便的定位到网页元素。但是这些属性并非所有的网页元素都具备&#xff0c;可以这么说&#xff0c;绝大部分情况下都很难保证元素具备这些属性。 也就是很多时候需要使用其他的方式来定位&#xff0c;在 WebDriver 中提供了 Xpath 和 Css…

二叉树堆的应用实例分析:堆排序 | TOP-K问题

&#x1f4f7; 江池俊&#xff1a; 个人主页 &#x1f525;个人专栏&#xff1a; ✅数据结构冒险记 ✅C语言进阶之路 &#x1f305; 有航道的人&#xff0c;再渺小也不会迷途。 文章目录 前言一、堆排序1.1 排序思想1.2 堆排序过程&#xff08;图解&#xff09;1.3 堆排序代…

IP数据云:实战网络安全的得力利器

在当今数字化时代&#xff0c;企业和个人面临着日益复杂和频繁的网络安全威胁。为了应对这些挑战&#xff0c;IP数据云作为一项全面的网络安全解决方案&#xff0c;已经在多个实际案例中展现了其卓越的能力。 1、识别并隔离异常行为 挑战&#xff1a;一家大型金融机构发现其内…

Github 不能访问,提示:port 22: Connection timed out

问题描述 github clone 代码出现错误&#xff1a; $ git clone gitgithub.com:Atlan4/Fnirsi1013D.git Cloning into Fnirsi1013D... ssh: connect to host github.com port 22: Connection timed out fatal: Could not read from remote repository.Please make sure you ha…

Unity 外观模式(实例详解)

文章目录 示例1&#xff1a;初始化游戏场景中的多个子系统示例2&#xff1a;管理音频播放示例3&#xff1a;场景加载流程示例4&#xff1a;UI管理器示例5&#xff1a;网络服务通信 在Unity中使用外观模式&#xff08;Facade&#xff09;时&#xff0c;主要目的是为了简化复杂子…

Webpack5 基本使用 - 1

Webpack 是什么 webpack 的核心目的是打包&#xff0c;即把源代码一个一个的 js 文件&#xff0c;打包汇总为一个总文件 bundle.js。 基本配置包括mode指定打包模式&#xff0c;entry指定打包入口&#xff0c;output指定打包输出目录。 另外&#xff0c;由于 webpack默认只能打…

6轴机器人运动正解-逆解控制【1】——三种控制位姿的方式

概览&#xff1a; 通过运动学正解控制机器人运动通过运动学逆解控制机器人运动一个简单的物体搬运&#xff08;沿轨迹运动&#xff09; 后续会陆续更新&#xff08;本例仅供学习交流用&#xff09; 一、6轴机器人 二、运动正解控制 通过修改各个轴的角度&#xff0c;实现末…

宠物互联网医院系统:数字化呵护你爱宠的新时代

宠物互联网医院系统正在为宠物主人提供一种前所未有的数字化健康护理体验。通过结合创新技术&#xff0c;这一系统旨在让宠物医疗变得更加便捷、智能和个性化。让我们深入探讨宠物互联网医院系统的技术核心&#xff0c;以及如何应用代码为你的爱宠提供最佳关怀。 1. 远程医疗…

linux clickhouse 安装

1、官网下载clickhouse安装包 下载地址&#xff0c; clickhouse分lts和stable版本&#xff0c;lts是长期版本&#xff0c;一般选择安装lts版本。 其中clickhouse-server是clickhouse服务&#xff0c;就是用来访问数据存储数据&#xff0c;clickhouse-client是用来通过命令访问数…