使用消息中间件实现系统间的异步通信和解耦

​​​​​​​目录

引言

一. 选择合适的消息中间件

二. 定义消息格式和通信协议

1. 定义消息格式

消息头

消息体

2. 定义通信协议

发送消息

接收消息

消息处理

3. 示例代码

定义消息格式

发送消息

接收消息

三、发布-订阅模式

1. 定义发布-订阅模式

2. 示例代码

发布消息

订阅消息

3. 运行示例

4. 异步处理消息

5. 解耦系统

6. 实现步骤

7. 实例场景

实例场景:电商系统订单处理

场景描述

实现步骤

示例代码

订单服务发送消息

库存服务接收消息

物流服务接收消息


引言

在现代分布式系统中,异步通信和解耦是非常重要的设计原则。通过使用消息中间件,可以实现系统间的异步通信和解耦,提高系统的可扩展性和可靠性。本文将介绍如何使用消息中间件来实现系统间的异步通信和解耦,并通过一个实际场景来演示。

一. 选择合适的消息中间件

选择合适的消息中间件需要考虑多个因素,包括项目需求、性能要求、可靠性、社区支持等。常见的消息中间件包括 RabbitMQ、Kafka、ActiveMQ、Redis 等,下面针对不同的需求给出一些选择建议:

  1. 消息传递模式

    • 点对点:适合使用 RabbitMQ、ActiveMQ 等传统消息中间件。
    • 发布-订阅:适合使用 RabbitMQ、Kafka 等支持广播消息的中间件。
  2. 可靠性

    • 如果对消息的可靠性要求较高,需要确保消息不会丢失,可以考虑使用 RabbitMQ、Kafka 等提供消息持久化和高可靠性的中间件。
  3. 性能

    • 如果需要处理大量的消息并且需要低延迟,可以考虑使用 Kafka,它是一个高吞吐量的消息中间件,适合大数据场景。
    • 如果对延迟要求较低,可以选择 RabbitMQ、ActiveMQ 等传统消息中间件。
  4. 社区支持和生态系统

    • 考虑选择一个有活跃社区支持和完善生态系统的消息中间件,这样可以更容易地解决问题和扩展功能。
  5. 技术栈兼容性

    • 考虑选择一个与你的技术栈兼容的消息中间件,避免出现集成上的问题。

综合考虑以上因素,可以选择最适合项目需求的消息中间件。

二. 定义消息格式和通信协议

定义消息格式和通信协议是使用消息中间件的关键步骤之一,它涉及到消息的结构、内容和交互方式。下面以 RabbitMQ 为例,演示如何定义消息格式和通信协议。

1. 定义消息格式

在 RabbitMQ 中,消息通常由两部分组成:消息头和消息体。消息头包含一些元数据信息,如消息的类型、路由键等;消息体包含实际的业务数据。

消息头
  • Content-Type:消息体的类型,如 application/jsontext/plain 等。
  • DeliveryMode:消息持久性标志,标识消息是否需要持久化存储,可选值为 1(持久化)和 2(非持久化)。
  • CorrelationId:消息关联标识,用于关联一组相关消息。
  • 其他自定义的消息头字段,根据业务需求定义。
消息体
  • 消息体可以是任意格式的数据,如 JSON、XML、文本等,根据业务需求定义。

2. 定义通信协议

通信协议定义了消息的交互方式,包括消息的发送、接收和处理流程。通信协议可以包括以下几个方面:

发送消息
  • 客户端向消息队列发送消息,包括指定交换机(Exchange)、路由键(Routing Key)和消息体。
接收消息
  • 服务端从消息队列接收消息,根据消息的交换机和路由键接收对应的消息。
消息处理
  • 客户端接收到消息后,根据消息的内容执行相应的业务逻辑。

3. 示例代码

