使用RabbitMQ实现微服务间的异步消息传递

使用RabbitMQ实现微服务间的异步消息传递

      • RabbitMQ简介
      • 安装RabbitMQ
        • 在Ubuntu上安装RabbitMQ
        • 在CentOS上安装RabbitMQ
      • 配置RabbitMQ
      • 创建微服务
        • 生产者服务
          • 安装依赖
          • 生产者代码
        • 消费者服务
          • 消费者代码
      • 运行微服务
      • 消息模式
        • 直接模式
          • 生产者代码
          • 消费者代码
        • 扇出模式
          • 生产者代码
          • 消费者代码
        • 主题模式
          • 生产者代码
          • 消费者代码
      • 高级特性
        • 持久化
          • 生产者代码
          • 消费者代码
        • 确认机制
          • 消费者代码
      • 监控和日志
        • 监控
        • 日志
      • 故障排除
      • 总结

在现代分布式系统中,微服务架构越来越受到欢迎。微服务之间需要进行高效、可靠的消息传递。RabbitMQ作为一个成熟的开源消息中间件,能够很好地满足这一需求。本文将详细介绍如何使用RabbitMQ实现微服务间的异步消息传递。

RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议。它支持多种消息模式,如发布/订阅、路由、主题等。

安装RabbitMQ

RabbitMQ可以在多种操作系统上安装,包括Linux、macOS和Windows。
在Ubuntu上安装RabbitMQ
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
在CentOS上安装RabbitMQ
sudo yum install epel-release
sudo yum install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

配置RabbitMQ

安装完成后,可以使用以下命令进行基本配置。
sudo rabbitmq-plugins enable rabbitmq_management
sudo systemctl restart rabbitmq-server
访问RabbitMQ管理界面:`http://localhost:15672`,默认用户名和密码都是`guest`。

创建微服务

我们将创建两个简单的微服务:生产者服务和消费者服务。
生产者服务
生产者服务负责发送消息到RabbitMQ。
安装依赖
pip install pika
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')message = 'Hello World!'
channel.basic_publish(exchange='', routing_key='hello', body=message)
print(f'Sent: {message}')
connection.close()
消费者服务
消费者服务负责从RabbitMQ接收消息。
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

运行微服务

先启动消费者服务,然后启动生产者服务。
# 启动消费者服务
python consumer.py# 启动生产者服务
python producer.py

消息模式

RabbitMQ支持多种消息模式,包括直接模式、扇出模式、主题模式和头部模式。
直接模式
直接模式是最简单的模式,消息会被发送到指定的队列。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='direct_queue')message = 'Direct message'
channel.basic_publish(exchange='', routing_key='direct_queue', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='direct_queue')channel.basic_consume(queue='direct_queue', auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
扇出模式
扇出模式将消息广播到所有绑定的队列。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')message = 'Fanout message'
channel.basic_publish(exchange='fanout_exchange', routing_key='', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='fanout_exchange', queue=queue_name)channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
主题模式
主题模式允许更复杂的路由规则。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')routing_key = 'kern.critical'
message = 'Critical kernel message'
channel.basic_publish(exchange='topic_exchange', routing_key=routing_key, body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuebinding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key=binding_key)channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

高级特性

RabbitMQ还支持许多高级特性,如持久化、确认机制、死信队列等。
持久化
可以配置消息和队列的持久化,以确保消息不会因为RabbitMQ服务器重启而丢失。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='durable_queue', durable=True)message = 'Persistent message'
channel.basic_publish(exchange='', routing_key='durable_queue', body=message, properties=pika.BasicProperties(delivery_mode=2))
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')ch.basic_ack(delivery_tag=method.delivery_tag)connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='durable_queue', durable=True)channel.basic_consume(queue='durable_queue', on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
确认机制
可以配置消费者在处理完消息后发送确认,以确保消息不会被重复处理。
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')ch.basic_ack(delivery_tag=method.delivery_tag)connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='ack_queue')channel.basic_consume(queue='ack_queue', on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

监控和日志

RabbitMQ提供了丰富的监控和日志功能,可以用于监控和调试。

