2-rabbitmq-发布订阅、发布订阅高级之Routing(按关键字匹配)、发布订阅高级之Topic(按关键字模糊匹配)、基于rabbitmq实现rpc

1 发布订阅
2 发布订阅高级之Routing(按关键字匹配)
2.1 发布订阅高级之Topic(按关键字模糊匹配)
3 基于rabbitmq实现rpc

1 发布订阅

发布者

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()channel.exchange_declare(exchange='m1',exchange_type='fanout')channel.basic_publish(exchange='m1',routing_key='',body='lqz nb')connection.close()

订阅者(启动几次订阅者会生成几个队列)

import pikacredentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m1',exchange_type='fanout')# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m1',queue=queue_name)def callback(ch, method, properties, body):print("消费者接受到了任务: %r" % body)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)channel.start_consuming()

1 发布订阅高级之Routing(按关键字匹配)

发布者

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()channel.exchange_declare(exchange='m2',exchange_type='direct')channel.basic_publish(exchange='m2',routing_key='bnb', # 多个关键字,指定routing_keybody='lqz nb')connection.close()

订阅者1

import pikacredentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m2',exchange_type='direct')# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='bnb')def callback(ch, method, properties, body):print("消费者接受到了任务: %r" % body)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)channel.start_consuming()

订阅者2

import pikacredentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m2',exchange_type='direct')# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')def callback(ch, method, properties, body):print("消费者接受到了任务: %r" % body)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)channel.start_consuming()

2.1 发布订阅高级之Topic(按关键字模糊匹配)

发布者

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()channel.exchange_declare(exchange='m3',exchange_type='topic')channel.basic_publish(exchange='m3',# routing_key='lqz.handsome', #都能收到routing_key='lqz.handsome.xx', #只有lqz.#能收到body='lqz nb')connection.close()

订阅者1

只能加一个单词

#可以加任意单词字符

import pikacredentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m3',exchange_type='topic')# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='lqz.#')def callback(ch, method, properties, body):print("消费者接受到了任务: %r" % body)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)channel.start_consuming()

订阅者2

import pikacredentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()# exchange='m1',exchange(秘书)的名称
# exchange_type='topic' , 模糊匹配
channel.exchange_declare(exchange='m3',exchange_type='topic')# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='lqz.*')def callback(ch, method, properties, body):queue_name = result.method.queue # 发送的routing_key是什么print("消费者接受到了任务: %r" % body)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)channel.start_consuming()

3 基于rabbitmq实现rpc

服务端

import pika
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()# 起翰监听任务队列
channel.queue_declare(queue='rpc_queue')def on_request(ch, method, props, body):n = int(body)response = n + 100# props.reply_to  要放结果的队列.# props.correlation_id  任务ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id= props.correlation_id),body=str(response))ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)
channel.basic_consume( queue='rpc_queue',on_message_callback=on_request,)
channel.start_consuming()

客户端

import pika
import uuidclass FibonacciRpcClient(object):def __init__(self):credentials = pika.PlainCredentials("admin", "admin")self.connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))self.channel = self.connection.channel()# 随机生成一个消息队列(用于接收结果)result = self.channel.queue_declare(queue='',exclusive=True)self.callback_queue = result.method.queue# 监听消息队列中是否有值返回,如果有值则执行 on_response 函数(一旦有结果,则执行on_response)self.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response, auto_ack=True)def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())# 客户端 给 服务端 发送一个任务:  任务id = corr_id / 任务内容 = '30' / 用于接收结果的队列名称self.channel.basic_publish(exchange='',routing_key='rpc_queue', # 服务端接收任务的队列名称properties=pika.BasicProperties(reply_to = self.callback_queue, # 用于接收结果的队列correlation_id = self.corr_id, # 任务ID),body=str(n))while self.response is None:self.connection.process_data_events()return self.responsefibonacci_rpc = FibonacciRpcClient()response = fibonacci_rpc.call(50)
print('返回结果:',response)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/217237.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

昇腾910安装驱动出错,降低Centos7.6的内核版本

零、问题描述: 在安装Atlas800-9000服务器的驱动的时候,可能会出现错误:Dkms install failed, details in : /var/log/ascend_seclog/ascend_install.log 如下所示: [rootlocalhost ~]# ./Ascend-hdk-910-npu-driver_23.0.rc3_l…

Git远程操作

目录 1.远程操作 1.1理解分布式版本控制系统 1.2远程仓库. 1.2.1新建远程仓库 1.2.2克隆远程仓库 1.2.3向远程仓库推送 1.2.4拉取远程仓库 1.3配置Git 1.3.1忽略特殊文件 1.3.2给命令配置别名 2.标签管理 2.1理解标签 2.2创建标签 2.3操作标签 1.远程操作 1.1理…

MacOS上配置Jenkins开机自启动

之前文章有写过,如何在Windows环境下,通过vbsbat脚本的组合拳实现Jenkins的开机自启动,最近换了电脑,又要搭建Jenkins了,顺带整理分享下MacOS上配置Jenkins开机自启动的方法。 具体配置步骤: 打开终端应用…

HTTP深度解析:构建高效与安全网络的关键知识

1. HTTP基础及其组件 我首先想和大家分享的是HTTP的基础知识。HTTP,即超文本传输协议,是互联网上最常用的协议之一。它定义了浏览器和服务器之间数据交换的规则,使得网页内容可以从服务器传输到我们的浏览器上。想象一下,每当你点…

Eclipse 将已有maven工程转为微服务

