(处理 Kafka 消息积压) - 高吞吐 + 零丢失的阻塞队列实战方案

一、分布式日志消费场景与挑战

在分布式日志系统中,Kafka 通常作为消息队列中间件,负责从日志生产者接收日志,并将其分发给日志消费者进行处理。为了平衡 Kafka 消费速度与日志处理速度,BlockingQueue 常被用作缓冲区,连接 Kafka 消费线程和多线程日志处理器。

典型架构:
1、Kafka 消费线程
从 Kafka 中持续拉取日志,放入 BlockingQueue 中。
2、多线程日志处理器
从 BlockingQueue 中取出日志,进行解析、存储或其他业务逻辑处理。
3、缓冲区(BlockingQueue)
作为生产者(Kafka 消费线程)和消费者(日志处理线程)之间的桥梁,平衡两者的速度差异。

主要挑战:
1、高吞吐需求
需要设计高效的线程模型,最大化日志处理吞吐量。
2、消息积压问题
当日志处理速度跟不上 Kafka 消费速度时,BlockingQueue 可能被填满,导致 Kafka 消费线程阻塞,甚至丢失消息。

二、基于 BlockingQueue 的高吞吐消费者线程模型

在分布式日志消费中,实现高吞吐的关键在于多线程并发处理和合理的线程模型设计。以下是一个典型的高吞吐消费者线程模型。

1、 消费者线程模型设计
为了高效消费和处理日志,我们可以将任务分为以下两部分:

  • Kafka 消费线程

负责从 Kafka 持续拉取日志,并将其放入 BlockingQueue。

  • 日志处理线程池

从 BlockingQueue 中取出日志,执行并发处理。

代码示例:

