kafka入门(三):kafka多线程消费

kafka消费积压

如果生产者发送消息的速度过快,或者是消费者处理消息的速度太慢,那么就会有越来越多的消息无法及时消费,也就是消费积压。

消费积压时,可以使用多线程消费,提高消费速度。

kafka多线程消费的代码:

public class ThirdMultiConsumerThreadDemo {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumerThread consumerThread = new KafkaConsumerThread(props, TOPIC,Runtime.getRuntime().availableProcessors());consumerThread.start();}/**** kafka配置* @return*/public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);return props;}/*** kafka消费者线程*/public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties props, String topic, int threadNumber) {kafkaConsumer = new KafkaConsumer<>(props);kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records =kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {log.error("run error", e);} finally {kafkaConsumer.close();}}}/*** 处理消息*/public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {//处理records.for (ConsumerRecord<String, String> record : records) {System.out.println("==========>record:"+record.value() + ",thread:" + Thread.currentThread().getName());}}}}

发送消息后,使用多线程消息,运行结果如下:

==========>record:{"id":"1234","name":"lin"},thread:pool-1-thread-1
==========>record:{"id":"5678","name":"chen"},thread:pool-1-thread-2
==========>record:{"id":"91011","name":"wu"},thread:pool-1-thread-3

参考资料:

《深入理解Kafka:核心设计与实践原理》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/200793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA连接Redis注意事项

禁用Linux防火墙 [atguiguhadoop102 ~]$ sudo systemctl stop firewalld [atguiguhadoop102 ~]$ sudo systemctl disable firewalld 修改redis.conf # 注释掉bind 127.0.0.1 # bind 127.0.0.1 # protected-mode默认为yes&#xff0c;修改为no protected-mode no logfile /opt/…

C# Demo--汉字转拼音

1.Nuget安装NPOI及Pinyin4net 2.Demo 代码部分 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using NPOI.SS.UserModel; using NPOI.HSSF.UserModel; using NPOI.XSSF.UserModel; using System.IO;…

基于ssm vue的社区互助平台源码和论文

摘 要 随着社区互助规模的不断扩大&#xff0c;社区互助数量的急剧增加&#xff0c;有关社区互助的各种信息量也在不断成倍增长。面对庞大的信息量&#xff0c;就需要有社区互助管理来提高社区互助管理工作的效率。通过这样的系统&#xff0c;我们可以做到信息的规范管理和快速…

​HTML代码混淆技术:原理、应用和实现方法详解

​HTML代码混淆技术&#xff1a;原理、应用和实现方法详解 HTML代码混淆是一种常用的反爬虫技术&#xff0c;它可以有效地防止爬虫对网站数据的抓取。本文将详细介绍HTML代码混淆技术的原理、应用以及实现方法&#xff0c;帮助大家更好地了解和运用这一技术。 一、HTML代码混淆…

Day51力扣打卡

打卡记录 Plus and Multiply&#xff08;模拟&#xff09; 链接 要满足 a x b ∗ y n a^x b * y n axb∗yn 的关系&#xff0c;可以枚举满足 b ∗ y n − a x b * y n - a ^ x b∗yn−ax 的可余条件。 t int(input()) for _ in range(t):n, a, b map(int, input().…

七、VMware虚拟机安装和docker容器部署项目

1、安装虚拟机详情 2、ping不通baidu.com 3、安装docker详情 4、docker安装mysql 5、docker安装redis 6、docker部署可执行jar包 7、练习&#xff1a;Docker部署若依前后端分离版【保姆级教程】

电脑上mp4视频文件无缩略图怎么办

前言&#xff1a;有时候电脑重装后电脑上的mp4视频文件无缩略图&#xff0c;视频文件数量比较多的时候查找比较麻烦 以下方法亲测有效&#xff1a; 1、下载MediaPreview软件 2、软件链接地址&#xff1a;https://pan.baidu.com/s/1bzVJpmcHyGxXNjnzltojtQ?pwdpma0 提取码&…

python笔记:dtaidistance

1 介绍 用于DTW的库纯Python实现和更快的C语言实现 2 DTW举例 2.1 绘制warping 路径 from dtaidistance import dtw from dtaidistance import dtw_visualisation as dtwvis import numpy as np import matplotlib.pyplot as plts1 np.array([0., 0, 1, 2, 1, 0, 1, 0, 0…

合并两个有序数组(leetcode_刷题1)

目录 题目&#xff1a;合并两个有序数组 题目分析方向1&#xff1a; 题目分析方向2&#xff1a; 题目&#xff1a;合并两个有序数组 题目要求&#xff1a; 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums…

MySQL二 | 函数

目录 函数 字符串函数 数值函数 日期函数 流程控制函数 聚合函数 函数 字符串函数 concat(s1,s2,...sn)#字符串拼接 lower(str) upper(str) lpad(str,n,pad)左填充,pad对str左边进行填充,达到n的长度 rpad(str,n,pad) trim(str)去除头部和尾部的空格 substring(s…

java之hashmap介绍

java之hashmap介绍 Java中的HashMap是一种基于哈希表实现的Map接口。它允许使用键值对&#xff08;key-value pairs&#xff09;存储和检索数据&#xff0c;并提供了高效的插入、查找和删除操作。 以下是关于Java HashMap的详细用法介绍&#xff1a; 1)创建HashMap对象&#…

深入理解 Spring Boot 的 ApplicationRunner 接口

介绍 在 Spring Boot 应用程序启动时&#xff0c;有时我们需要执行一些特定的任务。Spring Boot 提供了 ApplicationRunner 接口&#xff0c;允许我们在应用程序完全启动后执行自定义的逻辑。本文将深入介绍 ApplicationRunner 接口&#xff0c;以及如何通过它来实现应用程序启…

飞行汽车开发原理(上)

前言 小节的安排是由浅入深&#xff0c;要按顺序读&#xff1b;有电路知识基础的同学可跳到“计算机电路”一节开始。因为知识点之间有网状依赖&#xff0c;没办法按分类来讲。 为了避免过于深入、越讲越懵&#xff0c;很多描述仅为方便理解、不求严谨。 半导体特性 导体&a…

目标检测YOLO系列从入门到精通技术详解100篇-【图像处理】图像配准

目录 前言 算法原理 图像配准开源工具 用到的数据集 图像配准分类

毕业论文及各种办公文件word页码的设置大全

当我们在写论文或者报告的时候&#xff0c;经常需要我们给文档设置页码&#xff0c;用于页码统计&#xff0c;也方便后期的查阅和阅读&#xff0c;但是经常遇到特殊的要求或者情况&#xff0c;比如删除了某个页的页码&#xff0c;那么整个文档目录的页码就会全部被删除&#xf…

Excel——多列合并成一列的4种方法

Excel怎么将多列内容合并成一列&#xff1f; 怎么将多个单元格的内容连接起来放在一个单元格里&#xff1f; 比如下图&#xff0c;要将B、C、D列的内容&#xff0c;合并成E列那样&#xff0c;该怎么做呢&#xff1f; △图1 本文中&#xff0c;高潜老师将给大家介绍 4种 将多…

基于SpringBoot的仓库管理系统设计与实现附带源码和论文

博主24h在线&#xff0c;想要源码文档部署视频直接私聊&#xff0c;全网最低价&#xff0c;9.9拿走&#xff01; 【关键词】仓库管理系统&#xff0c;jsp编程技术&#xff0c;mysql数据库&#xff0c;SSM&#xff0c;Springboot 目 录 摘 要 Abstract 第1章 绪论 1.1 课题…

分布式搜索引擎(Elastic Search)+消息队列(RabbitMQ)部署

一、分布式搜索引擎&#xff1a;Elastic Search Elastic Search的目标就是实现搜索。是一款非常强大的开源搜索引擎&#xff0c;可以帮助我们从海量数据中快速找到需要的内容。在数据量少的时候&#xff0c;我们可以通过索引去搜索关系型数据库中的数据&#xff0c;但是如果数…

SQL int(1) 和 int (10)的区别

前言 最近遇到个问题&#xff0c;有个表的要加个user_id字段&#xff0c;user_id字段可能很大&#xff0c;于是我提mysql工单alter table xxx ADD user_id int(1)。领导看到我的sql工单&#xff0c;于是说&#xff1a;这int(1)怕是不够用吧&#xff0c;接下来是一通解释。 其…

python爬虫混肴DES案例:某影视大数据平台

声明&#xff1a; 该文章为学习使用&#xff0c;严禁用于商业用途和非法用途&#xff0c;违者后果自负&#xff0c;由此产生的一切后果均与作者无关 一、找出需要加密的参数 js运行atob(‘aHR0cHM6Ly93d3cuZW5kYXRhLmNvbS5jbi9Cb3hPZmZpY2UvQk8vTW9udGgvb25lTW9udGguaHRtbA’…