SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

上一篇直通车

SpringBoot整合SpringCloudStream3.1+版本Kafka

实现死信队列步骤

  1. 添加死信队列配置文件,添加对应channel
  2. 通道绑定配置对应的channel位置添加重试配置

结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

配置文件

Kafka基本配置(application-mq.yml)

server:port: 7105
spring:application:name: betrice-message-queueconfig:import:- classpath:application-bindings.ymlcloud:stream:kafka:binder:brokers: localhost:9092configuration:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer-properties:enable.auto.commit: falsebinders:betrice-kafka:type: kafkaenvironment:spring.kafka:bootstrap-servers: ${spring.cloud.stream.kafka.binder.brokers}

创建死信队列配置文件(application-dql.yml)

在这里插入图片描述

spring:cloud:stream:kafka:bindings:dqlTransfer-in-0:consumer:# When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>.# messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[].# By default, a failed record is sent to the same partition number in the DLQ topic as the original record.enableDlq: truedlqName: Evad05-message-dlqkeySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
#              valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerdevalueSerde: com.devilvan.pojo.Evad05MessageSerdeautoCommitOnError: trueautoCommitOffset: true

注意:这里的valueSerde使用了对象类型,需要搭配application/json使用,consumer接收到消息后会转化为json字符串

通道绑定文件添加配置(application-bindings.yml)

channel对应上方配置文件的dqlTransfer-in-0

在这里插入图片描述

spring:cloud:stream:betrice-default-binder: betrice-kafkafunction:# 声明两个channel,transfer接收生产者的消息,处理完后给sinkdefinition: transfer;sink;gather;gatherEcho;dqlTransfer;evad05DlqConsumerbindings:# 添加生产者bindiing,输出到destination对应的topicdqlTransfer-in-0:destination: Evad10binder: ${spring.cloud.stream.betrice-default-binder}group: evad05DlqConsumer # 使用死信队列必须要有groupcontent-type: application/jsonconsumer:maxAttempts: 2 # 当消息消费失败时,尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 消息消费失败后重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2,即第二次是第一次间隔时间的2倍,第三次是第二次的2倍backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s。dqlTransfer-out-0:destination: Evad10binder: ${spring.cloud.stream.betrice-default-binder}content-type: text/plain# 消费死信队列中的消息evad05DlqConsumer-in-0:destination: Evad05-message-dlqbinder: ${spring.cloud.stream.betrice-default-binder}content-type: text/plain

Controller

发送消息并将消息引入死信队列

@Slf4j
@RestController
@RequestMapping(value = "betriceMqController")
public class BetriceMqController {@Resource(name = "streamBridgeUtils")private StreamBridge streamBridge;@PostMapping("streamSend")public void streamSend(String topic, String message) {try {streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}}@PostMapping("streamSendDql")public void streamSendDql(String topic, String message) {try {streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}}@PostMapping("streamSendJsonDql")public void streamSendJsonDql(String topic) {try {Evad05MessageSerde message = new Evad05MessageSerde();message.setData("evad05 test dql");message.setCount(1);streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}}
}

Channel

这里使用了transfer通道,消息从Evad10(topic)传来,经过transfer()方法后抛出异常,随后进入对应的死信队列

@Configuration
public class BetriceMqSubChannel {@Beanpublic Function<String, String> dqlTransfer() {return message -> {System.out.println("transfer: " + message);throw new RuntimeException("死信队列测试!");};}@Beanpublic Consumer<String> evad05DlqConsumer() {return message -> {System.out.println("Topic: evad05 Dlq Consumer: " + message);};}
}

将自定义序列化类型转换为JSON消息

步骤

1. 通道绑定文件(application-bindings.yml)的valueSerde属性添加自定义的序列化

在这里插入图片描述

2. BetriceMqController中封装该自定义类型的对象,并作为消息发送

