Guava-EventBus 源码解析


EventBus 采用发布订阅者模式的实现方式,它实现了泛化的注册方法以及泛化的方法调用,另外还考虑到了多线程的问题,对多线程使用时做了一些优化,观察者模式都比较熟悉,这里会简单介绍一下,重点介绍的是如何泛化的进行方法的注册以及调用,还有在单个线程和多线程不同的实现方式。

#发布订阅者模式
观察者模式又名发布订阅者模式,类结构图如下:

观察者类结构图

package com.dfan.设计模式.观察者模式;
import java.util.Arrays;
import java.util.List;public class ObeserverMode {static class TestResult {private List<IObserver> iObservers;public TestResult() {}public void register(IObserver iObserver){this.iObservers.add(iObserver);}public void declaration() {System.out.println("this is result");}public void notifyRunners(List<IObserver> iObservers) {for(IObserver iObserver : iObservers) {iObserver.run(this);}}public  void notifyRunner(IObserver iObserver){iObserver.run(this);}}static class UITestResult extends TestResult{public UITestResult() {super();}public void declaration() {System.out.println("i am ui test result");}public  void notifyRunner(IObserver iObserver){iObserver.run(this);}}interface IObserver{void run(TestResult testResult) ;}static class TestObserver implements IObserver {private TestResult testResult;public TestResult createTestResult(){return new TestResult();}public void run(TestResult testResult) {this.testResult = testResult;System.out.println("this is test obeserver");testResult.declaration();}}static class TestObserver1 implements IObserver {private TestResult testResult;public TestResult createTestResult(){return new TestResult();}public void run(TestResult testResult) {testResult = createTestResult();System.out.println("this is test obeserver 1");testResult.declaration();}}public static void main(String[] args) {IObserver testRunner = new TestObserver();UITestResult uiTestResult = new UITestResult();uiTestResult.notifyRunner(testRunner);System.out.println("华丽分割线");List<IObserver> observers = Arrays.asList(new TestObserver1(), new TestObserver());uiTestResult.notifyRunners(observers);}
}

#Guava EventBus
Guva中EventBus的机制就是观察者模式,因此符合观察者模式的一般结构:

监听者:监听来自被监听者的变更事件,完成动作变更
被监听者:发送变更事件给监听者,使监听者监听到变更事件后,完成动作变更

EventBus的用法简单总结为一句话就是:

订阅者向EventBus进行事件注册(register),表示对这个事件关心;
EventBus会向所有订阅发布者事件的订阅者进行事件的发送(post)

EventBus 区分 同步模式和异步模式,下面将根据这两个点进行展开
##同步模式
###向EventBus进行注册

/*** Returns all subscribers for the given listener grouped by the type of event they subscribe to.*/private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();Class<?> clazz = listener.getClass();for (Method method : getAnnotatedMethods(clazz)) {Class<?>[] parameterTypes = method.getParameterTypes();Class<?> eventType = parameterTypes[0];methodsInListener.put(eventType, Subscriber.create(bus, listener, method));}return methodsInListener;}/*** Registers all subscriber methods on the given listener object.*/void register(Object listener) {Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {Class<?> eventType = entry.getKey();Collection<Subscriber> eventMethodsInListener = entry.getValue();CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);if (eventSubscribers == null) {CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);}eventSubscribers.addAll(eventMethodsInListener);}}
  • 其中 findAllSubscribers方法 目的是获取所有添加注解@Subscriber的方法,并将根据当前EventBusListener、以及加有@Subscriber注解的方法生成的Subscribe作为 Multimap<Class<?>, Subscriber>的value返回(其中key为方法[注释]的入参)

  • registerSubscriber注册到集合private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();中,其中该Map的key为EventType(即方法[注释]入参)。

EventBus发送事件给所有订阅者

