基于SpringBoot实现一个可扩展的事件总线

基于SpringBoot实现一个可扩展的事件总线

前言

在日常开发中,我们经常会用到事件总线,SpringBoot通过事件多播器的形式为我们提供了一个事件总线,但是在开发中我们经常会用到其他的实现,比如Guava、Disruptor的。我们将基于SpringBoot封装一套底层驱动可扩展的,统一api的事件驱动组件。

环境准备

jdk1.8
spring-boot-autoconfigure
Guava
Disruptor

pom文件如下

Copy<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.billetsdoux</groupId><artifactId>eventBus</artifactId><version>1.0.0</version><name>eventBus</name><description>eventBus</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><scope>compile</scope><version>5.8.9</version></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency></dependencies></project>

组件介绍

整体架构#

目录结构如下:

[外链图片转存中…(img-m0HXGURP-1703487623977)]

我们的核心是一个EventListenerRegistry,它便是我们提供统一api的入口,它有两个方法,一个是init方法,在SpringBoot容器启动的时候会去注册我们所有的事件监听器,publish 方法则为事件发布的方法。这里我为它提供了3种实现,GuavaSpringDisruptor

[外链图片转存中…(img-AtJntQUE-1703487623978)]

EventModel#

这是我们定义的事件模型,topic为事件主题,我们通过不同的topic对应不同的事件处理器,entity为具体的事件对象模型

Copypackage com.billetsdoux.eventbus.model;import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Data
@NoArgsConstructor
public class EventModel<T> implements Serializable {/***  事件发布主题*/private String topic;/***  事件对象模型*/private T entity;
}

EventListener#

EventListener 为事件消费接口,定义了2个方法,topic() 为监听的事件topic,onMessage()为事件的回调接口。

Copypackage com.billetsdoux.eventbus;/***  消费接口*/
public interface EventListener<T> {String topic();void onMessage(T message);
}

EventListenerRegistry#

这边是我们之前介绍的事件核心接口,它提供两个接口 initRegistryEventListener 负责注册我们所定义的所有事件监听器,publish 负责发送消息,我们底层的驱动需要继承这个接口。

Copypublic interface EventListenerRegistry<P> {void initRegistryEventListener(List<EventListener> eventConsumerList);void publish(P param);
}

SpringEventListenerRegistry#

这是我们通过Spring为我们提供的消息多播器来实现的一个事件驱动。这个类被@Component标记,那么它会在容器启动的时候,通过构造器为我们注入 eventListeners ,applicationContext 。eventListeners 为所有实现了EventListener接口,并被注入到容器里面的类。

initRegistryEventListener 这是一个空方法,因为他们本身已经在容器中了,所以不需要注册了

publish: 直接调用applicationContext.publishEvent就可以了。

Copypackage com.billetsdoux.eventbus.spring;
import com.billetsdoux.eventbus.EventListener;
import com.billetsdoux.eventbus.EventListenerRegistry;
import com.billetsdoux.eventbus.model.EventModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.List;@RequiredArgsConstructor
@Slf4j
@Component
public class SpringEventListenerRegistry implements EventListenerRegistry<EventModel> {final ApplicationContext applicationContext;final List<EventListener> eventListeners;@Overridepublic void initRegistryEventListener(List<EventListener> eventConsumerList) {}@Overridepublic void publish(EventModel param) {applicationContext.publishEvent(param);}@PostConstructpublic void init(){log.info("开始初始化Spring事件监听器的组件服务");initRegistryEventListener(eventListeners);log.info("完成初始化Spring事件监听器的组件服务");}
}

GuavaEventListenerRegistry#

基于Guava来实现的事件总线,我们首先还是需要容器帮我们注入eventListeners。相较于Spring我们需要自己定义一个Guava的EventBus,然后把我们的Listener注册到这个EventBus中。

publish方法则是调用EventBus的post方法到。

