SpringBoot ApplicationListener实现发布订阅模式

文章目录

  • 前言
  • 一、Spring对JDK的扩展
  • 二、快速实现发布订阅模式


前言

发布订阅模式(Publish-Subscribe Pattern)通常又称观察者模式,它被广泛应用于事件驱动架构中。即一个事件的发布,该行为会通过同步或者异步的方式告知给订阅该事件的订阅者。JDK中提供了EventListener作为所有订阅者的接口规范(即所有的订阅者都应该实现该接口),而EventObject则作为所有事件发布者的实现规范(即所有事件发布者都应该继承该类)。对于观察者的原理不是本章讨论的重点,本章只是演示如何在SpringBoot中实现发布订阅模式。


一、Spring对JDK的扩展

Spring中,提供了接口ApplicationListener作为Spring观察者(也叫监听者)的实现规范,ApplicationListener其实是对JDKEventListener中的扩展,增加了onApplicationEvent方法作为触发监听的方法。而事件发布对象ApplicationEvent也是继承了JDK中的EventObject类,仅仅增加了参数timestamp用于记录事件创建的时间。也就是说如果要使用Spring提供的发布订阅模式,您的监听器应该实现ApplicationListener接口,通过onApplicationEvent方法获取监听的内容。事件则必须继承ApplicationEvent

ApplicationListener源码:

@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {/*** Handle an application event.* @param event the event to respond to*/void onApplicationEvent(E event);}

ApplicationEvent源码:

public abstract class ApplicationEvent extends EventObject {/** use serialVersionUID from Spring 1.2 for interoperability. */private static final long serialVersionUID = 7099057708183571937L;/** System time when the event happened. */private final long timestamp;/*** Create a new {@code ApplicationEvent}.* @param source the object on which the event initially occurred or with* which the event is associated (never {@code null})*/public ApplicationEvent(Object source) {super(source);this.timestamp = System.currentTimeMillis();}/*** Return the system time in milliseconds when the event occurred.*/public final long getTimestamp() {return this.timestamp;}}

二、快速实现发布订阅模式

配置线程池是必要的,因为发布订阅模式的一个好处就是可以实现解耦,而解耦最好的方式就是采用异步线程处理。如果我们不配置线程池,则在spring中默认会采用同步的方式进行消息发布和订阅消费。这样一来就没有任何意义了。首先在yaml或者properties中配置线程池信息:

thread:executor:corePoolSize: 8 #核心线程keepAliveSeconds: 30000 # 活跃时间maxPoolSize: 16 #最大线程数queueCapacity: 100000 #最大队列长度

然后通过配置文件读取配置信息,创建线程池并注入IOC容器

package com.hl.by.common.thread;import com.hl.by.common.utils.JsonUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ErrorHandler;import java.util.concurrent.ThreadPoolExecutor;/*** @Author: DI.YIN* @Date: 2024/3/6 9:39* @Version: 1.0.0* @Description: 线程池配置**/
@Configuration
public class ThreadPoolTaskExecutorConfig {//核心线程@Value("${thread.executor.corePoolSize}")private Integer corePoolSize;//存活时间@Value("${thread.executor.keepAliveSeconds}")private Integer keepAliveSeconds;//最大线程数@Value("${thread.executor.maxPoolSize}")private Integer maxPoolSize;//最大队列长度@Value("${thread.executor.queueCapacity}")private Integer queueCapacity;@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);threadPoolTaskExecutor.setCorePoolSize(corePoolSize);threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);threadPoolTaskExecutor.setQueueCapacity(queueCapacity);//设置拒绝策略,直接运行,不采用异步threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());threadPoolTaskExecutor.setThreadNamePrefix("Thread-Pool-Task-");return threadPoolTaskExecutor;}@DependsOn(value = "taskExecutor")@Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)public SimpleApplicationEventMulticaster eventMulticaster() {SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());//设置错误处理器simpleApplicationEventMulticaster.setErrorHandler(new ErrorHandler() {@Overridepublic void handleError(Throwable throwable) {System.out.println("抛出异常:" + JsonUtils.writeObjectAsBeautifulJson(throwable));}});return simpleApplicationEventMulticaster;}
}

