非阻塞重试与 Spring Kafka 的集成测试

        如何为启用重试和死信发布的消费者的 Spring Kafka 实现编写集成测试。

Kafka 非阻塞重试

Kafka 中的非阻塞重试是通过为主主题配置重试主题来完成的。如果需要,还可以配置其他死信主题。如果所有重试均已用尽,事件将转发至 DLT。公共领域提供了大量资源来了解技术细节。 

要测试什么?

在代码中为重试机制编写集成测试时,这可能是一项具有挑战性的工作。 

  • 如何测试该事件是否已重试所需的次数? 
  • 如何测试仅在发生某些异常时才执行重试,而对于其他异常则不执行重试?
  • 如果上次重试中异常已解决,如何测试是否未进行另一次重试?
  • 在(n-1)次重试尝试失败后,如何测试重试中的第n次尝试是否成功?
  • 当所有重试尝试都用完后,如何测试事件是否已发送到死信队列?

让我们看一些代码。您可以找到很多很好的文章,展示如何使用 Spring Kafka 设置非阻塞重试。下面给出了一种这样的实现。这是使用Spring-Kafka 的@RetryableTopic@DltHandler  注释来完成的。

设置可重试消费者

@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {private final CustomEventHandler handler;@RetryableTopic(attempts = "${retry.attempts}",backoff = @Backoff(delayExpression = "${retry.delay}",multiplierExpression = "${retry.delay.multiplier}"),topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,dltStrategy = FAIL_ON_ERROR,autoStartDltHandler = "true",autoCreateTopics = "false",include = {CustomRetryableException.class})@KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {try {log.info("Received event on topic {}", topic);handler.handleEvent(event);} catch (Exception e) {log.error("Error occurred while processing event", e);throw e;}}@DltHandlerpublic void listenOnDlt(@Payload CustomEvent event) {log.error("Received event on dlt.");handler.handleEventFromDlt(event);}}

如果您注意到上面的代码片段,include参数包含CustomRetryableException.class. 这告诉使用者仅在该方法抛出 CustomRetryableException 时才重试CustomEventHandler#handleEvent。您可以根据需要添加任意数量。还有一个排除参数,但一次可以使用其中任何一个参数。

${retry.attempts}在发布到 DLT 之前,事件处理应重试最多次数。

设置测试基础设施

要编写集成测试,您需要确保拥有一个正常运行的 Kafka 代理(首选嵌入式)和一个功能齐全的发布者。让我们设置我们的基础设施:

@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,brokerProperties = {"listeners=" + "${kafka.broker.listeners}", "port=" + "${kafka.broker.port}"},controlledShutdown = true,topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {@Autowiredprivate KafkaTemplate<String, CustomEvent> testKafkaTemplate;// tests}

** 配置是从 application-test.yml 文件导入的。

使用嵌入式 kafka 代理时,重要的是要提及要创建的主题。它们不会自动创建。在本例中,我们创建四个主题,即 

"test", "test-retry-0", "test-retry-1", "test-dlt"

我们已将最大重试尝试次数设置为 3 次。每个主题对应于每次重试尝试。因此,如果 3 次重试都用尽,则应将事件转发到 DLT。

测试用例

如果第一次尝试消费成功,则不应重试。

CustomEventHandler#handleEvent这可以通过该方法仅被调用一次的事实来测试。还可以添加对 Log 语句的进一步测试。

    @Testvoid test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {CustomEvent event = new CustomEvent("Hello");// GIVENdoNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send("test", event).get();// THENverify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}

如果引发不可重试的异常,则不应重试。

在这种情况下,该CustomEventHandler#handleEvent方法应该只调用一次:

    @Testvoid test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {CustomEvent event = new CustomEvent("Hello");// GIVENdoThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send("test", event).get();// THENverify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}

如果抛出 a,则重试配置的最大次数RetryableException,并在重试用完后将其发布到死信主题。

在这种情况下,该CustomEventHandler#handleEvent方法应被调用三次(maxRetries)次,并且CustomEventHandler#handleEventFromDlt该方法应被调用一次。

    @Testvoid test_should_retry_maximum_times_and_publish_to_dlt_if_retryable_exception_raised() throws ExecutionException, InterruptedException {CustomEvent event = new CustomEvent("Hello");// GIVENdoThrow(CustomRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send("test", event).get();// THENverify(customEventHandler, timeout(10000).times(maxRetries)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(1)).handleEventFromDlt(any(CustomEvent.class));}

**在验证阶段添加了相当大的超时,以便在测试完成之前可以考虑指数退避延迟。这很重要,如果设置不当可能会导致断言失败。

应该重试直到RetryableException解决,并且如果引发不可重试的异常或消费最终成功,则不应继续重试。

测试已设置为RetryableException先抛出 a 然后再抛出 a NonRetryable exception,以便重试一次。

    @Testvoid test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,InterruptedException {CustomEvent event = new CustomEvent("Hello");// GIVENdoThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send("test", event).get();// THENverify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}@Testvoid test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,InterruptedException {CustomEvent event = new CustomEvent("Hello");// GIVENdoThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));// WHENtestKafkaTemplate.send("test", event).get();// THENverify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));}

结论

因此,您可以看到集成测试是策略、超时、延迟和验证的混合和匹配,以确保 Kafka 事件驱动架构的重试机制万无一失。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/47673.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

go 指针

我们知道go中除了map 切片等类型都是进行值传递的&#xff0c;也就是copy一份&#xff0c;不会修改原有数据 普通指针 package mainimport "fmt"func main() {var a int 1var ip *intip &afmt.Printf("ip: %v\n", ip)fmt.Printf("ip: %v\n&qu…

数据库SQL语句使用

-- 查询所有数据库 show databases; -- 创建数据库&#xff0c;数据库名为mydatabase create database mydatabase; -- 如果没有名为 mydatabase的数据库则创建&#xff0c;有就不创建 create database if not exists mydatabase; -- 如果没有名为 mydatabase的数据库则创建…

ffmpeg 子进程从内存读取文件、提取图片到内存

除了网络、文件io&#xff0c;由python或java或go或c等语言开启的ffmpeg子进程还支持pipe&#xff0c;可以从stdin读入数据&#xff0c;输出转化后的图像到stdout。无需编译 ffmpeg&#xff0c;直接调用 ffmpeg.exe不香么&#xff01; “从内存读”可用于边下载边转码&#xf…

使用lodash的throttle函数会触发两次

当使用lodash的throttle函数时会触发两次&#xff0c;分别在最开始和最后。 严格来说不算是bug&#xff0c;因为官方文档写的很清楚。throttle函数其实有三个参数&#xff1a; _.throttle(func, [wait0], [options]) func: 要节流的函数 wait: 等待时间 options: 选项 op…

【Java面试题】线程创建的三种方式及区别?

三种线程创建方式 继承Thread类&#xff0c;子类重写run&#xff08;&#xff09;方法&#xff0c;调用子类的strat&#xff08;&#xff09;启动线程。实现Runnable接口&#xff0c;实现run&#xff08;&#xff09;方法&#xff0c;调用对象start&#xff08;&#xff09;启…

【回味“经典”】DFS基础训练(N皇后,装载问题)

这篇文章是一年前写的 走进“深度搜索基础训练“&#xff0c;踏入c算法殿堂&#xff08;一&#xff09;和 走进“深度搜索基础训练“&#xff0c;踏入c算法殿堂&#xff08;二&#xff09;的重编版。 希望以此&#xff0c;唤起对那位故人的回忆。 小航走入赛场&#xff0c;比赛…

常见的网络设备有哪些?分别有什么作用?

个人主页&#xff1a;insist--个人主页​​​​​​ 本文专栏&#xff1a;网络基础——带你走进网络世界 本专栏会持续更新网络基础知识&#xff0c;希望大家多多支持&#xff0c;让我们一起探索这个神奇而广阔的网络世界。 目录 一、网络设备的概述 二、常见的网络设备 1、…

eslintignore无效解决办法

项目的根目录下新建.eslintignore&#xff0c;但是无论怎么配置&#xff0c;该文件总是无法生效。本想解决不生效的问题&#xff0c;但是一直无法解决&#xff0c;于是换了一种解决问题的思路。 方法一&#xff1a; 在需要进行忽略的文件顶部加上 /* eslint-disable */这样e…

JVS低代码中表单引擎与逻辑引擎是如何联合调用外部API的?

在企业项目中&#xff0c;常常出现需要给外部系统提供一个api &#xff0c;让外部系统触发调用&#xff0c;本系统直接数据入库&#xff0c;那么我们来看看jvs的表单引擎与逻辑引擎联合实现这个功能&#xff0c;先看实现效果&#xff1a; 配置步骤&#xff1a; 一、配置列表页…

【机密计算实践】支持 Intel SGX 的 LibOS 项目介绍(一)

一、LibOS 库操作系统(Library Operating System,简称 LibOS)是根据某类应用的特殊需求,由某一高级编程语言将原本属于操作系统内核的某些资源管理功能,如文件磁盘 I/O、网络通信等,按照模块化的要求,以库的形式提供给应用程序的特殊操作系统。 它能代替操作系统内核合…

springboot(JavaCV )实现视频截取第N帧并保存图片

springboot&#xff08;JavaCV &#xff09;实现视频截取第N帧并保存图片 现在视频网站展示列表都是用img标签展示的&#xff0c;动图用的是gif&#xff0c;但是我们上传视频时并没有视屏封面&#xff0c;就这需要上传到服务器时自动生成封面并保存 本博客使用jar包的方式实现…

如何成功开展跨境电子商务?快速入门!

随着全球化的推进和互联网技术的发展&#xff0c;跨境电子商务已经成为许多企业追求新市场和实现增长的重要途径。然而&#xff0c;要在这个竞争激烈的领域中脱颖而出并取得成功并非易事。本文将介绍三个可行的策略&#xff0c;以帮助企业成功开展跨境电子商务。 第一策略&…

浙大数据结构第八周之08-图8 How Long Does It Take

前置知识&#xff1a; 拓扑排序&#xff1a; /* 邻接表存储 - 拓扑排序算法 */bool TopSort( LGraph Graph, Vertex TopOrder[] ) { /* 对Graph进行拓扑排序, TopOrder[]顺序存储排序后的顶点下标 */int Indegree[MaxVertexNum], cnt;Vertex V;PtrToAdjVNode W;Queue Q Cre…

网络面试题(172.22.141.231/26,该IP位于哪个网段? 该网段拥有多少可用IP地址?广播地址是多少?)

此题面试中常被问到&#xff0c;一定要会172.22.141.231/26&#xff0c;该IP位于哪个网段&#xff1f; 该网段拥有多少可用IP地址&#xff1f;广播地址是多少&#xff1f; 解题思路&#xff1a; 网络地址&#xff1a;172.22.141.192 10101100.00010110.10001101.11000000 广播…

【后端速成 Vue】第一个 Vue 程序

1、为什么要学习 Vue&#xff1f; 为什么使用 Vue? 回想之前&#xff0c;前后端交互的时候&#xff0c;前端收到后端响应的数据&#xff0c;接着将数据渲染到页面上&#xff0c;之前使用的是 JavaScript 或者 基于 JavaScript 的 Jquery&#xff0c;但是这两个用起来还是不太…

uni-app 打包生成签名Sha1

Android平台打包发布apk应用&#xff0c;需要使用数字证书&#xff08;.keystore文件&#xff09;进行签名&#xff0c;用于表明开发者身份。 可以使用JRE环境中的keytool命令生成。以下是windows平台生成证书的方法&#xff1a; 安装JRE环境&#xff08;推荐使用JRE8环境&am…

yolov8模型转onnx模型 和 tensorRT 模型

转onnx模型 在 安装好 pip install onnxruntime-gpu pip install onnx onnxconverter-common 出现 No module named cpuinfo 错误&#xff0c;通过安装&#xff1a; pip install py-cpuinfo 解决该问题。 import sys # 即 ultralytics文件夹 所在绝对路径 sys.path.app…

STM32 GPIO复习

GPIO General Purpose Input Output&#xff0c;即通用输入输出端口&#xff0c;简称GPIO。 负责采集外部器件的信息或控制外部器件工作&#xff0c;即输入输出。 不同型号&#xff0c;IO口数量可能不一样&#xff0c;可通过选型手册快速查询。 能快速翻转&#xff0c;每次翻…

Crimson:高性能,高扩展的新一代 Ceph OSD

背景 随着物理硬件的不断发展&#xff0c;存储软件所使用的硬件的情况也一直在不断变化。 一方面&#xff0c;内存和 IO 技术一直在快速发展&#xff0c;硬件的性能在极速增加。在最初设计 Ceph 的时候&#xff0c;通常情况下&#xff0c;Ceph 都是被部署到机械硬盘上&#x…