python rocketmq生产者消费者

  1. 安装依赖包
pip install rocketmq
  1. 生产者
    需要注意的是假如你用的java SDK 需要只是UNinname
    我们可以看到下列代码设置了tag以及key,在页面可以根据key查找消息

from rocketmq.client import Producer, Message
import jsonproducer = Producer('PID-XXX')
# producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
producer.set_namesrv_addr('192.168.214.134:9876')
producer.set_max_message_size(1024*1024)# producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
producer.start()msg = Message('ellis')
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body(json.dumps({"key1":"value1"}))
ret = producer.send_sync(msg)print(ret.msg_id,ret.offset,ret.status)
producer.shutdown()
  1. 消费方式PullConsumer(全部消费)(可重复消费)

```python
from rocketmq.client import PullConsumerconsumer = PullConsumer('CID_XXX')
consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
# consumer.set_namesrv_addr('127.0.0.1:9887')
consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
consumer.start()for msg in consumer.pull('YOUR-TOPIC'):print(msg.id, msg.body)consumer.shutdown()
  1. 消费方式PushConsumer(即时消费)(不可重复消费)
import timefrom rocketmq.client import PushConsumerdef callback(msg):print(msg.id, msg.body.decode())consumer = PushConsumer('ellis1')
consumer.set_group("ellis1")
consumer.set_instance_name("ellis1")
# consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
consumer.set_namesrv_addr('127.0.0.1:9876')
# consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
其中tags是要订阅的TAG
consumer.subscribe('ellis', callback,TAGS)
consumer.start()while True:time.sleep(3600)consumer.shutdown()
  1. 生产者发送消息选择队列,以及设置顺序
from rocketmq.client import Producer, Message
import jsondef queue_selector(mq_size, msg, arg):return 1producer = Producer('PID-XXX')
# producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
producer.set_namesrv_addr('192.168.214.134:9876')
producer.set_max_message_size(1024*1024)# producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
producer.start()msg = Message('ellis')
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body(json.dumps({"key1":"value1"}))ret = producer.send_oneway_orderly(msg,arg=1,queue_selector=queue_selector)producer.shutdown()
  1. 消费者顺序消费
import timefrom rocketmq.client import PushConsumerdef callback(msg):print(msg)print(msg.id, msg.body.decode())consumer = PushConsumer('ellis1',orderly=True)
consumer.set_group("ellis3")
consumer.set_instance_name("ellis1")
# consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
consumer.set_namesrv_addr('127.0.0.1:9876')
# consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
consumer.subscribe('ellis', callback)
consumer.start()while True:time.sleep(3600)consumer.shutdown()

https://github.com/apache/rocketmq-clients

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/15378.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【使用维纳滤波进行信号分离】基于维纳-霍普夫方程的信号分离或去噪维纳滤波器估计(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

BUU [网鼎杯 2020 朱雀组]phpweb

BUU [网鼎杯 2020 朱雀组]phpweb 众生皆懒狗。打开题目,只有一个报错,不知何从下手。 翻译一下报错,data()函数:,还是没有头绪,中国有句古话说的好“遇事不决抓个包” 抓个包果然有东西,仔细一看这不就分别是函数和参…

MySQL 数据库 【增删查改(二)】

目录 一、表的设计 1、一对一 2、一对多 3、多对多 二、新增 三、查询 1、聚合查询 (1)聚合函数: (2) group by 子句 (3)having 2、联合查询 (1)内连接 (2)外连接 (3)自链接 (4)…

142. 环形链表 II

142. 环形链表 II 中等 2.2K 相关企业 给定一个链表的头节点 head ,返回链表开始入环的第一个节点。 如果链表无环,则返回 null。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了表示给定…

服务器(容器)开发指南——SSH打洞开发

文章目录 SSH容器服务打包测试服务文件镜像打包 SSH打洞开发部署带SSH的容器SSH连接服务器(容器内部)SSH访问容器内的缺陷 IDE远程SSH开发VSCode远程SSH开发Jetbrains系列产品SSH远程开发 在进行定制化的服务开发时,我们有时候只能在固定的服…

Day 9 C++ 内存分区模型

目录 内存四区 代码区 全局区 栈区 堆区 内存四区意义: 程序运行前后内存变化 程序运行前 代码区 全局区 程序运行后 栈区 堆区 new操作符 基本语法 创建 释放(delete) 内存四区 代码区 代码区(Code Segment&…

没有使用IaC的DevOps系统都是耍流氓 |IDCF

作者:徐磊 文章首发地址:https://smartide.cn/zh/blog/2022-1010-iac/ 作为现代软件工程的基础实践,基础设施即代码(Infrastructure as Code, IaC)是云原生、容器、微服务以及DevOps背后的底层逻辑。应该说&#xff…

layui框架学习(34:数据表格_基本用法)

Layui中的数据表格模块table支持动态显示并操作表格数据,之前学习的页面元素中的表格主要是通过使用样式及属性对表格样式进行设置,而table模块支持动态显示、分页显示、排序显示、搜索等形式各样的动态操作,参考文献3中也给出了数据表格的各…

WPF实战学习笔记31-登录界面全局通知

UI添加消息聚合器 <md:Snackbarx:Name"LoginSnakeBar"Grid.ColumnSpan"2"Panel.ZIndex"1"MessageQueue"{md:MessageQueue}" />注册提示消息 文件&#xff1a;Mytodo.Views.LoginView.cs构造函数添加内容 //注册提示消息 aggre…

JavaScript---事件对象event

获取事件对象&#xff1a; 事件对象&#xff1a;是个对象&#xff0c;这个对象里有事件触发时的相关信息&#xff0c;在事件绑定的回调函数的第一个参数就是事件对象&#xff0c;一般命名为event、ev、e eg: 元素.addEventListener(click,function (e){}) 部分常用属性&…

IDEA好用的插件总结

IdeaVim 这个看个人喜好&#xff0c;我比较喜欢用vim&#xff0c;并且支持自定义修改按键绑定alibaba java code guidelines alibaba的java编程规范plantUML 绘制UML&#xff0c;支持语言显示plantUML integration 能够直接将代码转化为UML图&#xff0c;非常方便rainbow brack…

Git初始化

查看git版本 git --version 设置Git的配置变量 方法&#xff1a; 修改全局文件&#xff08;用户主目录下.gitconfig&#xff09;修改系统文件&#xff08;如/etc/gitconfig&#xff09; 用户姓名和邮件地址 修改用户名和邮件地址 git config --global user.name "用…

LLM系列 | 18 : 如何用LangChain进行网页问答

简介 一夕轻雷落万丝&#xff0c;霁光浮瓦碧参差。 紧接之前LangChain专题文章&#xff1a; 15:如何用LangChain做长文档问答&#xff1f;16:如何基于LangChain打造联网版ChatGPT&#xff1f;17:ChatGPT应用框架LangChain速成大法 今天这篇小作文是LangChain实践专题的第4…

细讲TCP三次握手四次挥手(四)

常见面试题 为什么TCP连接的时候是3次&#xff1f;2次不可以吗&#xff1f; 因为需要考虑连接时丢包的问题&#xff0c;如果只握手2次&#xff0c;第二次握手时如果服务端发给客户端的确认报文段丢失&#xff0c;此时服务端已经准备好了收发数(可以理解服务端已经连接成功)据…

Excel Power View教程_编程入门自学教程_菜鸟教程-免费教程分享

教程简介 Excel Power View 是一种数据可视化技术&#xff0c;用于创建交互式图表、图形、地图和其他视觉效果&#xff0c;以便直观呈现数据。 Excel Power View中&#xff0c;可以快速创建各种可视化效果&#xff0c;从表格和矩阵到饼图、条形图和气泡图&#xff0c;以及多个…

Qt: 查看qmake相关参数设置

Qt开发中&#xff0c;经常会遇到qmake相关问题&#xff0c;比如同时安装了多个Qt版本的情况。比如我的情况是系统自带了Qt 5.12.8, 但是开发中遇到一些兼容性问题&#xff0c;于是又手动安装了5.9.8。 查看qmake版本&#xff0c;qmake -v, 虽然项目中已经指定了5.9.8, 但是系统…

爬虫小白-如何调试列表页链接与详情链接不一样并三种方式js逆向解决AES-ECB

目录 一、网站分析二、定位监听三、熟悉AES-ECB四、调试分析五、node运行js六、Python执行js 一、网站分析 三年前的案例&#xff0c;我的原始文章网站 &#xff0c;如图我们直接点击标题进入到详情页&#xff0c;链接会发生跳转&#xff0c;且与我们在详情看到的链接&#xf…

React 中的常见 API 和生命周期函数

目录 useStateuseEffectuseRefdangerouslySetInnerHTML生命周期函数 constructorcomponentDidMountstatic getDerivedStateFromPropsshouldComponentUpdatecomponentDidUpdatecomponentWillUnmount useState useState 是 React 的一个 Hook&#xff0c;用于在函数组件中添加…

iOS开发-格式化时间显示刚刚几分钟前几小时前等

iOS开发-格式化时间显示刚刚几分钟前几小时前等 在开发中经常遇到从服务端获取的时间戳&#xff0c;需要转换显示刚刚、几分钟前、几小时前、几天前、年月日等格式。 主要用到了NSCalendar、NSDateComponents这两个类 NSString *result nil;NSCalendarUnit components (NSC…

【后端面经】微服务构架 (1-6) | 隔离:如何确保心悦会员体验无忧?唱响隔离的鸣奏曲!

文章目录 一、前置知识1、什么是隔离?2、为什么要隔离?3、怎么进行隔离?A) 机房隔离B) 实例隔离C) 分组隔离D) 连接池隔离 与 线程池隔离E) 信号量隔离F) 第三方依赖隔离二、面试环节1、面试准备2、基本思路3、亮点方案A) 慢任务隔离B) 制作库与线上库分离三、章节总结 …