定义消息格式
public class Message {private String content;private String contentType;private int deliveryMode;private String correlationId;// 省略getter和setter方法
}
发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class SendMessage {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);Message message = new Message();message.setContent("Hello, RabbitMQ!");message.setContentType("text/plain");message.setDeliveryMode(1); // 持久化message.setCorrelationId("123456");String messageJson = toJson(message);channel.basicPublish("", QUEUE_NAME, null, messageJson.getBytes());System.out.println(" [x] Sent '" + messageJson + "'");}}private static String toJson(Message message) {// 将 message 对象转换成 JSON 格式的字符串return "{ \"content\": \"" + message.getContent() + "\", \"contentType\": \"" + message.getContentType() + "\", \"deliveryMode\": " + message.getDeliveryMode() + ", \"correlationId\": \"" + message.getCorrelationId() + "\" }";}
}
接收消息
import com.rabbitmq.client.*;public class ReceiveMessage {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String messageJson = new String(delivery.getBody(), "UTF-8");Message message = fromJson(messageJson, Message.class);System.out.println(" [x] Received '" + messageJson + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}}private static <T> T fromJson(String json, Class<T> clazz) {// 将 JSON 格式的字符串转换成指定类型的对象// 这里可以使用 JSON 框架(如 Jackson、Gson)来实现return null;}
}

通过以上步骤,可以定义消息格式和通信协议,并使用 RabbitMQ 实现消息的发送和接收。

三、发布-订阅模式

发布-订阅模式是一种常见的消息传递模式,用于实现消息的广播和订阅。在发布-订阅模式中,消息发布者将消息发布到一个主题(Topic),而消息订阅者可以订阅感兴趣的主题,从而接收到相关消息。下面以 RabbitMQ 为例,演示如何使用发布-订阅模式。

1. 定义发布-订阅模式

在发布-订阅模式中,有一个交换机(Exchange)用来接收发布者发布的消息,并根据订阅者的绑定关系将消息路由到对应的队列。订阅者可以创建自己的队列,并将队列绑定到交换机上,从而接收到发布者发布的消息。

2. 示例代码

发布消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Publisher {private final static String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "Hello, subscribers!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
订阅消息
import com.rabbitmq.client.*;public class Subscriber {private final static String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}}
}

3. 运行示例

  1. 先运行订阅者 Subscriber,它会创建一个队列并绑定到交换机上,开始监听消息。
  2. 然后运行发布者 Publisher,它会向交换机发布一条消息。
  3. 订阅者会接收到发布者发布的消息,并输出到控制台。

通过以上步骤,可以实现基于 RabbitMQ 的发布-订阅模式。

4. 异步处理消息

通过消息中间件实现异步处理消息,即发送消息后不需要立即等待结果,而是继续执行其他任务。这样可以提高系统的响应速度和吞吐量。

5. 解耦系统

通过消息中间件,系统之间的通信变成了基于消息的方式,系统不再直接依赖于对方的接口和实现细节,从而实现了系统之间的解耦。

6. 实现步骤

  • 定义消息格式和通信协议:确定消息的格式和通信协议,包括消息的内容结构、消息的生命周期等。
  • 配置消息中间件:在系统中配置和启动消息中间件,确保消息中间件正常运行。
  • 消息的发布和订阅:编写代码实现消息的发布和订阅逻辑,将消息发布到指定的主题,并订阅感兴趣的主题。
  • 处理接收到的消息:编写代码处理接收到的消息,根据消息的内容执行相应的业务逻辑。
  • 测试和验证:对系统进行测试和验证,确保消息的发布、订阅和处理功能正常运行。

7. 实例场景

实例场景:电商系统订单处理

场景描述

假设有一个电商系统,包含订单服务、库存服务和物流服务。当用户下单时,订单服务需要通知库存服务减少库存,通知物流服务发货。为了提高系统的可扩展性和可靠性,我们可以使用消息中间件来实现订单处理的异步通信和解耦。

实现步骤

  1. 定义消息格式和通信协议:定义订单消息的格式,包括订单号、商品信息等,并确定消息的交换机和队列名称。

  2. 配置消息中间件:在消息中间件中配置交换机和队列,并确保消息的持久化。

  3. 订单服务发送消息:订单服务在用户下单后,将订单消息发送到消息队列中。

  4. 库存服务订阅消息:库存服务订阅订单消息队列,接收并处理订单消息,减少库存。

  5. 物流服务订阅消息:物流服务也订阅订单消息队列,接收并处理订单消息,进行发货。

示例代码

