DDD 架构分层,MQ消息要放到那一层处理?

作者:小傅哥
博客:https://bugstack.cn

沉淀、分享、成长,让自己和他人都能有所收获!😄

本文的宗旨在于通过简单干净实践的方式教会读者,使用 Docker 配置 RocketMQ 并在基于 DDD 分层结构的 SpringBoot 工程中使用 RocketMQ 技术。因为大部分 MQ 的发送都是基于特定业务场景的,所以本章节也是基于 《MyBatis 使用教程和插件开发》 章节的扩展。

本章也会包括关于 MQ 消息的发送和接收应该处于 DDD 的哪一层的实践讲解和使用。

本文涉及的工程:

  • xfg-dev-tech-rocketmq:https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-rocketmq
  • RocketMQ Docker 安装:rocketmq-docker-compose-mac-amd-arm.yml
  • 导入测试库表 road-map.sql

一、案例背景

首先我们要知道,MQ 消息的作用是用于;解耦过长的业务流程应对流量冲击的消峰。如;用户下单支付完成后,拿到支付消息推动后续的发货流程。也可以是我们基于 《MyBatis 使用教程和插件开发》 中的案例场景,给雇员提升级别和薪资的时候,也发送一条MQ消息,用于发送邮件通知给用户。

  • 从薪资调整到邮件发送,这里是2个业务流程,通过 MQ 消息的方式进行连接。
  • 其实MQ消息的使用场景特别多,原来你可能使用多线程的一些操作,现在就扩展为多实例的操作了。发送 MQ 消息出来,让应用的各个实例接收并进行消费。

二、领域事件

因为我们本章所讲解的内容是把 RocketMQ 放入 DDD 架构中进行使用,那么也就引申出领域事件定义。所以我们先来了解下,什么是领域事件。

领域事件,可以说是解耦微服务设计的关键。领域事件也是领域模型中非常重要的一部分内容,用于标示当前领域模型中发生的事件行为。一个领域事件会推进业务流程的进一步操作,在实现业务解耦的同时,也推动了整个业务的闭环。

  • 首先,我们需要在领域模型层,添加一块 event 区域。它的存在是为了定义出于当前领域下所需的事件消息信息。信息的类型可以是model 下的实体对象、聚合对象。
  • 之后,消息的发送是放在基础设置层。本身基础设置层就是依赖倒置于模型层,所以在模型层所定义的 event 对象,可以很方便的在基础设置层使用。而且大部分开发的时候,MQ消息的发送与数据库操作都是关联的,采用的方式是,做完数据落库后,推送MQ消息。所以定义在仓储中实现,会更加得心应手、水到渠成。
  • 最后,就是 MQ 的消息,MQ 的消费可以是自身服务所发出的消息,也可以是外部其他微服务的消息。就在小傅哥所整体讲述的这套简明教程中 DDD 部分的触发器层。

三、环境安装

本案例涉及了数据库和RocketMQ的使用,都已经在工程中提供了安装脚本,可以按需执行。

这里主要介绍 RocketMQ 的安装;

1. 执行 compose yml

文件:docs/rocketmq/rocketmq-docker-compose-mac-amd-arm.yml - 关于安装小傅哥提供了不同的镜像,包括Mac、Mac M1、Windows 可以按需选择使用。

version: '3'
services:# https://hub.docker.com/r/xuchengen/rocketmq# 注意修改项;# 01:data/rocketmq/conf/broker.conf 添加 brokerIP1=127.0.0.1# 02:data/console/config/application.properties server.port=9009 - 如果8080端口被占用,可以修改或者添加映射端口rocketmq:image: livinphp/rocketmq:5.1.0container_name: rocketmqports:- 9009:9009- 9876:9876- 10909:10909- 10911:10911- 10912:10912volumes:- ./data:/home/app/dataenvironment:TZ: "Asia/Shanghai"NAMESRV_ADDR: "rocketmq:9876"
  • 在 IDEA 中打开 rocketmq-docker-compose-mac-amd-arm.yml 你会看到一个绿色的按钮在左侧侧边栏,点击即可安装。或者你也可以使用命令安装:# /usr/local/bin/docker-compose -f /docs/dev-ops/environment/environment-docker-compose.yml up -d - 比较适合在云服务器上执行。
  • 首次安装可能使用不了,一个原因是 brokerIP1 未配置IP,另外一个是默认的 8080 端口占用。可以按照如下小傅哥说的方式修改。

