使用RabbitMQ实现微服务间的异步消息传递

使用RabbitMQ实现微服务间的异步消息传递

      • RabbitMQ简介
      • 安装RabbitMQ
        • 在Ubuntu上安装RabbitMQ
        • 在CentOS上安装RabbitMQ
      • 配置RabbitMQ
      • 创建微服务
        • 生产者服务
          • 安装依赖
          • 生产者代码
        • 消费者服务
          • 消费者代码
      • 运行微服务
      • 消息模式
        • 直接模式
          • 生产者代码
          • 消费者代码
        • 扇出模式
          • 生产者代码
          • 消费者代码
        • 主题模式
          • 生产者代码
          • 消费者代码
      • 高级特性
        • 持久化
          • 生产者代码
          • 消费者代码
        • 确认机制
          • 消费者代码
      • 监控和日志
        • 监控
        • 日志
      • 故障排除
      • 总结

在现代分布式系统中,微服务架构越来越受到欢迎。微服务之间需要进行高效、可靠的消息传递。RabbitMQ作为一个成熟的开源消息中间件,能够很好地满足这一需求。本文将详细介绍如何使用RabbitMQ实现微服务间的异步消息传递。

RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议。它支持多种消息模式,如发布/订阅、路由、主题等。

安装RabbitMQ

RabbitMQ可以在多种操作系统上安装,包括Linux、macOS和Windows。
在Ubuntu上安装RabbitMQ
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
在CentOS上安装RabbitMQ
sudo yum install epel-release
sudo yum install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

配置RabbitMQ

安装完成后,可以使用以下命令进行基本配置。
sudo rabbitmq-plugins enable rabbitmq_management
sudo systemctl restart rabbitmq-server
访问RabbitMQ管理界面:`http://localhost:15672`,默认用户名和密码都是`guest`。

创建微服务

我们将创建两个简单的微服务:生产者服务和消费者服务。
生产者服务
生产者服务负责发送消息到RabbitMQ。
安装依赖
pip install pika
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')message = 'Hello World!'
channel.basic_publish(exchange='', routing_key='hello', body=message)
print(f'Sent: {message}')
connection.close()
消费者服务
消费者服务负责从RabbitMQ接收消息。
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

运行微服务

先启动消费者服务,然后启动生产者服务。
# 启动消费者服务
python consumer.py# 启动生产者服务
python producer.py

消息模式

RabbitMQ支持多种消息模式,包括直接模式、扇出模式、主题模式和头部模式。
直接模式
直接模式是最简单的模式,消息会被发送到指定的队列。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='direct_queue')message = 'Direct message'
channel.basic_publish(exchange='', routing_key='direct_queue', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='direct_queue')channel.basic_consume(queue='direct_queue', auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
扇出模式
扇出模式将消息广播到所有绑定的队列。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')message = 'Fanout message'
channel.basic_publish(exchange='fanout_exchange', routing_key='', body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='fanout_exchange', exchange_type='fanout')result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='fanout_exchange', queue=queue_name)channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
主题模式
主题模式允许更复杂的路由规则。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')routing_key = 'kern.critical'
message = 'Critical kernel message'
channel.basic_publish(exchange='topic_exchange', routing_key=routing_key, body=message)
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_exchange', exchange_type='topic')result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queuebinding_keys = ['*.critical', 'kern.*']
for binding_key in binding_keys:channel.queue_bind(exchange='topic_exchange', queue=queue_name, routing_key=binding_key)channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

高级特性

RabbitMQ还支持许多高级特性,如持久化、确认机制、死信队列等。
持久化
可以配置消息和队列的持久化,以确保消息不会因为RabbitMQ服务器重启而丢失。
生产者代码
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='durable_queue', durable=True)message = 'Persistent message'
channel.basic_publish(exchange='', routing_key='durable_queue', body=message, properties=pika.BasicProperties(delivery_mode=2))
print(f'Sent: {message}')
connection.close()
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')ch.basic_ack(delivery_tag=method.delivery_tag)connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='durable_queue', durable=True)channel.basic_consume(queue='durable_queue', on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()
确认机制
可以配置消费者在处理完消息后发送确认,以确保消息不会被重复处理。
消费者代码
import pikadef on_message_received(ch, method, properties, body):print(f'Received: {body.decode()}')ch.basic_ack(delivery_tag=method.delivery_tag)connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.queue_declare(queue='ack_queue')channel.basic_consume(queue='ack_queue', on_message_callback=on_message_received)print('Waiting for messages...')
channel.start_consuming()