/*** Gets an iterator representing an immutable snapshot of all subscribers to the given event at* the time this method is called.*/Iterator<Subscriber> getSubscribers(Object event) {ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());List<Iterator<Subscriber>> subscriberIterators =Lists.newArrayListWithCapacity(eventTypes.size());for (Class<?> eventType : eventTypes) {CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);if (eventSubscribers != null) {// eager no-copy snapshotsubscriberIterators.add(eventSubscribers.iterator());}}return Iterators.concat(subscriberIterators.iterator());}/*** Posts an event to all registered subscribers.  This method will return* successfully after the event has been posted to all subscribers, and* regardless of any exceptions thrown by subscribers.** <p>If no subscribers have been subscribed for {@code event}'s class, and* {@code event} is not already a {@link DeadEvent}, it will be wrapped in a* DeadEvent and reposted.** @param event  event to post.*/public void post(Object event) {Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);if (eventSubscribers.hasNext()) {dispatcher.dispatch(event, eventSubscribers);} else if (!(event instanceof DeadEvent)) {// the event had no subscribers and was not itself a DeadEventpost(new DeadEvent(this, event));}}
  • getSubscribers 根据刚才提到的参数类型会查找对应的Subscribe,而且不止查指定的类型,还会对这个类型的继承体系上的其他参数类型也会查,比如对于String类型,他会找Serializable,CharSequence,Comparable,Object四种类型,
    举个例子说明下这种情况,在这个例子中,会有两个task被执行,分别是task1和task3
public class EventBusSyncEx {static class SimpleListener1 {/***订阅方式,通过@Subscribe进行事件订阅,方法名随意**/@Subscribepublic void task1(String s) {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("listener1 do task , String param:" + s);}@Subscribepublic void task3(Object s) {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("listener1 do task , Object param:" + s);}@Subscribepublic void task1(Integer s) {System.out.println("listener1 do task , int  param:" + s);}}public static class SimpleEventBusExample {public static void main(String[] args) {EventBus eventBus = new EventBus();eventBus.register(new SimpleListener1());System.out.println("Post Simple EventBus Example");eventBus.post("Simple EventBus Example");}}
}
  • subscribers 就是刚才注册subscriber的集合(Map<ParamType, Subscribers>),通过getSubscribers获取到了Subscribe之后,下面就是要根据这个Event的type来执行对应的Event了,首先这里引入一个属性dispatcher (事件分发器 : 用于分发事件给订阅对象的事件处理器,该对象在EventBus构造方法内部初始化,默认的实现是,该分发器将事件存入队列).

    PerThreadQueuedDispatcher: 默认实现,该分发器将事件存入队列,并保证在同一个线程上发送的事件能够按照他们发布的顺序被分发给所有的订阅者。

    private static final class PerThreadQueuedDispatcher extends Dispatcher 
    // This dispatcher matches the original dispatch behavior of EventBus.
    /*** Per-thread queue of events to dispatch.*/
    private final ThreadLocal<Queue<Event>> queue =new ThreadLocal<Queue<Event>>() {@Overrideprotected Queue<Event> initialValue() {return Queues.newArrayDeque();}};/*** Per-thread dispatch state, used to avoid reentrant event dispatching.*/
    private final ThreadLocal<Boolean> dispatching =new ThreadLocal<Boolean>() {@Overrideprotected Boolean initialValue() {return false;}};@Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {checkNotNull(event);checkNotNull(subscribers);Queue<Event> queueForThread = queue.get();queueForThread.offer(new Event(event, subscribers));if (!dispatching.get()) {dispatching.set(true);try {Event nextEvent;while ((nextEvent = queueForThread.poll()) != null) {while (nextEvent.subscribers.hasNext()) {nextEvent.subscribers.next().dispatchEvent(nextEvent.event);}}} finally {dispatching.remove();queue.remove();}}
    }private static final class Event {private final Object event;private final Iterator<Subscriber> subscribers;private Event(Object event, Iterator<Subscriber> subscribers) {this.event = event;this.subscribers = subscribers;}
    }
    }
    
    • 这段代码有三个关键点需要注意:
      1. PerThreadQueuedDispatcher 通过Queue来对Event进行存储
      2. Queue以及 dispatching都是ThreadLocal变量,也就意味着每个线程维护自己的一个变量,即线程安全的
      3. nextEvent.subscribers.next().dispatchEvent(nextEvent.event);调用了 Subscribe的dispatchEvent (类似于文中开篇所讲的Observer模式中的被监听者中的iObserver.run(this),只是Observer模式中,是在被监听者中执行的,而EventBus中是在dispatcher中执行的) ,如果继续跟进代码会发现,这个dispatchEvent实际工作就是直接通过反射执行了Method方法(method.invoke(target, checkNotNull(event));)

至此,EventBus的同步执行方式已经分析完成


异步模式

异步模式, 它与同步模式的EventBus的主要区别有两点:

  1. 声明EventBus时,声明为 AsyncEventBus, 而AsyncEventBus的构造函数必须要传入一个 Executor
  2. 在Dispatcher上,AsyncEventBus 采用的事件分发器为 LegacyAsyncDispatcher
