详解kafka消息发送重试机制的案例

在 Kafka 生产者中实现消息发送的重试机制,可以通过配置 KafkaProducer 的相关属性来实现。以下是一些关键的配置项:

retries:设置生产者发送失败后重试的次数。

retry.backoff.ms:设置生产者在重试前等待的时间。

buffer.memory:设置生产者在内存中缓存数据的最大值,如果达到这个值,生产者会拒绝接受新的消息,直到当前缓存的消息被发送出去。

batch.size:设置生产者在发送批次中可以包含的最大消息数。

linger.ms:设置生产者在发送批次之前等待更多消息的最大时间。

max.in.flight.requests.per.connection:设置每个连接最多数未完成的请求

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerDemo {public static void main(String[] args) {// 配置生产者属性Properties props = new Properties();props.put("bootstrap.servers", "4.5.8.4:9092");props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());props.put("retries", 5); // 设置重试次数props.put("retry.backoff.ms", 100); // 设置重试间隔props.put("buffer.memory", 33554432); // 设置缓冲区大小props.put("batch.size", 16384); // 设置批次大小props.put("linger.ms", 1); // 设置等待时间props.put("max.in.flight.requests.per.connection", 5); // 设置最大在途请求数// 创建生产者实例Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 1000000; i++) {String key = "案例1=====" + i;System.out.println("key:"+key);String value = "Spring AI Alibaba 实现了与阿里云通义模型的完整适配,接下来,我们将学习如何使用 spring ai alibaba 开发一个基于通义模型服务的智能聊天应用" + i;ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);producer.send(record, (metadata, exception) -> {if (exception != null) {// 处理消息发送失败的情况System.err.println("发送消息失败:" + exception.getMessage());} else {// 处理消息发送成功的情况System.out.println("消息发送成功,偏移量:" + metadata.offset());}});}// 关闭生产者producer.close();}
}

在这个示例中,我们设置了重试次数、重试间隔、缓冲区大小、批次大小、等待时间和最大在途请求数。此外,我们还为 send 方法提供了一个回调函数,用于处理消息发送成功或失败的情况。这样,当消息发送失败时,生产者会自动重试,直到达到配置的重试次数。如果所有重试都失败,回调函数会收到异常通知,你可以在回调中实现进一步的错误处理逻辑。

🔍 如何配置Kafka生产者的重试策略?

其实上面也有说,再次总结下

要配置 Kafka 生产者的重试策略,你可以按照以下步骤进行:

  1. 设置重试次数

    • 通过设置 retries 属性来指定生产者在遇到错误时重试发送消息的次数。例如,设置 retries 为 3 表示生产者会尝试最多 3 次发送消息。
  2. 设置重试间隔

    • 使用 retry.backoff.ms 属性来配置重试之间的时间间隔。这个设置可以防止生产者在连续的短时间内发送大量重试请求,给 Kafka 集群或网络造成压力。
  3. 确保消息幂等性

    • 设置 enable.idempotencetrue 以确保生产者发送消息的逻辑是幂等的,即使消息被重复发送也不会影响系统状态。
  4. 配置确认策略

    • 通过 acks 属性来确保消息被所有副本确认。例如,设置 acks 为 “all” 可以确保消息被所有副本确认后才认为是成功发送。
  5. 异步发送与回调

    • 使用异步发送消息,并在回调中处理发送失败的情况。在回调中对异常进行分类处理,对于可恢复的错误进行重试,对于不可恢复的错误进行日志记录或报警。
  6. 错误处理与日志记录

    • 在回调函数中捕获并处理异常,同时记录详细的错误日志,便于问题排查和监控。
  7. 监控与告警

    • 对生产者的关键性能指标进行监控,如发送延迟、吞吐量等。当指标出现异常时,及时触发告警通知相关人员处理。
  8. 合理配置重试机制

    • 根据业务需求合理配置重试次数和重试间隔,以减少因网络波动或 Kafka 集群短暂不可用导致的消息丢失风险。
  9. 设置最大在途请求

    • 通过 max.in.flight.requests.per.connection 属性限制每个连接最多数未完成的请求,这有助于控制内存使用和重试的并发量。
  10. 配置超时时间

    • Kafka 2.4 版本引入了 delivery.timeout.ms 参数,它设置了发送记录和接收确认之间的超时时间。这个参数与 retries 结合使用,可以提供更灵活的重试控制。

通过上述配置,你可以为 Kafka 生产者设置一个健壮的重试策略,以确保在面对网络问题或 Kafka 集群短暂不可用时,消息能够被可靠地发送。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886035.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

51单片机应用开发(进阶)---定时器应用(电子时钟)

实现目标 1、巩固定时器的配置流程&#xff1b; 2、掌握按键、数码管与定时器配合使用&#xff1b; 3、功能1&#xff1a;&#xff08;1&#xff09;简单显示时间。显示格式&#xff1a;88-88-88&#xff08;时-分-秒&#xff09; 4、功能2&#xff1a;&#xff08;1&#…

FPGA实现PCIE采集电脑端视频转SFP光口万兆UDP输出,基于XDMA+GTX架构,提供2套工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐我已有的PCIE方案10G Ethernet Subsystem实现万兆以太网物理层方案 3、PCIE基础知识扫描4、工程详细设计方案工程设计原理框图电脑端视频PCIE视频采集QT上位机XDMA配置及使用XDMA中断模块FDMA图像缓存UDP视频组包发送UDP协议栈MAC…

使用 unicorn 和 capstone 库来模拟 ARM Thumb 指令的执行(一)

import binascii import unicorn import capstonedef printArm32Regs(mu):for i in range(66,78):print("R%d,value:%x"%(i-66,mu.reg_read(i)))def testhumb():CODE b\x1C\x00\x0A\x46\x1E\x00"""MOV R3, R0 的机器码&#xff1a;0x1C 0x00&#xf…

git重置的四种类型(Git Reset)

git区域概念 1.工作区:IDEA中红色显示文件为工作区中的文件 (还未使用git add命令加入暂存区) 2.暂存区:IDEA中绿色(本次还未提交的新增的文件显示为绿色)或者蓝色(本次修改的之前版本提交的文件但本次还未提交的文件显示为蓝色)显示的文件为暂存区中的文件&#xff08;使用了…

第三十一天|贪心算法| 56. 合并区间,738.单调递增的数字 , 968.监控二叉树

目录 56. 合并区间 方法1&#xff1a;fff 看方法2&#xff1a;fff优化版 方法3&#xff1a; 738.单调递增的数字 968.监控二叉树&#xff08;贪心二叉树&#xff09; 56. 合并区间 判断重叠区间问题&#xff0c;与452和435是一个套路 方法1&#xff1a;fff 看方法2&am…

LeetCode 热题100(八)【二叉树】(3)

目录 8.11二叉树展开为链表&#xff08;中等&#xff09; 8.12从前序与中序遍历序列构造二叉树&#xff08;中等&#xff09; 8.13路径总和III&#xff08;中等&#xff09; 8.14二叉树的最近公共祖先&#xff08;中等&#xff09; 8.15二叉树中的最大路径和&#xff08;困…

AutoSAR CP DoIP规范导读

主要功能和用途 诊断通信协议实现 遵循标准&#xff1a;遵循ISO 13400 - 2标准&#xff0c;实现了诊断通信在IP网络上的传输协议和网络层服务&#xff0c;包括数据封装、传输、路由等功能。 多种消息支持 车辆识别与公告&#xff1a;能够进行车辆识别请求和响应&#xff0c;…

Simulink中Matlab function使用全局变量

目录 一. 引言二. 普通Matlab function使用全局变量三. Simulink中的Matlab function使用全局变量四. 如何利用Matlab function的全局变量施加随机噪声 一. 引言 最近发现了之前仿真中的一个问题&#xff0c;记录一下备忘。 Matlab function中有时候需要用到全局变量&#xf…

屏幕缩放后截屏图片尺寸数字偏大导致前端DOM尺寸设置失真问题

如果显示器的尺寸缩放&#xff0c;而不是100%的话&#xff0c;利用截屏软件截取屏幕中的区域&#xff0c;截取时读取到的区域尺寸&#xff0c;就会失真&#xff1b;如果使用这个尺寸去设置网页中的DOM&#xff0c;则Dom的尺寸也会跟着失真。 比如&#xff0c; 如果使用失真…

蓝桥杯每日真题 - 第7天

题目&#xff1a;&#xff08;爬山&#xff09; 题目描述&#xff08;X届 C&C B组X题&#xff09; 解题思路&#xff1a; 前缀和构造&#xff1a;为了高效地计算子数组的和&#xff0c;我们可以先构造前缀和数组 a&#xff0c;其中 a[i] 表示从第 1 个元素到第 i 个元素的…

给阿里云OSS绑定域名并启用SSL

为什么要这么做&#xff1f; 问题描述&#xff1a; 当用户通过 OSS 域名访问文件时&#xff0c;OSS 会在响应头中增加 Content-Disposition: attachment 和 x-oss-force-download: true&#xff0c;导致文件被强制下载而不是预览。这个问题特别影响在 2022/10/09 之后新开通 OS…

电脑浏览器打不开网页怎么办 浏览器无法访问网页解决方法

我们在使用电脑的时候&#xff0c;使用浏览器是经常的&#xff0c;很多用户在点开浏览器时&#xff0c;却遇到浏览器无法访问网页的情况。那么电脑浏览器打不开网页是什么原因呢&#xff1f;今天小编就给大家分享几个常见的原因和具体的解决方法&#xff0c;希望能对大家有所帮…

(干货)Jenkins使用kubernetes插件连接k8s的认证方式

#Kubernetes插件简介 Kubernetes 插件的目的是能够使用 Kubernetes 配合&#xff0c;实现动态配置 Jenkins 代理&#xff08;使用 Kubernetes 调度机制来优化负载&#xff09;&#xff0c;在执行 Jenkins Job 构建时&#xff0c;Jenkins Master 会在 kubernetes 中创建一个 Sla…

C语言 | Leetcode C语言题解之第556题下一个更大元素III

题目&#xff1a; 题解&#xff1a; int nextGreaterElement(int n){int x n, cnt 1;for (; x > 10 && x / 10 % 10 > x % 10; x / 10) {cnt;}x / 10;if (x 0) {return -1;}int targetDigit x % 10;int x2 n, cnt2 0;for (; x2 % 10 < targetDigit; x2…

TDesign了解及使用

文章目录 1、概述2、快速开始2.1使用 npm 安装2.2通过 浏览器引入 安装2.3、使用 3、简单案例3.1 路由创建3.2、 页面创建3.3、 Table组件3.4、序号展示3.5、 图片展示及预览3.6、 性别字段处理 1、概述 TDesign 是腾讯推出的设计系统&#xff0c;旨在提供一致的设计语言和视觉…

计算机网络(11)和流量控制补充

这一篇对数据链路层中的和流量控制进行详细学习 流量控制&#xff08;Flow Control&#xff09;是计算机网络中确保数据流平稳传输的技术&#xff0c;旨在防止数据发送方发送过多数据&#xff0c;导致接收方的缓冲区溢出&#xff0c;进而造成数据丢失或传输失败。流量控制通常…

可扩展架构与分层架构

可扩展架构 1 概述 软件系统与硬件/建筑系统最大的区别就是可以迭代升级和扩展&#xff0c;一个硬件生产出来后就不会进行改变&#xff0c;除非拿去售后维修&#xff0c;一个建筑完工后也不会改变其整体的结构&#xff0c;除非被破坏后进行修复和重铸 可以发现如果硬件/建筑不…

MyBatis从入门到进阶

目录 MyBatis入门1、创建项目、数据准备2、数据库配置3、编写持久层代码单元测试打印日志 基本操作查询数据插入数据删除数据更新数据 MyBatis - xml插入数据更新数据删除数据查询数据#{}与${}SQL注入排序like查询 MyBatis进阶if标签trim标签where标签set标签foreach标签sql标签…

TensorFlow 2.0 环境配置

官方文档&#xff1a;CUDA Installation Guide for Windows 官方文档有坑&#xff0c;windows的安装指南直接复制了linux的指南内容&#xff1a;忽略这些离谱的信息即可。 可以从官方文档知悉&#xff0c;cuda依赖特定版本的C编译器。但是我懒得为了一个编译器就下载整个visua…

浅谈:基于三维场景的视频融合方法

视频融合技术的出现可以追溯到 1996 年 , Paul Debevec等 提出了与视点相关的纹理混合方法 。 也就是说 &#xff0c; 现实的漫游效果不是从摄像机的角度来看 &#xff0c; 但其仍然存在很多困难 。基于三维场景的视频融合 &#xff0c; 因其直观等特效在视频监控等相关领域有着…