监控和日志

RabbitMQ提供了丰富的监控和日志功能,可以用于监控和调试。

监控
可以通过RabbitMQ管理界面监控队列、交换机和连接等。

日志
可以通过配置文件调整日志级别和输出方式。

故障排除

如果RabbitMQ配置出现问题,可以使用以下命令进行故障排除。

sudo rabbitmqctl status
sudo journalctl -u rabbitmq-server

总结

通过本文,你已经学习了如何使用RabbitMQ实现微服务间的异步消息传递。我们介绍了RabbitMQ的基本概念、安装方法、配置RabbitMQ、创建微服务、消息模式(直接模式、扇出模式、主题模式)、高级特性(持久化、确认机制)、监控和日志、故障排除等内容。掌握了这些知识,将有助于你在实际工作中更好地利用RabbitMQ来构建高效、可靠的微服务架构。
RabbitMQ管理界面示例
RabbitMQ消息传递模式示例

使用RabbitMQ可以显著提高微服务间消息传递的可靠性和效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/884338.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Yolo V4详解

Yolo V4(You Only Look Once version 4)是一种先进的目标检测系统,于2020年推出。作为Yolo系列算法的最新版本,Yolo V4继承了其前代版本的优点,并在此基础上进行了多项改进,使得其性能得到了显著提升。本文…

Pandas数据结构之Series对象