/*** Dispatches {@code event} to this subscriber using the proper executor.*/final void dispatchEvent(final Object event) {executor.execute(new Runnable() {@Overridepublic void run() {try {invokeSubscriberMethod(event);} catch (InvocationTargetException e) {bus.handleSubscriberException(e.getCause(), context(event));}}});}private static final class LegacyAsyncDispatcher extends Dispatcher {/*** Global event queue.*/private final ConcurrentLinkedQueue<EventWithSubscriber> queue =Queues.newConcurrentLinkedQueue();@Overridevoid dispatch(Object event, Iterator<Subscriber> subscribers) {checkNotNull(event);while (subscribers.hasNext()) {queue.add(new EventWithSubscriber(event, subscribers.next()));}EventWithSubscriber e;while ((e = queue.poll()) != null) {e.subscriber.dispatchEvent(e.event);}}private static final class EventWithSubscriber {private final Object event;private final Subscriber subscriber;private EventWithSubscriber(Object event, Subscriber subscriber) {this.event = event;this.subscriber = subscriber;}}}

之所以异步模式传入Executor就是在通过 dispatchEvent 进行多线程的创建 new ThreadPoolExecutor().excute(new Runnable)
而之所以使用 LegacyAsyncDispatcher 目的还有一个就是这个 Dispatcher中使用的queue是ConcurrentLinkedQueue, 之所以使用这个Queue,后面会有专门的一个讲解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/857447.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

dial tcp 10.96.0.1:443: connect: no route to host

1、创建Pod一直不成功&#xff0c;执行kubectl describe pod runtime-java-c8b465b98-47m82 查看报错 Warning FailedCreatePodSandBox 2m17s kubelet Failed to create pod sandbox: rpc error: code Unknown desc failed to setup network for…

数据挖掘与分析——数据预处理

数据探索 波士顿房价数据集&#xff1a;卡内基梅隆大学收集&#xff0c;StatLib库&#xff0c;1978年&#xff0c;涵盖了麻省波士顿的506个不同郊区的房屋数据。 一共含有506条数据。每条数据14个字段&#xff0c;包含13个属性&#xff0c;和一个房价的平均值。 数据读取方法…

昨天gitee网站访问不了,开始以为电脑哪里有问题了

昨天gitee网站下午访问不了&#xff0c;开始以为是什么毛病。 结果同样的网络&#xff0c;手机是可以访问的。 当然就ping www.gitee.com 结果也下面那样是正常的 以为是好的&#xff0c;但就是访问www.gitee.com也是不行&#xff0c;后来用阿里云的服务器curl访问是下面情况&…

LabVIEW机器视觉在质量控制中的应用

基于LabVIEW的机器视觉系统在质量控制中应用广泛&#xff0c;通过图像采集、处理和分析&#xff0c;自动检测产品缺陷、测量尺寸和识别标记&#xff0c;提高生产效率和产品质量。下面介绍LabVIEW机器视觉系统在质量控制中的实现方法、应用场景及其优势。 项目背景 在现代制造业…

github连接报本地

一、创建GIthub账号 这里默认大家已经创建好了并且有加速器&#xff0c;能正常上网&#xff0c;然后才能进行下面的操作。 二、创建ssh公钥 网址&#xff1a;Sign in to GitHub GitHub Sign in to GitHub GitHub 进入下面的界面&#xff1a; 然后创建新的密钥 三、官方文…

MCT Self-Refine:创新集成蒙特卡洛树搜索 (MCTS)提高复杂数学推理任务的性能,超GPT4,使用 LLaMa-3 8B 进行自我优化

&#x1f4dc; 文献卡 题目&#xff1a; Accessing GPT-4 level Mathematical Olympiad Solutions via Monte Carlo Tree Self-refine with LLaMa-3 8B作者: Di Zhang; Xiaoshui Huang; Dongzhan Zhou; Yuqiang Li; Wanli OuyangDOI: 10.48550/arXiv.2406.07394摘要: This pape…

kettle无法启动问题_PENTAHO_JAVA_HOME

1&#xff0c;遇到spoon.bat启动报错&#xff1a;先增加pause看清错误信息 1.1&#xff0c;错误信息 1.2&#xff0c;因为本地安装jdk1.6无法支持现有版本kettle。只能手动执行kettle调用的java路径&#xff1b;如下 系统--高级系统设置--高级--环境变量 启动成功

fastapi教程(一):初识 fastapi