监控
可以通过RabbitMQ管理界面监控队列、交换机和连接等。

日志
可以通过配置文件调整日志级别和输出方式。

故障排除

如果RabbitMQ配置出现问题,可以使用以下命令进行故障排除。

sudo rabbitmqctl status
sudo journalctl -u rabbitmq-server

总结

通过本文,你已经学习了如何使用RabbitMQ实现微服务间的异步消息传递。我们介绍了RabbitMQ的基本概念、安装方法、配置RabbitMQ、创建微服务、消息模式(直接模式、扇出模式、主题模式)、高级特性(持久化、确认机制)、监控和日志、故障排除等内容。掌握了这些知识,将有助于你在实际工作中更好地利用RabbitMQ来构建高效、可靠的微服务架构。
RabbitMQ管理界面示例
RabbitMQ消息传递模式示例

使用RabbitMQ可以显著提高微服务间消息传递的可靠性和效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/884338.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机低能儿从0刷leetcode | 31.下一个排列

题目:31. 下一个排列 思路: 本题中,我们需要寻找“下一个排列”,也就是要找到增长最小的排列。 因此我们应该从尽可能从靠右侧(末尾)的位置开始增长。想象我们从末尾开始遍历数组,会遇到第一…

.net core NPOI以及NOPI mapper

我们在日常开发中对Excel的操作可能会比较频繁,好多功能都会涉及到Excel的操作。在.Net Core中大家可能使用Npoi比较多,这款软件功能也十分强大,而且接近原始编程。但是直接使用Npoi大部分时候我们可能都会自己封装一下,毕竟根据二…

Yolo V4详解

Yolo V4(You Only Look Once version 4)是一种先进的目标检测系统,于2020年推出。作为Yolo系列算法的最新版本,Yolo V4继承了其前代版本的优点,并在此基础上进行了多项改进,使得其性能得到了显著提升。本文…

Pandas数据结构之Series对象