订单服务发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class OrderService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "New order placed";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
库存服务接收消息
import com.rabbitmq.client.*;public class InventoryService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 处理订单消息,减少库存};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}
物流服务接收消息
import com.rabbitmq.client.*;public class LogisticsService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 处理订单消息,发货};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}

通过以上步骤的简单演示,订单服务可以异步发送订单消息,库存服务和物流服务可以订阅订单消息并处理,实现了订单处理的异步通信和解耦。

通过以上步骤,可以使用消息中间件实现系统间的异步通信和解耦,提高系统的可扩展性和可维护性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/687682.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++ //练习 7.29 修改你的Screen类,令move、set和display函数返回Screen并检查程序的运行结果,在上一个练习中你的推测正确吗?

C Primer&#xff08;第5版&#xff09; 练习 7.29 练习 7.29 修改你的Screen类&#xff0c;令move、set和display函数返回Screen并检查程序的运行结果&#xff0c;在上一个练习中你的推测正确吗&#xff1f; 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#xff09; …

RIP协议详解

​RIP是最早的动态路由协议&#xff0c;虽然已经过时并且很少使用&#xff0c;但是可以通过学习RIP并且和ospf等现在正在使用的路由协议对比&#xff0c;了解其工作原理和过时原因&#xff0c;具有很强的学习性。 一、RIP协议简介 RIP&#xff08;Routing Information Protoc…

(OpenCV)图片拼接

前言 图片拼接在许多领域都有广泛的应用&#xff0c;包括但不限于以下几个方面&#xff1a; 全景摄影&#xff1a;在摄影中&#xff0c;通过将多张照片拼接在一起可以实现全景照片的效果。这在旅游景点、房地产展示等领域有着广泛的应用&#xff0c;能够提供更加生动、真实的视…

Bpmn-js 属性控制

我们可以通过bpmn-js来访问对应的BPMN图例的属性信息。对应的流程图中的每个图例元素&#xff08;如开始、结束、中间/边界事件等都通过businessObject属性存储对基础BPMN元素的引用。业务对象是从BPMN 2.0 XML导入并在导出过程中序列化的实际元素。使用业务对象来读取和写入BP…

如何减少HTTP请求次数

资料来源 : 小林coding 小林官方网站 : 小林coding (xiaolincoding.com) 如何减少HTTP请求次数? 减少 HTTP 请求次数自然也就提升了 HTTP 性能&#xff0c;可以从这 3 个方面入手: 减少重定向请求次数合并请求延迟发送请求 减少重定向请求次数 我们先来看看什么是重定向请…

美相关 APT 组织分析报告

获取方式&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1AsysdggUIbvB3PZ41MaJaQ?pwd8euh 提取码&#xff1a;8euh

Debug Monitor中断详细解析

文章目录 0 基本术语1 相关寄存器和指令1.1 Debug Halting Control and Status Register (DHCSR), 0xE000EDF01.2 Debug Exception and Monitor Control Register (DEMCR), 0xE000EDFC1.3 Debug Fault Status Register, DFSR, 0xE000ED301.4 BKPT指令 2 Debug Monitor中断示例2…

DNS域名解析过程、工具、文件配置

目录 DNS介绍 DNS域名层次结构 DNS域名解析过程 递归查询和迭代查询 DNS 查询的命令行工具&#xff1a;host、dig、nslookup host 语法 参数和选项 示例用法 dig 语法 参数和选项 示例用法 nslookup 语法 参数和选项 交互式命令 示例用法 配置 DNS 客户端 DNS介…

解读OpenAI视频生成模型Sora背后的原理:Diffusion Transformer

Diffusion Models视频生成-博客汇总 前言&#xff1a;OpenAI最近推出的视频生成模型Sora在效果上实现了真正的遥遥领先&#xff0c;很多博主都介绍过Sora&#xff0c;但是深入解读背后原理的博客却非常少。Sora的原理最主要的是核心模型主干《Scalable Diffusion Models with T…

Code Composer Studio (CCS) - Breakpoint (断点)

Code Composer Studio [CCS] - Breakpoint [断点] 1. BreakpointReferences 1. Breakpoint 选中断点右键 -> Breakpoint Properties… Skip Count&#xff1a;跳过断点总数&#xff0c;在断点执行之前设置总数 Current Count&#xff1a;当前跳过断电累计值 References […

