Event Bus设计模式

    EventBus是消息中间件的设计思想,在此设计中有三个非常重要的角色(Bus、Registry、Dispatcher),Bus主要负责提供给外部使用的操作方法;Registry注册表用来整理记录所有注册在EventBus上的Subscriber;Dispatcher主要负责对Subscriber消息进行推送(用反射的方式执行方法),考虑到程序的灵活性,Dispatcher方法也提供了Executor的多态方式。

示例代码如下:

public interface Bus {
void register(Object subscriber);
void unregister(Object subscriber);
void post(Object event);
void post(Object event,String topic);
void close();
String getBusName();
}
import java.lang.reflect.Method;public interface EventContext {
String getSource();
Object getSubscriber();
Method getSubscribe();
Object getEvent();
}
public interface EventExceptionHandler {
void handle(Throwable cause,EventContext context);
}
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;import java.lang.annotation.Retention;
import java.lang.annotation.Target;@Retention(RUNTIME)
@Target(METHOD)
public @interface Subscribe {
String topic() default "default-topic";
}
public class Subscriber {
private final Object subscribeObject;
private final Object subscribeMethod;
private boolean disable=false;public Subscriber(Object subscribeObject,Object subscribeMethod) {
this.subscribeObject=subscribeObject;
this.subscribeMethod=subscribeMethod;
}public Object getSubscribeObject() {
return subscribeObject;
}public Object getSubscribeMethod() {
return subscribeMethod;
}public boolean isDisable() {
return disable;
}public void setDisable(boolean disable) {
this.disable = disable;
}
}
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;class Registry {
private final ConcurrentHashMap<String,ConcurrentLinkedQueue<Subscriber>> subscriberContainer=new ConcurrentHashMap<>();private List<Method> getSubscribeMethods(Object subscriber){
final List<Method> methods=new ArrayList<>();
Class<?> type=subscriber.getClass();
while(type!=null) {
Method[] declaredMethods=type.getDeclaredMethods();
Arrays.stream(declaredMethods).filter(m->m.isAnnotationPresent(Subscribe.class)
&&m.getParameterCount()==1
&&m.getModifiers()==Modifier.PUBLIC)
.forEach(methods::add);
type=type.getSuperclass();
}
return methods;
}private void tierSubscriber(Object subscriber,Method method) {
final Subscribe subscribe=method.getDeclaredAnnotation(Subscribe.class);
String topic=subscribe.topic();
this.subscriberContainer.computeIfAbsent(topic, key->new ConcurrentLinkedQueue());
this.subscriberContainer.get(topic).add(new Subscriber(subscriber,method));
}public void bind(Object subscriber) {
List<Method> subscribeMethods=getSubscribeMethods(subscriber);
subscribeMethods.forEach(m->tierSubscriber(subscriber,m));
}public void unbind(Object subscriber) {
this.subscriberContainer.forEach((key,queue)->queue.forEach(s->{
if(s.getSubscribeObject()==subscriber) {
s.setDisable(true);
}
}));
}public ConcurrentLinkedQueue<Subscriber> scanScriber(String topic) {
return this.subscriberContainer.get(topic);
}
}
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;public class Dispatcher {
private final Executor executorService;
private final EventExceptionHandler exceptionHandler;
public static final Executor SEQ_EXECUTOR_SERVICE=SeqExecutorService.INSTANCE;
public static final Executor PER_THREAD_EXECUTOR_SERVICE=PerThreadExecutorService.INSTANCE;private Dispatcher(Executor executorService,EventExceptionHandler handler) {
this.executorService=executorService;
this.exceptionHandler=handler;
}private void realInvokeSubscribe(Subscriber subscriber,Object event,Bus bus) {
Method method=(Method) subscriber.getSubscribeMethod();
Object object=subscriber.getSubscribeObject();
this.executorService.execute(()->{
try {
method.invoke(object, event);
} catch (Exception e) {
if(null!=exceptionHandler) {
exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(),subscriber,event));
}
e.printStackTrace();
}
});
}public void dispatch(Bus bus, Registry registry,Object event, String topic) {
ConcurrentLinkedQueue<Subscriber> subscribers=registry.scanScriber(topic);
if(null==subscribers) {
if(exceptionHandler!=null) {
exceptionHandler.handle(new IllegalArgumentException("The topic "+topic+" not bind yet"),
new BaseEventContext(bus.getBusName(),null,event));
}
return ;
}
subscribers.stream().filter(subscriber->!subscriber.isDisable())
.filter(subscriber->{
Method method=(Method) subscriber.getSubscribeMethod();
Class<?> aClass=method.getParameterTypes()[0];
return (aClass.isAssignableFrom(event.getClass()));
}).forEach(subscriber->this.realInvokeSubscribe(subscriber, event, bus));
}public void close() {
if(this.executorService instanceof ExecutorService) {
((ExecutorService)this.executorService).shutdown();
}
}static Dispatcher newDispatcher(EventExceptionHandler handler,Executor executor) {
return new Dispatcher(executor,handler);
}static Dispatcher seqDispatcher(EventExceptionHandler handler) {
return new Dispatcher(SEQ_EXECUTOR_SERVICE,handler);
}static Dispatcher perThreadDispatcher(EventExceptionHandler handler) {
return new Dispatcher(PER_THREAD_EXECUTOR_SERVICE,handler);
}private static class SeqExecutorService implements Executor{
private final static SeqExecutorService INSTANCE=new SeqExecutorService();
@Override
public void execute(Runnable command) {
command.run();
}
}private static class PerThreadExecutorService implements Executor{
private final static PerThreadExecutorService INSTANCE=new PerThreadExecutorService();
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}private static class BaseEventContext implements EventContext{
private final String eventBusName;
private final Subscriber subscriber;
private final Object event;
private BaseEventContext(String eventBusName,Subscriber subscriber,Object event) {
this.eventBusName=eventBusName;
this.subscriber=subscriber;
this.event=event;
}
@Override
public String getSource() {
return this.eventBusName;
}@Override
public Object getSubscriber() {
return this.subscriber!=null?subscriber.getSubscribeObject():null;
}@Override
public Method getSubscribe() {
return this.subscriber==null?(Method)subscriber.getSubscribeMethod():null;
}@Override
public Object getEvent() {
return this.event;
}
}
}
import java.util.concurrent.Executor;public class EventBus implements Bus{
private final Registry resitry=new Registry();
private String busName;
private static final String DEFAULT_BUS_NAME="default";
private static final String DEFAULT_TOPIC="default-topic";
private final Dispatcher dispatcher;public EventBus() {
this(DEFAULT_BUS_NAME,null,Dispatcher.SEQ_EXECUTOR_SERVICE);
}public EventBus(String busName) {
this(busName,null,Dispatcher.SEQ_EXECUTOR_SERVICE);
}EventBus(String busName,EventExceptionHandler handler,Executor executor){
this.busName=busName;
this.dispatcher=Dispatcher.newDispatcher(handler, executor);
}public EventBus(EventExceptionHandler exceptionHandler) {
this(DEFAULT_BUS_NAME,exceptionHandler,Dispatcher.SEQ_EXECUTOR_SERVICE);
}@Override
public void register(Object subscriber) {
this.resitry.bind(subscriber);
}@Override
public void unregister(Object subscriber) {
this.resitry.unbind(subscriber);
}@Override
public void post(Object event) {
this.post(event,DEFAULT_TOPIC);
}@Override
public void post(Object event, String topic) {
this.dispatcher.dispatch(this, resitry, event, topic);
}@Override
public void close() {
this.dispatcher.close();
}@Override
public String getBusName() {
return this.busName;
}}
import java.util.concurrent.ThreadPoolExecutor;public class AsyncEventBus extends EventBus{AsyncEventBus(String busName,EventExceptionHandler handler,ThreadPoolExecutor executor){
super(busName,handler,executor);
}public AsyncEventBus(String busName,ThreadPoolExecutor threadPoolExecutor) {
this(busName, null, threadPoolExecutor);
}public AsyncEventBus(ThreadPoolExecutor executor) {
this("default-async",null,executor);
}public AsyncEventBus(EventExceptionHandler handler,ThreadPoolExecutor executor) {
this("default-async",handler,executor);
}
}
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;public class SimpleScriberTest {@Subscribe
public void methodA(String message) {
System.out.println("==SimpleSubscriber==methodA=="+message);
}@Subscribe(topic="test")
public void methodB(String message) {
System.out.println("==SimpleSubscriber==methodB=="+message);
}public static void main(String[] args) {
//Bus bus=new EventBus("TestBus");
Bus bus=new AsyncEventBus("TestBus",(ThreadPoolExecutor)Executors.newFixedThreadPool(10));
bus.register(new SimpleScriberTest());
bus.post("Hello");
System.out.println("__________________");
bus.post("Hello", "test");
}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/220858.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

02-详解请求路由的实现和常见的断言工厂

请求路由 路由转发 第一步: 新建一个SpringBoot工程如gateway模块, 引入网关依赖和nacos服务发现依赖 <!--网关依赖--> <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId&…

玄关柜和鞋柜是一回事吗?福州中宅装饰,福州装修

玄关柜和鞋柜虽然都用于存放鞋子&#xff0c;但它们在概念上有所不同。玄关柜是一个更大的概念&#xff0c;它包括鞋柜和其他功能区域&#xff0c;可以说鞋柜是玄关柜的一部分。 1️⃣概念上的不同 玄关柜是一种集成了鞋柜、挂衣架、换鞋凳等多种功能于一体的家居家具&#xf…

Nginx(四层+七层代理)+Tomcat实现负载均衡、动静分离

一、Tomcat多实例部署 具体步骤请看我之前的博客 写文章-CSDN创作中心https://mp.csdn.net/mp_blog/creation/editor/134956765?spm1001.2014.3001.9457 1.1 访问测试多实例的部署 1.2 分别在三个tomcat服务上部署jsp的动态页面 mkdir /usr/local/tomcat/webapps/test vim …

Fork和Join底层原理

文章目录 一、任务类型1. 简介2. CPU密集型3. IO密集型4. 线程数计算方法 二、Fork/Join框架1. 思想2. Fork/Join简介3. Fork/Join使用4. 底层原理5. 总结 一、任务类型 1. 简介 思考: 线程池的线程数设置多少合适? 我们调整线程池中的线程数量的最主要的目的是为了充分并合理…

数字孪生轻量化引擎——AMRT3D引擎

随着全球经济亟待复苏&#xff0c;作为科技发展主要需求技术之一&#xff0c;数字孪生已经成为全球多个国家重点布局行业。例如&#xff0c;美国工业互联网盟将数字孪生作为工业互联网落地的核心和关键&#xff0c;德国工业4.0参考架构将数字孪生作为重要内容。 数字孪生已经形…

SSRF攻击实例讲解

服务器端请求伪造&#xff08;SSRF&#xff09;攻击是一种网络安全漏洞&#xff0c;其中攻击者迫使受影响的服务器向攻击者指定的内部或外部系统发送请求。以下是一个SSRF攻击的实例讲解及其分析。 SSRF攻击实例 当然&#xff0c;下面提供另外三个SSRF&#xff08;服务器端请…

2022最新云存储网盘系统,文件分享系统与文件存储系统。

资源入口 2022 最新云存储网盘系统, 文件分享系统与文件存储系统。 测试环境&#xff1a;Apache MySQL5.6 PHP7.0 安装 PHP 扩展 exif、fileinfo 从 PHP 禁用函数中 删除 shell_exec、proc_open、putenv 这三个 PHP 函数 PS&#xff1a;整体还不错的系统&#xff0c;注意的…

【学习笔记】V8垃圾回收策略

V8 V8是一款主流的JavaScript执行引擎V8采用即时编译,速度比较快V8内存设限,64位操作系统中上限为1.5G,32位系统中不超过800M V8垃圾回收策略 采用分代回收的思想内存分为新生代\老生代针对不同对象采用不同算法 v8常用的GC算法: 分代回收、空间复制、标记清除、标记整理、…

实战——Mac M2 安装mat工具

线上环境出现内存飙升的情况&#xff0c;需要工具定位问题发生点就需要用到mat工具了&#xff0c;之前都是在intel芯片环境上安装的&#xff0c;现在换了m2芯片&#xff0c;导致出现了问题&#xff0c;经过一系列调研都解决了&#xff0c;特此记录下&#xff0c;以备后查 开发…

语音压缩扩展器电路芯片TA31101/TA31101F——内含压缩和扩展器,噪声低 工作电压低电流小

TA31101/TA31101F是一块语音压缩扩展器电路&#xff0c;内含信号整流器单元、可变增益单元、运算放大器等电路单元&#xff0c;适用于无绳电话等产品中作语音压缩与扩展。 TA31101采用DIP 16的封装形式封装&#xff0c;TA31101F采用SOP 16的封装形式。 主要特点&#xff1a;●…

利用tf-idf对特征进行提取

TF-IDF是一种文本特征提取的方法&#xff0c;用于评估一个词在一组文档中的重要性。 一、代码 from sklearn.feature_extraction.text import TfidfVectorizer import numpy as npdef print_tfidf_words(documents):"""打印TF-IDF矩阵中每个文档中非零值对应…

社交网络分析1:起源发展、不同领域的应用、核心概念

社交网络分析1&#xff1a;社交网络相关定义和概念 写在最前面关于课程 社交网络、社交网络分析社交网络发展阶段&#xff08;自己感兴趣&#xff09;1. 社交网络的起源2. 社交网络的演变3. 社交网络的成熟4. 发展阶段补充和展望 2023社交大变革&#xff08;自己感兴趣的点&…

【Linux系统编程】初步运用git工具

介绍&#xff1a; 使用git之前首先要先认识gitee/github&#xff0c;gitee/github是一个远程仓库网站。git是平台专门开发的一个操控工具&#xff0c;是一个开源的分布式版本控制系统&#xff0c;我们使用git工具来与gitee/github来取得联系。 git的推送使用&#xff1a; git既…

商城免费搭建之java鸿鹄云商 java电子商务商城 Spring Cloud+Spring Boot+mybatis+MQ+VR全景+b2b2c

鸿鹄云商 SAAS云产品概述 1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买家平台&#xff08;H5/公众号、小程序、APP端&#xff08;IOS/Android&#xff09;、微服务平台&#xff08;业务服务&#xff09; 2. 核心架构 Spring Cloud、Spring Boot、My…

光纤的连接

光纤在工程布线中&#xff0c;难免会遇到线不够长或者磨损折断的情况&#xff0c;要怎么处理呢&#xff1f; 首先看看光纤的结构&#xff1a; 纤芯&#xff1a;中心部分&#xff0c;光波在纤芯中传输。 包层&#xff1a;环绕纤芯&#xff0c;折射率低于纤芯&#xff0c;作用是…

Talk | UCSB博士生王丹青: 大语言模型的协作学习以及个性化生成评估

本期为TechBeat人工智能社区第555期线上Talk。 北京时间12月13日(周三)20:00&#xff0c;加州大学圣塔芭芭拉分校博士生—王丹青的Talk已准时在TechBeat人工智能社区开播&#xff01; 她与大家分享的主题是: “大语言模型的协作学习以及个性化生成评估”&#xff0c;介绍了她的…

基于JAVAEE技术校园车辆管理系统论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本校园车辆管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息…

互联网加竞赛 python 机器视觉 车牌识别 - opencv 深度学习 机器学习

1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于python 机器视觉 的车牌识别系统 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;3分 &#x1f9ff; 更多资…

2023年最佳推荐 | 值得收藏的 5大 SaaS 知识库

随着数字化时代的到来&#xff0c;SaaS&#xff08;软件即服务&#xff09;已经成为企业和个人日常工作中的重要工具。在众多的SaaS应用中&#xff0c;知识库是不可或缺的一部分&#xff0c;它可以帮助我们更好地管理和利用知识&#xff0c;提高工作效率和创新能力。接下来就跟…

冲压模具市场调研:2023年该行业发展现状及前景分析

汽车冲压件模具是汽车车身生产的重要工艺装备&#xff0c;是汽车换型的主要制约因素。汽车冲压件模具具有尺寸大、型面复杂、精度要求高等特点&#xff0c;属于技术密集型产品。 汽车冲压模具能快速精密地把材料直接加工成零件或半成品并通过焊接、铆接、拼装等工艺装配成零部件…