Kafka系列之如何提高消费者消费速度

前言

在实际开发过程中,如果使用Kafka处理超大数据量(千万级、亿级)的场景,Kafka消费者的消费速度可能决定系统性能瓶颈。

实现方案

为了提高消费者的消费速度,我们可以采取以下措施:

  • 将主题的分区数量增大,如 20,通过concurrency将消费者的消费线程数增大到 10(2个pod),提高消息处理的并发能力。
  • 将每次批量拉取消息的数量max.poll.records增大到 500,提高单次处理消息的数量。
  • 将消息切分成批次,将单个批次的数据处理业务逻辑放进线程池中异步进行,提高并发处理消息的速度。
  • 将异步线程池的拒绝模式调整为 CallerRunsPolicy,这个配置非常重要。当线程池的任务队列已满且所有线程都在忙碌时,新的任务将由提交任务的线程(即调用者线程)来执行。否则在消息量特别大的情况下,很可能会因为线程池任务队列满了而丢失数据。
  • 将异步线程池的队列容量设置为 0,这样意味着所有任务必须立即由线程池中的线程来处理,减少在队列中的等待时间。
  • 在数据上报的时候进行幂等性验证,防止重复上报数据。
@Component
public class OrderConsumer {@Resource(name = "execThreadPool")private ThreadPoolTaskExecutor execThreadPool;@KafkaListener(id = "record_consumer",topics = "record",groupId = "g_record_consumer",concurrency = "10",properties = {"max.poll.interval.ms:300000", "max.poll.records:500"})public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) {execThreadPool.submit(()-> {// 业务逻辑});ack.acknowledge();}}

ThreadPoolTaskExecutor 是 Spring 框架提供的一个线程池实现,用于管理和执行多线程任务。它是 TaskExecutor 接口的实现,提供了在 Spring 应用程序中创建和配置线程池的便捷方式。

ThreadPoolTaskExecutor主要特点:

  • 线程池配置: ThreadPoolTaskExecutor 允许你配置核心线程数、最大线程数、队列容量等线程池属性。

  • 线程创建和销毁: 它会根据任务的需求自动创建和销毁线程,避免不必要的线程创建和销毁开销。

  • 线程复用: 线程池中的线程可以被复用,从而减少线程创建的开销。

  • 队列管理: 当线程池达到最大线程数时,新任务会被放入队列中等待执行。

  • 拒绝策略: 当线程池已满并且队列也已满时,可以配置拒绝策略来处理新任务的方式。
    RejectedExecutionHandler 是 Java 线程池的一个重要接口,用于定义当线程池已满并且无法接受新任务时,如何处理被拒绝的任务。当线程池的队列和线程都已满,新任务就会被拒绝执行,这时就会使用 RejectedExecutionHandler 来处理这些被拒绝的任务。
    在 Java 中,有几种内置的 RejectedExecutionHandler 实现可供选择,每种实现都有不同的拒绝策略:
    AbortPolicy(默认策略): 这是默认的拒绝策略,它会抛出一个 RejectedExecutionException 异常,表示任务被拒绝执行。
    CallerRunsPolicy: 当线程池已满时,将任务返回给提交任务的调用者(Caller)。这意味着提交任务的线程会尝试执行被拒绝的任务。
    DiscardPolicy: 这个策略会默默地丢弃被拒绝的任务,不会产生任何异常。
    DiscardOldestPolicy: 这个策略会丢弃队列中最老的任务,然后尝试将新任务添加到队列中。

    除了这些内置的策略,你还可以实现自定义的 RejectedExecutionHandler 接口,以定义特定于你应用程序需求的拒绝策略。你可以根据业务需求来决定拒绝策略,比如记录日志、通知管理员、重试等。

@Configuration
public class ThreadPoolConfig {@Beanprivate ThreadPoolTaskExecutor execThreadPool() {ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();pool.setCorePoolSize(50);  // 核心线程数pool.setMaxPoolSize(10000);  // 最大线程数pool.setQueueCapacity(0);  // 等待队列sizepool.setKeepAliveSeconds(60);  // 线程最大空闲存活时间pool.setWaitForTasksToCompleteOnShutdown(true);pool.setAwaitTerminationSeconds(60);  // 程序shutdown时最多等60秒钟让现存任务结束pool.setRejectedExecutionHandler(new CallerRunsPolicy());  // 拒绝策略return pool;}
}

通过以上方案,我们可以提高消费侧的TPS,同时杜绝重复上报的现象,极大提高数据准确性和用户体验。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/49879.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

A Comprehensive Study of Knowledge Editing for Large Language Models

大型语言模型&#xff08;LLMs&#xff09;在理解和生成与人类交流密切相关的文本方面表现出了非凡的能力。然而&#xff0c;一个主要的限制在于训练期间的大量计算需求&#xff0c;这是由于它们的广泛参数化而产生的。世界的动态性质进一步加剧了这一挑战&#xff0c;需要经常…

10 ES6的模板字符串

ES6模板字符串&#xff08;Template Literals&#xff09;是一种新的字符串表示方式&#xff0c;它提供了一种更为强大和灵活的方式来构建字符串。以下是ES6模板字符串的详细介绍&#xff1a; 基本语法 模板字符串使用反引号&#xff08;&#xff09;包围&#xff0c;而不是传…

Unity UGUI 之Text 控件

本文仅作学习笔记与交流&#xff0c;不作任何商业用途 本文包括但不限于unity官方手册&#xff0c;唐老狮&#xff0c;麦扣教程知识&#xff0c;引用会标记&#xff0c;如有不足还请斧正 1.Text是什么 UI里面写文本的&#xff08;注意是legacy Text&#xff0c;而不是TextmeshP…

leetcode-136. 只出现一次的数字

题目描述 给你一个 非空 整数数组 nums &#xff0c;除了某个元素只出现一次以外&#xff0c;其余每个元素均出现两次。找出那个只出现了一次的元素。 你必须设计并实现线性时间复杂度的算法来解决此问题&#xff0c;且该算法只使用常量额外空间 示例 1 &#xff1a; 输入&…

搞懂Java继承,这样写代码轻松又高效!

大家好&#xff0c;我是小欧&#xff01; 今天我们来聊聊Java中的一个重要概念——继承。初学者常常觉得继承复杂&#xff0c;但掌握了它之后&#xff0c;代码写起来会更轻松高效。接下来&#xff0c;我会用大白话、简单易懂的例子&#xff0c;帮你彻底搞懂Java继承。 什么是继…

前端网页打开PC端本地的应用程序实现方案

最近开发有一个需求&#xff0c;网页端有个入口需要跳转三维大屏&#xff0c;而这个大屏是一个exe应用程序。产品需要点击这个入口&#xff0c;并打开这个应用程序。这个就类似于百度网盘网页跳转到PC端应用程序中。 这里我们采用添加自定义协议的方式打开该应用程序。一开始可…

Spring 系列

SpringBoot 实体类&#xff08;Entity&#xff09;层 实体类&#xff08;Entity&#xff09;通常属于模型层&#xff08;Model Layer&#xff09;或领域层&#xff08;Domain Layer&#xff09;。它们代表应用程序中的核心业务数据结构&#xff0c;与数据库表结构紧密对应。在…

springboot项目从jdk8升级为jdk17过程记录

背景&#xff1a;公司有升级项目jdk的规划&#xff0c;计划从jdk8升级到jdk11 开始 首先配置本地的java_home 参考文档&#xff1a;Mac环境下切换JDK版本及不同的maven-CSDN博客 将pom.xml中jdk1.8相关的版本全部改为jdk17&#xff0c;主要是maven编译插件之类的&#xff0c…

mysql定时备份

为什么写这篇文章 最近项目里面需要定时备份mysql的数据&#xff0c;网上找了下&#xff0c;找到了一些比较好的解决方案。但是发现有几个地方与自己不匹配&#xff0c;我期望有如下 备份过程不能锁表&#xff0c;网上很多都是会锁表备份定时任务无法执行&#xff0c;但是手动…

【如何在Jenkins的从节点切换NPM镜像源查看和切换】

【问题】 Jenkins打包时&#xff0c;前端npm构建时很慢&#xff0c;所有需要更换镜像源 【自查】 找到Jenkins从节点上的nodejs安装的路径&#xff0c;进入bin目录 执行./npm -v查看是不能正常查看&#xff0c; [rootlocalhost bin]# ./npm -v /usr/bin/env: ‘node’: No su…

redis 基础命令

1.数据库命令 select 库名&#xff1b;切换库 flushdb 清空库 flushall 清空所有库 redis支持的数据类型有很多&#xff0c;使用最频繁的有String 字符串类型&#xff0c;List队列&#xff0c;Hash&#xff0c;Zset有序集合&#xff0c;Set集合。 2.字符串类型命令 表示k…

【Python机器学习】k-近邻算法简单实践——电影分类

k-近邻算法&#xff08;KNN&#xff09;的工作原理是&#xff1a;存在一个样本数据集合&#xff0c;也被称为训练样本集&#xff0c;并且样本集中每个数据都存在标签&#xff0c;即我们知道样本集中每一数据与所属分类的对应关系&#xff0c;输入没有标签的数据后&#xff0c;将…

解决Java模块系统下的InaccessibleObjectException

前言 随着Java平台的演进&#xff0c;模块系统&#xff08;Project Jigsaw&#xff09;的引入为Java生态系统带来了更高级别的封装性和安全性。然而&#xff0c;这一进步也带来了新的挑战&#xff0c;特别是在处理反射和依赖于内部类实现的场景中。本文旨在深入解析java.lang.…

如何在Linux上使用Ansible自动化部署

Ansible是一个开源的自动化工具&#xff0c;可以帮助开发人员和系统管理员对大规模的服务器进行自动化部署和管理。它使用SSH协议来在远程服务器上执行任务&#xff0c;并通过模块化的方式提供了丰富的功能&#xff0c;可以轻松地管理服务器配置、软件部署和应用程序运行。 在…

Flink之重启策略

目录 1、固定延迟重启策略 2、失败率重启策略 3、不重启策略 在设置完 CheckPoint() 检查点机制后&#xff0c;不设置重启策略的话&#xff0c;&#xff0c;可以无限重启程序&#xff0c;那么设置的检查点机制也就没有什么意义了。因此&#xff0c;在生产实践中&#xff0c;…

android手势监听

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、商业变现、人工智能等&#xff0c;希望大家多多支持。 未经允许不得转载 目录 一、导读二、概览三、使用四、 如何实…

昇思25天学习打卡营第17天 | CycleGAN图像风格迁移互换

通过深入学习CycleGAN模型&#xff0c;我对无监督图像到图像的转换技术有了更深的理解。CycleGAN不仅能在没有成对训练样本的情况下实现域之间的转换&#xff0c;而且在保持内容结构的同时成功转换图像风格&#xff0c;这在许多应用中都非常有用&#xff0c;如艺术风格转换、季…

面向RDF的三元组数据库

文章目录 开源RDF三元组数据库RDF4J开源RDF三元组数据库RDF-3X开源RDF三元组数据库gStore商业RDF三元组数据库Virtuoso商业RDF三元组数据库AllegroGraph商业RDF三元组数据库GraphDB商业RDF三元组数据库Blazegraph商业RDF三元组数据库Stardog由于RDF是W3C推荐的表示语义网上关联…

VAE、GAN与Transformer核心公式解析

VAE、GAN与Transformer核心公式解析 VAE、GAN与Transformer&#xff1a;三大深度学习模型的异同解析 【表格】VAE、GAN与Transformer的对比分析 序号对比维度VAE&#xff08;变分自编码器&#xff09;GAN&#xff08;生成对抗网络&#xff09;Transformer&#xff08;变换器&…

Python从0到100(四十三):数据库与Django ORM 精讲

前言&#xff1a; 零基础学Python&#xff1a;Python从0到100最新最全教程。 想做这件事情很久了&#xff0c;这次我更新了自己所写过的所有博客&#xff0c;汇集成了Python从0到100&#xff0c;共一百节课&#xff0c;帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…