EDA - Spring Boot构建基于事件驱动的消息系统

文章目录

  • 概述
  • 事件驱动架构的基本概念
  • 工程结构
  • Code
    • 创建事件和事件处理器
    • 创建事件总线
    • 创建消息通道和发送逻辑
    • 创建事件处理器
    • 消息持久化
    • 创建消息发送事件
    • 配置 Spring Boot 启动类
    • 测试
    • 消息消费
    • 运行项目

在这里插入图片描述


概述

在微服务架构和大规模分布式系统中,事件驱动架构(EDA)成为了非常重要的设计模式。通过事件驱动,我们可以解耦系统的各个组件,提高系统的可扩展性、可维护性和响应能力。

接下来,我们将演示一下如何在 Spring Boot 中实现一个基于事件驱动的消息发送和接收流程,从消息的发送、事件的发布到事件的监听。


事件驱动架构的基本概念

在事件驱动架构中,系统的各个组件通过事件进行通信。每个事件代表一个特定的行为或状态变化,当事件发布时,系统的其他部分可以响应这些事件并做出相应的处理。消息发送和接收的流程正是通过发布和监听事件来实现的。

接下来我们使用 Spring Boot 来实现一个基于事件驱动的消息系统。、

系统包含以下几个部分:

  • 消息发送: 消息将通过一个 MessageEventProcessor 进行处理,并且在处理完成后会发布一个事件。
  • 事件发布: 消息成功发送后,通过 ApplicationEventPublisher 发布一个 MessageSentEvent
  • 事件监听: 一个监听器会接收到发布的事件并进行相应的处理(比如记录日志、通知其他组件等)

工程结构

在这里插入图片描述

  • EventBus:事件总线,负责发布事件。
  • MessageEventProcessor:处理消息事件的处理器。
  • EventMessageEventMessageSentEvent:事件类,MessageEventMessageSentEvent继承自Event
  • MessageChannel:消息通道接口,EmailMessageChannel是其具体实现。
  • MessageRepository:消息存储库,用于保存消息事件。
  • MessageChannelConfig:消息通道配置,配置了消息通道的Bean。
  • MessageController:消息控制器,处理发送消息的请求。
  • MessageSentEventListener:监听消息发送事件的监听器。

Code

创建事件和事件处理器

Event.java - 定义基础事件

package com.artisan.booteventbus.domain;public abstract class Event {// 事件的基本字段
}

MessageEvent.java - 定义具体的消息事件

package com.artisan.booteventbus.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;import java.util.Map;@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class MessageEvent extends Event {private String message;private String channel;private Map<String, Object> metadata;}

EventHandler.java - 定义事件处理器接口

package com.artisan.booteventbus.bus;import com.artisan.booteventbus.domain.Event;public interface EventHandler<T extends Event> {void handle(T event);
}

创建事件总线

EventBus.java - 用于发布事件

package com.artisan.booteventbus.bus;import com.artisan.booteventbus.domain.Event;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;@Component
public class EventBus {private final ApplicationEventPublisher publisher;public EventBus(ApplicationEventPublisher publisher) {this.publisher = publisher;}public void publish(Event event) {publisher.publishEvent(event);}
}

创建消息通道和发送逻辑

MessageChannel.java - 定义消息通道接口

package com.artisan.booteventbus.service;import com.artisan.booteventbus.domain.MessageEvent;import java.util.concurrent.CompletableFuture;public interface MessageChannel {boolean supports(MessageEvent event);CompletableFuture<Void> sendAsync(MessageEvent event);
}

MessageChannelConfig.java - 初始化channel

