SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

上一篇直通车

SpringBoot整合SpringCloudStream3.1+版本Kafka

实现死信队列步骤

  1. 添加死信队列配置文件,添加对应channel
  2. 通道绑定配置对应的channel位置添加重试配置

结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

配置文件

Kafka基本配置(application-mq.yml)

server:port: 7105
spring:application:name: betrice-message-queueconfig:import:- classpath:application-bindings.ymlcloud:stream:kafka:binder:brokers: localhost:9092configuration:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer-properties:enable.auto.commit: falsebinders:betrice-kafka:type: kafkaenvironment:spring.kafka:bootstrap-servers: ${spring.cloud.stream.kafka.binder.brokers}

创建死信队列配置文件(application-dql.yml)

在这里插入图片描述

spring:cloud:stream:kafka:bindings:dqlTransfer-in-0:consumer:# When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>.# messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[].# By default, a failed record is sent to the same partition number in the DLQ topic as the original record.enableDlq: truedlqName: Evad05-message-dlqkeySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
#              valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerdevalueSerde: com.devilvan.pojo.Evad05MessageSerdeautoCommitOnError: trueautoCommitOffset: true

注意:这里的valueSerde使用了对象类型,需要搭配application/json使用,consumer接收到消息后会转化为json字符串

通道绑定文件添加配置(application-bindings.yml)

channel对应上方配置文件的dqlTransfer-in-0

在这里插入图片描述

spring:cloud:stream:betrice-default-binder: betrice-kafkafunction:# 声明两个channel,transfer接收生产者的消息,处理完后给sinkdefinition: transfer;sink;gather;gatherEcho;dqlTransfer;evad05DlqConsumerbindings:# 添加生产者bindiing,输出到destination对应的topicdqlTransfer-in-0:destination: Evad10binder: ${spring.cloud.stream.betrice-default-binder}group: evad05DlqConsumer # 使用死信队列必须要有groupcontent-type: application/jsonconsumer:maxAttempts: 2 # 当消息消费失败时,尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 消息消费失败后重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2,即第二次是第一次间隔时间的2倍,第三次是第二次的2倍backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s。dqlTransfer-out-0:destination: Evad10binder: ${spring.cloud.stream.betrice-default-binder}content-type: text/plain# 消费死信队列中的消息evad05DlqConsumer-in-0:destination: Evad05-message-dlqbinder: ${spring.cloud.stream.betrice-default-binder}content-type: text/plain

Controller

发送消息并将消息引入死信队列

@Slf4j
@RestController
@RequestMapping(value = "betriceMqController")
public class BetriceMqController {@Resource(name = "streamBridgeUtils")private StreamBridge streamBridge;@PostMapping("streamSend")public void streamSend(String topic, String message) {try {streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}}@PostMapping("streamSendDql")public void streamSendDql(String topic, String message) {try {streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}}@PostMapping("streamSendJsonDql")public void streamSendJsonDql(String topic) {try {Evad05MessageSerde message = new Evad05MessageSerde();message.setData("evad05 test dql");message.setCount(1);streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}}
}

Channel

这里使用了transfer通道,消息从Evad10(topic)传来,经过transfer()方法后抛出异常,随后进入对应的死信队列

@Configuration
public class BetriceMqSubChannel {@Beanpublic Function<String, String> dqlTransfer() {return message -> {System.out.println("transfer: " + message);throw new RuntimeException("死信队列测试!");};}@Beanpublic Consumer<String> evad05DlqConsumer() {return message -> {System.out.println("Topic: evad05 Dlq Consumer: " + message);};}
}

将自定义序列化类型转换为JSON消息

步骤

1. 通道绑定文件(application-bindings.yml)的valueSerde属性添加自定义的序列化

在这里插入图片描述

2. BetriceMqController中封装该自定义类型的对象,并作为消息发送

@PostMapping("streamSendJsonDql")
public void streamSendJsonDql(String topic) {try {Evad05MessageSerde message = new Evad05MessageSerde();message.setData("evad05 test dql");message.setCount(1);streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}
}

