观察者模式(下):如何实现一个异步非阻塞的EventBus框架?

上一节课中,我们学习了观察者模式的原理、实现、应用场景,重点介绍了不同应用场景下,几种不同的实现方式,包括:同步阻塞、异步非阻塞、进程内、进程间的实现方式。

同步阻塞是最经典的实现方式,主要是为了代码解耦;异步非阻塞除了能实现代码解耦之外,还能提高代码的执行效率;进程间的观察者模式解耦更加彻底,一般是基于消息队列来实现,用来实现不同进程间的被观察者和观察者之间的交互。

今天,我们聚焦于异步非阻塞的观察者模式,带你实现一个类似Google Guava EventBus的通用框架。等你学完本节课之后,你会发现,实现一个框架也并非一件难事。

对于上一节课的提到的采用异步非阻塞的形式处理handleRegSuccess

// 第一种实现方式,其他类代码不变,就没有再重复罗列
public class RegPromotionObserver implements RegObserver {private PromotionService promotionService; // 依赖注入@Overridepublic void handleRegSuccess(Long userId) {Thread thread = new Thread(new Runnable() {@Overridepublic void run() {promotionService.issueNewUserExperienceCash(userId);}});thread.start();}
}// 第二种实现方式,其他类代码不变,就没有再重复罗列
public class UserController {private UserService userService; // 依赖注入private List<RegObserver> regObservers = new ArrayList<>();private Executor executor;public UserController(Executor executor) {this.executor = executor;}public void setRegObservers(List<RegObserver> observers) {regObservers.addAll(observers);}public Long register(String telephone, String password) {//省略输入参数的校验代码//省略userService.register()异常的try-catch代码long userId = userService.register(telephone, password);for (RegObserver observer : regObservers) {executor.execute(new Runnable() {@Overridepublic void run() {observer.handleRegSuccess(userId);}});}return userId;}
}

        对于第一种实现方式,频繁地创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出。第二种实现方式,尽管利用了线程池解决了第一种实现方式的问题,但线程池、异步执行逻辑都耦合在了register()函数中,增加了这部分业务代码的维护成本。

EventBus框架功能需求介绍

        EventBus翻译为“事件总线”,它提供了实现观察者模式的骨架代码。我们可以基于此框架,非常容易地在自己的业务场景中实现观察者模式,不需要从零开始开发。其中,Google Guava EventBus就是一个比较著名的EventBus框架,它不仅仅支持异步非阻塞模式,同时也支持同步阻塞模式

public class UserController {private UserService userService; // 依赖注入private EventBus eventBus;private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;public UserController() {//eventBus = new EventBus(); // 同步阻塞模式eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE)); // 异步非阻塞模式}public void setRegObservers(List<Object> observers) {for (Object observer : observers) {eventBus.register(observer);}}public Long register(String telephone, String password) {//省略输入参数的校验代码//省略userService.register()异常的try-catch代码long userId = userService.register(telephone, password);eventBus.post(userId);return userId;}
}public class RegPromotionObserver {private PromotionService promotionService; // 依赖注入@Subscribepublic void handleRegSuccess(Long userId) {promotionService.issueNewUserExperienceCash(userId);}
}public class RegNotificationObserver {private NotificationService notificationService;@Subscribepublic void handleRegSuccess(Long userId) {notificationService.sendInboxMessage(userId, "...");}
}

利用EventBus框架实现的观察者模式,跟从零开始编写的观察者模式相比,从大的流程上来说,实现思路大致一样,都需要定义Observer,并且通过register()函数注册Observer,也都需要通过调用某个函数(比如,EventBus中的post()函数)来给Observer发送消息(在EventBus中消息被称作事件event)。

但在实现细节方面,它们又有些区别。基于EventBus,我们不需要定义Observer接口,任意类型的对象都可以注册到EventBus中,通过@Subscribe注解来标明类中哪个函数可以接收被观察者发送的消息。

1.Subscribe

Subscribe是一个注解,用于标明观察者中的哪个函数可以接收消息。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Beta
public @interface Subscribe {}

2.ObserverAction

ObserverAction类用来表示@Subscribe注解的方法,其中,target表示观察者类,method表示方法。它主要用在ObserverRegistry观察者注册表中。