Copy
package com.billetsdoux.eventbus.guava;import cn.hutool.core.thread.ThreadUtil;import com.billetsdoux.eventbus.EventListener;
import com.billetsdoux.eventbus.EventListenerRegistry;
import com.billetsdoux.eventbus.model.EventModel;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.objenesis.instantiator.util.ClassUtils;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.ExecutorService;@Component("guava")
@Slf4j
public class GuavaEventListenerRegistry implements EventListenerRegistry<EventModel> {EventBus eventBus;final List<EventListener> eventListeners;public GuavaEventListenerRegistry(List<EventListener> eventListeners) {this.eventListeners = eventListeners;}@Overridepublic void initRegistryEventListener(List<EventListener> eventConsumerList) {final ExecutorService executor = ThreadUtil.newExecutor(10, 20, 300);eventBus = new AsyncEventBus(GuavaEventListenerRegistry.class.getName(),executor);eventConsumerList.forEach(param->{log.info("注册监听器:{}",param.getClass().getName());eventBus.register(ClassUtils.newInstance(param.getClass()));});}@Overridepublic void publish(EventModel param) {eventBus.post(param);}@PostConstructpublic void init(){log.info("开始初始化Guava事件监听器的组件服务");initRegistryEventListener(eventListeners);log.info("完成初始化Guava事件监听器的组件服务");}}

DisruptorEventListenerRegistry#

Disruptor的实现相对来说麻烦一点,它首先需要一个实现了EventFactory接口的类,它提供一个newInstance接口来创建事件对象模型。

具体的使用方式可以参考我这篇博文:Disruptor入门

EventModelFactory

我们首先还是需要注入我们的Listener,只是这里在init的时候是将我们的Listener交给我们的Disruptor去处理,我们先将Listener转成EventHandler,所以我们的监听器接口具体实现的时候除了实现我们定义的EventListener接口外还需要继承Disruptor的EventHandler接口。 调用disruptor.handleEventsWith(dataListener); 把我们的Listener交给Disruptor去管理。最后再启动Disruptor。

publish:调用Disruptor的RingBuffer来进行消息的发送。

