基于消息中间件的异步通信机制在系统解耦中的优化与实现


✨✨谢谢大家捧场,祝屏幕前的小伙伴们每天都有好运相伴左右,一定要天天开心哦!✨✨ 
🎈🎈作者主页: 喔的嘛呀🎈🎈
✨✨ 帅哥美女们,我们共同加油!一起进步!✨✨ 

目录

引言

一. 选择合适的消息中间件

二. 定义消息格式和通信协议

1. 定义消息格式

消息头

消息体

2. 定义通信协议

发送消息

接收消息

消息处理

3. 示例代码

定义消息格式

发送消息

接收消息

三、发布-订阅模式

1. 定义发布-订阅模式

2. 示例代码

发布消息

订阅消息

3. 运行示例

4. 异步处理消息

5. 解耦系统

6. 实现步骤

7. 实例场景

实例场景:电商系统订单处理

场景描述

实现步骤

示例代码

订单服务发送消息

库存服务接收消息

物流服务接收消息


引言

在现代分布式系统中,异步通信和解耦是非常重要的设计原则。通过使用消息中间件,可以实现系统间的异步通信和解耦,提高系统的可扩展性和可靠性。本文将介绍如何使用消息中间件来实现系统间的异步通信和解耦,并通过一个实际场景来演示。

一. 选择合适的消息中间件

选择合适的消息中间件需要考虑多个因素,包括项目需求、性能要求、可靠性、社区支持等。常见的消息中间件包括 RabbitMQ、Kafka、ActiveMQ、Redis 等,下面针对不同的需求给出一些选择建议:

  1. 消息传递模式

    • 点对点:适合使用 RabbitMQ、ActiveMQ 等传统消息中间件。
    • 发布-订阅:适合使用 RabbitMQ、Kafka 等支持广播消息的中间件。
  2. 可靠性

    • 如果对消息的可靠性要求较高,需要确保消息不会丢失,可以考虑使用 RabbitMQ、Kafka 等提供消息持久化和高可靠性的中间件。
  3. 性能

    • 如果需要处理大量的消息并且需要低延迟,可以考虑使用 Kafka,它是一个高吞吐量的消息中间件,适合大数据场景。
    • 如果对延迟要求较低,可以选择 RabbitMQ、ActiveMQ 等传统消息中间件。
  4. 社区支持和生态系统

    • 考虑选择一个有活跃社区支持和完善生态系统的消息中间件,这样可以更容易地解决问题和扩展功能。
  5. 技术栈兼容性

    • 考虑选择一个与你的技术栈兼容的消息中间件,避免出现集成上的问题。

综合考虑以上因素,可以选择最适合项目需求的消息中间件。

二. 定义消息格式和通信协议

定义消息格式和通信协议是使用消息中间件的关键步骤之一,它涉及到消息的结构、内容和交互方式。下面以 RabbitMQ 为例,演示如何定义消息格式和通信协议。

1. 定义消息格式

在 RabbitMQ 中,消息通常由两部分组成:消息头和消息体。消息头包含一些元数据信息,如消息的类型、路由键等;消息体包含实际的业务数据。

消息头
  • Content-Type:消息体的类型,如 application/jsontext/plain 等。
  • DeliveryMode:消息持久性标志,标识消息是否需要持久化存储,可选值为 1(持久化)和 2(非持久化)。
  • CorrelationId:消息关联标识,用于关联一组相关消息。
  • 其他自定义的消息头字段,根据业务需求定义。
消息体
  • 消息体可以是任意格式的数据,如 JSON、XML、文本等,根据业务需求定义。

2. 定义通信协议

通信协议定义了消息的交互方式,包括消息的发送、接收和处理流程。通信协议可以包括以下几个方面:

发送消息
  • 客户端向消息队列发送消息,包括指定交换机(Exchange)、路由键(Routing Key)和消息体。
接收消息
  • 服务端从消息队列接收消息,根据消息的交换机和路由键接收对应的消息。
消息处理
  • 客户端接收到消息后,根据消息的内容执行相应的业务逻辑。

3. 示例代码

