tp8 使用rabbitMQ(3)发布/订阅

发布/订阅

当我们想把一个消息,发送给 多个消费者的时候,我们把这种模式叫做发布/订阅模式,比如我们做两个消费者,其中一个消费者把消息写入磁盘中,别一个消费者把消息结果输出到屏幕上,就要用到发布订阅模式

  • 发布者(producer)是发布消息的应用程序。
  • 队列(queue)用于消息存储的缓冲。
  • 消费者(consumer)是接收消息的应用程序。

下面的图才是 rabbitmq 的完整模式, 中间是有交换机的
在这里插入图片描述

交换机

RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。

我们之前的 简单队列和工作队列中,没有提来交换机的概念。

默认交换机

当我们使用RabbitMQ时,如果不指定交换机的类型,那么Rabbit会使用默认的一个交换机,这个默认的交换机类型是一个直连交换机(direct),后续新建的队列(queue)都会自动绑定到这个默认交换机上,绑定的路由键就是队列的名称,注意这个默认交换机的名称是一个空字符串 " "

交换机的种类有多种

直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)
头交换机的性能不好, 基本不用
用的最多的还是 扇形交换机 相当于是广播
前面两节中,我们只使用了下面的代码,其实是使用的默认交换机,没有定义,直接使用了

$channel->basic_publish($msg, '', 'hello');

发布订阅模式中,我们使用 扇形交换机 fanout 代码如下

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

上面两段代码比较,第一段,因为使用了默认的交换机,所以没有交换机的定义语句, 但是在发布的时候,中间那个参数是 “”,这样就指定了默认交换机的名称, 第二段,我们指明了要使用的交换机 fanout 所以在发布的时候,使用的是自定义的交换机名称


因为有了交换机,生产者代码中只需要把 message 发送给交换机就可以了, 所以生产者中不需要创建队列,创建队列放到 消费者中就可以了,(如果我们一定要把创建队列的时机放在生产者中,也是可以的, 个人根据需要灵活应用)

交换机和队列的绑定(这里应该是在消费者代码中出现的)

我们创建了交换机,并且有了N个队列,它们之间要建立绑定关系,才可以分发到相应有绑定的队列中

$channel->queue_bind($queue_name, 'hello');  //这样就把队列名称和交换机名称做了绑定

下面的 完整的代码示例

生产者

