EDA - Spring Boot构建基于事件驱动的消息系统

文章目录

  • 概述
  • 事件驱动架构的基本概念
  • 工程结构
  • Code
    • 创建事件和事件处理器
    • 创建事件总线
    • 创建消息通道和发送逻辑
    • 创建事件处理器
    • 消息持久化
    • 创建消息发送事件
    • 配置 Spring Boot 启动类
    • 测试
    • 消息消费
    • 运行项目

在这里插入图片描述


概述

在微服务架构和大规模分布式系统中,事件驱动架构(EDA)成为了非常重要的设计模式。通过事件驱动,我们可以解耦系统的各个组件,提高系统的可扩展性、可维护性和响应能力。

接下来,我们将演示一下如何在 Spring Boot 中实现一个基于事件驱动的消息发送和接收流程,从消息的发送、事件的发布到事件的监听。


事件驱动架构的基本概念

在事件驱动架构中,系统的各个组件通过事件进行通信。每个事件代表一个特定的行为或状态变化,当事件发布时,系统的其他部分可以响应这些事件并做出相应的处理。消息发送和接收的流程正是通过发布和监听事件来实现的。

接下来我们使用 Spring Boot 来实现一个基于事件驱动的消息系统。、

系统包含以下几个部分:

  • 消息发送: 消息将通过一个 MessageEventProcessor 进行处理,并且在处理完成后会发布一个事件。
  • 事件发布: 消息成功发送后,通过 ApplicationEventPublisher 发布一个 MessageSentEvent
  • 事件监听: 一个监听器会接收到发布的事件并进行相应的处理(比如记录日志、通知其他组件等)

工程结构

在这里插入图片描述

  • EventBus:事件总线,负责发布事件。
  • MessageEventProcessor:处理消息事件的处理器。
  • EventMessageEventMessageSentEvent:事件类,MessageEventMessageSentEvent继承自Event
  • MessageChannel:消息通道接口,EmailMessageChannel是其具体实现。
  • MessageRepository:消息存储库,用于保存消息事件。
  • MessageChannelConfig:消息通道配置,配置了消息通道的Bean。
  • MessageController:消息控制器,处理发送消息的请求。
  • MessageSentEventListener:监听消息发送事件的监听器。

Code

创建事件和事件处理器

Event.java - 定义基础事件

package com.artisan.booteventbus.domain;public abstract class Event {// 事件的基本字段
}

MessageEvent.java - 定义具体的消息事件

package com.artisan.booteventbus.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;import java.util.Map;@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class MessageEvent extends Event {private String message;private String channel;private Map<String, Object> metadata;}

EventHandler.java - 定义事件处理器接口

package com.artisan.booteventbus.bus;import com.artisan.booteventbus.domain.Event;public interface EventHandler<T extends Event> {void handle(T event);
}

创建事件总线

EventBus.java - 用于发布事件

package com.artisan.booteventbus.bus;import com.artisan.booteventbus.domain.Event;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;@Component
public class EventBus {private final ApplicationEventPublisher publisher;public EventBus(ApplicationEventPublisher publisher) {this.publisher = publisher;}public void publish(Event event) {publisher.publishEvent(event);}
}

创建消息通道和发送逻辑

MessageChannel.java - 定义消息通道接口

package com.artisan.booteventbus.service;import com.artisan.booteventbus.domain.MessageEvent;import java.util.concurrent.CompletableFuture;public interface MessageChannel {boolean supports(MessageEvent event);CompletableFuture<Void> sendAsync(MessageEvent event);
}

MessageChannelConfig.java - 初始化channel