2. 修改默认配合

  1. 打开 data/rocketmq/conf/broker.conf 添加一条 brokerIP1=127.0.0.1 在结尾
# 集群名称
brokerClusterName = DefaultCluster
# BROKER 名称
brokerName = broker-a
# 0 表示 Master, > 0 表示 Slave
brokerId = 0
# 删除文件时间点,默认凌晨 4 点
deleteWhen = 04
# 文件保留时间,默认 48 小时
fileReservedTime = 48
# BROKER 角色 ASYNC_MASTER为异步主节点,SYNC_MASTER为同步主节点,SLAVE为从节点
brokerRole = ASYNC_MASTER
# 刷新数据到磁盘的方式,ASYNC_FLUSH 刷新
flushDiskType = ASYNC_FLUSH
# 存储路径
storePathRootDir = /home/app/data/rocketmq/store
# IP地址
brokerIP1 = 127.0.0.1
  1. 打开 ``data/console/config/application.properties修改server.port=9009` 端口。
server.address=0.0.0.0
server.port=9009
  • 修改配置后,重启服务。

3. RockMQ登录与配置

3.1 登录

RocketMQ 此镜像,会在安装后在控制台打印登录账号信息,你可以查看使用。

登录:http://localhost:9009/

3.2 创建Topic

  • 也可以使用命令创建:docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq

3.3 创建消费者组

  • 也可以使用命令创建:docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group

四、工程实现

1. 工程结构

  • MQ 的使用无论是 RocketMQ 还是 Kafka 等,都很简单。但在使用之前,要考虑好怎么在架构中合理的使用。如果最初没有定义好这些,那么胡乱的任何地方都能发送和接收MQ,最后的工程将非常难以维护。
  • 所以这里整个MQ的生产和消费,是按照整个 DDD 领域事件结构进行设计。分为在 domain 使用基础层生产消息,再有 trigger 层接收消息。

2. 配置文件

引入POM

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>

添加配置

# RocketMQ 配置
rocketmq:name-server: 127.0.0.1:9876consumer:group: xfg-group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 10producer:# 发送同一类消息的设置为同一个group,保证唯一group: xfg-group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false

3. 定义领域事件

源码cn.bugstack.xfg.dev.tech.domain.salary.event.SalaryAdjustEvent

@EqualsAndHashCode(callSuper = true)
@Data
public class SalaryAdjustEvent extends BaseEvent<AdjustSalaryApplyOrderAggregate> {public static String TOPIC = "xfg-mq";public static SalaryAdjustEvent create(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {SalaryAdjustEvent event = new SalaryAdjustEvent();event.setId(RandomStringUtils.randomNumeric(11));event.setTimestamp(new Date());event.setData(adjustSalaryApplyOrderAggregate);return event;}}
  • 每个领域的消息,都有领域自己定义。发送的时候再交给基础设施层来发送。

4. 消息发送

源码cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher

@Component
@Slf4j
public class EventPublisher {@Setter(onMethod_ = @Autowired)private RocketMQTemplate rocketmqTemplate;/*** 普通消息** @param topic   主题* @param message 消息*/public void publish(String topic, BaseEvent<?> message) {try {String mqMessage = JSON.toJSONString(message);log.info("发送MQ消息 topic:{} message:{}", topic, mqMessage);rocketmqTemplate.convertAndSend(topic, mqMessage);} catch (Exception e) {log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e);// 大部分MQ发送失败后,会需要任务补偿}}/*** 延迟消息** @param topic          主题* @param message        消息* @param delayTimeLevel 延迟时长*/public void publishDelivery(String topic, BaseEvent<?> message, int delayTimeLevel) {try {String mqMessage = JSON.toJSONString(message);log.info("发送MQ延迟消息 topic:{} message:{}", topic, mqMessage);rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(), 1000, delayTimeLevel);} catch (Exception e) {log.error("发送MQ延迟消息失败 topic:{} message:{}", topic, JSON.toJSONString(message), e);// 大部分MQ发送失败后,会需要任务补偿}}}
  • 在基础设施层提供 event 事件的处理,也就是 MQ 消息的发送。

源码cn.bugstack.xfg.dev.tech.infrastructure.repository.SalaryAdjustRepository

@Resource
private EventPublisher eventPublisher;@Override
@Transactional(rollbackFor = Exception.class, timeout = 350, propagation = Propagation.REQUIRED, isolation = Isolation.DEFAULT)
public String adjustSalary(AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate) {// ... 省略部分代码 eventPublisher.publish(SalaryAdjustEvent.TOPIC, SalaryAdjustEvent.create(adjustSalaryApplyOrderAggregate));return orderId;
}

在 SalaryAdjustRepository 仓储的实现中,做完业务流程开始发送 MQ 消息。这里有2点要注意;

  1. 消息发送,不要写在数据库事务中。因为事务一直占用数据库连接,需要快速释放。
  2. 对于一些强MQ要求的场景,需要在发送MQ前,写入一条数据库 Task 记录,发送消息后更新 Task 状态为成功。如果长时间未更新数据库状态或者为失败的,则需要由任务补偿进行处理。

5. 消费消息

源码cn.bugstack.xfg.dev.tech.trigger.mq.SalaryAdjustMQListener

@Component
@Slf4j
@RocketMQMessageListener(topic = "xfg-mq", consumerGroup = "xfg-group")
public class SalaryAdjustMQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {log.info("接收到MQ消息 {}", s);}}
  • 消费消息,配置消费者组合消费的主题,之后就可以接收到消息了。接收以后你可以做自己的业务,如果抛出异常,消息会进行重新接收处理。

六、测试验证

1. 单独发送消息测试

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketMQTest {@Setter(onMethod_ = @Autowired)private RocketMQTemplate rocketmqTemplate;@Testpublic void test() throws InterruptedException {while (true) {rocketmqTemplate.convertAndSend("xfg-mq", "我是测试消息");Thread.sleep(3000);}}}
  • 这里方便你来发送消息,验证流程。

2. 业务流程消息验证

@Test
public void test_execSalaryAdjust() throws InterruptedException {AdjustSalaryApplyOrderAggregate adjustSalaryApplyOrderAggregate = AdjustSalaryApplyOrderAggregate.builder().employeeNumber("10000001").orderId("100908977676003").employeeEntity(EmployeeEntity.builder().employeeLevel(EmployeePostVO.T3).employeeTitle(EmployeePostVO.T3).build()).employeeSalaryAdjustEntity(EmployeeSalaryAdjustEntity.builder().adjustTotalAmount(new BigDecimal(100)).adjustBaseAmount(new BigDecimal(80)).adjustMeritAmount(new BigDecimal(20)).build()).build();String orderId = salaryAdjustApplyService.execSalaryAdjust(adjustSalaryApplyOrderAggregate);log.info("调薪测试 req: {} res: {}", JSON.toJSONString(adjustSalaryApplyOrderAggregate), orderId);Thread.sleep(Integer.MAX_VALUE);
}
23-07-29.15:40:52.307 [main            ] INFO  HikariDataSource       - HikariPool-1 - Start completed.
23-07-29.15:40:52.445 [main            ] INFO  EventPublisher         - 发送MQ消息 topic:xfg-mq message:{"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
23-07-29.15:40:52.517 [main            ] INFO  ISalaryAdjustApplyServiceTest - 调薪测试 req: {"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"} res: 100908977676004
23-07-29.15:40:52.520 [ConsumeMessageThread_1] INFO  SalaryAdjustMQListener - 接收到MQ消息 {"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29 15:40:52.425"}
  • 当执行一次加薪调整后,就会接收到MQ消息了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/49677.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

多个微信号怎么快速发圈、自动加好友、自动回复?

一键助你快速发圈、批量自动加好友、自动回复&#xff0c;好用哭了&#xff01; 微信管理系统是一个聚合管理多个微信账号的利器&#xff0c;让你的微信管理变得简单高效。不管你是电商、微商&#xff0c;还是拥有多个微信号的用户&#xff0c;这一款微信管理软件都可以满足你的…

【数据结构】 LinkedList的模拟实现与使用

文章目录 &#x1f340;什么是LinkedList&#x1f334;LinkedList的模拟实现&#x1f6a9;创建双链表&#x1f6a9;头插法&#x1f6a9;尾插法&#x1f6a9;任意位置插入&#x1f6a9;查找关键字&#x1f6a9;链表长度&#x1f6a9;打印链表&#x1f6a9;删除第一次出现关键字为…

javaScript:带你深入了解基本数据类型和引用类型

目录 一.前言 二.Ecmascript 规定的变量有两种 补充 1.基本数据类型&#xff08;重点掌握&#xff09; 基本数据类型的特点 2.引用数据类型 &#xff08;重点掌握&#xff09; 引用数据类型的特点 三.什么是栈&#xff1f;堆&#xff1f; 下面代码帮助了解 解释 官…

spring boot分装通用的查询+分页接口

背景 在用spring bootmybatis plus实现增删改查的时候&#xff0c;总是免不了各种模糊查询和分页的查询。每个数据表设计一个模糊分页&#xff0c;这样代码就造成了冗余&#xff0c;且对自身的技能提升没有帮助。那么有没有办法实现一个通用的增删改查的方法呢&#xff1f;今天…

爬虫工具的选择与使用:阐述Python爬虫优劣势

作为专业爬虫ip方案解决服务商&#xff0c;我们每天都面对着大量的数据采集任务需求。在众多的爬虫工具中&#xff0c;Python爬虫凭借其灵活性和功能强大而备受青睐。本文将为大家分享Python爬虫在市场上的优势与劣势&#xff0c;帮助你在爬虫业务中脱颖而出。 一、优势篇 灵活…

MySQL的基础操作

前言 对MySQL的一些基础操作做一下学习性的总结&#xff0c;基本上是照着视频写的。 MySQL的安装 MySQL的下载 MySQL :: Download MySQL Community Server (Archived Versions)https://downloads.mysql.com/archives/community/ 配置环境变量 下载之后直接解压&#xff0c…

c++ 虚函数类对象模型

一、复杂的菱形继承及菱形虚拟继承 单继承&#xff1a;一个子类只有一个直接父类时称这个继承关系为单继承。 多继承&#xff1a;一个子类有两个或以上直接父类时称这个继承关系为多继承。 菱形继承&#xff1a;菱形继承是多继承的一种特殊情况。 菱形继承的问题&#xff1a;从…

2023年京东儿童智能手表行业数据分析(京东销售数据分析)

儿童消费市场向来火爆&#xff0c;儿童智能手表作为能够实现定位导航&#xff0c;信息通讯&#xff0c;SOS求救&#xff0c;远程监听&#xff0c;智能防丢等多功能的智能可穿戴设备&#xff0c;能够通过较为精准的定位功能和安全防护能力保障儿童的安全&#xff0c;因而广受消费…

C#详解-Contains、StartsWith、EndsWith、Indexof、lastdexof

目录 简介: 过程: 举例1.1 举例1.2 ​ 总结: 简介: 在C#中Contains、StarsWith和EndWith、IndexOf都是字符串函数。 1.Contains函数用于判断一个字符串是否包含指定的子字符串&#xff0c;返回一个布尔值&#xff08;True或False&#xff09;。 2.StartsWith函数用于判断一…

数据结构-二叉树

在学习二叉树之前.必须先要掌握一些树的重要概念: 结点的度:一个结点含有的子树个数称为该结点的度.树的度:一棵树中,所有节点度的最大值称为树的度.叶子结点:度为0的结点称为叶子节点.(也叫终端结点)双亲结点:若一个结点含有子结点,则这个结点称为其子结点的双亲结点(也叫父节…

USB隔离器电路分析,SA8338矽塔sytatek电机驱动,源特科技VPS8701,开关电源,电源 大师

一、 USB隔离器电路分析 进行usb隔离可以使用USB隔离模块 ADUM3160 ADUM4160 注意&#xff1a;B0505S 最大带载0.16A&#xff0c;副边需要带载能力需要改变方案 比如移动硬盘至少需要0.5A 用充电宝、18650、设计5V1A输出电源 二、 1A隔离电压方案

redis乐观锁+启用事务解决超卖

乐观锁用于监视库存&#xff08;watch&#xff09;&#xff0c;然后接下来就启用事务。 启用事务&#xff0c;将减库存、下单这两个步骤&#xff0c;放到一个事务当中即可解决秒杀问题、防止超卖。 但是&#xff01;&#xff01;&#xff01;乐观锁&#xff0c;会带来" …

Leetcode67 二进制求和

给你两个二进制字符串 a 和 b &#xff0c;以二进制字符串的形式返回它们的和。 代码 class Solution {public String addBinary(String a, String b) {StringBuilder res new StringBuilder();int carry 0;int i a.length() - 1, j b.length() - 1;while(i > 0 || j &…

keepalived+lvs+nginx高并发集群

keepalivedlvsnginx高并发集群 简介&#xff1a; keepalivedlvsnginx高并发集群&#xff0c;是通过LVS将请求流量均匀分发给nginx集群&#xff0c;而当单机nginx出现状态异常或宕机时&#xff0c;keepalived会主动切换并将不健康nginx下线&#xff0c;维持集群稳定高可用 1.L…

Linux系统之安装OneNav个人书签管理器

Linux系统之安装OneNav个人书签管理器 一、OneNav介绍1.OneNav简介2.OneNav特点 二、本地环境介绍2.1 本地环境规划2.2 本次实践介绍 三、检查本地环境3.1 检查本地操作系统版本3.2 检查系统内核版本3.3 检查本地yum仓库状态 四、安装httpd服务4.1 安装httpd4.2 启动httpd服务4…

低代码开发ERP:精打细算,聚焦核心投入

企业数字化转型已经成为现代商业环境中的一项关键任务。如今&#xff0c;企业面临着日益激烈的竞争和不断变化的市场需求。在这样的背景下&#xff0c;数字化转型不仅是企业生存的必然选择&#xff0c;也是取得竞争优势和实现可持续发展的关键因素。 在数字化转型的过程中&…

神经网络入门

前言 本文主要介绍最基础的神经网络&#xff0c;包括其结构&#xff0c;学习方法&#xff0c; C \texttt{C} C 的实现代码。 Python \texttt{Python} Python 的代码可以搜索互联网得到。 前排提示&#xff1a;本人涉及一丁点数学知识。 神经网络的结构 神经网络包括多个层…

[Linux]进程概念

[Linux]进程概念 文章目录 [Linux]进程概念进程的定义进程和程序的关系Linux下查看进程Linux下通过系统调用获取进程标示符Linux下通过系统调用创建进程-fork函数使用 进程的定义 进程是程序的一个执行实例&#xff0c;是担当分配系统资源&#xff08;CPU时间&#xff0c;内存…

二、数学建模之整数规划篇

1.定义 2.例题 3.使用软件及解题 一、定义 1.整数规划&#xff08;Integer Programming&#xff0c;简称IP&#xff09;&#xff1a;是一种数学优化问题&#xff0c;它是线性规划&#xff08;Linear Programming&#xff0c;简称LP&#xff09;的一个扩展形式。在线性规划中&…

构造不包含字母和数字的webshell

文章目录 利用不含字母与数字进行绕过知识介绍题目方法一&#xff1a;异或操作绕过方法二&#xff1a;取反进行绕过 过滤不是很严格的情况进阶绕过利用php7特性直接绕过 利用不含字母与数字进行绕过 知识介绍 <?phpecho "A"^"";?>从运行结果为! …