// 定义阻塞队列,作为缓冲区  
BlockingQueue<String> logQueue = new LinkedBlockingQueue<>(10000);  // Kafka 消费线程  
Thread kafkaConsumerThread = new Thread(() -> {  while (true) {  try {  // 从 Kafka 拉取日志  String log = kafkaConsumer.poll(Duration.ofMillis(100));  if (log != null) {  logQueue.put(log); // 放入阻塞队列  }  } catch (InterruptedException e) {  Thread.currentThread().interrupt();  }  }  
});  // 日志处理线程池  
ExecutorService logProcessorPool = Executors.newFixedThreadPool(10);  
for (int i = 0; i < 10; i++) {  logProcessorPool.submit(() -> {  while (true) {  try {  // 从阻塞队列中取日志  String log = logQueue.take();  processLog(log); // 处理日志  } catch (InterruptedException e) {  Thread.currentThread().interrupt();  }  }  });  
}  // 启动 Kafka 消费线程  
kafkaConsumerThread.start();

2、设计要点分析
阻塞队列的大小:
队列大小需要根据系统的内存限制和吞吐量需求进行合理配置。过小的队列可能导致 Kafka 消费线程频繁阻塞,过大的队列则可能占用过多内存,影响系统性能。

线程池的大小:
日志处理线程池的线程数需要根据业务逻辑的复杂度和 CPU 核心数调整。一般情况下,线程数可以设置为 CPU 核心数的 2 倍(I/O 密集型任务)或相等(CPU 密集型任务)。

Kafka 消费速率:
使用 Kafka 的 poll 方法可以批量拉取日志,适当调整批量大小可以提高消费效率。建议设置批量大小与队列容量相匹配,避免一次性拉取过多数据。

三、如何避免队列满了导致消息丢失?

在高并发场景下,如果日志处理速度跟不上 Kafka 消费速度,BlockingQueue 很可能被填满,导致 Kafka 消费线程阻塞,甚至引发消息丢失问题。以下是几种常见的解决方案:

1、流控机制:动态调整 Kafka 消费速率
流控机制的核心思想是根据队列的剩余容量动态调整消费速率,确保生产和消费的平衡。具体实现方法如下:

暂停 Kafka 消费线程
当队列接近满时,暂停 Kafka 消费线程;当队列有足够空间时,恢复消费。
实现示例:

Thread kafkaConsumerThread = new Thread(() -> {  while (true) {  try {  // 如果队列已满,暂停消费  if (logQueue.remainingCapacity() == 0) {  Thread.sleep(100); // 暂停 100ms  continue;  }  // 拉取日志并放入队列  String log = kafkaConsumer.poll(Duration.ofMillis(100));  if (log != null) {  logQueue.put(log);  }  } catch (InterruptedException e) {  Thread.currentThread().interrupt();  }  }  
});

动态调整批量大小
根据队列的剩余容量,动态调整 Kafka 拉取日志的批量大小,避免一次性拉取过多数据导致队列溢出。

2、自定义阻塞队列:持久化溢出日志
默认的 BlockingQueue 会在队列满时阻塞生产线程,但我们可以通过自定义队列,在队列满时将溢出的日志持久化到磁盘,避免数据丢失。

自定义队列实现示例:

class DiskBackedQueue extends LinkedBlockingQueue<String> {  private final File backupFile = new File("backup.log");  @Override  public boolean offer(String log) {  boolean success = super.offer(log);  if (!success) {  // 队列满时,将日志写入磁盘  try (FileWriter writer = new FileWriter(backupFile, true)) {  writer.write(log + System.lineSeparator());  } catch (IOException e) {  e.printStackTrace();  }  }  return success;  }  
}

通过这种方式,即使队列满了,日志也不会丢失,而是被安全地存储到磁盘中。

3、消息回写 Kafka:使用死信队列
当队列满时,可以将日志重新写入 Kafka 的另一个主题(通常称为“死信队列”),以便后续重新消费。

实现步骤:

1、当 BlockingQueue 满时,捕获 offer 方法的失败状态。
2、使用 Kafka Producer 将日志写入死信队列。
实现代码:

if (!logQueue.offer(log)) {  kafkaProducer.send(new ProducerRecord<>("dead_letter_topic", log)); // 写入死信队列  
}

这种方式可以保证即使队列溢出,日志也不会丢失,而是被转移到另一个 Kafka 主题中等待后续处理。

4、提升队列处理能力
如果队列溢出频繁发生,可以通过以下方式提升处理能力:

  • 增加日志处理线程数

扩展线程池规模,以提高日志的处理速度。

  • 优化日志处理逻辑

减少单条日志的处理耗时,例如使用批量处理或异步存储。

  • 多队列分流

根据日志的类型或来源,将日志分配到多个队列,每个队列独立消费。

四、总结与最佳实践

在分布式日志系统中,BlockingQueue 是实现高吞吐和缓冲的重要工具,但在高并发场景下,消息积压和队列溢出可能导致数据丢失。以下是本文总结的最佳实践:

1、高吞吐消费者线程模型:

  • 使用 Kafka 消费线程与日志处理线程池分工协作。
  • 根据吞吐量需求调整队列大小和线程池规模。

2、流控机制避免队列溢出:

  • 动态调整 Kafka 消费速率,确保生产与消费平衡。
  • 暂停或限制 Kafka 消费线程的拉取操作。

3、自定义队列或持久化机制:

  • 自定义队列将溢出日志存储到磁盘或回写 Kafka。
  • 使用死信队列保存无法及时处理的日志。

4、提升处理能力:

  • 增加线程池规模或优化日志处理逻辑。
  • 使用多队列分流,将日志按类型分配到不同的队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/68319.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Unity】unity3D 调用LoadSceneAsync 场景切换后比较暗 部门材质丢失

解决方法&#xff1a;两个场景使用同样灯光 现象 直接进入第二个场景是可以正常显示 调用LoadSceneAsync来切换后&#xff0c;第二个场景出现比较暗的情况 解决方法&#xff1a;两个场景使用同样灯光&#xff0c;在loading 的场景中加入灯光。 Light—Directional Light 如果…

红日-VulnStack靶场一

http://vulnstack.qiyuanxuetang.net/vuln/ 一、环境部署 win7(被攻击机/关火墙) web服务器 1张外网网卡(桥接192.168.1.105)&#xff0c;一张内网网卡192.168.52.143/255.255.255.0/192.168.52.2 DNS 192.168.52.138 winser2008 域控服务器 1张…

实现linux硬盘smart检测

一、下载交叉编译libatasmart库 下载链接&#xff1a;https://www.linuxfromscratch.org/blfs/view/svn/general/libatasmart.html libatasmart库编译依赖libudev库&#xff0c;交叉编译器前先准备依赖的libudev: 设置libudev的环境变量&#xff0c;并通过configure编译文件生…

蓝桥杯算法|基础笔记(1)

**时间复杂度** 一、概念理解 时间复杂度是用来衡量算法运行时间随输入规模增长而增长的量级。它主要关注的是当输入规模趋向于无穷大时&#xff0c;算法执行基本操作的次数的增长趋势&#xff0c;而不是精确的运行时间。 二、分析代码中的基本操作 确定关键操作 在一段代码…

Uniapp判断设备是安卓还是 iOS,并调用不同的方法

在 UniApp 中&#xff0c;可以通过 uni.getSystemInfoSync() 方法来获取设备信息&#xff0c;然后根据系统类型判断当前设备是安卓还是 iOS&#xff0c;并调用不同的方法。 示例代码 export default {onLoad() {this.checkPlatform();},methods: {checkPlatform() {// 获取系…

K8S 节点选择器

今天我们来实验 pod 调度的 nodeName 与 nodeSelector。官网描述如下&#xff1a; 假设有如下三个节点的 K8S 集群&#xff1a; k8s31master 是控制节点 k8s31node1、k8s31node2 是工作节点 容器运行时是 containerd 一、镜像准备 1.1、镜像拉取 docker pull tomcat:8.5-jre8…

Multi-Agent如何设计

文章小结 研究背景和目的 在单一大语言模型长期主导人工智能领域的背景下&#xff0c;多智能体系统在对话任务解决中逐渐崭露头角。 虽然先前的研究已经展示了多智能体系统在推理任务和创造性工作中的潜力&#xff0c;但对于其在对话范式方面的局限性以及单个智能体的影响&am…

Web端实时播放RTSP视频流(监控)

一、安装ffmpeg: 1、官网下载FFmpeg: Download FFmpeg 2、点击Windows图标,选第一个:Windows builds from gyan.dev 3、跳转到下载页面: 4、下载后放到合适的位置,不用安装,解压即可: 5、配置path 复制解压后的\bin路径,配置环境变量如图: <

keepalived双机热备(LVS+keepalived)实验笔记

目录 前提准备&#xff1a; keepalived1&#xff1a; keepalived2&#xff1a; web1&#xff1a; web2&#xff1a; keepalived介绍 功能特点 工作原理 应用场景 前提准备&#xff1a; 准备4台centos&#xff0c;其中两台为keepalived&#xff0c;两台为webkeepalive…

CentOS 7 下 Nginx 的详细安装与配置

1、安装方式 1.1、通过编译方式安装 下载Nginx1.16.1的安装包 https://nginx.org/download/nginx-1.16.1.tar.gz 下载后上传至/home目录下。 1.2、通过yum方式安装 这种方式安装更简单。 2、通过编译源码包安装Nginx 2.1、安装必要依赖 sudo yum -y install gcc gcc-c sudo…

八股学习 Redis

八股学习 Redis 常见场景常见问题问题1、2示例场景缓存穿透解决方案一解决方案二 问题3示例场景缓存击穿解决方案 问题4示例场景缓存雪崩解决方案 问题5示例场景双写一致性强一致方案允许延时一致方案 问题6RDB方式AOF方式两种方式对比 问题7数据过期策略惰性删除定期删除 问题…

【全套】基于机器学习的印度森林火灾发生概率的分析与预测

【私信送源码文档】基于机器学习的印度森林火灾发生概率的分析与预测 对应的ppt 摘 要 随着全球气候变化的不断加剧&#xff0c;火灾的频发和规模逐渐增大&#xff0c;成为备受关注的问题。本文旨在提高对火灾发生概率的准确性&#xff0c;为火灾的预防和管理提供科学支持。在…

RabbitMQ中有哪几种交换机类型?

大家好&#xff0c;我是锋哥。今天分享关于【RabbitMQ中有哪几种交换机类型&#xff1f;】面试题。希望对大家有帮助&#xff1b; RabbitMQ中有哪几种交换机类型&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 在RabbitMQ中&#xff0c;交换机&#xf…

HTML拖拽功能(纯html5+JS实现)

1、HTML拖拽--单元行拖动 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><…

SpringMVC复习笔记

文章目录 SpringMVC 概念和基本使用SpringMVC 简介SpringMVC 核心组件和调用流程SpringMVC 基本使用第一步&#xff1a;导入依赖第二步&#xff1a;Controller 层开发第三步&#xff1a;SpringMVC 配置类配置核心组件第四步&#xff1a;SpringMVC 环境搭建第五步&#xff1a;部…

记录一次Android Studio的下载、安装、配置

目录 一、下载和安装 Android Studio 1、搜索下载Android studio ​2、下载成功后点击安装包进行安装&#xff1a; 3、这里不用打勾&#xff0c;直接点击安装 &#xff1a; 4、完成安装&#xff1a; 5、这里点击Cancel就可以了 6、接下来 7、点击自定义安装&#xff1a…

字节序 大端和小端

目录 什么是 大端存储和小端存储&#xff1f;为什么会有大小端转换问题如何检查自己电脑 是大端还是小端&#xff1f;大端小端处理函数使用位运算操作来手动转换大端和小端。使用标准库中的htonl和ntohl函数代码示例&#xff1a; 什么是 大端存储和小端存储&#xff1f; 大端模…

金融项目实战 03|JMeter脚本实现手工接口测试

目录 一、环境说明 1、项目环境搭建 2、Mock说明 二、构造测试数据 1、通过系统页面构造 2、通过接口构造 3、通过数据库构造【推荐】 4、案例&#xff1a;构造借款业务数据 三、JMeter执行接口测试用例 1、获取图片验证码、获取短信验证码 2、注册脚本 3、登录脚本…

【优先算法】滑动窗口--(结合例题讲解解题思路)(C++)

目录 1. 例题1&#xff1a;最大连续1的个数 1.1 解题思路 1.2代码实现 1.3 错误示范如下&#xff1a;我最开始写了一种&#xff0c;但是解答错误&#xff0c;请看&#xff0c;给大家做个参考 2. 将 x 减到 0 的最小操作数 2.1解题思路 2.2代码实现 1. 例题1&#xff…

JDK17语法新增

1.yield关键字: 2.var关键字&#xff1a; 3.密封类&#xff1a; ⼀般应⽤在类和接⼝中&#xff0c;对接⼝和类的实现和继承进⾏约束。主要使⽤的关键字是 final 。当这个类被 final 修饰了&#xff0c;被修饰的类就变成完全封闭的状态了&#xff0c;所有类都没办法继承。…