文章目录 1. DataFrame对象1.1 创建DataFrame对象1.2 DataFrame对象常用属性和方法1.3 布尔值列表获取DataFrame对象中部分数据1.4 DataFrame对象的运算 1. DataFrame对象 DataFrame是一个表格型的结构化数据结构,它含有一组或多组有序的列(Series&…

Tomcat 11 下载/安装 与基本使用

为什么要使用Tomcat? 使用Apache Tomcat的原因有很多,以下是一些主要的优点和特点: 1. 开源与免费 Tomcat是一个完全开源的项目,任何人都可以免费使用。它由Apache软件基金会维护,拥有一个活跃的社区,这…

换热器换热面积计算

1 容积式水加热器换热面积计算 式中Q—设计小时耗热量(W) ε—由于水垢、热媒分布不均匀等影响传热效率的系数,一般采用0.8~0.6 K—传热系数[W/(m2ˑ℃)],K值对加热器换热影响很大,主要取决于热媒种类和压力、热媒和…

幸福宝宝起名器

这段代码是一个简单的“幸福宝宝取名器”网页应用&#xff0c;主要功能是根据用户输入的姓氏、性别和生成数量&#xff0c;随机生成宝宝的名字。以下是代码的主要组成部分和功能简介&#xff1a; 1. HTML 结构 - 文档类型和语言&#xff1a;使用 <!DOCTYPE html> 声明文…

BLG与T1谁会赢?python制作预测程序,结果显示,BLG将打败T1

决赛预测 2024英雄联盟全球总决赛 2024年英雄联盟全球总决赛&#xff0c;今天晚上&#xff08;2024年11月2日22点&#xff09;就要开始了&#xff01;今年的总决赛的队伍是BLG与T1。当然一些老的lol玩家&#xff0c;现在可能对于lol关注不多&#xff0c;并不清楚这两个队伍。…

Spring Boot 3.x 整合 Druid 数据库连接池(含密码加密)

Spring Boot 3.x 整合 Druid 数据库连接池&#xff08;含密码加密&#xff09; 1. 为什么需要数据库连接池&#xff1f; 在传统的数据库连接中&#xff0c;每一次与数据库连接都会消耗大量的系统资源和时间。数据库连接池会提前创建一定数量的数据库连接保存在池中&#xff0…

完美解决“找不到MSVCR110.dll无法继续执行代码

msvcr110.dll是一个动态链接库&#xff08;Dynamic Link Library&#xff0c;简称DLL&#xff09;文件&#xff0c;它是Microsoft Visual C 2012 Redistributable Package的一部分。这个库文件包含了大量预先编写的函数和资源&#xff0c;用于支持那些使用Visual C 2012或与之兼…

C++【string的模拟实现】

在前文我们讲解了string类接口使用&#xff08;C【string类的使用】(上),C【string类的使用】&#xff08;下&#xff09;&#xff09;&#xff0c;本片文章就来模拟实现string类。 注&#xff1a;本文实现的是string的部分重点内容&#xff0c;目的是为了更好的了解string&…

新能源汽车充电设施在储充电站的应用

0引言 全球能源和环境问题促使新能源汽车受到关注&#xff0c;但其推广受充电设施和能源供应限制。光伏站、储能站和电动汽车充放电站作为可再生能源利用和储存方式&#xff0c;具有巨大潜力。本研究旨在探索新能源汽车充电设施与这些站点的融合模式&#xff0c;以支持新能源汽…

uniapp开发小程序【简单的实现点击下拉选择性别功能】

一、展示效果 二、代码 <template><view><view class="form_box"><view class="item"

Git 基础详解

1. 基本概念 Git是一个免费、开源的 分布式版本控制系统&#xff0c;可以高效处理小到大型的各种项目。 1.1 版本控制 版本控制&#xff1a;它是一种用于追踪和记录文件、目录、项目或软件的变化&#xff0c;以便将来查阅、比较、修订不同版本文件的系统 版本控制系统&…

Yarn介绍 | 组成 | 工作流程

1、理论 Apache YARN&#xff08;Yet another Resource Negotiator的缩写&#xff09;是Hadoop集群的资源管理系统&#xff0c;负责为计算程序提供服务器计算资源&#xff0c;相当于一个分布式的操作系统平台&#xff0c;而MapReduce等计算程序则相当于运行于操作系统之上的应用…

uniapp开发【选择地址-省市区功能】,直接套用即可

一、效果展示 二、代码 <template><view><view class="user_info"><view class="item"

《Web性能权威指南》-WebRTC-读书笔记

本文是《Web性能权威指南》第四部分——WebRTC的读书笔记。 第一部分——网络技术概览&#xff0c;请参考网络技术概览&#xff1b; 第二部分——无线网络性能&#xff0c;请参考无线网络性能&#xff1b; 第三部分——HTTP&#xff0c;请参考HTTP&#xff1b; 第四部分——浏览…

高效水电管理:Spring Boot在大学城的应用

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理大学城水电管理系统的相关信息成为必然。开…

(linux驱动学习 - 12). IIC 驱动实验

目录 一.IIC 总线驱动相关结构体与函数 1.i2c_adapter 结构体 2.i2c_algorithm 结构体 3.向系统注册设置好的 i2c_adapter 结构体 - i2c_add_adapter 4.向系统注册设置好的 i2c_adapter 结构体 - i2c_add_numbered_adapter 5.删除 I2C 适配器 - i2c_del_adapter 二.IIC 设…

selenium操作已开启的浏览器,方便调试

一、谷歌浏览器配置&#xff1a; 在所安装的谷歌下面&#xff0c;执行下面命令&#xff0c;打开谷歌浏览器&#xff0c;用来selenium的操作&#xff1a; 注意事项&#xff1a;端口需要不被占用&#xff0c;--user-data-dir"D:\workspace\chrome-data"这个路径需要有…

特殊矩阵的压缩存储

一维数组的存储结构 ElemType arr[10]; 各数组元素大小相同&#xff0c;且物理上连续存放。 数组元素a[i]的存放地址 LOC i * sizeof(ElemType)。&#xff08;LOC为起始地址&#xff09; 二维数组的存储结构 ElemType b[2][4];二维数组也具有随机存取的特性&#xff08;需…

河南高校大数据实验室建设案例分享

泰迪智能科技在与中国各地高校的合作中积累了丰富的经验&#xff0c;尤其是在大数据和人工智能领域。过去多年里与河南省内多所高校在大数据领域进行了积极的探索和建设&#xff0c;形成了一系列具有特色的大数据实验室。这些实验室不仅促进了高校内部的科研创新&#xff0c;也…