@PostMapping("streamSendJsonDql")
public void streamSendJsonDql(String topic) {try {Evad05MessageSerde message = new Evad05MessageSerde();message.setData("evad05 test dql");message.setCount(1);streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}
}

3. channel(BetriceMqSubChannel)接收到该消息并反序列化

@Bean
public Consumer<String> evad05DlqConsumer() {return message -> {System.out.println("Topic: evad05 Dlq Consumer: " + JSON.parseObject(message, Evad05MessageSerde.class));};
}

4. 结果

在这里插入图片描述
在这里插入图片描述

参考网址

Kafka 消费端消费重试和死信队列 - Java小强技术博客 (javacui.com)
spring cloud stream kafka rabbit 实现死信队列_spring cloud stream kafka 死信队列_it噩梦的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/6265.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ deque/queue/stack的底层原理

deque容器的存储结构 和 vector 容器采用连续的线性空间不同&#xff0c;deque 容器存储数据的空间是由一段一段等长的连续空间构成&#xff0c;各段空间之间并不一定是连续的&#xff0c;可以位于在内存的不同区域。 deque采用一块所谓的map数组&#xff08;注意&#xff0c…

LeetCode 0874. 模拟行走机器人:哈希表模拟

【LetMeFly】874.模拟行走机器人&#xff1a;哈希表模拟 力扣题目链接&#xff1a;https://leetcode.cn/problems/walking-robot-simulation/ 机器人在一个无限大小的 XY 网格平面上行走&#xff0c;从点 (0, 0) 处开始出发&#xff0c;面向北方。该机器人可以接收以下三种类…

rabbitmq模块启动报java.net.SocketException: socket closed的解决方法

问题 最近在接手一个项目时&#xff0c;使用的是spring-cloud微服务构架&#xff0c;mq消息消费模块是单独一个模块&#xff0c;但启动这个模块一直报如下错误&#xff1a; java.net.SocketException: socket closed 这个错误是这个模块注册不到nacos报的错&#xff0c;刚开…

day34-Animated Countdown(动画倒计时)

50 天学习 50 个项目 - HTMLCSS and JavaScript day34-Animated Countdown&#xff08;动画倒计时&#xff09; 效果 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport&q…

态势标绘专题介绍

介绍 这个专栏是专门针对基于Cesium来实现态势标绘的专题专栏,专栏主要实现了30余种态势几何形状的标绘和编辑、文本的标绘和编辑、图片的标绘和编辑以及简单模型的标绘,同时支持标绘结果的导出以及导入。包括最终编写成的一个完整的Vue3.2+TS+Cesium1.107.2的标绘组件。专栏…

C#仿热血江湖

目录 1 GClass10 1.1 定义属性 1.2 int method 1.3 method 1.4 Byte method GClass0 定义属性private byte[] byte_0; private byte[] byte_1;

线性代数——线性方程组

文章目录 版权声明补充知识求和公式的性质常用希腊字符读音 线性方程组有解判定定理齐次线性方程组的基础解系非齐次线性方程组解的结构 版权声明 本文大部分内容皆来自李永乐老师考研教材和视频课。 补充知识 求和公式的性质 ∑ i 1 n k a i k ∑ i 1 n a i \sum_{i1}^n…

从用户的角度谈GPT时代技术突破的两大关键逻辑

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

EtherCAT转TCP/IP网关EtherCAT解决方案

你是否曾经为生产管理系统的数据互联互通问题烦恼过&#xff1f;曾经因为协议不同导致通讯问题而感到困惑&#xff1f;现在&#xff0c;我们迎来了突破性的进展&#xff01; 介绍捷米特JM-TCPIP-ECT&#xff0c;一款自主研发的Ethercat从站功能的通讯网关。它能够连接到Etherc…

macOS Ventura sublime无法加载Package Control

突然发现我的sublime text 的package control不起作用了&#xff0c;设置也变成灰色的了。 本以为是st出问题了&#xff0c;从官网重新下载&#xff0c;然后点菜单命令中的install package control&#xff0c;还是一样的不起作用。 启动st后&#xff0c;用ctrl~ 打开st的conso…

