利用单线程池实现多线程并发顺序消费消息

1 背景

在某些场景下,我们需要保证消费消息的顺序性,可能要使用单线程处理任务。
这个在消息数量较少时,还是一个可行的方案,但在大量的数据消息情况下,单线程就显得力不从心了,所以这时候需要引入多线程。

2 方案

引入多线程又要保证顺序性,那么只能让同类事件路由到同一个线程中去执行。
这时一般需要使用哈希取余得到哈希槽位数组的下标。
为了提高可扩展可定制化,提供一个哈希函数的接口定义,供使用者实现自己的哈希算法。

    /*** 可哈希计算的任务*/public interface HashableTask extends Runnable {/*** 哈希码计算所需key** @return key值*/Object getKey();/*** 计算哈希码** <p>* 默认实现是使用{@link  java.util.HashMap HashMap}的哈希计算公式</br>* 建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(Object key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}

另外为提高取余计算的效率一般使用位计算and,这要求哈希槽位数是2的幂次方,即如下:

		//校验线程数是否是2的幂次方if (Integer.bitCount(threadCount) != 1) {throw new IllegalArgumentException("threadCount  must be a power of 2");}//计算哈希槽位数组下标int hashcode = task.hash(task.getKey());int index = hashcode & executors.length-1;

自己实现满足顺序消费的线程池,需要实现每个线程对应一个队列,需要维护阻塞队列、线程挂掉、启动新线程,还有优雅停机问题。这些实现都不是那么简单,我们其实可以换个角度思考问题,将线程池线程来使用。这咋理解呢?就是一个线程池只有一个线程,这就实现了所谓的线程池线程的平替。即每个线程池应该这种定义

 ExecutorService executor= new ThreadPoolExecutor(1, 1,1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(size), threadFactory);

下面附上完整代码。

public class PartitionThreadPool {private final ExecutorService[] executors;private final int indexMask;public PartitionThreadPool(int threadCount, int singleQueueCapacity, ThreadFactory threadFactory){if (Integer.bitCount(threadCount) != 1) {throw new IllegalArgumentException("threadCount  must be a power of 2");}indexMask = threadCount - 1;executors = new ExecutorService[threadCount];for (int i = 0; i < threadCount; i++) {executors[i] = new ThreadPoolExecutor(1, 1,1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(singleQueueCapacity), threadFactory);}}public Future<?> submit(HashableTask task){int hashcode = task.hash(task.getKey());int index = hashcode & indexMask;return executors[index].submit(task);}public void shutdown(){for (ExecutorService executor : executors) {executor.shutdownNow();}}/*** 可哈希计算的任务*/public interface HashableTask extends Runnable {/*** 哈希码计算所需key** @return key值*/Object getKey();/*** 计算哈希码** <p>* 默认实现是使用{@link  java.util.HashMap HashMap}的哈希计算公式</br>* 建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(Object key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}}

上面的代码还有些问题,你们发现了么?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/31001.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在4面体空间内2点结构占比

有一个4面体状空间&#xff0c;由3层甲烷状分子堆积而成&#xff0c;单个甲烷4面体边长10. 内有30个点&#xff0c;在30个点中取2点&#xff0c;有30*29/2435种取法。这里要求两个点的距离必须为6.123 在435个结构中只有40个符合要求 序数 结构 序数 结构 3 1 282 3 7…

利用 MyFlash 实现 MySQL 数据闪回

Github https://github.com/Meituan-Dianping/MyFlash MyFlash 限制 仅支持 5.6 与 5.7binlog 格式必须为 row&#xff0c;且 binlog_row_imagefull只能回滚DML&#xff08;增、删、改&#xff09; MySQL 准备 注&#xff1a; 本章 MySQL 是采用 Docker 部署&#xff0c;容器…

如何进行海外网络加速?告别卡顿与访问慢的方法

你是否经常在打开海外网站浏览网页时遇到响应缓慢的问题&#xff1f;或者在进行国际网络会议时&#xff0c;由于网络延迟影响与客户的交流&#xff1f;亦或是由于网络问题&#xff0c;导致OA、ERP、云储存等应用频繁因为数据包丢失而中断下载&#xff1f;如果你经常遇到这些问题…

centos7 离线安装zip和unzip

解压的时候发现不能解压&#xff0c;报-bash: unzip: command not found 1、访问https://www.rpmfind.net/linux/rpm2html/search.php?queryzip&submitSearch…&systemcentos&arch#/ 2、输入zip和centos搜索&#xff0c;选择el7下载 3、输入unzip和centos搜索&am…

显卡nvidia的CUDA和cuDNN的安装

显卡版本&#xff0c;和nvidia下载的 CUDA版本和CUDNN的关系 1. 显卡版本 nvidia-smi 硬件环境&#xff1a;显卡版本 4090 NVIDIA-SMI-555.85 我的驱动是510.85.02&#xff0c;驱动附带cuda12.5 2. nvidia下载的cuda版本 nvcc -V 我下载的是cuda12.5 cuda在安装版本过程…

【WPF编程宝典】第10讲:简单动画

1.基本动画 这些基本动画都应用与C#代码动画是WPF模型的核心部分&#xff0c;让动画动起来不需要使用计时器以及事件处理代码&#xff0c;可使用声明的方式创建动画。WPF动画第一条规则&#xff0c;每个动画都依赖于一个依赖项属性。WPF动画第二条规则&#xff0c;属性的动态化…

【Java 如何创建线程】

创建线程的方式一&#xff1a;继承Thread类 步骤&#xff1a; 定义一个类继承Thread类覆盖Thread类中的run&#xff08;&#xff09;方法直接创建Thread类的子类对象创建线程调用start方法开启线程并调用线程的任务run方法执行。 如果线程的目的是为了开启一条执行路径去运行…

互联网技术基础-计算机人必看

目录 1.Internet的工作原理 1、Internet是一个分组交换系统 2、路由器是Internet实现互连的“标准件” 3、TCP/IP是Internet的核心协议 4、客户机/服务器的工作模式 2. IP地址 2.1 IP地址分类 2.2特殊IP地址 2.3路由器和IP编制原则 2.4子网的划分 2.5 IPV6 3.域名系…

【因果推断python】46_估计量2

目录 连续型干预变量案例 非线性处理效果 关键思想 连续型干预变量案例 目标转换方法的另一个明显缺点是它仅适用于离散或二元处理。这是你在因果推理文献中经常看到的东西。大多数研究都是针对二元干预案例进行的&#xff0c;但您找不到很多关于连续干预的研究。这让我很困…

Bytebase 对接本地部署的 llama3 开启ChatSQL功能

Bytebase 是为开发人员、测试、DBA和运维工程师构建的数据库 DevOps 领域的&#xff0c;类 GitLab/GitHub 平台。 这篇文章主要关注 Bytebase SQL 编辑器中的 AI 增强功能。使用此功能您可以使用自然语言在 Bytebase SQL 编辑器中查询数据库。同时还能给出针对查询的索引建议&…

RK3568技术笔记十四 Ubuntu创建共享文件夹

单击“虚拟机”&#xff0c;单击“设置”&#xff0c;如图所示&#xff1a; 单击“选项”&#xff0c;选择“总是启用&#xff08;E&#xff09;”&#xff0c;单击“添加”&#xff0c;如图所示&#xff1a; 单击“下一步”&#xff0c;如图所示&#xff1a; 单击“浏览”添加…

Ranger配置图片及json文件预览

文章目录 前言下载apt下载pip下载 配置使用json文件预览方法一 修改scope用cat预览方法二 安装jq预览配置ranger 图片文件预览方法一 使用img2txt预览方法二 使用fim预览配置ranger 总结 前言 本文主要讲解Ranger12如何配置json及图片的预览设置&#xff0c;如下是ranger的介绍…

Spring AI 整合openAI的chatGpt

Spring AI支持ChatGPT&#xff0c;这是OpenAI的AI语言模型。ChatGPT在激发人们对人工智能驱动文本生成的兴趣方面发挥了重要作用。 SpringAi与Spring Boot 的整合详见上一篇文章&#xff1a; Spring AI 介绍以及与 Spring Boot 项目整合 下面分四个部分来分别说明和演示&#…

简单分享github

一、官网 GitHub: Let’s build from here GitHub 二、注册 通过简单的注册步骤&#xff0c;你就可以拥有一个属于自己的GitHub账号。再简单注册完成之后会需要验证你所输入的邮箱才能正常使用你的GitHub。 三、设置自己的库 在注册完成之后&#xff0c;完成一些简单的设置之…

Zookeeper 集群节点实现通信原理(一)

Zookeeper 集群节点实现通信原理(一) 多节点部署完启动时,为了选举发送自己节点选举的信息,是如何实现通信的,实现原理 在 Zookeeper 的多节点集群中,为了选举领导节点,每个节点需要互相通信以发送和接收选举信息。 这种通信是通过一种称为 Fast Leader Election 的算法…

简单且高效的水域物探轨迹坐标转换程序

简单且高效的水域物探轨迹坐标转换程序 前言 水上测线的高精度定位是水域物探的难题&#xff0c;水域磁法、水域地震实施时常采用船舶拖拽传感器进行走航式观测&#xff0c;GPS仪器放在船舶上测量&#xff0c;造成船舶位置与传感器位置存在偏差&#xff0c;后期资料整理需要校…

mvn dependency -D outputFile=dependency_tree.txt

命令解析 mvn dependency:tree&#xff1a;运行Maven的dependency:tree目标&#xff0c;生成项目的依赖树。 DoutputFiledependency_tree.txt&#xff1a;将生成的依赖树输出到dependency_tree.txt文件中&#xff0c;而且是每个独立模板输出自己的依赖。

互联网应用主流框架整合之Spring Boot基本概念

Spring Boot是用来简化Spring应用程序的搭建、开发、测试和部署过程的&#xff0c;该框架使用了特定的方式进行配置&#xff0c;从而使开发人员不再需要定义样板化的配置&#xff0c;SpringBoot致力于快速应用开发(Rapid Application Development)领域的发展&#xff0c;它通过…

RTSP/Onvif安防监控平台EasyNVR抓包命令tcpdump使用不了的解决方法

安防视频监控汇聚EasyNVR智能安防视频监控平台&#xff0c;是基于RTSP/Onvif协议的安防视频平台&#xff0c;可支持将接入的视频流进行全平台、全终端分发&#xff0c;分发的视频流包括RTSP、RTMP、HTTP-FLV、WS-FLV、HLS、WebRTC等格式。平台可提供的视频能力包括&#xff1a;…

vscode插件开发之 - TestController

TesController概要介绍 TestController 组件是用于实现自定义测试框架和集成测试结果的。它允许开发者定义自己的测试运行器&#xff0c;以支持在VSCode中运行和展示测试。以下是一些使用 TestController 组件的主要场景&#xff1a; 自定义测试框架&#xff1a;如果你正在开发…