Kafka 的延迟队列、死信队列和重试队列

总结一下实现的方法:
1、延迟队列,首先kafka是没有延迟队列的,那要实现延迟队列的话,就得使用其他方法。在发送消息的时候加上时间戳,再在时间戳上面加上延迟时间。消费的时候判断一下,有没有到达延迟时间,如果没有到达的话,重新入队,或启用定时线程处理。
2、重试队列,使用@RetryableTopic注解
3、死信队列,使用@DltHandler 或 @KafkaListener监听死信队列

代码非完整代码,仅供参考

1. 添加依赖

确保你的 pom.xml 文件中包含 Spring Kafka 的依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency>
</dependencies>

2. 配置 Kafka

application.properties 文件中配置 Kafka 的连接信息和消费者的基本配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.group-id=my-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=false

3. 创建 Kafka 生产者

创建一个 Kafka 生产者服务,用于发送消息到指定的 Topic:

package com.example.demo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import java.time.Instant;
import java.util.Date;@Service
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;/*** 发送延迟消息到指定的 Topic。* @param topic 目标 Topic 名称(延迟队列为delay-topic,其他为my-topic)* @param message 要发送的消息内容* @param delay 延迟时间(毫秒)*/public void sendDelayedMessage(String topic, String message, long delay) {long timestamp = Instant.now().toEpochMilli() + delay;kafkaTemplate.send(new ProducerRecord<>(topic, null, new Date(timestamp), null, message));}
}

4. 创建 Kafka 消费者

4.1 消费延迟队列的消费者
@Service
public class KafkaConsumerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@KafkaListener(topics = "delay-topic", groupId = "delay-group")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {long currentTimestamp = System.currentTimeMillis();long messageTimestamp = record.timestamp();// 检查是否到达延迟时间if (currentTimestamp < messageTimestamp) {// 未到达延迟时间,重新发送到延迟队列long remainingDelay = messageTimestamp - currentTimestamp;sendDelayedMessage(record.topic(), record.value(), remainingDelay);} else {// 到达延迟时间,处理消息System.out.println("Processing message: " + record.value());}// 确认消息已处理acknowledgment.acknowledge();}private void sendDelayedMessage(String topic, String message, long delay) {long timestamp = System.currentTimeMillis() + delay;kafkaTemplate.send(new ProducerRecord<>(topic, null, new Date(timestamp), null, message));}
}
4.2 消费重试队列,失败放入死信队列

创建一个 Kafka 消费者服务,用于监听指定的 Topic 并处理消息。使用 @KafkaListener 注解来指定监听的 Topic,并手动提交偏移量。

package com.example.demo;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.retry.annotation.Backoff;import java.time.Instant;@Service
public class KafkaConsumer {/*** 监听指定的 Topic 并处理消息。* 使用 @RetryableTopic 注解实现重试机制,最多尝试 3 次,每次重试间隔 2 秒,最大延迟 60 秒。* 如果所有重试都失败,消息将发送到死信队列。** @param record 消费的消息记录* @param acknowledgment 用于手动提交偏移量*/@RetryableTopic(attempts = "3", // 最大重试次数backoff = @Backoff(delay = 2000, multiplier = 2, maxDelay = 60000), // 重试间隔和最大延迟dltStrategy = RetryableTopic.DltStrategy.FAIL_ON_ERROR, // 失败后发送到死信队列autoCreateTopics = "true" // 自动创建重试和死信队列主题)@KafkaListener(topics = "my-topic", groupId = "my-group")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {try {System.out.println("Received message: " + record.value());// 模拟异常if (shouldFail()) {throw new RuntimeException("Simulated failure");}acknowledgment.acknowledge(); // 提交偏移量} catch (Exception e) {throw e; // 抛出异常,触发重试机制}}/*** 模拟处理失败的条件。* @return 是否模拟失败*/private boolean shouldFail() {// 模拟处理失败的条件return true;}/*** @DltHandler 注解标记的方法用于处理死信队列中的消息。* 当消息在重试队列中多次重试失败后,会被发送到死信队列。* @DltHandler 注解的方法会监听死信队列,并对其中的消息进行处理。* @DltHandler 它与 @RetryableTopic 注解结合使用,用于处理重试失败后的死信消息。* * 处理死信队列中的消息。* @param record 死信队列中的消息记录*/@DltHandlerpublic void dltListen(ConsumerRecord<String, String> record) {String topic = record.topic(); // 获取死信队列的主题名称System.out.println("Received message in DLT: " + record.value());System.out.println("Topic: " + topic); // 打印主题名称// 处理死信消息, 可以在这里添加对死信消息的处理逻辑}
}

5. 配置 Kafka 消费者工厂

在 Spring Boot 中,可以通过配置 ConcurrentKafkaListenerContainerFactory 来设置重试机制和死信队列处理策略。
@RetryableTopic 和 SeekToCurrentErrorHandler 的配置不会同时生效。Spring Kafka 会优先处理 @RetryableTopic 注解的配置,因为它是一个更高级的抽象,专门用于处理重试和死信队列的逻辑。
为了避免配置冲突,建议选择一种方式来实现重试和死信队列的逻辑。如果你选择使用 @RetryableTopic,则不需要再配置 SeekToCurrentErrorHandler,即这里就可以跳过。

package com.example.demo;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory,KafkaTemplate<Object, Object> template) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);// 设置错误处理器,最多重试 3 次,失败后发送到死信队列factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));return factory;}
}