3. channel(BetriceMqSubChannel)接收到该消息并反序列化

@Bean
public Consumer<String> evad05DlqConsumer() {return message -> {System.out.println("Topic: evad05 Dlq Consumer: " + JSON.parseObject(message, Evad05MessageSerde.class));};
}

4. 结果

在这里插入图片描述
在这里插入图片描述

参考网址

Kafka 消费端消费重试和死信队列 - Java小强技术博客 (javacui.com)
spring cloud stream kafka rabbit 实现死信队列_spring cloud stream kafka 死信队列_it噩梦的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/6265.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ deque/queue/stack的底层原理

deque容器的存储结构 和 vector 容器采用连续的线性空间不同&#xff0c;deque 容器存储数据的空间是由一段一段等长的连续空间构成&#xff0c;各段空间之间并不一定是连续的&#xff0c;可以位于在内存的不同区域。 deque采用一块所谓的map数组&#xff08;注意&#xff0c…

rabbitmq模块启动报java.net.SocketException: socket closed的解决方法

问题 最近在接手一个项目时&#xff0c;使用的是spring-cloud微服务构架&#xff0c;mq消息消费模块是单独一个模块&#xff0c;但启动这个模块一直报如下错误&#xff1a; java.net.SocketException: socket closed 这个错误是这个模块注册不到nacos报的错&#xff0c;刚开…

day34-Animated Countdown(动画倒计时)

50 天学习 50 个项目 - HTMLCSS and JavaScript day34-Animated Countdown&#xff08;动画倒计时&#xff09; 效果 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport&q…

态势标绘专题介绍

介绍 这个专栏是专门针对基于Cesium来实现态势标绘的专题专栏,专栏主要实现了30余种态势几何形状的标绘和编辑、文本的标绘和编辑、图片的标绘和编辑以及简单模型的标绘,同时支持标绘结果的导出以及导入。包括最终编写成的一个完整的Vue3.2+TS+Cesium1.107.2的标绘组件。专栏…

从用户的角度谈GPT时代技术突破的两大关键逻辑

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

EtherCAT转TCP/IP网关EtherCAT解决方案

你是否曾经为生产管理系统的数据互联互通问题烦恼过&#xff1f;曾经因为协议不同导致通讯问题而感到困惑&#xff1f;现在&#xff0c;我们迎来了突破性的进展&#xff01; 介绍捷米特JM-TCPIP-ECT&#xff0c;一款自主研发的Ethercat从站功能的通讯网关。它能够连接到Etherc…

通过FPGA实现基于RS232串口的指令发送并控制显示器中目标位置

目录 1.算法理论概述 串口通信模块 指令解析模块 位置控制模块 显示器驱动模块 2.部分核心程序 3.算法运行软件版本 4.算法运行效果图预览 5.算法完整程序工程 1.算法理论概述 通过FPGA实现基于RS232串口的指令发送并控制显示器中目标位置是一种常见的应用场景&#x…

Prompt 技巧指南-让 ChatGPT 回答更准确

随着 ChatGPT 等大型语言模型 (LLM)的兴起&#xff0c;人们慢慢发现&#xff0c;怎么样向 LLM 提问、以什么技巧提问&#xff0c;是获得更加准确的回答的关键&#xff0c;也由此产生了提示工程这个全新的领域。 提示工程(prompt engineering)是一门相对较新的领域&#xff0c;用…

java学习003

Java数组 Java 语言中提供的数组是用来存储固定大小的同类型元素&#xff0c;这一点和PHP语言的可变数组长度不同。 声明变量数组 首先必须声明数组变量&#xff0c;才能在程序中使用数组。下面是声明数组变量的语法&#xff1a; dataType[] arrayRefVar; // 首选的方法 或 …

云计算——云计算与虚拟化的关系

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​ 目录 前言 一.虚拟化 1.什么是虚拟化 2.虚拟化技术作用 二.云计算与虚拟化的关系 三.虚…

华为eNSP:ospf的配置