通过FPGA实现基于RS232串口的指令发送并控制显示器中目标位置

目录 1.算法理论概述 串口通信模块 指令解析模块 位置控制模块 显示器驱动模块 2.部分核心程序 3.算法运行软件版本 4.算法运行效果图预览 5.算法完整程序工程 1.算法理论概述 通过FPGA实现基于RS232串口的指令发送并控制显示器中目标位置是一种常见的应用场景&#x…

Prompt 技巧指南-让 ChatGPT 回答更准确

随着 ChatGPT 等大型语言模型 (LLM)的兴起&#xff0c;人们慢慢发现&#xff0c;怎么样向 LLM 提问、以什么技巧提问&#xff0c;是获得更加准确的回答的关键&#xff0c;也由此产生了提示工程这个全新的领域。 提示工程(prompt engineering)是一门相对较新的领域&#xff0c;用…

Golang中函数和方法的区别

在Golang中&#xff0c;函数和方法之间有一些区别。 函数是一段独立的代码块&#xff0c;可以接收输入参数并返回结果。它可以在任何地方被调用&#xff0c;而不依赖于任何特定的结构或类型。 方法是与特定类型关联的函数。它是类型的一部分&#xff0c;可以通过该类型的实例…

java学习003

Java数组 Java 语言中提供的数组是用来存储固定大小的同类型元素&#xff0c;这一点和PHP语言的可变数组长度不同。 声明变量数组 首先必须声明数组变量&#xff0c;才能在程序中使用数组。下面是声明数组变量的语法&#xff1a; dataType[] arrayRefVar; // 首选的方法 或 …

C++(13):拷贝控制

一个类通过定义五种特殊的成员函数来控制这些操作&#xff0c;包括&#xff1a;拷贝构造函数(copy constructor)、拷贝赋值运算符&#xff08;copy-assignment operator)、移动构造函数&#xff08;moveconstructor)、移动赋值运算符&#xff08;move-assignment operator)和析…

云计算——云计算与虚拟化的关系

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​ 目录 前言 一.虚拟化 1.什么是虚拟化 2.虚拟化技术作用 二.云计算与虚拟化的关系 三.虚…

华为eNSP:ospf的配置

一、拓扑图 二、路由器的配置 1、路由器依据规划配置接口IP AR1: <Huawei>system-view [Huawei]int g0/0/0 [Huawei-GigabitEthernet0/0/0]ip add 10.10.10.1 24 [Huawei-GigabitEthernet0/0/0]qu AR2: <Huawei>system-view [Huawei]int g0/0/0 [Huawei-Gi…

RabbitMQ消息可靠性问题及解决

说明&#xff1a;在RabbitMQ消息传递过程中&#xff0c;有以下问题&#xff1a; 消息没发到交换机 消息没发到队列 MQ宕机&#xff0c;消息在队列中丢失 消息者接收到消息后&#xff0c;未能正常消费&#xff08;程序报错&#xff09;&#xff0c;此时消息已在队列中移除 …

STM32(HAL库)驱动AD8232心率传感器

目录 1、简介 2、CubeMX初始化配置 2.1 基础配置 2.1.1 SYS配置 2.1.2 RCC配置 2.2 ADC外设配置 2.3 串口外设配置 2.4 GPIO配置 2.5 项目生成 3、KEIL端程序整合 3.1 串口重映射 3.2 ADC数据采集 3.3 主函数代码整合 4 硬件连接 5 效果展示 1、简介 本文通过STM32…

springboot整合redis

先在配置文件中加好配置 然后引入相关依赖 然后 写好如下配置bean即可整合redis 1.加配&#xff08;yml格式&#xff09; redis: client-type: jedis host: 101.227.52.230 password: 892660rG port: 6379 jedis: pool: # 连接池最大连接数&#xff08;使用负值表示没有限制&a…