可以看到除了注入线程池之外,还注入了自定义的SimpleApplicationEventMulticaster 对象并将创建的线程池设置到SimpleApplicationEventMulticaster中。因为SimpleApplicationEventMulticaster是处理发布订阅的核心类,通过multicastEvent方法进行事件发布。可以看到multicastEvent中,循环遍历订阅该事件的所有监听器,并判断是否配置了线程池Executor,如果配置了则将发布操作扔入线程池中异步处理,否则将同步处理发布事件操作。很多情况发现我们的事件发布与监听处理是在一个线程中执行,就是因为我们未设置线程池,导致发布订阅无法异步实现。

	@Overridepublic void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));//获取线程池Executor executor = getTaskExecutor();for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {if (executor != null) {//如果配置了线程池,则放入线程池中异步处理executor.execute(() -> invokeListener(listener, event));}else {//未配置线程池,则同步处理invokeListener(listener, event);}}}

完成以上配置后,就可以定义发布者、订阅者和发布事件了。现在我们定义一个类MessageSource作为发布者发布的事件,结构如下:

import lombok.Data;/*** @Author: DI.YIN* @Date: 2024/3/6 13:41* @Version:* @Description: 消息实体**/
@Data
public class MessageSource {private String id;private String msg;private String title;
}

定义好发布事件后,我们定义一个事件发布者MessageEvent,并指定其发布的事件类型是MessageSourceMessageSource 的子类,结构如下:

import org.springframework.context.ApplicationEvent;/*** @Author: DI.YIN* @Date: 2024/3/6 13:39* @Version: 1.0.0* @Description: 消息事件**/
public class MessageEvent<T extends MessageSource> extends ApplicationEvent {/*** Create a new {@code ApplicationEvent}.** @param source the object on which the event initially occurred or with*               which the event is associated (never {@code null})*/public MessageEvent(MessageSource source) {super(source);}
}

现在已经定义好了发布事件MessageSource,事件发布者MessageEvent,此时我们可以定义一个事件订阅者MessageListener,用于监听事件发布者MessageEvent发布的事件。代码如下:

import com.alibaba.fastjson.JSONObject;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;/*** @Author: DI.YIN* @Date: 2024/3/6 10:19* @Version:* @Description:**/
@Component
public class MessageListener implements ApplicationListener<MessageEvent> {@Overridepublic void onApplicationEvent(MessageEvent event) {MessageSource source = (MessageSource)event.getSource();System.out.println("消息监听器监听到消息:===>"+ JSONObject.toJSONString(source));}
}