一、拓扑图 二、路由器的配置 1、路由器依据规划配置接口IP AR1: <Huawei>system-view [Huawei]int g0/0/0 [Huawei-GigabitEthernet0/0/0]ip add 10.10.10.1 24 [Huawei-GigabitEthernet0/0/0]qu AR2: <Huawei>system-view [Huawei]int g0/0/0 [Huawei-Gi…

RabbitMQ消息可靠性问题及解决

说明&#xff1a;在RabbitMQ消息传递过程中&#xff0c;有以下问题&#xff1a; 消息没发到交换机 消息没发到队列 MQ宕机&#xff0c;消息在队列中丢失 消息者接收到消息后&#xff0c;未能正常消费&#xff08;程序报错&#xff09;&#xff0c;此时消息已在队列中移除 …

STM32(HAL库)驱动AD8232心率传感器

目录 1、简介 2、CubeMX初始化配置 2.1 基础配置 2.1.1 SYS配置 2.1.2 RCC配置 2.2 ADC外设配置 2.3 串口外设配置 2.4 GPIO配置 2.5 项目生成 3、KEIL端程序整合 3.1 串口重映射 3.2 ADC数据采集 3.3 主函数代码整合 4 硬件连接 5 效果展示 1、简介 本文通过STM32…

Linux文件处理命令

目录&#xff1a; linux系统与shell环境准备linux常用命令之文件处理Linux系统登录与文件操作 1.linux系统与shell环境准备 Linux 系统简介&#xff1a; Linux 内核最初只是由芬兰人林纳斯托瓦兹&#xff08;Linus Torvalds&#xff09;在赫尔辛基大学上学时出于个人爱好而…

分布式光伏并网防孤岛保护装置AM5SE-IS

分布式光伏并网防孤岛保护装置AM5SE-IS 应用场景 防孤岛原理&#xff1a;防孤岛保护装置检测到并网点有逆功率、频率突变、 等异常数据时&#xff0c;即发生孤岛现象时&#xff0c;装置可配合断路器快速切除并网点&#xff0c;使本站与电网侧快速脱离&#xff0c;保证整个电站…

blender 纹理材质

添加材质纹理需要哪五个节点&#xff1f; 映射节点&#xff1a;调整纹理的位置、大小、缩放&#xff1b; 纹理坐标&#xff1a;怎么映射&#xff0c;以什么方式去映射这张图&#xff0c;换句话说就是如何将 2D 的图片映射到 3D 的图像上&#xff1b;纹理坐标就是以什么坐标方式…

Flutter系列(3):如何将Flutter项目打包成Android安装包

将Flutter项目打包成Android安装包&#xff0c;主要步骤如下&#xff1a; 一、生成key 进入jdk的bin目录下&#xff1a; keytool -genkey -v -keystore D:\key.jks -keyalg RSA -keysize 2048 -validity 10000 -alias key 大概会有密码等参数&#xff0c;根据自身需要&#x…

Mysql数据库

目录 1.数据库 2.数据库分类与常见的数据库 3.SQL 3.1.DDL 数据库操作 表操作 3.2.DML 3.3.DQL 3.4.DCL 管理用户 权限控制 4.Mysql常用的数据类型 1.数据库 数据库:是“按照数据结构来组织、存储和管理数据的仓库”。是一个长期存储在计算机内的、有组织的、可共…

Spring MVC异常处理【单个控制异常处理器、全局异常处理器、自定义异常处理器】

目录 一、单个控制器异常处理 1.1 控制器方法 1.2 编写出错页面 1.3 测试结果 二、全局异常处理 2.1 一个有异常的控制器类 2.2 全局异常处理器类 2.3 测试结果 三、自定义异常处理器 3.1 自定义异常处理器 3.2 测试结果 往期专栏&文章相关导读 1. Maven系列…

只需3步,使用Stable Diffusion无限生产AI数字人视频

效果演示 先看效果&#xff0c;感兴趣的可以继续读下去。 没有找到可以上传视频的地方&#xff0c;大家打开这个网盘链接观看&#xff1a;https://www.aliyundrive.com/s/CRBm5NL3xAE 基本方法 搞一张照片&#xff0c;搞一段语音&#xff0c;合成照片和语音&#xff0c;同…