public class ObserverAction {private Object target;private Method method;public ObserverAction(Object target, Method method) {this.target = Preconditions.checkNotNull(target);this.method = method;this.method.setAccessible(true);}public void execute(Object event) { // event是method方法的参数try {method.invoke(target, event);} catch (InvocationTargetException | IllegalAccessException e) {e.printStackTrace();}}
}

3.ObserverRegistry

ObserverRegistry类就是前面讲到的Observer注册表,是最复杂的一个类,框架中几乎所有的核心逻辑都在这个类中。这个类大量使用了Java的反射语法,不过代码整体来说都不难理解,其中,一个比较有技巧的地方是CopyOnWriteArraySet的使用。

public class ObserverRegistry {private ConcurrentMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry = new ConcurrentHashMap<>();public void register(Object observer) {Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {Class<?> eventType = entry.getKey();Collection<ObserverAction> eventActions = entry.getValue();CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType);if (registeredEventActions == null) {registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());registeredEventActions = registry.get(eventType);}registeredEventActions.addAll(eventActions);}}public List<ObserverAction> getMatchedObserverActions(Object event) {List<ObserverAction> matchedObservers = new ArrayList<>();Class<?> postedEventType = event.getClass();for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {Class<?> eventType = entry.getKey();Collection<ObserverAction> eventActions = entry.getValue();if (postedEventType.isAssignableFrom(eventType)) {matchedObservers.addAll(eventActions);}}return matchedObservers;}private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();Class<?> clazz = observer.getClass();for (Method method : getAnnotatedMethods(clazz)) {Class<?>[] parameterTypes = method.getParameterTypes();Class<?> eventType = parameterTypes[0];if (!observerActions.containsKey(eventType)) {observerActions.put(eventType, new ArrayList<>());}observerActions.get(eventType).add(new ObserverAction(observer, method));}return observerActions;}private List<Method> getAnnotatedMethods(Class<?> clazz) {List<Method> annotatedMethods = new ArrayList<>();for (Method method : clazz.getDeclaredMethods()) {if (method.isAnnotationPresent(Subscribe.class)) {Class<?>[] parameterTypes = method.getParameterTypes();Preconditions.checkArgument(parameterTypes.length == 1,"Method %s has @Subscribe annotation but has %s parameters."+ "Subscriber methods must have exactly 1 parameter.",method, parameterTypes.length);annotatedMethods.add(method);}}return annotatedMethods;}
}

4.EventBus

EventBus实现的是阻塞同步的观察者模式。看代码你可能会有些疑问,这明明就用到了线程池Executor啊。实际上,MoreExecutors.directExecutor()是Google Guava提供的工具类,看似是多线程,实际上是单线程。之所以要这么实现,主要还是为了跟AsyncEventBus统一代码逻辑,做到代码复用。

public class EventBus {private Executor executor;private ObserverRegistry registry = new ObserverRegistry();public EventBus() {this(MoreExecutors.directExecutor());}protected EventBus(Executor executor) {this.executor = executor;}public void register(Object object) {registry.register(object);}public void post(Object event) {List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);for (ObserverAction observerAction : observerActions) {executor.execute(new Runnable() {@Overridepublic void run() {observerAction.execute(event);}});}}
}

5.AsyncEventBus

有了EventBus,AsyncEventBus的实现就非常简单了。为了实现异步非阻塞的观察者模式,它就不能再继续使用MoreExecutors.directExecutor()了,而是需要在构造函数中,由调用者注入线程池。