FastAPI 是一个用于构建 API 的现代、快速&#xff08;高性能&#xff09;的 web 框架&#xff0c;使用 Python 并基于标准的 Python 类型提示。 关键特性: 快速&#xff1a;可与 NodeJS 和 Go 并肩的极高性能&#xff08;归功于 Starlette 和 Pydantic&#xff09;。最快的 …

甘肃旅游服务平台的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;公告信息管理&#xff0c;景点管理&#xff0c;酒店管理&#xff0c;基础数据管理&#xff0c;美食管理 前台账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0…

HTML静态网页成品作业(HTML+CSS)——故宫介绍网页(4个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有4个页面。 二、作品演示 三、代…

Docker:安装RediSearch全文搜索

1、简述 在本文中&#xff0c;我们将介绍如何使用Docker快速、简便地安装RediSearch&#xff0c;Redis的全文搜索模块。RediSearch提供了高效的全文搜索功能&#xff0c;通过Docker安装&#xff0c;可以轻松地在任何环境中部署和管理RediSearch。 官网地址&#xff1a;https:/…

【GUI】LVGL无操作系统移植以及移植过程错误处理

目录 介绍 1. 删除源码 2. 导入lvgl到项目screen_mcu中 3. keil添加分组和头文件 4. 移植显示 5. 移植触摸 6. 添加测试案例 6.1. 测试按钮 6.2. 测试音乐界面 7. 提供时钟 错误处理 L6218E错误 出现花屏 屏幕颜色不对 内存分配 介绍 本文 主要介绍GD32移植…

BlockingQueue详解(含动画演示)

目录 BlockingQueue详解0、BlockingQueue简介BlockingQueue接口中方法注释BlockingQueue的实现&#xff0c;总结计划 1、ArrayBlockingQueue简介2、ArrayBlockingQueue的继承体系3、ArrayBlockingQueue的构造方法①、 ArrayBlockingQueue(int capacity)②、ArrayBlockingQueue(…

计算机组成原理 —— 存储系统(概述)

计算机组成原理 —— 存储系统&#xff08;概述&#xff09; 存储系统按层次划分按照存储介质分类按照存储方式分类按照信息可更改性分类根据信息的可保存性分类存储器性能指标 我们今天来学习计算机组成原理中的存储系统&#xff1a; 存储系统 存储系统是计算机系统中用于存…

vue实现的商品列表网页

一、商品列表效果如下 二、代码&#xff1b; vue实现的商品列表网页 &#xff0c; 图片在vue项目的Public文件夹里的 imgs中 <template><div class"common-layout"><!-- el-container:外层容器。 当子元素中包含 <el-header> 或 <el-foo…

mysql:简单理解mysql mvcc的可重复读

# 原理 假设有这样的sql begin select&#xff08;或update、insert、delete&#xff09; ... commit当执行【begin】的时候&#xff0c;标记有一个新事务要开始&#xff0c;但是事务还没有真正开始&#xff0c;事务id还没有产生当执行事务里面的第一个sql语句时&#xff08;…

java之url任意跳转漏洞

1 漏洞介绍 URLRedirect url重定向漏洞也称url任意跳转漏洞&#xff0c;网站信任了用户的输入导致恶意攻击&#xff0c;url重定向主要用来钓鱼&#xff0c;比如url跳转中最常见的跳转在登陆口&#xff0c;支付口&#xff0c;也就是一旦登陆将会跳转任意自己构造的网站&#xf…

Xshell7免费版下载安装使用

​一、下载安装​ 1.打开官网下载 https://www.xshell.com/zh/free-for-home-school/ 2.选择合适的下载路径&#xff0c;点击下载按钮&#xff0c;然后按照提示完成安装。 二、Xshell7的使用&#xff0c;Xhell连接Linux 1.连接之前&#xff0c;确保在Linux中开启SSH。参考&a…

YOLOv8中的C2f模块

文章目录 一、结构概述二、模块功能 一、结构概述 C2f块:首先由一个卷积块(Conv)组成&#xff0c;该卷积块接收输入特征图并生成中间特征图特征图拆分:生成的中间特征图被拆分成两部分&#xff0c;一部分直接传递到最终的Concat块&#xff0c;另一部分传递到多个Botleneck块进…

QT基础 - 文本文件读写

目录 零. 前言 一.读取文件 二. 写入文件 三. 和二进制读写的区别 零. 前言 在 Qt 中&#xff0c;对文本文件进行读写操作是常见的任务之一。这对于保存和加载配置信息、处理数据文件等非常有用。 Qt 提供了多种方式来读写文本文件&#xff0c;使得文件操作变得相对简单和…