定义消息格式
public class Message {private String content;private String contentType;private int deliveryMode;private String correlationId;// 省略getter和setter方法
}
发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class SendMessage {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);Message message = new Message();message.setContent("Hello, RabbitMQ!");message.setContentType("text/plain");message.setDeliveryMode(1); // 持久化message.setCorrelationId("123456");String messageJson = toJson(message);channel.basicPublish("", QUEUE_NAME, null, messageJson.getBytes());System.out.println(" [x] Sent '" + messageJson + "'");}}private static String toJson(Message message) {// 将 message 对象转换成 JSON 格式的字符串return "{ \"content\": \"" + message.getContent() + "\", \"contentType\": \"" + message.getContentType() + "\", \"deliveryMode\": " + message.getDeliveryMode() + ", \"correlationId\": \"" + message.getCorrelationId() + "\" }";}
}
接收消息
import com.rabbitmq.client.*;public class ReceiveMessage {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String messageJson = new String(delivery.getBody(), "UTF-8");Message message = fromJson(messageJson, Message.class);System.out.println(" [x] Received '" + messageJson + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}}private static <T> T fromJson(String json, Class<T> clazz) {// 将 JSON 格式的字符串转换成指定类型的对象// 这里可以使用 JSON 框架(如 Jackson、Gson)来实现return null;}
}

通过以上步骤,可以定义消息格式和通信协议,并使用 RabbitMQ 实现消息的发送和接收。

三、发布-订阅模式

发布-订阅模式是一种常见的消息传递模式,用于实现消息的广播和订阅。在发布-订阅模式中,消息发布者将消息发布到一个主题(Topic),而消息订阅者可以订阅感兴趣的主题,从而接收到相关消息。下面以 RabbitMQ 为例,演示如何使用发布-订阅模式。

1. 定义发布-订阅模式

在发布-订阅模式中,有一个交换机(Exchange)用来接收发布者发布的消息,并根据订阅者的绑定关系将消息路由到对应的队列。订阅者可以创建自己的队列,并将队列绑定到交换机上,从而接收到发布者发布的消息。

2. 示例代码

发布消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Publisher {private final static String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "Hello, subscribers!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
订阅消息
import com.rabbitmq.client.*;public class Subscriber {private final static String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}}
}

3. 运行示例

  1. 先运行订阅者 Subscriber,它会创建一个队列并绑定到交换机上,开始监听消息。
  2. 然后运行发布者 Publisher,它会向交换机发布一条消息。
  3. 订阅者会接收到发布者发布的消息,并输出到控制台。

通过以上步骤,可以实现基于 RabbitMQ 的发布-订阅模式。

4. 异步处理消息

通过消息中间件实现异步处理消息,即发送消息后不需要立即等待结果,而是继续执行其他任务。这样可以提高系统的响应速度和吞吐量。

5. 解耦系统

通过消息中间件,系统之间的通信变成了基于消息的方式,系统不再直接依赖于对方的接口和实现细节,从而实现了系统之间的解耦。

6. 实现步骤

  • 定义消息格式和通信协议:确定消息的格式和通信协议,包括消息的内容结构、消息的生命周期等。
  • 配置消息中间件:在系统中配置和启动消息中间件,确保消息中间件正常运行。
  • 消息的发布和订阅:编写代码实现消息的发布和订阅逻辑,将消息发布到指定的主题,并订阅感兴趣的主题。
  • 处理接收到的消息:编写代码处理接收到的消息,根据消息的内容执行相应的业务逻辑。
  • 测试和验证:对系统进行测试和验证,确保消息的发布、订阅和处理功能正常运行。

7. 实例场景

实例场景:电商系统订单处理
场景描述

假设有一个电商系统,包含订单服务、库存服务和物流服务。当用户下单时,订单服务需要通知库存服务减少库存,通知物流服务发货。为了提高系统的可扩展性和可靠性,我们可以使用消息中间件来实现订单处理的异步通信和解耦。

实现步骤
  1. 定义消息格式和通信协议:定义订单消息的格式,包括订单号、商品信息等,并确定消息的交换机和队列名称。

  2. 配置消息中间件:在消息中间件中配置交换机和队列,并确保消息的持久化。

  3. 订单服务发送消息:订单服务在用户下单后,将订单消息发送到消息队列中。

  4. 库存服务订阅消息:库存服务订阅订单消息队列,接收并处理订单消息,减少库存。

  5. 物流服务订阅消息:物流服务也订阅订单消息队列,接收并处理订单消息,进行发货。