Copy
/***  事件工厂*  Disruptor 通过EventFactory在RingBuffer中预创建Event的实例* @param <T>*/
public class EventModelFactory<T> implements EventFactory<EventModel<T>> {@Overridepublic EventModel<T> newInstance() {return new EventModel<>();}
}
Copy@Slf4j
@RequiredArgsConstructor
@Component("disruptor")
@Scope("prototype") // 线程安全问题
public class DisruptorEventListenerRegistry implements EventListenerRegistry<EventModel>,AutoCloseable {/***  disruptor事件处理器*/@Getter@Setterprivate Disruptor<EventModel> disruptor;@NonNullfinal List<EventListener> eventListeners;/***  RingBuffer的大小*/private final int DEFAULT_RING_SIZE = 1024 * 1024;/***  事件工厂*/private EventFactory<EventModel> eventFactory = new EventModelFactory();@Overridepublic void initRegistryEventListener(List<EventListener> eventConsumerList) {disruptor = new Disruptor<>(eventFactory, DEFAULT_RING_SIZE, createThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy());EventHandler[] dataListener = eventConsumerList.stream().map(param -> {EventListener<EventModel> eventModelEventListener = param;return eventModelEventListener;}).collect(Collectors.toList()).toArray(new EventHandler[eventConsumerList.size()]);log.info("注册服务信息接口:{}",dataListener);disruptor.handleEventsWith(dataListener);disruptor.start();}@Overridepublic void publish(EventModel param) {publishEvent(param);}public void publishEvent(EventModel... eventModels){Objects.requireNonNull(disruptor, "当前disruptor核心控制器不可以为null");Objects.requireNonNull(eventModels, "当前eventModels事件控制器不可以为null");// 发布事件final RingBuffer<EventModel> ringBuffer = disruptor.getRingBuffer();try {final List<EventModel> dataList = Arrays.stream(eventModels).collect(Collectors.toList());for (EventModel element : dataList) {// 请求下一个序号long sequence = ringBuffer.next();// 获取该序号对应的事件对象EventModel event =  ringBuffer.get(sequence);event.setTopic(element.getTopic());event.setEntity(element.getEntity());ringBuffer.publish(sequence);}}catch (Exception e) {log.error("error",e);}}/***  关闭处理机制* @throws Exception*/@Overridepublic void close() throws Exception {if (Objects.nonNull(disruptor)) disruptor.shutdown();}@PostConstructpublic void init(){log.info("开始初始化Disruptor事件监听器的组件服务");initRegistryEventListener(eventListeners);log.info("完成初始化Disruptor事件监听器的组件服务");}private static ThreadFactory createThreadFactory(){AtomicInteger integer = new AtomicInteger();return r-> new Thread(r,"disruptor-"+integer.incrementAndGet());}}

至此我们已经实现了我们的目标三个EventListenerRegistry,我们接下来看看我们Listener如何实现。

BaseEventListener#

我们刚说过我们的Listener需要同时实现EventHandler跟EventListener,所以我们定义一个抽象类,注意这个EventListener是我们定义的,EventHandler是Disruptor定义的。

Copypublic abstract class BaseEventListener<T> implements EventListener<T>, EventHandler<T> {}

ExecutableEventListener#

我们定义一个抽象类ExecutableEventListener 我们来实现一下里面的方法。

对于Spring跟Guava来说只需要在方法上添加注解便可以在事件发生的时候回调过来,而对于Disruptor来说它的回调是继承EventHandler里面的onEvent方法。所以我们在onEvent里面手动调用onMessage方法,让所有的消息都转发给onMessage处理。

@org.springframework.context.event.EventListener Spring的回调注解

@Subscribe 的回调注解

onMessage:我们先调用topic()方法获取Listener方法的topic,这个方法我们这里先不实现,交给具体的实现类去实现这个方法。我们再定义一个handle的抽象方法,则是我们具体的消息处理逻辑的方法,也交给具体的实现类去实现。

Copy@Slf4j
public abstract class ExecutableEventListener extends BaseEventListener<EventModel<?>> {@org.springframework.context.event.EventListener@Subscribe@Overridepublic void onMessage(EventModel<?> message) {log.info("收到消息:{}",message);if (topic().equals(message.getTopic())){handle(message);}}@Overridepublic void onEvent(EventModel<?> event, long sequence, boolean endOfBatch) throws Exception {onMessage(event);}/***  具体消息处理方法* @param message*/protected abstract void handle(EventModel<?> message);}

至此我们的核心代码就开发完成了,现在定义两个注解,让我们能够在项目中启用它。

EnableEventBus:在启动类上添加这个注解以启用EventBus

[外链图片转存中…(img-OjGimDb2-1703487623978)]

EventBusConfiguration:配置一下Spring的包扫描路径
img

测试

我们把我们刚写的项目install到本地maven仓库,以便我们在项目中能够引用它。我们新建一个SpringBootWeb项目添加这个依赖测试下

在pom中添加

Copy  <dependency><groupId>com.billetsdoux</groupId><artifactId>eventBus</artifactId><version>1.0.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

在启动类上添加这个注解启用

img

添加一个事件监听器,继承ExecutableEventListener,监听”blog“消息的主题。

img

添加一个Controller来测试一下我们的事件总线。它根据我们的type来选择不同的底层驱动:spring,guava,disruptor

[外链图片转存中…(img-ZndsBWCc-1703487623979)]

我们打包成docker镜像然后启动。
Dockerfile如下:

CopyFROM openjdk:8-jre-slim
MAINTAINER billtsdouxWORKDIR /appADD target/eventbus_blog*.jar app.jarEXPOSE 8080ENV JVM_OPTS="-Xmx256m -Xms256m" \TZ=Asia/ShanghaiRUN ln -sf /usr/share/zoneinfo/$TZ /etc/localtime \&& echo $TZ > /etc/timezoneENTRYPOINT ["sh","-c","java -jar $JVM_OPTS app.jar"]

我们这里配置一下打包后镜像的名称,已经启动的容器名称跟监听的端口。

[外链图片转存中…(img-xKzuGe5r-1703487623980)]

构建成功

img

并且也启动一个容器:

[外链图片转存中…(img-QETw4G24-1703487623980)]

查看日志可以看到我们内置的三个监听器注册器已经成功启动了。

[外链图片转存中…(img-ScQl7W0a-1703487623980)]

我们测试一下接口:可以看到根据我们选择的不同类型我们可以选择不同的实现。

[外链图片转存中…(img-aFD49ezm-1703487623981)]

[外链图片转存中…(img-CnPONtYE-1703487623981)]

[外链图片转存中…(img-mDQW65LN-1703487623981)]

后言

如果提供这个三个不够用,我们还可以通过实现这个接口EventListenerRegistry来扩展我们的事件总线组件,再注入到容器中,在调用的时候选择具体的实现就好了。

标签: java , Spring

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/576507.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大数据技术之 Kafka (第 1 章 Kafka 概述)

第 1 章 Kafka 概述 1.1 定义 Kafka 是一个分布式的基于发布/订阅模式的消息队列&#xff08;Message Queue&#xff09;&#xff0c;主要应用于大数据实时处理领域。 1.2 消息队列 1.2.1 传统消息队列的应用场景 MQ传统应用场景之异步处理 使用消息队列的好处 1&a…

那些你无比崇拜的厉害人,是如何建构知识体系的

那些你无比崇拜的厉害人&#xff0c;是如何建构知识体系的&#xff1f; 2018-04-04 六合同风 文 | Lachel 高效思维达人&#xff0c;知识管理专家&#xff0c;深度思考践行者&#xff0c;领英、36氪特约作家 来源 | L先生说&#xff08;ID&#xff1a;lxianshengmiao&#x…

大数据技术之 Kafka (第 2 章 Kafka快速入门)

第 2 章 Kafka 快速入门 下载安装kafka集群 1.需要jdk 2.需要zookeeper&#xff0c;这个东西在最新版的Kafka中内置。 3.下载Kafka安装包 &#xff08;下载官网地址&#xff1a;Apache Kafka&#xff09; 一&#xff0c;下载Kafka安装包 二&#xff0c;Kafka安装包上传li…

使用ICSharpCode.TextEditor制作一个语法高亮显示的XML编辑器

本文转载&#xff1a;http://www.cnblogs.com/lefay/archive/2010/07/25/1784919.html转载于:https://www.cnblogs.com/51net/archive/2012/04/21/2462431.html

文因互联 CEO 鲍捷:确保搞砸人工智能项目的十种方法

文因互联 CEO 鲍捷&#xff1a;确保搞砸人工智能项目的十种方法 原文链接 原创&#xff1a; 鲍捷 文因互联 前天 做成一件事儿不容易&#xff0c;而坑恒在。 鲍捷博士于5月10日在将门创投的线上 talk 中盘点了人工智能项目的大坑小坑&#xff0c;选出了看上去非常反常识的十…

启动kafka失败报内存不足(Cannot allocate memory)

原因分析&#xff0c;因为我的虚拟机内存一共才1G 查看一下我的虚拟机内存还剩余多少&#xff1f; 天呐&#xff0c;我的只有60M了 我们看下kafka的启动脚本 vim kafka-server-start.sh 看见下面的脚本文件内容没有&#xff0c;默认是1G&#xff0c;而我只有60M&#xff0…

Introduce Null Object(引入Null对象)

Introduce Null Object&#xff08;引入Null对象&#xff09;你需要再三检查某对象是否为null。将null值替换为null对象。if (customer null) plan BillingPlan.basic(); else plan customer.getPlan(); 动机多态的最根本好处在于&#xff1a;你不必再向对象询问"你是…

zookeeper Error contacting service. It is probably not running

通过命令zkServer.sh start之后&#xff0c;显示STARTED 通过client测试报错 无法连接 连接关闭 Connecting to localhost:2181 2020-03-29 12:44:31,855 [myid:] - INFO [main:Environment100] - Client environment:zookeeper.version3.4.13-2d71af4dbe22557fda74f9a9b430…

引言

推荐系统从0到1_引言什么是推荐&#xff1f;什么是推荐系统&#xff1f;推荐系统的应用什么是推荐&#xff1f; 说起推荐&#xff0c;就不得不说搜索。搜索这里指信息检索&#xff0c;在大量的信息中&#xff0c;我们需要找到自己需要的信息&#xff0c;就用到了搜索引擎&…

JNI的方式调用DLL(SO)(上)

最近有个项目要调用原有的DLL文件&#xff0c;本来准备使用JNA&#xff0c;可是客户不同意&#xff0c;非要使用JNI的方式。万般无奈之下&#xff0c;只能重新使用JNI&#xff0c;现将JAVA调用动态链接库的一般方法总结如下。 JNI是Java Native Interface&#xff08;JAVA本地调…

搭建推荐系统所需要的材料

搭建推荐系统所需要的材料人力物力推荐系统的原材料物品流量最后人力物力 在上一小节中《推荐系统的必要性》里已经讨论过一个推荐业务团队需要哪些人员储备&#xff0c;需要什么核心技术等。这里在重点说下物力&#xff0c;即机器资源设备。 往往从头开始搭建这样一个团队&a…

kafka:topic为什么要进行分区?副本机制是如何做的?

kafka为什么要在topic里加入分区的概念&#xff1f;如果没有分区,topic中的segment消息写满后,直接给订阅者不是也可以吗&#xff1f; Kafka可以将主题划分为多个分区&#xff08;Partition&#xff09;&#xff0c;会根据分区规则选择把消息存储到哪个分区中&#xff0c;只要如…

将某字符串切割成阵列并排序列出

老师布置的另外一道题是将字符串"a;b;d;z;y;u"切割成阵列并排序列出。老师出这题也许是让Insus.NET掌握或复习Array.Sort()函数&#xff0c;不管怎样&#xff0c;先按自己的想法实现&#xff0c;然后是等老师的意见。protected void Page_Load(object sender, EventA…

关于管理,这5大误区你越早知道越好

关于管理&#xff0c;这5大误区你越早知道越好 原创&#xff1a; 陈春花 管理的常识 绝大部分人都感觉自己已经非常努力地工作&#xff0c;但结果却不尽如人意&#xff0c;到底问题出在哪里&#xff1f; 就让北京大学国家发展研究院管理学教授、北大国发院BiMBA商学院院长陈春…

大数据技术之 Kafka (第 3 章 Kafka 架构深入 ) Log存储解析

Kafka 工作流程 Kafka 中消息是以 topic 进行分类的&#xff0c;生产者生产消息&#xff0c;消费者消费消息&#xff0c;都是面向 topic的。 topic 是逻辑上的概念&#xff0c;而 partition 是物理上的概念&#xff0c;每个 partition 对应于一个 log 文件&#xff0c;该 log…

推荐系统整体框架概览

推荐系统整体框架概览推荐系统整体架构 推荐系统的核心组成部分离线核心节点服务UI总结推荐系统整体架构 先说点题外话&#xff0c;最近在看的书中讲到了怎么进行自学的方法&#xff0c;分了十个层级。第一个便是要了解所学内容的概况&#xff0c;也就是轮廓&#xff0c;大的东…

大数据技术之 Kafka (第 3 章 Kafka 架构深入 ) Kafka 生产者

3.2.1 分区策略 1&#xff09;分区的原因 &#xff08;1&#xff09;方便在集群中扩展&#xff0c;每个 Partition 可以通过调整以适应它所在的机器&#xff0c;而一个 topic又可以有多个 Partition 组成&#xff0c;因此整个集群就可以适应任意大小的数据了&#xff1b; &a…

MySQL定时备份实现

一、备份数据库 –all-databases 备份所有数据库 /opt/mysqlcopy/all_$(date “%Y-%m-%d %H:%M:%S”).sql 备份地址 docker exec -it 容器名称 sh -c "mysqldump -u root -ppassword --all-databases > /opt/mysqlcopy/all_$(date "%Y-%m-%d %H:%M:%S").sq…

程序员健身的重要性

程序员健身的重要性人的身体就是人的灵魂最好的写照&#xff01; &#xff0d;&#xff0d;&#xff0d; 路德维系.维特根斯坦 健身不仅是保持健康体魄的关键要素之一&#xff0c;也是灵活的、具有创造性的脑力活动的基础。 &#xff0d;&#xff0d;&#xff0d; 约翰.肯尼迪 …

Java 8 中的哈希表

JDK 的代码是开源的&#xff0c;我们打开idea开发工具&#xff0c;引入jdk1.8 找到hashmap HashMap 是基于 HashTable 的一种数据结构&#xff0c;在普通哈希表的基础上&#xff0c;它支持多线程操作以及空的 key 和 value。 在 HashMap 中定义了几个常量: static final in…