package com.artisan.booteventbus.config;import com.artisan.booteventbus.service.MessageChannel;
import com.artisan.booteventbus.service.impl.EmailMessageChannel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.ArrayList;
import java.util.List;@Configuration
public class MessageChannelConfig {@Beanpublic List<MessageChannel> messageChannels() {List<MessageChannel> channels = new ArrayList<>();channels.add(new EmailMessageChannel());// 可以继续添加其他类型的通道return channels;}
}

EmailMessageChannel.java - 实现邮件发送通道

package com.artisan.booteventbus.service.impl;import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.service.MessageChannel;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CompletableFuture;@Slf4j
public class EmailMessageChannel implements MessageChannel {@Overridepublic boolean supports(MessageEvent event) {return "email".equals(event.getChannel());}@Overridepublic CompletableFuture<Void> sendAsync(MessageEvent event) {return CompletableFuture.runAsync(() -> {// 模拟邮件发送System.out.println(Thread.currentThread().getName() + "- Sending email: " + event.getMessage());log.info("Sending email: {}", event.getMessage());});}
}

创建事件处理器

MessageEventProcessor.java - 处理消息事件,保存事件并发送

package com.artisan.booteventbus.bus;import com.artisan.booteventbus.dao.MessageRepository;
import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.domain.MessageSentEvent;import com.artisan.booteventbus.service.MessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.scheduling.annotation.Async;import java.util.List;@Component
public class MessageEventProcessor implements EventHandler<MessageEvent> {private final EventBus eventBus;private final MessageRepository messageRepository;private final List<MessageChannel> channels;@Autowiredpublic MessageEventProcessor(EventBus eventBus, MessageRepository messageRepository, List<MessageChannel> channels) {this.eventBus = eventBus;this.messageRepository = messageRepository;this.channels = channels;}/*** @param event* Asyn 请使用自定义线程池,这里仅仅是 为了演示异步*/@Async@Overridepublic void handle(MessageEvent event) {// 1. 消息持久化messageRepository.save(event);// 2. 通道路由MessageChannel channel = channels.stream().filter(ch -> ch.supports(event)).findFirst().orElseThrow();// 3. 异步发送channel.sendAsync(event).thenRun(() -> eventBus.publish(new MessageSentEvent(event)));}}

消息持久化

MessageRepository.java - 用于消息的持久化(可以使用内存或数据库)

package com.artisan.booteventbus.dao;import com.artisan.booteventbus.domain.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;import java.util.ArrayList;
import java.util.List;@Slf4j
@Repository
public class MessageRepository {private final List<MessageEvent> messageStore = new ArrayList<>();public void save(MessageEvent event) {// 模拟存储messageStore.add(event);System.out.println(Thread.currentThread().getName() + " - Message saved: " + event.getMessage());log.info("Message saved {}", event.getMessage());}
}

创建消息发送事件

MessageSentEvent.java - 定义发送后的事件

package com.artisan.booteventbus.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageSentEvent extends Event {private MessageEvent originalEvent;}

配置 Spring Boot 启动类

package com.artisan;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;@EnableAsync(proxyTargetClass=true)
@SpringBootApplication
public class BootEventBusApplication {public static void main(String[] args) {SpringApplication.run(BootEventBusApplication.class, args);}}

测试

为了测试整个架构,创建一个控制器来模拟发送消息。

package com.artisan.booteventbus.controller;import com.artisan.booteventbus.domain.MessageEvent;
import com.artisan.booteventbus.bus.EventBus;
import com.artisan.booteventbus.bus.MessageEventProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.HashMap;@RestController
@RequestMapping("/messages")
public class MessageController {private final EventBus eventBus;private final MessageEventProcessor eventProcessor;@Autowiredpublic MessageController(EventBus eventBus, MessageEventProcessor eventProcessor) {this.eventBus = eventBus;this.eventProcessor = eventProcessor;}@RequestMapping("/send")public String sendMessage(@RequestParam String message, @RequestParam String channel) {MessageEvent event = new MessageEvent(message, channel, new HashMap<>());eventProcessor.handle(event); // 异步处理消息return "Message is being processed";}
}

消息消费

package com.artisan.booteventbus.listeners;import com.artisan.booteventbus.domain.MessageSentEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MessageSentEventListener {@Async@EventListenerpublic void handleMessageSentEvent(MessageSentEvent event) {// 模拟处理事件System.out.println(Thread.currentThread().getName() + " - Received MessageSentEvent: " + event.getOriginalEvent().getMessage());log.info("Sending email: {}", event.getOriginalEvent().getMessage());}
}

运行项目

http://localhost:8080/messages/send?message=artisan&channel=email

在这里插入图片描述
在这里插入图片描述

当然了,你也可以基于此种模式,使用kafka
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63950.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能GitHub Copilot副驾驶®提示和技巧

简介 智能 GitHub Copilot 副驾驶 代表了开发者历史上的一个重要里程碑工具。它象征着人工智能辅助编程新时代的开始&#xff0c;它显着提高了开发人员的生产力&#xff0c;提高了代码质量&#xff0c;并且还对开发团队的整体福祉产生了积极影响。随着智能 GitHub Copilot 副驾…

CAD C# 批量替换当前图中块、标注

本案例功能为选择当前文档中一个块&#xff08;旧块&#xff09;&#xff0c;然后选择新图元&#xff08;新块&#xff09;&#xff0c;运行插件后新块将替换图中所有的旧块。 效果如下&#xff1a; public static class Class1{//选取对象替换块定义[CommandMethod("TT&…

java 缓存篇2

缓存的部署方式 单机主从哨兵集群 特性主从&#xff08;Master-Slave&#xff09;哨兵&#xff08;Sentinel&#xff09;集群&#xff08;Cluster&#xff09;数据分片不支持不支持支持&#xff0c;基于 slot 进行水平分片高可用性部分支持&#xff08;手动故障转移&#xff…

SpringBoot快速使用

一些名词的碎碎念: 1> 俩种网络应用设计模式 C/S 客户端/服务器 B/S 浏览器/服务器 俩者对比: 2> 集群和分布式的概念 集群: 分布式: 例子: 一个公司有一个人身兼多职 集群: 招聘N个和上面这个人一样身兼多职 分布式: 招聘N个人,分担上面这个人的工作,进行工作的拆分. 工…

苹果公司即将为iPhone和智能家居改用自主研发的蓝牙和Wi-Fi芯片

美股快讯&#xff1a;苹果公司即将为iPhone和智能家居改用自主研发的蓝牙和Wi-Fi芯片 苹果公司计划从明年开始在其设备上改用国产芯片进行蓝牙和Wi-Fi连接&#xff0c;此举将逐步淘汰目前由博通提供的部分部件。这种代号为Proxima的芯片已经开发了数年&#xff0c;现在计划用于…

Linux 切换用户的两种方法

sudo -su user1 与 su - user1 都可以让当前用户切换到 user1 的身份执行命令或进入该用户的交互式 Shell。但它们在权限认证方式、环境变量继承和 Shell 初始化过程等方面存在一些差异。 权限认证方式 su - user1 su 是 “switch user” 的缩写&#xff0c;默认情况下需要你输…

快速理解分布式事务Seate基本知识

Seata是一款开源的分布式事务解决方案&#xff0c;致力于提供高性能和简单易用的分布式事务服务。Seata将为用户提供了AT、TCC、SAGA和XA事务模式&#xff0c;为用户打造一站式的分布式解决方案。 一.Seate的三大角色 在 Seata 的架构中&#xff0c;一共有三个角色&#xff1a;…

前端项目初始化搭建(二)

一、使用 Vite 创建 Vue 3 TypeScript 项目 PS E:\web\cursor-project\web> npm create vitelatest yf-blog -- --template vue-ts> npx > create-vite yf-blog --template vue-tsScaffolding project in E:\web\cursor-project\web\yf-blog...Done. Now run:cd yf-…

SQL最佳实践:避免使用COUNT=0

如果你遇到类似下面的 SQL 查询&#xff1a; SELECT * FROM customer c WHERE 0 (SELECT COUNT(*)FROM orders oWHERE o.customer_id c.customer_id);意味着有人没有遵循 SQL 最佳实践。该语句的作用是查找没有下过订单的客户&#xff0c;其中子查询使用了 COUNT 函数统计客…

多模态大模型(二)——用Transformer Encoder和Decoder的方法(BLIP、CoCa、BEiTv3)

文章目录 BLIP: Bootstrapping Language-Image Pre-training for Unified Vision-Language Understanding and Generation 理解、生成我都要&#xff0c;一个很有效的、根据图片生成caption的工具1. BLIP的研究动机2. BLIP的模型结构3. CapFilt Model4. BLIP的训练过程 CoCa: C…

【理想汽车中科院】基于模仿学习的端到端自动驾驶数据缩放规律

论文: https://arxiv.org/pdf/2412.02689 项目: https://github.com/ucaszyp/Driving-Scaling-Law 0. 摘要 端到端自动驾驶范式因其可扩展性而最近吸引了大量关注。然而&#xff0c;现有方法受到现实世界数据规模有限的制约&#xff0c;这阻碍了对端到端自动驾驶相关扩展规律…

【工具介绍】可以批量查看LableMe标注的图像文件信息~

在图像处理和计算机视觉领域&#xff0c;LabelMe是一个广泛使用的图像标注工具&#xff0c;它帮助我们对图像中的物体进行精确的标注。但是&#xff0c;当标注完成后&#xff0c;我们常常需要一个工具来批量查看这些标注信息。 今天&#xff0c;我要介绍的这款exe程序&#xf…

链式栈的实现及其应用

目录 一、链式栈结构模型 二、链式栈的实现 2.1创建 2.2压栈 2.3出栈 2.4判断栈是否为空 2.5查看栈顶 2.6释放栈 三、应用 链式栈实际上就是基于链表&#xff0c;压栈和弹栈可分别看作头插和头删&#xff0c;链表尾部就是栈底&#xff0c;头指针就是栈顶指针 一、链式…

day12 接口测试 ——入门→精通→实战(1)

【没有所谓的运气&#x1f36c;&#xff0c;只有绝对的努力✊】 目录 1、接口测试分类 1.1 内部接口&#xff1a; 1.2 外部接口&#xff1a; 2、目前接口架构设计 2.1、基于SOAP架构&#xff0c; 2.2、基于RPC架构&#xff0c; 2.3、基于RestFul架构&#xff0c; 2.3.1…

程序的调试

一名优秀的程序员也是一名出色的侦探&#xff0c;每一次调试都是尝试破案的过程 目录 前言 一、什么是调试&#xff1f; 二、调试 1.调试是什么 2.基本步骤 三、调试注意事项 1.怎么写出易于调试的代码 assert(断言) const 2.常见错误 总结 前言 主要是怎么调试&#xff0c;调…

FPGA实现GTP光口数据回环传输,基于Aurora 8b/10b编解码架构,提供2套工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐我已有的所有工程源码总目录----方便你快速找到自己喜欢的项目我这里已有的 GT 高速接口解决方案 3、工程详细设计方案工程设计原理框图用户数据发送模块基于GTP高速接口的数据回环传输架构GTP IP 简介GTP 基本结构GTP 发送和接收…

如何快速切换更新电脑网络的ip地址

1.ADSL拨号更换IP地址 这种更换 IP地址的方法其实就是我们平时使用的宽带拨号&#xff0c;每次拨号得到的IP地址都不同&#xff0c;但是这种方法无法使用于光纤宽带&#xff0c;并且使用这种方法更换的IP地址&#xff0c;一般只会变更最后一个号段&#xff0c;前三个号段的数字…

25.DDD数量关系

学习视频来源&#xff1a;DDD独家秘籍视频合集 https://space.bilibili.com/24690212/channel/collectiondetail?sid1940048&ctype0 文章目录 关系型数据库的数量关系领域模型的数量关系实现聚合数量关系聚合内聚合间具体说明代码 数量关系是本质吗&#xff1f;领域对象之…

每天40分玩转Django:Django视图和URL

Django视图和URL 一、课程概述 学习项目具体内容预计用时视图基础函数视图、类视图、视图装饰器90分钟URL配置URL模式、路由系统、命名URL60分钟请求处理请求对象、响应对象、中间件90分钟 二、视图基础 2.1 函数视图 # blog/views.py from django.shortcuts import render…

SAS - Subtractive Port

在SAS&#xff08;串行连接SCSI&#xff0c;Serial Attached SCSI&#xff09;协议中&#xff0c;subtractive port 是一种特殊类型的端口&#xff0c;主要用于设备间的路由功能。它的作用是在路径选择过程中充当默认路径&#xff0c;以处理未明确指定路径的请求。以下是它的定…