【Python】RocketMQ 基础使用

目录

1. 介绍

2. 实践

2.1. 启动消费者

2.2. 启动生产者


1. 介绍

RocketMQ是一个开源的分布式消息传递系统,最初由阿里巴巴集团开发并于2012年开源。它旨在解决高可靠性、高吞吐量、低延迟和可伸缩性等大规模分布式系统下的消息通信需求。

RocketMQ的设计目标是提供一种灵活、可靠、高性能的消息传递解决方案,适用于各种场景,包括在线消息通信、日志处理、流式处理、事件驱动架构等。

下面是对RocketMQ的主要特点和关键功能的简要介绍:

  1. 分布式架构:RocketMQ采用了分布式架构,支持水平扩展和高可用性。它的架构包括多个消息生产者、多个消息消费者和多个消息服务器(Broker)。消息生产者将消息发送到Broker,消息消费者从Broker订阅并消费消息。

  2. 高可靠性和容错性:RocketMQ通过复制和故障转移机制,提供高可靠性和容错性。它支持主从同步复制和异步复制方式,确保消息不会丢失,并且在Broker故障时能够自动切换到备用Broker。

  3. 高吞吐量和低延迟:RocketMQ通过优化存储、网络和消息传递等方面的性能,实现高吞吐量和低延迟。它支持批量发送和批量消费消息,有效地减少网络开销和提高消息处理效率。

  4. 灵活的消息模型:RocketMQ支持多种消息模型,包括点对点(P2P)和发布-订阅(Pub-Sub)模型。在P2P模型中,消息生产者直接发送消息给特定的消费者;在Pub-Sub模型中,消息生产者发布消息到特定的主题(Topic),消息消费者订阅感兴趣的主题并接收相应的消息。

  5. 丰富的消息过滤和顺序消息支持:RocketMQ提供灵活的消息过滤功能,可以根据消息的属性、标签或SQL表达式进行过滤。此外,它还支持顺序消息,确保相同主题的消息按照发送顺序被消费。

  6. 可伸缩性和扩展性:RocketMQ具有良好的可伸缩性和扩展性,可以根据需求增加或减少Broker、生产者和消费者的数量,以适应不断增长的消息流量。

  7. 丰富的生态系统和社区支持:RocketMQ拥有活跃的开源社区,提供了丰富的文档、示例和工具。此外,它还与其他开源项目(如Apache Storm、Apache Flume、Apache Flink等)集成,为用户提供更多选择和灵活性。

总之,RocketMQ是一个功能强大的分布式消息传递系统,具有高可靠性、高吞吐量、低延迟和可伸缩性等特点。它适用于构建大规模分布式系统中的消息通信基础设施,为开发者提供了一种可靠、高效的消息传递解决方案。

2. 实践

注意:以下配置需从阿里云 RocketMQ 获取

HTTP_ENDPOINT = "xxx"
ACCESS_KEY = "xxx"
SECRET_KEY = "xxx"

TOPIC = "xxx"
GROUP_ID = "xxx"
INSTANCE_ID = "xxx"

2.1. 启动消费者

""" 
The code was copied from this link
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-and-subscribe-to-transactional-messages-2?spm=a2c4g.11186623.0.0.4b6f8707cKp9eN
"""
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *HTTP_ENDPOINT = "xxx"
ACCESS_KEY = "xxx"
SECRET_KEY = "xxx"TOPIC = "xxx"
GROUP_ID = "xxx"
INSTANCE_ID = "xxx"
# 初始化client。
mq_client = MQClient(# 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。HTTP_ENDPOINT,# 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。# AccessKey ID,阿里云身份验证标识。ACCESS_KEY,# AccessKey Secret,阿里云身份验证密钥。SECRET_KEY)
# 消息所属的Topic,在消息队列RocketMQ版控制台创建。
topic_name = TOPIC
# 您在消息队列RocketMQ版控制台创建的Group ID。
group_id = GROUP_ID
# Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
# 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
instance_id = INSTANCE_IDconsumer = mq_client.get_consumer(instance_id, topic_name, group_id)# 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
# 长轮询时间3秒(最多可设置为30秒)。
wait_seconds = 3
# 一次最多消费3条(最多可设置为16条)。
batch = 3
print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \% (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)))
while True:try:# 长轮询消费消息。recv_msgs = consumer.consume_message(batch, wait_seconds)for msg in recv_msgs:print(("Receive, MessageId: %s\nMessageBodyMD5: %s \\nMessageTag: %s\nConsumedTimes: %s \\nPublishTime: %s\nBody: %s \\nNextConsumeTime: %s \\nReceiptHandle: %s \\nProperties: %s\n" % \(msg.message_id, msg.message_body_md5,msg.message_tag, msg.consumed_times,msg.publish_time, msg.message_body,msg.next_consume_time, msg.receipt_handle, msg.properties)))except MQExceptionBase as e:# Topic中没有消息可消费。if e.type == "MessageNotExist":print(("No new message! RequestId: %s" % e.req_id))continueprint(("Consume Message Fail! Exception:%s\n" % e))time.sleep(2)continue# msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。# 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。try:receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]consumer.ack_message(receipt_handle_list)print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))except MQExceptionBase as e:print(("\nAk Message Fail! Exception:%s" % e))# 某些消息的句柄可能超时,会导致消息消费状态确认不成功。if e.sub_errors:for sub_error in e.sub_errors:print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \(sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))