示例代码
订单服务发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class OrderService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "New order placed";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
库存服务接收消息
import com.rabbitmq.client.*;public class InventoryService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 处理订单消息,减少库存};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}
物流服务接收消息
import com.rabbitmq.client.*;public class LogisticsService {private static final String EXCHANGE_NAME = "orders";private static final String QUEUE_NAME = "order_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for orders. To exit press Ctrl+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 处理订单消息,发货};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}

通过以上步骤的简单演示,订单服务可以异步发送订单消息,库存服务和物流服务可以订阅订单消息并处理,实现了订单处理的异步通信和解耦。

通过以上步骤,可以使用消息中间件实现系统间的异步通信和解耦,提高系统的可扩展性和可维护性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/13438.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GPT-SoVITS语音克隆部署与使用

GPT-SoVITS是一款强大的少量样本语音转换与语音合成开源工具。当前&#xff0c;GPT-SoVITS实现了如下几个方面的功能&#xff1a; 由参考音频的情感、音色、语速控制合成音频的情感、音色、语速可以少量语音微调训练&#xff0c;也可不训练直接推理可以跨语种生成&#xff0c;…

【AI】试用 ai 提取文章内容尝试

电梯产业面临这样一个问题&#xff0c;因为太多的品牌&#xff0c;将近 400 多个&#xff0c;甚至有宝马&#xff0c;奥迪&#xff0c;你敢相信&#xff0c;一家造汽车的造过电梯?不过好像想想也是&#xff0c;电梯是第二大交通工具&#xff0c;电梯从某种意义上来说&#xff…

无网环境禁止 WPS 提示登录,且基本功能按钮可用

目前 WPS 升级后&#xff0c;每次打开都会提示你登录 WPS&#xff0c;并且在未登录之前所有基本功能按钮是置灰状态&#xff0c;无法使用。 如此一来&#xff0c;在内网或无网环境&#xff0c;我们无法登陆 WPS &#xff0c;就给我们的使用带来了极大的不便&#xff0c;那么有没…

全网最全面的由浅到深的Kotlin基础教程(七)

前言 本篇文章接着上一篇文章全网最全面的由浅到深的Kotlin基础教程&#xff08;六&#xff09;继续进阶学习kotlin&#xff0c;建议看完上一篇文章&#xff0c;再来看本篇文章。本篇主要分析一些常用的kotlin函数源码&#xff0c;以及用kotlin简单实现Rxjava的几个操作符。坚…

全志A133 android10 调试vibrator震动马达

一&#xff0c;前提条件 全志使用的马达配置为上电震动&#xff0c;下电停止&#xff0c;需测试硬件是否正常。马达供电最好为独立供电&#xff0c;避免干扰。 二&#xff0c;适配步骤 1. dts中增加马达配置 motor_para {compatible "allwinner,sunxi-vibrator";…

BGP实验:联邦和发射器实验

BGP实验&#xff1a;联邦和发射器实验 一、实验拓扑 二、实验要求及分析 实验要求&#xff1a; 1、AS1存在两个环回&#xff0c;一个地址为192.168.1.0/24&#xff0c;该地址不能再任何协议中宣告&#xff1b; ​ AS3存在两个环回&#xff0c;一个地址为192.168.2.0/24&…

解决ModuleNotFoundError: No module named ‘open_clip‘问题

在使用stable diffusion大模型时&#xff0c;添加一些模型后启动df页面报错&#xff1a;ModuleNotFoundError: No module named open_clip 使用 pip install open_clip命令下载失败&#xff0c;报错&#xff1a; Looking in indexes: https://mirrors.aliyun.com/pypi/simple…

Redis【B站面试题】

前言 2023-07-27 22:44:59 出自B站 灰灰的Java面试 Redis Redis为什么快&#xff1f; 1.纯内存KV操作 Redis的操作都是基于内存的&#xff0c;CPU不是 Redis性能瓶颈,&#xff0c;Redis的瓶颈是机器内存和网络带宽。 在计算机的世界中&#xff0c;CPU的速度是远大于内存的速…

深度学习之基于TensorFlow人脸表情识别

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景 人脸表情识别是计算机视觉领域的重要研究方向之一&#xff0c;它在人机交互、情感分析、安全监控等领…

Java 变量和作用域:理解变量的声明、初始化及其作用域

在Java编程语言中&#xff0c;变量和作用域是两个核心概念。理解变量的声明、初始化以及它们的作用域对于编写健壮且高效的代码至关重要。 变量的声明与初始化 变量的声明 在Java中&#xff0c;变量的声明指的是定义变量的名称和类型。在Java中&#xff0c;变量声明的一般语…

ubuntu使用记录——如何使用wireshark网络抓包工具进行检测速腾激光雷达的ip和端口号

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言wireshark网络抓包工具1.wireshark的安装2.wireshark的使用3.更改雷达ip 总结 前言 Wireshark是一款备受赞誉的开源网络协议分析软件&#xff0c;其功能之强大…

简述MyBatis中#{}引用和${}引用的区别

各位大佬光临寒舍&#xff0c;希望各位能赏脸给个三连&#xff0c;谢谢各位大佬了&#xff01;&#xff01;&#xff01; 目录 1.有无预编译 优点 缺点 2.SQL执行的快慢 3.能否被SQL注入 4.参数输入方式 5.总结 1.有无预编译 #{}是有预编译的而${}是没有预编译的&…

LiveGBS流媒体平台GB/T28181用户手册-服务器概览:通道信息、负载信息、CPU使用、存储使用、带宽使用(Mbps)、内存使用

LiveGBS用户手册-服务器概览&#xff1a;通道信息、负载信息、CPU使用、存储使用、带宽使用&#xff08;Mbps&#xff09;、内存使用 1、服务器概览1.1、通道信息1.2、负载信息1.2.1、信息说明1.2.2、会话列表 1.3、CPU使用1.4、存储使用1.5、带宽使用&#xff08;Mbps&#xf…

15:00面试,15:08出来,面试问的有点变态。。。。

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 从小厂出来&#xff0c;没想到在另一家公司又寄了。 到这家公司开始上班&#xff0c;加班是每天…

阿里云数据库 SelectDB 版全面商业化,开启现代化实时数据仓库的全新篇章

2024 年 5 月 21 日&#xff0c;由阿里云联合飞轮科技共同举办的「阿里云数据库 SelectDB 版商业化产品发布会」于线上召开。阿里巴巴集团副总裁、阿里云数据库产品事业部负责人李飞飞宣布&#xff0c;阿里云数据库 SelectDB 版在中国站及国际站全面发布&#xff0c;正式开启商…

ROS | 用C++和python实现运动控制功能

基础知识&#xff1a; 用C实现&#xff1a; C代码&#xff1a; 用python实现&#xff1a; Python代码&#xff1a;

数据库理论基本概念

数据库理论基本概念 三级模式和两级映像 外模式 > 用户和数据库系统的接口 -------- 外模式-概念模式映射 概念模式 > 数据的逻辑结构和特征的描述 -------- 概念模式-内模式映射 内模式 > 数据物理结构和存储方式的描述三级…

避雷:搭建ai知识库的6大注意事项

随着人工智能技术的发展&#xff0c;ai知识库成为众多企业追求的一个重要部分&#xff0c;帮助企业提高运营次效率&#xff0c;越来越受到人们的关注。但是&#xff0c;在搭建ai知识库的过程中&#xff0c;稍不留意&#xff0c;就会漏掉一些小细节&#xff0c;导致做出来的ai知…

GPT-4o 引领人机交互新风向的向量数据库Milvus Cloud 成本

成本 AIGC 时代对于冷热储存的呼唤 成本一直是向量数据库获得更广泛使用的最大阻碍之一,这个成本来自两点: 储存,绝大多数向量数据库为了保证低延迟,需要把数据全量缓存到内存或者本地磁盘。在这个动辄百亿量级的AI 时代,意味着几十上百 TB 的资源消耗。 计算,数据需…