public class AsyncEventBus extends EventBus {public AsyncEventBus(Executor executor) {super(executor);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/640.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SSH框架简介篇

文章目录 概述目录结构 strutsSpringHibernate总结 概述 SSH框架&#xff08;Struts Spring Hibernate&#xff09;是一种广泛应用的Java企业级开发框架组合&#xff0c;它将Struts、Spring和Hibernate三个优秀的框架有机地结合在一起&#xff0c;提供了一套完整的解决方案&…

Linux系统编程:文件系统和inode

目录 一. 磁盘的结构和读写数据的方式 1.1 磁盘级文件和内存级文件 1.2 磁盘的物理结构 1.3 访问磁盘数据的方式 二. 磁盘文件系统 2.1 磁盘的分区管理方法 2.2 文件名和inode的关系 三. 结合文件系统对文件创建和删除的相关问题的理解 3.1 文件创建时操作系统进行的工…

Mysql8.0的bin log日志

文章目录 一、 Mysql8.0 的bin log 日志关闭1.1、查看是否已开启 bin log 日志1.2、关闭 bin log 日志1.3、 设置 bin log 日志的时长1.3.1、第一种设置方式&#xff1a;1.3.2、第二种设置方式 一、 Mysql8.0 的bin log 日志关闭 Mysql8.0默认开启 binlog 记录功能&#xff0c…

如何配置Git工具

①安装Git&#xff1a;首先确保你已经在计算机上安装了Git。你可以从Git官方网站&#xff08;https://git-scm.com/&#xff09;下载适合你操作系统的安装程序&#xff0c;并按照提示进行安装。 ② 配置用户信息&#xff1a;在命令行终端中&#xff0c;使用下面的命令来配置你…

51单片机--DS1302时钟

文章目录 DS1302引脚定义和应用电路内部结构框图寄存器的定义时序定义BCD码DS1302时钟代码 DS1302 DS1302是美国DALLAS公司推出的一款实时时钟电路芯片。它具有高性能和低功耗的特点&#xff0c;可以通过SPI三线接口与CPU进行同步通信。DS1302能够提供秒、分、时、日、星期、月…

【SQL应知应会】表分区(一)• MySQL版

欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 本文收录于SQL应知应会专栏,本专栏主要用于记录对于数据库的一些学习&#xff0c;有基础也有进阶&#xff0c;有MySQL也有Oracle 分区表 • MySQL版 一、分区表1.非分区表2.分区表2…

Autograd:自动求导

Autograd&#xff1a;自动求导 PyTorch中&#xff0c;所有神经网络的核心是 autograd 包。先简单介绍一下这个包&#xff0c;然后训练我们的第一个的神经网络。 autograd 包为张量上的所有操作提供了自动求导机制。它是一个在运行时定义(define-by-run&#xff09;的框架&…

利用集合框架实现-超市会员管理系统

借助集合框架来实现超市会员管理系统&#xff0c;实现以下功能&#xff1a; 1.开卡 2.积分累计 3.查询剩余积分 4.积分兑换 5.修改密码 6.退出 -------------------------------------------------------------------------------------------------- 展示&#x…

【信号去噪和分类】基于小波的隐马尔可夫模型统计信号处理(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

C语言实现扫雷【经典】

前言   本篇文章要实现的是扫雷游戏&#xff0c;其代码实现与上一篇的三子棋游戏类同&#xff0c;都是在棋盘的基础上&#xff0c;与电脑进行对抗&#xff0c;不同的是&#xff0c;扫雷游戏一开始电脑就已经随机布置好了所有“雷”。 请戳 --->三子棋 扫雷游戏 1. 扫雷游…

【新日语】第17課

练习A 语法练习 一、これはnanakoという電子マネーです。 这是叫做nanako的电子钱包。 解析&#xff1a; これ&#xff1a;“这个”&#xff1b;指示代词 ⇒ これ&#xff0f;あれ&#xff0f;それ「第4課ーP33」。 二、これは留学生が読む雑誌です。 这个是留学生读的杂志…

MySQL每日一练——MySQL多表查询进阶挑战

目录 1、首先创建表 t_dept: t_emp: 2、插入数据 t_dept表&#xff1a; t_tmp表: 3、修改表 4、按条件查找 1、首先创建表 t_dept: CREATE TABLE t_dept (id INT(11) NOT NULL AUTO_INCREMENT,deptName VARCHAR(30) DEFAULT NULL,address VARCHAR(40) DEFAULT NULL,P…

Python结巴中文分词笔记

&#x1f4da; jieba库基本介绍 &#x1f310; jieba库概述 Jieba是一个流行的中文分词库&#xff0c;它能够将中文文本切分成词语&#xff0c;并对每个词语进行词性标注。中文分词是自然语言处理的重要步骤之一&#xff0c;它对于文本挖掘、信息检索、情感分析等任务具有重要…

Linux服务器丢包故障的解决思路及引申的TCP/IP协议栈理论

Linux服务器丢包故障的解决思路及引申的TCP/IP协议栈理论 我们使用Linux作为服务器操作系统时&#xff0c;为了达到高并发处理能力&#xff0c;充分利用机器性能&#xff0c;经常会进行一些内核参数的调整优化&#xff0c;但不合理的调整常常也会引起意想不到的其他问题&#x…

Elasticsearch原理剖析

一、 Elasticsearch结构 Elasticsearch集群方案由EsMaster、EsClient和EsNode1、EsNode2、EsNode3、EsNode4、EsNode5、EsNode6、EsNode7、EsNode8、EsNode9进程组成&#xff0c;如下图所示&#xff0c;模块说明如表下所示。 说明如表&#xff1a; 名称说明ClientClient使用H…

Android系统启动流程分析

当按下Android系统的开机电源按键时候&#xff0c;硬件会触发引导芯片&#xff0c;执行预定义的代码&#xff0c;然后加载引导程序(BootLoader)到RAM&#xff0c;Bootloader是Android系统起来前第一个程序&#xff0c;主要用来拉起Android系统程序&#xff0c;Android系统被拉起…

C# Linq 详解四

目录 概述 二十、SelectMany 二十一、Aggregate 二十二、DistinctBy 二十三、Reverse 二十四、SequenceEqual 二十五、Zip 二十六、SkipWhile 二十七、TakeWhile C# Linq 详解一 1.Where 2.Select 3.GroupBy 4.First / FirstOrDefault 5.Last / LastOrDefault C# Li…

Prompt本质解密及Evaluation实战与源码解析(一)

第9章 Prompt本质解密及Evaluation实战与源码解析 9.1 Customer Service案例 本节主要谈提示词(Prompt)内部的工作机制,围绕案例、源码、论文三个维度展开。首先,我们可以看一下代码部分,这是对基于大模型应用程序开发的一个评估(Evaluation),这显然是一个至关重要的内…

排序子序列,倒置字符串讲解(图文并茂)

目录 1.排序子序列 2.倒置字符串 1.排序子序列 排序子序列_牛客笔试题_牛客网 (nowcoder.com) 首先题干中提到非递增序列和非递减序列&#xff0c;那么我们就要先弄明白什么是上述2种序列&#xff1a; 非递增序列&#xff1a;a[i] > a[i1] 如&#xff1a;3 2 1 或者 3 3 …

使用docker简单创建一个python容器

/root/docker_python目录结构&#xff1a; . |-- demo | -- main.py -- docker-compose.ymlmain.py内容&#xff1a; # codingutf-8 # -*- coding: utf-8 -*-if __name__ __main__:print("hello world")docker-compose.yml内容&#xff1a; version: "3&q…