文章目录 1. DataFrame对象1.1 创建DataFrame对象1.2 DataFrame对象常用属性和方法1.3 布尔值列表获取DataFrame对象中部分数据1.4 DataFrame对象的运算 1. DataFrame对象 DataFrame是一个表格型的结构化数据结构,它含有一组或多组有序的列(Series&…

Tomcat 11 下载/安装 与基本使用

为什么要使用Tomcat? 使用Apache Tomcat的原因有很多,以下是一些主要的优点和特点: 1. 开源与免费 Tomcat是一个完全开源的项目,任何人都可以免费使用。它由Apache软件基金会维护,拥有一个活跃的社区,这…

换热器换热面积计算

1 容积式水加热器换热面积计算 式中Q—设计小时耗热量(W) ε—由于水垢、热媒分布不均匀等影响传热效率的系数,一般采用0.8~0.6 K—传热系数[W/(m2ˑ℃)],K值对加热器换热影响很大,主要取决于热媒种类和压力、热媒和…

幸福宝宝起名器

这段代码是一个简单的“幸福宝宝取名器”网页应用&#xff0c;主要功能是根据用户输入的姓氏、性别和生成数量&#xff0c;随机生成宝宝的名字。以下是代码的主要组成部分和功能简介&#xff1a; 1. HTML 结构 - 文档类型和语言&#xff1a;使用 <!DOCTYPE html> 声明文…

【数据结构与算法】LeetCode: 贪心算法

文章目录 LeetCode&#xff1a; 贪心算法买卖股票的最佳时机 &#xff08;Hot100&#xff09;买卖股票的最佳时机 II跳跃游戏 &#xff08;Hot100&#xff09;跳跃游戏 II&#xff08;Hot100&#xff09;划分字母区间 &#xff08;Hot100&#xff09;分发饼干K次取反后最大化的…

BLG与T1谁会赢?python制作预测程序,结果显示,BLG将打败T1

决赛预测 2024英雄联盟全球总决赛 2024年英雄联盟全球总决赛&#xff0c;今天晚上&#xff08;2024年11月2日22点&#xff09;就要开始了&#xff01;今年的总决赛的队伍是BLG与T1。当然一些老的lol玩家&#xff0c;现在可能对于lol关注不多&#xff0c;并不清楚这两个队伍。…

Spring Boot 3.x 整合 Druid 数据库连接池(含密码加密)

Spring Boot 3.x 整合 Druid 数据库连接池&#xff08;含密码加密&#xff09; 1. 为什么需要数据库连接池&#xff1f; 在传统的数据库连接中&#xff0c;每一次与数据库连接都会消耗大量的系统资源和时间。数据库连接池会提前创建一定数量的数据库连接保存在池中&#xff0…

麒麟V10SP1部署postgresql+postgis+pgrouting

1、查看当前操作系统版本&#xff1a; nkvers ############## Kylin Linux Version ################# Release: Kylin Linux Advanced Server release V10 (Tercel) Kernel: 4.19.90-17.5.ky10.aarch64 Build: Kylin Linux Advanced Server release V10 (SP1) /(Tercel)-…

Spring Boot框架在信息学科平台建设中的实用技巧

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了基于保密信息学科平台系统的开发全过程。通过分析基于保密信息学科平台系统管理的不足&#xff0c;创建了一个计算机管理基于保密信息学科平台系统的方案。文章介…

完美解决“找不到MSVCR110.dll无法继续执行代码

msvcr110.dll是一个动态链接库&#xff08;Dynamic Link Library&#xff0c;简称DLL&#xff09;文件&#xff0c;它是Microsoft Visual C 2012 Redistributable Package的一部分。这个库文件包含了大量预先编写的函数和资源&#xff0c;用于支持那些使用Visual C 2012或与之兼…

C++【string的模拟实现】

在前文我们讲解了string类接口使用&#xff08;C【string类的使用】(上),C【string类的使用】&#xff08;下&#xff09;&#xff09;&#xff0c;本片文章就来模拟实现string类。 注&#xff1a;本文实现的是string的部分重点内容&#xff0c;目的是为了更好的了解string&…

JS中计算时数据有误差解决方案

首先判断需要计算的数字是否为整数 // 判断一个数字是否为一个整数 export function isInt(num) {num Number(num);return Math.floor(num) num } 将一个浮点数转为整数&#xff0c;返回整数和倍数。如3.14 返回314 100 export function toInt(num) {var ret { times: 1,…

新能源汽车充电设施在储充电站的应用

0引言 全球能源和环境问题促使新能源汽车受到关注&#xff0c;但其推广受充电设施和能源供应限制。光伏站、储能站和电动汽车充放电站作为可再生能源利用和储存方式&#xff0c;具有巨大潜力。本研究旨在探索新能源汽车充电设施与这些站点的融合模式&#xff0c;以支持新能源汽…

【RESTful】RESTful API的设计原则

目录 引言一、协议二、域名三、版本&#xff08;Versioning&#xff09;四、路径&#xff08;Endpoint&#xff09;4.1 每个资源应有唯一的URI标识4.2 资源路径设计 五、HTTP动词5.1 常用HTTP动词及其作用5.2 RESTful API利用HTTP方法表示对商品资源的操作 六、使用自描述消息6…

Oracle 第11章:异常处理

在 Oracle PL/SQL 中&#xff0c;异常处理是一个重要的概念&#xff0c;它用于管理程序执行过程中可能发生的错误或特殊情况。异常可以是系统预定义的&#xff0c;也可以是由用户自定义的。 异常类型与处理机制 PL/SQL 提供了两种类型的异常&#xff1a; 预定义异常&#xf…

uniapp开发小程序【简单的实现点击下拉选择性别功能】

一、展示效果 二、代码 <template><view><view class="form_box"><view class="item"

Git 基础详解

1. 基本概念 Git是一个免费、开源的 分布式版本控制系统&#xff0c;可以高效处理小到大型的各种项目。 1.1 版本控制 版本控制&#xff1a;它是一种用于追踪和记录文件、目录、项目或软件的变化&#xff0c;以便将来查阅、比较、修订不同版本文件的系统 版本控制系统&…