以下是将已有Maven工程转为微服务的步骤: 将已有的Maven工程中的每个模块作为独立的服务,每个模块都需要有独立的pom.xml文件,以便进行单独的构建和部署。 引入Spring Boot和Spring Cloud的依赖,以便于实现微服务化的相关功能&am…

为 PHP 引入 Python 生态的经验分享

编译安装 phpy 可以作为 PHP 的扩展,也可以作为 Python 的 C 模块。既可以在 PHP 代码中调用 Python 的库,也可以在 Python 中调用 PHP 的类和函数。 作为 Python 模块时依赖 PHP 的 embed SAPI ,检查 PHP 的目录中,确保存在 libphp.so ll /opt/php-8.…

19-数据结构-查找-散列查找

目录 一、散列查找结构思路图 二、哈希函数 三、解决冲突 1.开放地址法 1.1.线性探测法(线性探测再散列法) 1.2.平方探测法(二次探测再散列) 1.3.再散列法(双散列法) 2.拉链法 2.1简介 四、散列查…

如何在nacos中的配置在不同的环境服务下可实现配置共享

其实在微服务启动时,会去nacos读取多个配置文件,例如: [spring.application.name].yaml,例如:nacos-order-service.yaml[spring.application.name]-[spring.profiles.active].yaml,例如:nacos-o…

区块链技术的未来,了解去中心化应用的新视角

小编介绍:10年专注商业模式设计及软件开发,擅长企业生态商业模式,商业零售会员增长裂变模式策划、商业闭环模式设计及方案落地;扶持10余个电商平台做到营收过千万,数百个平台达到百万会员,欢迎咨询。 随着…

12月12日作业

设计一个闹钟 头文件 #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QTimerEvent> #include <QTime> #include <QTime> #include <QTextToSpeech>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass …

Linux实用操作(超级实用)

Linux实用操作篇-上篇&#xff1a;Linux实用操作-上篇-CSDN博客 Linux实用操作篇-下篇&#xff1a;Linux实用操作篇-下篇-CSDN博客 一、各类小技巧&#xff08;快捷键&#xff09; 1.1 ctrl c 强制停止 Linux某些程序的运行&#xff0c;如果想要强制停止它&#xff0c;可以…

VCG 标记使用(BitFlags)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 对于网格的每个单形,我们都有一个称为BitFlags的组件,该组件存储固定大小的32位向量,用于各种需求。管理这些标志的相关类:vcg::tri::UpdateFlags与vcg::tri::UpdateSelection。主要的标记有:删除标记、边界标记…

配置android sudio出现的错误

导入demo工程&#xff0c;配置过程参考&#xff1a; AndroidStudio导入项目的正确方式&#xff0c;修改gradle配置 错误&#xff1a;Namespace not specified. Specify a namespace in the module’s build file. 并定位在下图位置&#xff1a; 原因&#xff1a;Android 大括号…

使用docker编排容器

使用Dockerfile构建一个自定义的nginx 首先用docker拉一个nginx镜像 docker pull nginx拉取完成后&#xff0c;编辑一个Dockerfile文件 vim Dockerfile命令如下所示,FROM 后面跟的你的基础镜像&#xff0c;而run则是表示你构建镜像时需要执行的指令&#xff0c;下面的指令意…

day16_java多线程(入门了解)

多线程入门 一、线程和进程 进程 进程&#xff1a;是指一个内存中运行的应用程序&#xff0c;每个进程都有一个独立的内存空间和系统资源&#xff0c;一个应用程序可以同时运行多个进程&#xff1b;进程也是程序的一次执行过程&#xff0c;是系统运行程序的基本单位&#xff1…

Argon-Theme 轻盈、简洁、美观的 WordPress 主题-供大家学习研究参考

特性 轻盈美观 - 使用 Argon Design System 前端框架,细节精致,轻盈美观高度可定制化 - 可自定义主题色、布局(双栏/单栏/三栏)、顶栏、侧栏、Banner、背景图、日夜间模式不同背景、背景沉浸、浮动操作按钮等,提供了丰富的自定义选项夜间模式 - 支持日间、夜间、纯黑三种模式…

12.12

do_irq.c #include "key_it.h" int flag10; int flag20; int flag30; extern void printf(const char *fmt, ...); unsigned int i 0; void do_irq(void) {// 获取中断号&#xff0c;根据中断号的不同进行不同的中断处理int irqno;irqno GICC->IAR & 0x3ff…

【Flink on k8s】 -- flink kubernetes operator 1.7.0 发布

目录 前言 重大特性 1、自动伸缩 2、版本支持 3、savepoint 触发改进 4、jdk 支持 前言 Flink 官方博客于 2023-11-22 发布了 flink kubernetes operator 1.7.0 发布的消息。这个版本对自动缩放进行了大量的改进&#xff0c;包括与 Kubernetes 的完全分离&#xff0c;以便…

力扣每日一题----2008. 出租车的最大盈利

这题我们是怎么思考的呢&#xff1f; 已知有乘客最多30000个&#xff0c;有最多100000个地点&#xff0c;那么通过算法时间复杂度&#xff0c;不可能是O(n^2), 那么我们就可以去看题目&#xff0c;题目又是最多盈利多少元&#xff1f;那么很容易联想到动态规划&#xff0c;并…

湖南大学-电路与电子学-2021期末A卷★(不含解析)

【写在前面】 电路与电子学好像是从2020级开设的课程&#xff0c;故实际上目前只有2020与2021两个年级考过期末考试。 本份卷子的参考性很高&#xff0c;这是2020级的期末考卷。题目都是很典型的&#xff0c;每一道题都值得仔细研究透。 特别注意&#xff1a;看得懂答案跟写得…