2.2. 启动生产者

""" 
The code was copied from this link
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-and-subscribe-to-transactional-messages-2?spm=a2c4g.11186623.0.0.4b6f8707cKp9eN
"""
import sysfrom mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time
import threadingHTTP_ENDPOINT = "xxx"
ACCESS_KEY = "xxx"
SECRET_KEY = "xxx"TOPIC = "xxx"
GROUP_ID = "xxx"
INSTANCE_ID = "xxx"# 初始化client。
mq_client = MQClient(# 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。HTTP_ENDPOINT,# 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。# AccessKey ID,阿里云身份验证标识。ACCESS_KEY,# AccessKey Secret,阿里云身份验证密钥。SECRET_KEY)
# 消息所属的Topic,在消息队列RocketMQ版控制台创建。
topic_name = TOPIC
# Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
# 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
instance_id = INSTANCE_IDproducer = mq_client.get_producer(instance_id, topic_name)# 循环发送4条消息。
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))try:for i in range(msg_count):msg = TopicMessage(# 消息内容。"I am test message %s.hello" % i,# 消息标签。"tag %s" % i)# 设置消息的自定义属性。msg.put_property("a", i)# 设置消息的Key。msg.set_message_key("MessageKey")re_msg = producer.publish_message(msg)print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))except MQExceptionBase as e:if e.type == "TopicNotExist":print("Topic not exist, please create it.")sys.exit(1)print("Publish Message Fail. Exception:%s" % e)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/802187.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue对比react18

1.模板语法-——>jsx JSX表达式用{}包裹&#xff0c;vue模板表达式用{{}}包裹&#xff0c;其余一致。 注意:if语句、switch语句、变量声明属于语句&#xff0c;不是表达式&#xff0c;不能出现在{}或{{}}中 <!--vue--> <template><div><p>I have…

Java开发面试题分享

目录 1、简述MyISAM和InnoDB的区别 2、简述Hash和B树索引的区别 3、简述MyBatis的实现逻辑 4、#{}和${}的区别 5、简述Mybatis的优缺点 6、当实体类中的属性名和表中的字段名不一样时怎么办&#xff1f; 7、resultType与resultMap的区别 8、如何执行批量插入 9、Mybat…

Unity自己实现的中英文的切换(简单好抄)

关键技术&#xff08;读取文件的方法&#xff0c;Split()分割字符串&#xff09; 1.搭建一个这样的场景&#xff0c;场景中有3个文本&#xff08;用新版的&#xff09;&#xff0c;一个空对象&#xff0c;一个按钮 2.编写翻译文本&#xff08;编写一个txt文本&#xff0c;在文…

jq命令简易教程——Linux中处理JSON数据的利器

在shell脚本中&#xff0c;当我们需要对JSON数据&#xff08;例如ceph、kubernetes等一些命令的输出&#xff0c;或是调用API获得的响应&#xff09;进行处理和提取时&#xff0c;如果使用传统的文本三剑客sed、awk和grep&#xff0c;命令将会非常臃肿不可读。虽然这三个命令在…

腾讯云视频点播配置说明 | Modstart

开通云点播 开通云点播 云点播VOD_音视频点播_直播回看_音视频上传、存储转码AI处理方案-腾讯云 获取腾讯云 SecretId 和 SecretSecret 注册并且登录 腾讯云

14. 【Android教程】文本输入框 EditText

在上一节我们讲到了 TextView&#xff0c;它用来显示一段文本。这一节可以算作成是 TextView 的延续&#xff0c;因为从功能上 EditText 在 TextView 的基础之上多了一个输入的功能&#xff1b;从代码上 EditText 是继承自 TextView 的子类&#xff0c;所以我们可以大胆的理解为…

下载python电子书

下面展示一些 内联代码片。 import requests from lxml import etree from urllib import parse from pprint import pprint from tqdm import tqdm class PythonBook: def init(self): self.url“https://m.jb51.net/books/list476_1.html” self.url_page“https://m.jb51.n…

数字乡村发展新模式:科技创新引领农业现代化与乡村振兴协同发展

随着信息技术的飞速发展&#xff0c;数字乡村已成为新时代农业现代化与乡村振兴协同发展的新模式。科技创新作为推动这一模式的核心动力&#xff0c;正引领着乡村产业结构的优化升级&#xff0c;促进农村经济的全面振兴&#xff0c;让农民在现代化的进程中共享发展成果。 一、科…

transformer上手(1) —— transformer介绍

1 起源与发展 2017 年 Google 在《Attention Is All You Need》中提出了 Transformer 结构用于序列标注&#xff0c;在翻译任务上超过了之前最优秀的循环神经网络模型&#xff1b;与此同时&#xff0c;Fast AI 在《Universal Language Model Fine-tuning for Text Classificat…

vue页面跳转过渡动画与防止抖动

目录 整页跳转动画页面抖动我的代码 整页跳转动画 总是看到别人的页面有个淡入淡出效果&#xff0c;但是自己一直不知道怎么实现&#xff0c;感觉不能是每个组件都加一个动画&#xff0c;于是我去看了vue的官方文档。 官方给了这两个东西&#xff1a; <transition> 元…

STM32存储左右互搏 SDIO总线读写SD/MicroSD/TF卡

STM32存储左右互搏 SDIO总线读写SD/MicroSD/TF卡 SD/MicroSD/TF卡是基于FLASH的一种常见非易失存储单元&#xff0c;由接口协议电路和FLASH构成。市面上由不同尺寸和不同容量的卡&#xff0c;手机领域用的TF卡实际就是MicroSD卡&#xff0c;尺寸比SD卡小&#xff0c;而电路和协…

基于Java SpringBoot+Vue的体育用品库存管理系统

博主介绍&#xff1a;✌IT徐师兄、7年大厂程序员经历。全网粉丝15W、csdn博客专家、掘金/华为云//InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3…

力扣739. 每日温度

Problem: 739. 每日温度 文章目录 题目描述思路复杂度Code 题目描述 思路 若本题目使用暴力法则会超时&#xff0c;故而使用单调栈解决&#xff1a; 1.创建结果数组res&#xff0c;和单调栈stack&#xff1b; 2.循环遍历数组temperatures&#xff1a; 2.1.若当stack不为空同时…

如何判断一个linux机器是物理机还是虚拟机

https://blog.csdn.net/qq_32262243/article/details/132571117 第一种方式&#xff1a;dmesg命令 [rootnshqae01adm03 ~]# dmesg | grep -i hypervisor [ 0.000000] Hypervisor detected: Xen PV [ 1.115297] VPMU disabled by hypervisor. 在我的机器上 dmesg也是能够用来判…

【C语言】扫雷【附源码】

一、扫雷游戏规则 尽快找到雷区中的所有不是地雷的格子,而不许踩到地雷。点开的数字是几&#xff0c;则说明该数字旁边的8个位置中有几个雷&#xff0c;如果挖开的是地雷&#xff0c;则会输掉游戏。 二、代码思路&#xff1a; 宏定义&#xff1a; Row 和 Col 定义了棋盘的行数和…

计算机研究生规划

一、计算机研究生技术栈 两条腿走路: 左侧工程实践能力&#xff1a;要掌握python编程语言&#xff0c;它和机器学习、神经网络&#xff08;这两门几乎是必须掌握的技能&#xff09;的学习有很大关系 右侧学术创新能力 二、编程语言能力提升 左边基础&#xff0c;右边教你写…

在ubuntu系统上安装ffmpeg支持rrweb使用rrvideo对视频文件转mp4格式遇到的一些问题及解决办法

在ubuntu系统上安装ffmpeg支持rrweb使用rrvideo对视频文件转mp4格式遇到的一些问题及解决办法 1,ubuntu系统上安装ffmpeg4.4.1稳定版本1,ubuntu系统上安装ffmpeg4.4.1稳定版本 按照ChatGPT3.5来 sudo apt updatesudo apt install build-essential git sudo apt-get instal…

上传应用程序到苹果应用商店的工具和要点

引言 在今天的移动应用市场中&#xff0c;将应用程序上传到苹果应用商店&#xff08;App Store&#xff09;是许多开发者的首要任务之一。然而&#xff0c;不同操作系统下的开发者可能需要使用不同的工具和遵循不同的要求来完成这一任务。本文将介绍在 macOS、Windows 和 Linu…

蓝桥杯算法题:练功

【问题描述】 小明每天都要练功&#xff0c;练功中的重要一项是梅花桩。 小明练功的梅花桩排列成 n 行 m 列&#xff0c;相邻两行的距离为 1&#xff0c;相邻两列的距离也为 1。 小明站在第 1 行第 1 列上&#xff0c;他要走到第 n 行第 m 列上。小明已经练了一段时间&#xff…

gcc/g++:编译阶段翻译成平台汇编代码

编译阶段翻译成平台汇编代码&#xff0c;是在预编译阶段上加码&#xff0c;将C/C代码翻译成平台相关的汇编代码。 示例&#xff1a; 1&#xff09;用户程序 /*brief test demo-for-compile-to-asm? show you hereauthor wenxuanpeiemail 15873152445163.com(query for any q…