绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务

disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler.
EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHandler1eventHandler2,这两个处理器都会去这4个事件,也就是每个事件会被处理两次,每个处理器处理一次;而如果有workHandlerworkHandler2这两个处理器,那么每个任务只会被其中一个处理器处理,这两处理器的处理时间总数加起来是这4个事件。
了解了这些,就可以能理解使用EventHandler为啥能实现多线程顺序处理了。
可以给事件定义一个hash函数,根据哈希取余的槽位下标和当前处理器的下标比较,判断出此事件是否应被当前的处理器处理,不该其处理的事件直接pass,避免重复消费

定义的哈希函数接口和统一的基础父类

public interface Hashed<K> {/*** 哈希码计算所需key** @return key值*/K getKey();/*** 计算哈希码** <p>*     默认实现是使用{@link  java.util.HashMap HashMap}的哈希计算公式</br>*     建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(K key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}public abstract class PartitionEventHandler<E extends Hashed<K>, K> implements EventHandler<E>, Cloneable {protected final Logger log = LoggerFactory.getLogger(getClass());private final static int NO_INIT_VALUE = -1;private int index;private int indexMask;public final int getIndex() {return index;}public final int getIndexMask() {return indexMask;}@Overridepublic final void onEvent(E event, long sequence, boolean endOfBatch) throws Exception {if (index == NO_INIT_VALUE || indexMask == NO_INIT_VALUE) {throw new IllegalStateException("this should be inited");}int position = event.hash(event.getKey()) & indexMask;if (index != position) {if (log.isTraceEnabled()) {log.trace("The event with key [{}] should be handled on slot [{}],but current slot is [{}], it will be ignored.",event.getKey(), position, index);}return;}if (log.isDebugEnabled()) {log.debug("The event with key [{}] is being handled on slot [{}].", event.getKey(), position);}doOnEvent(event, sequence, endOfBatch);}protected abstract void doOnEvent(E event, long sequence, boolean endOfBatch) throws Exception;@SafeVarargspublic static <E extends Hashed<K>, K> PartitionEventHandler<E, K>[] initHandlers(final PartitionEventHandler<E, K>... handlers) {if (handlers == null) {throw new NullPointerException("handlers must not be null");}if (handlers.length == 0) {throw new IllegalArgumentException("handlers length must be more than zero");}if (Integer.bitCount(handlers.length) != 1) {throw new IllegalArgumentException("handlers count must be a power of 2");}int indexMask = handlers.length - 1;for (int i = 0; i < handlers.length; i++) {PartitionEventHandler<E, K> handler = handlers[i];handler.indexMask = indexMask;handler.index = i;}return handlers;}@Overridepublic PartitionEventHandler<E, K> clone() {try {@SuppressWarnings("unchecked")PartitionEventHandler<E, K> handler = (PartitionEventHandler<E, K>) super.clone();handler.index = NO_INIT_VALUE;handler.indexMask = NO_INIT_VALUE;return handler;} catch (CloneNotSupportedException e) {//never occurthrow new RuntimeException("can not clone " + getClass(), e);}}public PartitionEventHandler<E, K>[] clones(int count) {if (count < 0) {throw new IllegalArgumentException("count must be more then zero");}@SuppressWarnings("unchecked")PartitionEventHandler<E, K>[] result = new PartitionEventHandler[count];for (int i = 0; i < count; i++) {result[i] = clone();}return result;}
}

测试代码

public class DisruptorTest05 {@Testpublic void test1() {CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("event-handler-");threadFactory.setDaemon(false);Disruptor<ChatGroupEvent> disruptor = new Disruptor<>(new ChatGroupEventFactory(), 1024, threadFactory);ChatGroupEventHandler handler = new ChatGroupEventHandler();PartitionEventHandler<ChatGroupEvent, String>[] handlers = handler.clones(8);disruptor.handleEventsWith(PartitionEventHandler.initHandlers(handlers));disruptor.start();RingBuffer<ChatGroupEvent> ringBuffer = disruptor.getRingBuffer();Random rdm = new Random();long start = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {long seq = ringBuffer.next();ChatGroupEvent chatGroupEvent = ringBuffer.get(seq);int rdmInt = rdm.nextInt(12);chatGroupEvent.setGroupId(String.format("groupId-%02d", rdmInt));chatGroupEvent.setGroupOwner(String.format("owner-%03d", i));String type = i % 2 == 0 ? "add" : "exit";chatGroupEvent.setChangeType(type);ringBuffer.publish(seq);}disruptor.shutdown();long spent = System.currentTimeMillis() - start;System.out.println("total time " + spent);System.out.println();}static class ChatGroupEventFactory implements EventFactory<ChatGroupEvent> {@Overridepublic ChatGroupEvent newInstance() {return new ChatGroupEvent();}}static class ChatGroupEventHandler extends PartitionEventHandler<ChatGroupEvent, String> {@Overrideprotected void doOnEvent(ChatGroupEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(50);log.info("event key={},owner={},changeType={}, index={}", event.getKey(), event.getGroupOwner(), event.getChangeType(), getIndex());}}static class ChatGroupEvent implements Hashed<String> {private volatile  int hashcode=-1;private String groupId;private String changeType;private String groupOwner;@Overridepublic String getKey() {return groupId;}@Overridepublic int hash(String key) {if (hashcode == -1) {synchronized (this){if (hashcode == -1) {int h;hashcode = (h = key.hashCode()) ^ (h >>> 16);return hashcode;}}}return hashcode;}public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getChangeType() {return changeType;}public void setChangeType(String changeType) {this.changeType = changeType;}public String getGroupOwner() {return groupOwner;}public void setGroupOwner(String groupOwner) {this.groupOwner = groupOwner;}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/31160.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

手机携号转网接口查询与对接指南:技术开发者必备手册

在当今通信技术飞速发展的背景下&#xff0c;手机携号转网已成为推动电信市场竞争、保障用户权益的重要手段。而对于技术开发者而言&#xff0c;掌握手机携号转网接口的查询与对接方法&#xff0c;无疑是提升服务兼容性和用户满意度的关键。 比如如下应用场景&#xff1a; 1.…

算法训练营day65-岛屿连同问题

深搜-岛屿&#xff1a;99. 岛屿数量 (kamacoder.com) // 深度搜索 dfs #include<bits/stdc.h>using namespace std;int dir[4][2] {0, 1, 1, 0, -1, 0, 0, -1};void dfs(vector<vector<int>>& map, vector<vector<bool>>& findednode,…

常用加密算法之 SM4 简介及应用

一、SM4 简介 SM4 是中国国家密码管理局提出的一种分组密码算法&#xff0c;也称为 SMS4。它属于对称加密算法&#xff0c;分组长度为 128 比特&#xff0c;密钥长度也为 128 比特。SM4 算法采用了与 AES 类似的轮函数结构&#xff0c;但具体的 S 盒和线性变换与 AES 不同&…

项目八 OpenStack存储管理

任务一 理解OpenStack块存储服务 1.1 •Cinder的主要功能 • 提供 持久性块存储资源&#xff0c;供 Nova 计算服务的虚拟机实例使用 。 • 为 管理块存储设备提供一套方法&#xff0c;对卷实现从创建到删除的整个生命周期 管理。 • 将 不同的后端存储进行封装&#xff0c;对外…

Zynq学习笔记--了解中断配置方式

目录 1. 简介 2. 工程与代码解析 2.1 Vivado 工程 2.2 Vitis 裸机代码 2.3 关键代码解析 3. 总结 1. 简介 Zynq 中的中断可以分为以下几种类型&#xff1a; 软件中断&#xff08;Software Generated Interrupt, SGI&#xff09;&#xff1a;由软件触发&#xff0c;通常…

NXP RT1060学习总结 - fsl_flexcan CAN FD 函数说明 -2

概要 CAN测试源码&#xff1a; https://download.csdn.net/download/qq_35671135/89425377 根据fsl_flexcan.h文件从文件末尾往前面梳理&#xff0c;总共CAN FD处理函数&#xff1b; 使用的是RT1064开发板进行测试。 11、设置FlexCAN FD帧的比特率 函 数 &#xff1a;statu…

C++数据格式化6 - uint转换成二六进制字符串

1. 关键词2. strfmt.h3. strfmt.cpp4. 测试代码5. 运行结果6. 源码地址 1. 关键词 C 数据格式化 字符串处理 std::string int bin 跨平台 2. strfmt.h #pragma once#include <string> #include <cstdint> #include <sstream> #include <iomanip>na…

游戏心理学Day20

扩展的8种玩家 完成主义者 此类玩家关心的是成就和进展&#xff0c;其主要目的是完成游戏的主要目标&#xff0c;其次是完成游戏的次要目标之后才是游戏中的其他内容&#xff0c;在多人游戏中完成主义者会致力于炫耀自己的状态和财富。如果游戏以胜负为目标&#xff0c;那么此…

Day 46 Redis缓存集群

Redis缓存集群 redis缓存服务 缓存数据库 缓存 ​ 读取数据 cpu ​ L1 L2 L3 L4 ​ 一级缓存 二级缓存 ​ cs context switch 上下文交换 free -m ​ buffer cache mysql服务器 ​ 缓存 表缓存 数据缓存 nginx ​ expire 1d ​ 304响应码 ​ 200 ​ 301 ​ 30…

RS485中继器的作用你还不知道?

RS485是一种串行通信协议&#xff0c;支持设备间长距离通信。RS485中继器则像“传声筒”&#xff0c;能放大衰减信号&#xff0c;延长通信距离&#xff0c;隔离噪声&#xff0c;扩展分支。在实际场景中&#xff0c;如工厂内&#xff0c;通过中继器可确保控制室与远距离机器间通…

嵌入式实验---实验三 定时器实验

一、实验目的 1、掌握STM32F103定时器程序设计流程&#xff1b; 2、熟悉STM32固件库的基本使用。 二、实验原理 1、使用SysTick定时方式控制LED闪烁&#xff1b; 2、使用通用定时器产生PWM脉冲&#xff0c;通过调整占空比实现两个目标&#xff1a; &#xff08;1&#xf…

前端导出excel xlsx 代码复制即用

确保安装了最新版本的 xlsx 库&#xff1a; npm install xlsx 2.在需要使用的文件中进行命名导入&#xff08;Named Import&#xff09;&#xff1a; import { utils, writeFile } from xlsx; 3.使用 utils 和 writeFile 替代默认导入的 XLSX 对象&#xff1a; const data [[…

RPM命令和YUM命令

目录 一、RPM软件包 1.1、RPM概述 1.2、查询已安装的rpm软件信息 1.3、查询未安装的 RPM 软件包文件中信息 1.4、安装、升级、卸载 RPM 软件包 二、YUM常规命令 三、手动配置Apache&#xff08;http&#xff09;服务 3.1、前提条件 3.2、开始配置 3.3、开启验证服务 …

迁移Docker容器

将 Docker 容器从一台服务器迁移到另一台服务器&#xff0c;主要包括以下步骤&#xff1a;保存容器的镜像&#xff0c;导出数据卷&#xff0c;传输文件到新服务器&#xff0c;然后在新服务器上重新运行容器。以下是具体的步骤和相应的命令&#xff1a; 1. 保存容器的镜像 首先…

GitHub爆赞!终于有大佬把《Python学习手册》学习笔记分享出来了

这份笔记的目标是为了给出一份比较精炼&#xff0c;但是又要浅显易懂的Python教程。《Python学习手册》中文第四版虽然比较简单&#xff0c;但是措辞比较罗嗦&#xff0c;而且一个语法点往往散落在多个章节&#xff0c;不方便读者总结。 我在做笔记时&#xff0c;将一个知识点…

PFC 离散元数值模拟仿真技术与应用

近几年&#xff0c;随着计算能力的提高和算法的优化&#xff0c;离散元仿真技术得到了快速发展&#xff0c;并在学术界产生了大量研究成果。在 PFC 离散元计算中无需给定材料的宏观本构关系和对应的参数&#xff0c;这些传统的参数和力学特性在程序中可以自动得到。据调查&…

深入了解常用负载均衡软件

在构建高性能、高可用的分布式系统时&#xff0c;负载均衡技术扮演着至关重要的角色。它通过合理分发网络请求到后端服务器集群&#xff0c;从而有效提升系统吞吐量、减少响应延迟、并保障系统的稳定运行。本文将介绍几种常用的负载均衡软件&#xff0c;包括它们的优缺点、应用…

Gartner发布2024年人工智能技术成熟度曲线:29项决定人工智能领域发展方向的前沿和趋势性技术

人工智能投资已达到新高&#xff0c;重点是生成式人工智能&#xff0c;但在大多数情况下&#xff0c;该技术尚未实现预期的商业价值。这项研究通过分析各种人工智能创新&#xff08;其中许多创新正在快速发展&#xff09;&#xff0c;帮助人工智能领导者确定其他值得投资的技术…

VScode开发ARM环境搭建

1. vscode安装 直接访问官网: Visual Studio Code - Code Editing. Redefined 2. 安装插件 2.1. 安装Embedded IDE 2.2. 安装Cortex-debug 3. 工程初始化 3.1. 导入现有工程&#xff08;推荐&#xff09; 3.2. 或可创建新的工程 3.2.1. 选择Cortex-M项目 指定项目名称&…

Qemu虚拟机在线迁移到VMware

libvirt版本&#xff1a;libvirt-10.0.0qemu版本&#xff1a;qemu-8.2.0 在生产环境中&#xff0c;大多数的场景是 vmware 虚拟机迁移到 qemu 环境&#xff0c;一般是通过关机然后导出、导入磁盘镜像来实现。 如果要将 qemu 环境虚拟机迁移到 vmware 怎么办呢&#xff1f;要求…