<?php
declare (strict_types = 1);namespace app\command;use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class PubSubMQProduce extends Command
{protected function configure(){// 指令配置$this->setName('pubsubmqproduce')->setDescription('发布订阅模式的生产者');}protected function execute(Input $input, Output $output){//获取连接$connection = $this->getConnectRabbitMQ();//创建通道$channel = $connection->channel();//创建交换机/*** params exchange  自定义交换机名称* params type  交换机的类型, 一般都会使用 扇形(fanout)* params passive 是否消极声名* params durable 是否持久化* params auto_delete 是否自动删除* params internal 设置是否内置的, true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式* params  nowait 相当于做一个异步版的声明,不等待返回,就让程序继续执行*/$channel->exchange_declare("exchangeName","fanout",false,false,false,false,false);//现在生产者只需要把消息发给交换机就可以了,所以不用在生产者中创建队列了(当然,想创建也是可以的)for ($i = 0; $i < 20; $i++) {$msgArr = ["name"=>"haha".$i,"age"=>'10'.$i,"sex"=>"female".$i];$msg = new AMQPMessage(json_encode($msgArr),["delivery_mode"=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);sleep(1);$channel->basic_publish($msg,"exchangeName");}$channel->close();$connection->close();}protected function getConnectRabbitMQ(){try{$connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");return $connection;}catch(Exception $e){throw new Exception("队列连接失败");}}
}

消费者

<?php
declare (strict_types = 1);namespace app\command;use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class PubSubMQConsumer extends Command
{protected function configure(){// 指令配置$this->setName('pubsubmqconsumer')->setDescription('发布订阅模式的消费者');}protected function execute(Input $input, Output $output){$connection = $this->connectRabbitMQ();$channel = $connection->channel();//创建两个队列$channel->queue_declare("queueName1",false,false,false,false,false);$channel->queue_declare("queueName2",false,false,false,false,false);//绑定交换机和队列,交换机的名称是在生产者中定义的$channel->queue_bind("queueName1","exchangeName");$channel->queue_bind("queueName2","exchangeName");//设置消息处理函数$callback1 = function($msg){$msgArr = json_decode($msg->body,true);echo "这是(显示)处理数据的队列NO1  ".$msgArr["name"]."-11-".$msgArr["age"]."-11-".$msgArr["sex"].PHP_EOL;$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);   //这里让就是消息的应答了};$callback2 = function($msg){$msgArr = json_decode($msg->body,true);echo "这是(保存)处理数据的队列NO2  ".$msgArr["name"]."-22-".$msgArr["age"]."-22-".$msgArr["sex"].PHP_EOL;$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);   //这里让就是消息的应答了};$channel->basic_consume("queueName1","",false,false,false,false,$callback1);$channel->basic_consume("queueName2","",false,false,false,false,$callback2);while(count($channel->callbacks)){$channel->wait();}}protected function connectRabbitMQ(){try{$connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");return $connection;}catch(Exception $e){throw new Exception("队列连接失败");}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/178446.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

产品化和商品化

我们经常会在IT产业听过以下岗位&#xff1a; 1、产品序列&#xff1a;产品行销经理 2、产品序列&#xff1a;产品经理、需求分析师、产品详细设计工程师、UIUE设计师 3、产品序列&#xff1a;业务架构师、应用架构师、数据架构师、技术架构师 4、研发序列&#xff1a;创新原型…

Java 图片验证码需求分析

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; 图片验证码 需求分析 连续因输错密码而登录失败时&#xff0c;记录其连续输错密码的累加次数&#xff1b;若在次数小于5时&#xff0c;用户输入正确的密码并成功登录…

前K个高频单词(Java详解)

一、题目描述 给定一个单词列表 words 和一个整数 k &#xff0c;返回前 k 个出现次数最多的单词。 返回的答案应该按单词出现频率由高到低排序。如果不同的单词有相同出现频率&#xff0c; 按字典顺序 排序。 示例1&#xff1a; 输入: words ["i", "love&…

浅谈硬件连通性测试几大优势

硬件连通性测试是确保硬件系统正常运行、提高系统可靠性和降低生产成本的关键步骤。在现代工程和制造中&#xff0c;将连通性测试纳入生产流程是一个明智的选择&#xff0c;有助于确保硬件产品的质量和性能达到最优水平。本文将介绍硬件连通性测试的主要优势有哪些! 一、提高系…

游戏测试和软件测试有什么区别

针对手游而言&#xff0c;游戏测试的本质是APP&#xff0c;所以不少手游的测试方式与APP测试异曲同工&#xff0c;然而也有所不同。APP更多的是具有一种工具&#xff0c;一款APP好不好用不重要&#xff0c;关键点在于实用。而游戏则具有一种玩具属性&#xff0c;它并不见得实用…

基于Python+requests编写的自动化测试项目-实现流程化的接口串联

框架产生目的&#xff1a;公司走的是敏捷开发模式&#xff0c;编写这种框架是为了能够满足当前这种发展模式&#xff0c;用于前后端联调之前&#xff08;后端开发完接口&#xff0c;前端还没有将业务处理完毕的时候&#xff09;以及日后回归阶段&#xff0c;方便为自己腾出学(m…

图像异常检测研究现状综述

论文标题&#xff1a;图像异常检测研究现状综述 作者&#xff1a;吕承侃 1, 2 沈 飞 1, 2, 3 张正涛 1, 2, 3 张 峰 1, 2, 3 发表日期&#xff1a;2022年6月 阅读日期 &#xff1a;2023年11月28 研究背景&#xff1a; 图像异常检测是计算机视觉领域的一个热门研究课题, 其目…

leetCode 39.组合总和 + 回溯算法 + 剪枝 + 图解 + 笔记

39. 组合总和 - 力扣&#xff08;LeetCode&#xff09; 给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target &#xff0c;找出 candidates 中可以使数字和为目标数 target 的 所有 不同组合 &#xff0c;并以列表形式返回。你可以按 任意顺序 返回这些组合 can…

2015年五一杯数学建模A题不确定性条件下的最优路径问题解题全过程文档及程序

2015年五一杯数学建模 A题 不确定性条件下的最优路径问题 原题再现 目前&#xff0c;交通拥挤和事故正越来越严重的困扰着城市交通。随着我国交通运输事业的迅速发展&#xff0c;交通“拥塞”已经成为很多城市的“痼疾”。在复杂的交通环境下&#xff0c;如何寻找一条可靠、快…

HarmonyOS 数据持久化 Preferences 如何在页面中对数据进行读写

背景介绍 最近在了解并跟着官方文档尝试做一个鸿蒙app 小demo的过程中对在app中保存数据遇到些问题 特此记录下来 这里的数据持久化以 Preferences为例子展开 废话不多说 这里直接上节目(官方提供的文档示例:) 以Stage模型为例 1.明确preferences的类型 import data_prefer…

印刷企业建设数字工厂管理系统的工作内容有哪些

随着科技的不断进步&#xff0c;数字工厂管理系统在印刷企业中的应用越来越广泛。这种系统可以有效地整合企业内外资源&#xff0c;提高生产效率&#xff0c;降低生产成本&#xff0c;并为印刷企业提供更好的业务运营与管理模式。本文将从以下几个方面探讨印刷企业建设数字工厂…

如何用postman实现接口自动化测试

postman使用 开发中经常用postman来测试接口&#xff0c;一个简单的注册接口用postman测试&#xff1a; 接口正常工作只是最基本的要求&#xff0c;经常要评估接口性能&#xff0c;进行压力测试。 postman进行简单压力测试 下面是压测数据源&#xff0c;支持json和csv两个格…

Kibana部署

服务器 安装软件主机名IP地址系统版本配置KibanaElk10.3.145.14centos7.5.18042核4G软件版本&#xff1a;nginx-1.14.2、kibana-7.13.2-linux-x86_64.tar.gz 1. 安装配置Kibana &#xff08;1&#xff09;安装 [rootelk ~]# tar zxf kibana-7.13.2-linux-x86_64.tar.gz -C…

easyExcel 注解开发 快速以及简单上手 以及包含工具类

easyExcel 简单快速使用 1. mevan 这里版本我这里选的是 poi 4.1.2和 ali的easyexcel 的 3.3.1。 因为阿里easy是根据poi的依赖开发的有关系&#xff0c;两者需要对应要不然就会有很多bug和错误在运行时发生。需要版本对应&#xff0c;然而就是easy的代码也会有bug这个版本是比…

运动鞋品牌识别

一、前期工作 1. 设置GPU from tensorflow import keras from tensorflow.keras import layers,models import os, PIL, pathlib import matplotlib.pyplot as plt import tensorflow as tfgpus tf.config.list_physical_devices("GPU")if gpus:gpu0 …

Leetcode—18.四数之和【中等】

2023每日刷题&#xff08;四十一&#xff09; Leetcode—18.四数之和 实现代码 class Solution { public:vector<vector<int>> fourSum(vector<int>& nums, int target) {vector<vector<int>> ans;sort(nums.begin(), nums.end());int n …

chatgpt prompt提示词

ChatGPT 最近十分火爆&#xff0c;今天我也来让 ChatGPT 帮我阅读一下 Vue3 的源代码。 都知道 Vue3 组件有一个 setup函数。那么它内部做了什么呢&#xff0c;今天跟随 ChatGPT 来一探究竟。 实战 1.setup setup 函数在什么位置呢&#xff0c;我们不知道他的实现函数名称&…

12 网关实战:Spring Cloud Gateway基础理论

为什么需要网关? 传统的单体架构中只有一个服务开放给客户端调用,但是微服务架构中是将一个系统拆分成多个微服务,那么作为客户端如何去调用这些微服务呢?如果没有网关的存在,只能在本地记录每个微服务的调用地址。 无网关的微服务架构往往存在以下问题: 客户端多次请求…

人机交互3——多主题多轮对话

1.主动切换 2.被动切换 3.多轮状态记忆

3.2 Windows驱动开发:内核CR3切换读写内存

CR3是一种控制寄存器&#xff0c;它是CPU中的一个专用寄存器&#xff0c;用于存储当前进程的页目录表的物理地址。在x86体系结构中&#xff0c;虚拟地址的翻译过程需要借助页表来完成。页表是由页目录表和页表组成的&#xff0c;页目录表存储了页表的物理地址&#xff0c;而页表…