6. 创建死信队列消费者

创建一个消费者来监听死信队列主题,对死信消息进行后续处理(配置了@DltHandler 可以不用 @KafkaListener)。

package com.example.demo;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class DltConsumer {/*** 监听死信队列主题并处理消息。* @param record 死信队列中的消息记录*/@KafkaListener(topics = "my-topic.DLT", groupId = "dlt-group")public void listen(ConsumerRecord<String, String> record) {System.out.println("Received message in DLT: " + record.value());// 可以在这里添加对死信消息的处理逻辑}
}

6.1 @DltHandler 与 @KafkaListener 的区别和适用场景
6.1.1 @DltHandler 的特点
与重试机制紧密结合:@DltHandler 注解的方法与 @RetryableTopic 注解的重试机制紧密结合,自动处理重试失败的消息。
自动发送到死信队列:当消息在重试队列中多次重试失败后,Spring Kafka 会自动将消息发送到死信队列。
简化代码:使用 @DltHandler 注解可以简化代码,减少手动处理死信消息的逻辑。
6.1.2 @KafkaListener 的特点
通用性:@KafkaListener 注解适用于任何 Kafka 主题,包括死信队列主题。
灵活性:可以用于监听任何主题,而不仅仅是死信队列。这使得它更加灵活,可以用于多种场景。
手动处理:需要手动配置死信队列主题,并在代码中显式处理死信消息。
6.2. @DltHandler 与 @KafkaListener 总结
**使用 @DltHandler:**如果你需要与 Spring Kafka 的重试机制紧密结合,并且希望自动处理重试失败的消息,使用 @DltHandler 是一个更简洁和方便的选择。
**使用 @KafkaListener:**如果你需要监听多个主题,或者需要更灵活地处理死信消息,使用 @KafkaListener 是一个更好的选择。
注意:如果 @KafkaListener 监听了死信队列的主题(例如 my-topic.DLT),那么当消息被发送到死信队列时,@KafkaListener 会先捕获并处理这些消息。这可能导致 @DltHandler 方法无法接收到死信队列中的消息。因此,两个最好不要一块用。

7. 启动类

创建一个 Spring Boot 应用程序的启动类:

package com.example.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaDemoApplication {public static void main(String[] args) {SpringApplication.run(KafkaDemoApplication.class, args);}
}

总结

通过以上步骤,你可以在 Spring Boot 中实现 Kafka 的延迟队列、死信队列和重试队列。这些功能可以确保消息处理的可靠性和健壮性,避免消息丢失或重复处理。希望这些示例能帮助你更好地理解和使用 Kafka 的高级特性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/73640.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DCAT模型:双交叉注意力革新医学影像诊断,AUC 99.75%

一、研究背景&#xff1a;医学影像诊断的挑战 在医学影像领域&#xff08;如X光、OCT&#xff09;&#xff0c;精准分类疾病直接影响患者治疗决策。传统深度学习模型存在两大痛点&#xff1a; 1.过度自信&#xff1a;即使图像模糊或存在噪声&#xff0c;模型仍可能给出高…

2.2.2 Spark单机版环境

本文介绍了如何搭建和使用Spark单机版环境。首先&#xff0c;确保安装配置好JDK&#xff0c;然后从群共享下载Spark安装包并上传至云主机的/opt目录。接着&#xff0c;解压到/usr/local目录并配置环境变量&#xff0c;通过spark-submit --version验证安装成功。在使用Spark单机…

AI音乐的革命:迈向格莱美级别的艺术表现力

摘要 近期&#xff0c;AI技术在音乐领域的突破性进展令人瞩目。这项新技术赋予了AI格莱美级别的歌唱能力&#xff0c;使其不仅能够进行写作和绘画创作&#xff0c;还能以接近人类的艺术表现力演绎音乐作品。这一成就标志着AI在艺术领域的技术进步达到了新的高度&#xff0c;为…

SAP消息号类型(E/I/W)的定制

比如这样的M8088的标准的消息号&#xff0c;希望变更消息类型&#xff0c;查询之后&#xff0c;网上提供的消息&#xff0c;都是SE91,OMRM&#xff0c;OBA5之类的消息。事实上&#xff0c;SE91是不能变更消息类型的。 而在OMRM界面&#xff0c;只看到有限的几个消息号。 原来&a…

wazuh安全管理工具

Wazuh 通过监控操作系统和应用程序层面的终端设备&#xff0c;增强您基础设施的安全可见性。其核心功能涵盖日志分析、文件完整性监控、入侵检测以及合规性监控。 一、介绍 1. 核心功能 1.1 主机入侵检测&#xff08;HIDS&#xff09; 文件完整性监控&#xff08;FIM&#…

SAP-ABAP:OData 协议深度解析:架构、实践与最佳应用

OData 协议深度解析:架构、实践与最佳应用 一、协议基础与核心特性 协议定义与目标 定位:基于REST的开放数据协议,标准化数据访问接口,由OASIS组织维护,最新版本为OData v4.01。设计哲学:通过统一资源标识符(URI)和HTTP方法抽象数据操作,降低异构系统集成复杂度。核心…

MATLAB 控制系统设计与仿真 - 29

用极点配置设计伺服系统 方法1-前馈修正 对于一个可控的系统&#xff0c;我们知道可以用极点配置来得到系统的动态响应指标&#xff0c;但是系统有时会存在较大的静态误差。 例如&#xff1a; 系统的状态矩阵如下&#xff0c;试求取其阶跃响应。 MATLAB 代码如下&#xff1…

编译原理——自底向上语法优先分析

文章目录 自底向上优先分析概述一、自底向上优先分析概述二、简单优先分析法&#xff08;一&#xff09;优先关系定义&#xff08;二&#xff09;简单优先文法的定义&#xff08;三&#xff09;简单优先分析法的操作步骤 三、算法优先分析法&#xff08;一&#xff09;直观算符…

Opencv计算机视觉编程攻略-第四节 图直方图统计像素

Opencv计算机视觉编程攻略-第四节 图直方图统计像素 1.计算图像直方图2.基于查找表修改图像3.直方图均衡化4.直方图反向投影进行内容查找5.用均值平移法查找目标6.比较直方图搜索相似图像7.用积分图统计图像 1.计算图像直方图 图像统计直方图的概念 图像统计直方图是一种用于描…

5、vim编辑和shell编程【超详细】

一、vim 1、了解 Vim (Vi IMproved) 是一款功能强大的文本编辑器。 正常模式&#xff1a;vim 文件&#xff0c;刚打开的样子vim模式&#xff1a;输入文本的地方命令模式&#xff1a;输入 :wq等等的位置&#xff0c;可以对文本进行一些操作&#xff0c;比如&#xff1a;保存文…

《Robust Synthetic-to-Real Transfer for Stereo Matching》

论文地址&#xff1a;https://arxiv.org/pdf/2403.07705 源码地址&#xff1a;https://github.com/jiaw-z/DKT-Stereo 概述 通过在合成数据上预训练的模型在未见领域上表现出强大的鲁棒性。然而&#xff0c;在现实世界场景中对这些模型进行微调时&#xff0c;其领域泛化能力可…

蓝桥杯第10届 后缀表达式

题目描述 给定 N 个加号、M 个减号以及 NM1 个整数 A1,A2,⋅⋅⋅,ANM1​&#xff0c;小明想知道在所有由这N 个加号、M 个减号以及 NM1 个整数凑出的合法的 后缀表达式中&#xff0c;结果最大的是哪一个&#xff1f; 请你输出这个最大的结果。 例如使用 1 2 3 -&#xff0c…

C++前缀和

个人主页&#xff1a;[PingdiGuo_guo] 收录专栏&#xff1a;[C干货专栏] 大家好&#xff0c;今天我们来了解一下C的一个重要概念&#xff1a;前缀和 目录 1.什么是前缀和 2.前缀和的用法 1.前缀和的定义 2.预处理前缀和数组 3.查询区间和 4.数组中某个区间的和是否为特定…

uni app跨端开发遇到的问题

技术栈 uni app&#xff0c;vue3&#xff0c;uview puls&#xff0c;map… nvue 因为项目中有地图&#xff0c;要使用到map标签&#xff0c;所以考虑用原生nvue开发&#xff0c;它是有痛点的&#xff0c;首先浏览器不支持&#xff0c;我是要开发ios和Android&#xff0c;所以…

SQL注入操作

sql注入 一&#xff0c;SQL注入分类按照注入的网页功能类型分类按照注入点值的属性分类基于从服务器返回内容按照注入的程度和顺序 一&#xff0c;SQL注入分类 按照注入的网页功能类型分类 登录注入cms注入 cms逻辑&#xff1a;index.php首页展示内容&#xff0c;具有文章列表…

微信 MMTLS 协议详解(五):加密实现

常用的解密算法&#xff0c;对称非对称 加密&#xff0c;密钥协商&#xff0c; 带消息认证的加解密 #生成RSA 密钥对 void GenerateRsaKeypair(std::string& public_key,std::string& private_key) {RSA* rsa RSA_new();BIGNUM* bn BN_new();// 生成 RSA 密钥对BN_s…

ROS melodic 安装 python3 cv_bridge

有时候&#xff0c;我们需要处理这些兼容性问题。此处列举我的过程&#xff0c;以供参考 mkdir -p my_ws_py39/src cd my_ws_py39 catkin_make_isolated-DPYTHON_EXECUTABLE/usr/bin/python3 \-DPYTHON_INCLUDE_DIR/usr/include/python3.8 \-DPYTHON_LIBRARY/usr/lib/x86_64-l…

深入学习:SpringQuartz的配置方式!

全文目录&#xff1a; 开篇语前言摘要概述1. 基于 XML 的传统配置配置步骤1.1 Maven 依赖1.2 XML 配置文件1.3 实现 Job 类 2. 基于 Java Config 的现代配置方式配置步骤2.1 Maven 依赖2.2 配置类2.3 实现 Job 类 3. 动态任务调度动态添加任务动态删除任务 4. Quartz 持久化配置…

ClickHouse与TiDB实操对比:从入门到实战的深度剖析

ClickHouse与TiDB实操对比&#xff1a;从入门到实战的深度剖析 宝子们&#xff0c;在当今数据驱动的时代&#xff0c;选择合适的数据库对于处理海量数据和支撑业务发展至关重要。ClickHouse和TiDB作为两款备受关注的数据库&#xff0c;各自有着独特的优势和适用场景。今天&…

element-ui messageBox 组件源码分享

messageBox 弹框组件源码分享&#xff0c;主要从以下两个方面&#xff1a; 1、messageBox 组件页面结构。 2、messageBox 组件属性。 一、组件页面结构。 二、组件属性。 2.1 title 标题&#xff0c;类型为 string&#xff0c;无默认值。 2.2 message 消息正文内容&#xf…