现在我们就实现了一个订阅发布模式,事件对象MessageSource,事件发布者MessageEvent专门用于发布MessageSource类型的事件,事件监听者MessageListener 则专门监听MessageEvent发布的事件。可以创建一个接口用于测试发布订阅是否成功。


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;/*** @Author: Greyfus* @Create: 2024-03-02 14:08* @Version:* @Description:*/
@RestController
@RequestMapping("/mock")
public class TestController {@Autowiredprivate ApplicationContext applicationContext;@RequestMapping(method = RequestMethod.POST, value = "/publishMessage", consumes = MediaType.APPLICATION_JSON_VALUE)public void publishMessage() throws Exception {//构建信息实体MessageSource messageSource = new MessageSource();messageSource.setId(String.valueOf(1));messageSource.setTitle("日志消息");messageSource.setMsg("调用了接口publishMessage");//构建消息事件MessageEvent<MessageSource> messageEvent = new MessageEvent(messageSource);//发布事件applicationContext.publishEvent(messageEvent);}
}

通过用postman调用接口/mock/feign可以看到MessageListener 成功接受到了MessageEvent发布的MessageSource事件。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/753837.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Error response from daemon Get server gave HTTP response to HTTPS client

使用docker compose拉起docker镜像时&#xff0c;若出现如下报错 Error response from daemon: Get "https://devops.test.cn:5000/v2/": http: server gave HTTP response to HTTPS client表示Docker守护进程无法从指定url获取响应&#xff0c; 可能原因有以下&…

苍穹外卖-day09:用户端历史订单模块(理解业务逻辑),商家端订单管理模块(理解业务逻辑),校验收货地址是否超出配送范围(相关API)

用户端历史订单模块 1. 查询历史订单&#xff08;分页查询&#xff09; 1.1 需求分析和设计 产品原型&#xff1a; 业务规则 分页查询历史订单可以根据订单状态查询展示订单数据时&#xff0c;需要展示的数据包括&#xff1a;下单时间、订单状态、订单金额、订单明细&#…

软考76-上午题-【面向对象技术3-设计模式】-创建型设计模式01

一、创建型设计模式一览 二、创建型设计模式 2-1、创建型设计模式的概念 一个类创建型模式使用继承改变被实例化的类&#xff1b; 一个对象创建型模式将实例化委托给另一个对象。 对应java的new一个对象。 2-2、简单工厂模式&#xff08;静态工厂方法&#xff09; 简单工厂…

猫头虎分享已解决Bug || TypeError: Cannot interpret ‘float‘ value as integer.

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

利用自定义 URI Scheme 在 Android 应用中实现安全加密解密功能

在现代移动应用开发中&#xff0c;安全性和用户体验是至关重要的考虑因素。在 Android 平台上&#xff0c;开发人员可以利用自定义 URI Scheme 和 JavaScript 加密解密技术来实现更安全的数据传输和处理。本文将介绍如何在 Android 应用中注册自定义 URI Scheme&#xff0c;并结…

计算机组成原理——自己制作一个cpu

cpu包括单周期cpu、中断cpu、多周期cpu 代码实现之后在实验箱看效果&#xff0c;并且看波形图 单周期波形 中断cpu 多周期cpu 1.单周期CPU总体电路图 如图是一个简单的基本上能够在单周期CPU上完成所要求设计的指令功能的数据通路和必要的控制线路图。其中指令和数据各存储在不…

超越想象的数据可视化:五大工具引领新潮流

在数据分析领域&#xff0c;数据可视化工具是每位分析师的得力助手。它们能够将复杂的数据转化为直观、易懂的图表和图像&#xff0c;帮助分析师快速洞察数据背后的规律与趋势。下面&#xff0c;我将从数据分析师的角度&#xff0c;为大家介绍五个常用的数据可视化工具。 一、…

【vue.js】文档解读【day 5】| ref模板引用

如果阅读有疑问的话&#xff0c;欢迎评论或私信&#xff01;&#xff01; 本人会很热心的阐述自己的想法&#xff01;谢谢&#xff01;&#xff01;&#xff01; 文章目录 模板引用前言访问模板引用模板引用与v-if、v-show的结合v-for中的模板引用函数模板引用 模板引用 前言 …

Vue.js+SpringBoot开发食品生产管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 加工厂管理模块2.2 客户管理模块2.3 食品管理模块2.4 生产销售订单管理模块2.5 系统管理模块2.6 其他管理模块 三、系统展示四、核心代码4.1 查询食品4.2 查询加工厂4.3 新增生产订单4.4 新增销售订单4.5 查询客户 五、…

二蛋赠书十八期:《一本书讲透Elasticsearch:原理、进阶与工程实践》

Elasticsearch 是一种强大的搜索和分析引擎&#xff0c;被广泛用于各种应用中&#xff0c;以其强大的全文搜索能力而著称。 不过&#xff0c;在日常管理 Elasticsearch 时&#xff0c;我们经常需要对索引进行保护&#xff0c;以防止数据被意外修改或删除&#xff0c;特别是在进…

PC-DARTS: PARTIAL CHANNEL CONNECTIONS FOR MEMORY-EFFICIENT ARCHITECTURE SEARCH

PC-DARTS&#xff1a;用于内存高效架构搜索的部分通道连接 论文链接&#xff1a;https://arxiv.org/abs/1907.05737 项目链接&#xff1a;https://github.com/yuhuixu1993/PC-DARTS ABSTRACT 可微分体系结构搜索(DARTS)在寻找有效的网络体系结构方面提供了一种快速的解决方案…

和解费用3362万美元,谁来守护跨境卖家的“钱包”

公司向原告支付3362万美元(包括原告方主张的损害赔偿金2500万美元及原告方支付的律师费用862万美元)&#xff1b; 公司不得通过任何方式访问或使用原告的产品或数据&#xff1b; 公司不得向最终用户提供维修帮助服务(属于公司汽车诊断产品中的辅助维 修功能&#xff0c;不影响…

MTLAB 批量下载 脑医学图像数据集BrainWeb: Simulated Brain Database

MTLAB 批量下载 脑医学图像数据集BrainWeb: Simulated Brain Database BrainWeb数据集的网址&#xff1a;https://brainweb.bic.mni.mcgill.ca/brainweb/ 1. 了解 BrainWeb: Simulated Brain Database 这是一个模拟大脑数据的数据库&#xff08;SBD&#xff0c;Simulated Br…

C#重新认识笔记_ 点乘,叉乘

C#重新认识笔记_ 点积&#xff0c;叉乘 一、Dot Product 点乘&#xff1a; &#xff08;Ax*Bx&#xff09;&#xff08;Ay*By&#xff09;&#xff08;Az*Bz&#xff09;Dot Product 点积 利用点积&#xff0c;可以了解&#xff0c;两个向量(vector)的相关信息&#xff0c; …

【系统架构设计师】软件架构设计 02

系统架构设计师 - 系列文章目录 01 系统工程与信息系统基础 02 软件架构设计 文章目录 系统架构设计师 - 系列文章目录 文章目录 前言 一、软件架构的概念 ★★★ 二、基于架构的软件开发 ★★★★ 三、软件架构风格 ★★★★ 1.数据流风格 2.调用/返回风格 3.独立构件风格…

Qt学习--继承(并以分文件实现)

基类 & 派生类 一个类可以派生自多个类&#xff0c;这意味着&#xff0c;它可以从多个基类继承数据和函数。定义一个派生类&#xff0c;我们使用一个类派生列表来指定基类。类派生列表以一个或多个基类命名。 总结&#xff1a;简单来说&#xff0c;父类有的&#xff0c;子…

操作系统系列学习——信号量临界区保护

文章目录 前言信号量临界区保护 前言 一个本硕双非的小菜鸡&#xff0c;备战24年秋招&#xff0c;计划学习操作系统并完成6.0S81&#xff0c;加油&#xff01; 本文总结自B站【哈工大】操作系统 李治军&#xff08;全32讲&#xff09; 老师课程讲的非常好&#xff0c;感谢 【哈…

【力扣精选算法100道】——带你了解(数组模拟栈)算法

目录 &#x1f4bb;比较含退格的字符串 &#x1f388;了解题意 &#x1f388;分析题意 &#x1f6a9;栈 &#x1f6a9;数组模拟栈 &#x1f388;实现代码 844. 比较含退格的字符串 - 力扣&#xff08;LeetCode&#xff09; &#x1f4bb;比较含退格的字符串 &#x1f3…

Maven项目 快速修复log4j 漏洞

1、log4j 漏洞介绍 log4j的漏洞介绍以及原理请参考文章 &#xff0c;网址详见下面文章 Log4j漏洞原理及修复_linux log4j漏洞修复方案-CSDN博客&#xff0c;遇到这个漏洞要升级log4j 的jar包到2.17.0 以上。 2、项目快速处理方案 由于maven 管理jar 的spring 项目或者…

通过点击按钮实现查看全屏和退出全屏的效果

动态效果如图&#xff1a; 可以通过点击按钮&#xff0c;或者esc键实现全屏和退出全屏的效果 实现代码&#xff1a; <template><div class"hello"><el-button click"fullScreen()" v-if"!isFullscreen">查看全屏</el-butt…