xtu oj 1215 A+B V

题目描述 小明很喜欢做ab&#xff0c;他但经常忘记进位&#xff0c;所以他算881290,而不是100。 现在你给了小明一些ab的算式&#xff0c;请问他算出来会是什么&#xff1f; 输入 第一行是一个整数K&#xff0c;表示样例的个数。 每个样例占一行&#xff0c;为两个整数a,b&a…

CCF编程能力等级认证GESP—C++7级—20231209

CCF编程能力等级认证GESP—C7级—20231209 单选题&#xff08;每题 2 分&#xff0c;共 30 分&#xff09;判断题&#xff08;每题 2 分&#xff0c;共 20 分&#xff09;编程题 (每题 25 分&#xff0c;共 50 分)商品交易纸牌游戏 答案及解析单选题判断题编程题1编程题2 单选题…

深度探索Python集合:从基本操作到高级用法

在Python编程中&#xff0c;集合(Set)作为一种高效且功能强大的内置数据结构&#xff0c;常用于处理不包含重复元素的无序数据集合。本文将详述Python集合的基本操作、进阶技巧以及在实际场景中的应用。 一、集合基础 Python集合初始化可通过大括号 {} 或者 set() 函数实现&a…

Vue实现多个input输入,光标自动聚焦到下一个input

遇到一个需求&#xff0c;需要实现和移动端短信输入一样&#xff0c;输入内容后&#xff0c;光标会进入下一个输入框 需要用到2个事件 keydown事件发生在键盘的键被按下的时候 keyup 事件在按键被释放的时候触发 <template><div class"box"><el-fo…

(ruoyi-vue3.8.6版本基础上)升级Spring Boot 3.x时遇到的问题与解决办法

升级Spring Boot 3.x时遇到的问题与解决办法 在升级Spring Boot应用到3.x版本的过程中,可能会遇到一系列的依赖、配置及JDK版本相关问题。本文将列出几个常见问题及其对应的解决方案。 问题一:javax.xml.bind.JAXBException异常 问题描述 在更新Spring Boot依赖后,如果缺…

OpenAI重磅发布Sora——首个视频生成模型:利用文本-视频人工智能将想象变为现实

想象一下&#xff0c;现在你有一段文本话描述的故事&#xff0c;通过输入这段文本&#xff0c;就可以立刻展开一个生动详细的视频。这就是 OpenAI 神奇的 Sora&#xff0c;一个革命性的文本到视频的 AI 模型。Sora于2024年2月推出&#xff0c;凭借其仅凭文字提示就能生成现实和…

CSS 不同颜色的小圆角方块组成的旋转加载动画

<template><!-- 创建一个装载自定义旋转加载动画的容器 --><view class="spinner"><!-- 定义外部包裹容器,用于实现整体旋转动画 --><view class="outer"><!-- 定义四个内部小方块以形成十字形结构 --><view clas…

攻防演练后的一点随记

攻防演练 攻防演练算是告一段落了&#xff0c;各位红队和蓝队的兄弟们都辛苦了&#xff0c;写一点随记&#xff0c;供大家参考。 记得第一次参加攻防演练是在2018年&#xff0c;当时被派到北京&#xff0c;在某个政企单位做攻防演练支撑工作&#xff0c;然后2020年又被紧急派到…

SSE 协议详细教程

Server-Sent Events&#xff08;SSE&#xff09;是一种服务器到客户端的单向通信协议&#xff0c;它基于HTTP协议&#xff0c;允许服务器向客户端推送数据。SSE常用于实现实时更新的功能&#xff0c;例如在新闻网站或股票市场中显示最新消息。本文将详细介绍SSE协议的原理、实现…

Vue首屏优化,12个提速建议

文章目录 代码拆分和懒加载&#xff1a;代码拆分懒加载 图片优化&#xff1a;组件懒渲染&#xff1a;数据预获取和缓存&#xff1a;服务器端渲染&#xff08;SSR&#xff09;&#xff1a;代码压缩和合并&#xff1a;使用 CDN 加速&#xff1a;监控和性能分析&#xff1a;代码优…