package com.artisan.booteventbus.config;import com.artisan.booteventbus.service.MessageChannel;
import com.artisan.booteventbus.service.impl.EmailMessageChannel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.ArrayList;
import java.util.List;@Configuration
public class MessageChannelConfig {@Beanpublic List<MessageChannel> messageChannels() {List<MessageChannel> channels = new ArrayList<>();channels.add(new EmailMessageChannel());// 可以继续添加其他类型的通道return channels;}
}

EmailMessageChannel.java - 实现邮件发送通道

package com.artisan.booteventbus.service.impl;import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.service.MessageChannel;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CompletableFuture;@Slf4j
public class EmailMessageChannel implements MessageChannel {@Overridepublic boolean supports(MessageEvent event) {return "email".equals(event.getChannel());}@Overridepublic CompletableFuture<Void> sendAsync(MessageEvent event) {return CompletableFuture.runAsync(() -> {// 模拟邮件发送System.out.println(Thread.currentThread().getName() + "- Sending email: " + event.getMessage());log.info("Sending email: {}", event.getMessage());});}
}

创建事件处理器

MessageEventProcessor.java - 处理消息事件,保存事件并发送

package com.artisan.booteventbus.bus;import com.artisan.booteventbus.dao.MessageRepository;
import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.domain.MessageSentEvent;import com.artisan.booteventbus.service.MessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.scheduling.annotation.Async;import java.util.List;@Component
public class MessageEventProcessor implements EventHandler<MessageEvent> {private final EventBus eventBus;private final MessageRepository messageRepository;private final List<MessageChannel> channels;@Autowiredpublic MessageEventProcessor(EventBus eventBus, MessageRepository messageRepository, List<MessageChannel> channels) {this.eventBus = eventBus;this.messageRepository = messageRepository;this.channels = channels;}/*** @param event* Asyn 请使用自定义线程池,这里仅仅是 为了演示异步*/@Async@Overridepublic void handle(MessageEvent event) {// 1. 消息持久化messageRepository.save(event);// 2. 通道路由MessageChannel channel = channels.stream().filter(ch -> ch.supports(event)).findFirst().orElseThrow();// 3. 异步发送channel.sendAsync(event).thenRun(() -> eventBus.publish(new MessageSentEvent(event)));}}

消息持久化

MessageRepository.java - 用于消息的持久化(可以使用内存或数据库)

package com.artisan.booteventbus.dao;import com.artisan.booteventbus.domain.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;import java.util.ArrayList;
import java.util.List;@Slf4j
@Repository
public class MessageRepository {private final List<MessageEvent> messageStore = new ArrayList<>();public void save(MessageEvent event) {// 模拟存储messageStore.add(event);System.out.println(Thread.currentThread().getName() + " - Message saved: " + event.getMessage());log.info("Message saved {}", event.getMessage());}
}

创建消息发送事件

MessageSentEvent.java - 定义发送后的事件

package com.artisan.booteventbus.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageSentEvent extends Event {private MessageEvent originalEvent;}

配置 Spring Boot 启动类

package com.artisan;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;@EnableAsync(proxyTargetClass=true)
@SpringBootApplication
public class BootEventBusApplication {public static void main(String[] args) {SpringApplication.run(BootEventBusApplication.class, args);}}

测试

为了测试整个架构,创建一个控制器来模拟发送消息。

package com.artisan.booteventbus.controller;import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.bus.EventBus;
import com.artisan.booteventbus.bus.MessageEventProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.HashMap;@RestController
@RequestMapping("/messages")
public class MessageController {private final EventBus eventBus;private final MessageEventProcessor eventProcessor;@Autowiredpublic MessageController(EventBus eventBus, MessageEventProcessor eventProcessor) {this.eventBus = eventBus;this.eventProcessor = eventProcessor;}@RequestMapping("/send")public String sendMessage(@RequestParam String message, @RequestParam String channel) {MessageEvent event = new MessageEvent(message, channel, new HashMap<>());eventProcessor.handle(event); // 异步处理消息return "Message is being processed";}
}

消息消费

package com.artisan.booteventbus.listeners;import com.artisan.booteventbus.domain.MessageSentEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MessageSentEventListener {@Async@EventListenerpublic void handleMessageSentEvent(MessageSentEvent event) {// 模拟处理事件System.out.println(Thread.currentThread().getName() + " - Received MessageSentEvent: " + event.getOriginalEvent().getMessage());log.info("Sending email: {}", event.getOriginalEvent().getMessage());}
}

运行项目

http://localhost:8080/messages/send?message=artisan&channel=email

在这里插入图片描述
在这里插入图片描述

当然了,你也可以基于此种模式,使用kafka
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63950.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CAD C# 批量替换当前图中块、标注

本案例功能为选择当前文档中一个块&#xff08;旧块&#xff09;&#xff0c;然后选择新图元&#xff08;新块&#xff09;&#xff0c;运行插件后新块将替换图中所有的旧块。 效果如下&#xff1a; public static class Class1{//选取对象替换块定义[CommandMethod("TT&…

SpringBoot快速使用

一些名词的碎碎念: 1> 俩种网络应用设计模式 C/S 客户端/服务器 B/S 浏览器/服务器 俩者对比: 2> 集群和分布式的概念 集群: 分布式: 例子: 一个公司有一个人身兼多职 集群: 招聘N个和上面这个人一样身兼多职 分布式: 招聘N个人,分担上面这个人的工作,进行工作的拆分. 工…

苹果公司即将为iPhone和智能家居改用自主研发的蓝牙和Wi-Fi芯片

美股快讯&#xff1a;苹果公司即将为iPhone和智能家居改用自主研发的蓝牙和Wi-Fi芯片 苹果公司计划从明年开始在其设备上改用国产芯片进行蓝牙和Wi-Fi连接&#xff0c;此举将逐步淘汰目前由博通提供的部分部件。这种代号为Proxima的芯片已经开发了数年&#xff0c;现在计划用于…

快速理解分布式事务Seate基本知识

Seata是一款开源的分布式事务解决方案&#xff0c;致力于提供高性能和简单易用的分布式事务服务。Seata将为用户提供了AT、TCC、SAGA和XA事务模式&#xff0c;为用户打造一站式的分布式解决方案。 一.Seate的三大角色 在 Seata 的架构中&#xff0c;一共有三个角色&#xff1a;…

前端项目初始化搭建(二)

一、使用 Vite 创建 Vue 3 TypeScript 项目 PS E:\web\cursor-project\web> npm create vitelatest yf-blog -- --template vue-ts> npx > create-vite yf-blog --template vue-tsScaffolding project in E:\web\cursor-project\web\yf-blog...Done. Now run:cd yf-…

多模态大模型(二)——用Transformer Encoder和Decoder的方法(BLIP、CoCa、BEiTv3)

文章目录 BLIP: Bootstrapping Language-Image Pre-training for Unified Vision-Language Understanding and Generation 理解、生成我都要&#xff0c;一个很有效的、根据图片生成caption的工具1. BLIP的研究动机2. BLIP的模型结构3. CapFilt Model4. BLIP的训练过程 CoCa: C…

【理想汽车中科院】基于模仿学习的端到端自动驾驶数据缩放规律

论文: https://arxiv.org/pdf/2412.02689 项目: https://github.com/ucaszyp/Driving-Scaling-Law 0. 摘要 端到端自动驾驶范式因其可扩展性而最近吸引了大量关注。然而&#xff0c;现有方法受到现实世界数据规模有限的制约&#xff0c;这阻碍了对端到端自动驾驶相关扩展规律…

【工具介绍】可以批量查看LableMe标注的图像文件信息~

在图像处理和计算机视觉领域&#xff0c;LabelMe是一个广泛使用的图像标注工具&#xff0c;它帮助我们对图像中的物体进行精确的标注。但是&#xff0c;当标注完成后&#xff0c;我们常常需要一个工具来批量查看这些标注信息。 今天&#xff0c;我要介绍的这款exe程序&#xf…

链式栈的实现及其应用

目录 一、链式栈结构模型 二、链式栈的实现 2.1创建 2.2压栈 2.3出栈 2.4判断栈是否为空 2.5查看栈顶 2.6释放栈 三、应用 链式栈实际上就是基于链表&#xff0c;压栈和弹栈可分别看作头插和头删&#xff0c;链表尾部就是栈底&#xff0c;头指针就是栈顶指针 一、链式…

day12 接口测试 ——入门→精通→实战(1)

【没有所谓的运气&#x1f36c;&#xff0c;只有绝对的努力✊】 目录 1、接口测试分类 1.1 内部接口&#xff1a; 1.2 外部接口&#xff1a; 2、目前接口架构设计 2.1、基于SOAP架构&#xff0c; 2.2、基于RPC架构&#xff0c; 2.3、基于RestFul架构&#xff0c; 2.3.1…

程序的调试

一名优秀的程序员也是一名出色的侦探&#xff0c;每一次调试都是尝试破案的过程 目录 前言 一、什么是调试&#xff1f; 二、调试 1.调试是什么 2.基本步骤 三、调试注意事项 1.怎么写出易于调试的代码 assert(断言) const 2.常见错误 总结 前言 主要是怎么调试&#xff0c;调…

FPGA实现GTP光口数据回环传输,基于Aurora 8b/10b编解码架构,提供2套工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐我已有的所有工程源码总目录----方便你快速找到自己喜欢的项目我这里已有的 GT 高速接口解决方案 3、工程详细设计方案工程设计原理框图用户数据发送模块基于GTP高速接口的数据回环传输架构GTP IP 简介GTP 基本结构GTP 发送和接收…

25.DDD数量关系

学习视频来源&#xff1a;DDD独家秘籍视频合集 https://space.bilibili.com/24690212/channel/collectiondetail?sid1940048&ctype0 文章目录 关系型数据库的数量关系领域模型的数量关系实现聚合数量关系聚合内聚合间具体说明代码 数量关系是本质吗&#xff1f;领域对象之…

每天40分玩转Django:Django视图和URL

Django视图和URL 一、课程概述 学习项目具体内容预计用时视图基础函数视图、类视图、视图装饰器90分钟URL配置URL模式、路由系统、命名URL60分钟请求处理请求对象、响应对象、中间件90分钟 二、视图基础 2.1 函数视图 # blog/views.py from django.shortcuts import render…

语言模型(序列模型)

终于快要毕业了&#xff0c;乘着还在还在研究室&#xff0c;把最后一章sequence模型也学完吧。 Sequence Model 一&#xff1a;基础知识1&#xff1a;符号的定义2&#xff1a;词典(Vocabulary) 与编码(Encoding) 二&#xff1a;RNN(Recurrent Neural Networks) 循环神经网络1&…

【自然语言处理与大模型】使用llama.cpp将HF格式大模型转换为GGUF格式

llama.cpp的主要目标是在本地和云端的各种硬件上以最小的设置和最先进的性能实现LLM推理。是一个专为大型语言模型&#xff08;LLM&#xff09;设计的高性能推理框架&#xff0c;完全使用C和C编写&#xff0c;没有外部依赖&#xff0c;这使得它可以很容易地被移植到不同的操作系…

npm : 无法加载文件 D:\nodejs\npm.ps1

问题描述 npm run serve 启动一个Vue项目&#xff0c;报错如下&#xff1a; npm : 无法加载文件 D:\nodejs\npm.ps1&#xff0c;因为在此系统上禁止运行脚本。有关详细信息&#xff0c;请参阅 https:/go.microsoft.com/fwlink/? LinkID135170 中的 about_Execution_Policies。…

线程池+线程安全+常见锁

目录 一、线程池1、日志与策略模式2、线程池设计3、线程安全的单例模式&#xff08;1&#xff09;单例模式的特点&#xff08;2&#xff09;饿汉实现方式和懒汉实现方式&#xff08;i&#xff09;饿汉方式实现单例模式&#xff08;ii&#xff09;懒汉方式实现单例模式&#xff…

数据结构6.4——归并排序

基本思想&#xff1a; 归并排序是建立在归并操作上的一种有效的排序算法&#xff0c;该算法是采用分治法的一个非常典型的应用。将已有的子序列合并&#xff0c;得到完全有序的序列&#xff1b;即先使每个子序列有序&#xff0c;再使子序列段间有序。若将两个有序表合并成一个…

revit转gltf,revit转3dtiles,如何将Revit模型转为3DTiles格式并在Cesiumjs中高效可视化

Revit模型导出gltf、glb与3dtiles有多种方式&#xff0c;但一般的商业工具收费普遍较高&#xff1a;Cesiumlab导出3dTile格式数据&#xff0c;Cesiumlab暂时可试用3天&#xff0c;会员版收费每年800&#xff1b;BimAngleEngine导出3dTile格式数据BimAngleEngine暂时可试用30天&…