RabbitMQ 入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)

发布/订阅

在上篇第二部分教程中,我们搭建了一个工作队列。每个任务之分发给一个工作者(worker)。在本篇教程中,我们要做的之前完全不一样——分发一个消息给多个消费者(consumers)。这种模式被称为“发布/订阅”。

为了描述这种模式,我们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。

在我们的这个日志系统中,所有正在运行的接收方程序都会接受消息。我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受者(receiver)把日志输出到屏幕上。

最终,日志消息被广播给所有的接受者(receivers)。

交换器(Exchanges)

前面的教程,我们发送消息到队列并从中取出消息。现在是时候介绍RabbitMq中完整的消息模型了。

让我们简单的概括一下之前的教程:

  • 发布者(producer)是发布消息的应用程序。
  • 队列(queue)用于消息存储的缓冲。
  • 消费者(consumer)是接收消息的应用程序。

RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。

发布者(producer)只需要把消息发送给一个交换器(exchange)。交换器非常简单,它一边从发布者方接收消息,一边把消息推入队列。交换器必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过exchange type来定义的。

有几个可供选择的交换器类型:AMQPEXTYPEDIRECT,AMQPEXTYPEFANOUT,AMQPEXTYPEHEADER orAMQPEXTYPETOPIC。我们在这里主要说明AMQPEXTYPE_FANOUT。先创建一个fanout类型的交换器,命名为logs:

$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->declare();

fanout交换器很简单,你可能从名字上就能猜测出来,它把消息发送给它所知道的所有队列。这正是我们的日志系统所需要的。

交换器列表

rabbitmqctl能够列出服务器上所有的交换器:

$ sudo rabbitmqctl list_exchanges Listing exchanges ... logs fanout amq.direct direct amq.topic topic amq.fanout fanout amq.headers headers ...done.

这个列表中有一些叫做amq.*的交换器。这些都是默认创建的,不过这时候你还不需要使用他们。

匿名的交换器

前面的教程中我们对交换器一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串(“”)默认的交换器。 回想我们之前是如何发布一则消息:

``` $exchange->publish($message, $routeKey);

```

exchange参数就是交换器的名称。空字符串代表默认或者匿名交换器:消息将会根据指定的routing_key分发到指定的队列。

在PHP的AMQP中如果exchange设置为匿名的话,是报错的:PHP Fatal error: Uncaught exception ‘AMQPExchangeException’ with message ‘Invalid exchange name given, must be between 1 and 255 characters long.’

现在,我们就可以发送消息到一个具名交换器了:

$exchange->publish($message, '');

临时队列

你还记得之前我们使用的队列名吗( hello和task_queue)?给一个队列命名是很重要的——我们需要把工作者(workers)指向正确的队列。如果你打算在发布者(producers)和消费者(consumers)之间共享同队列的话,给队列命名是十分重要的。

但是这并不适用于我们的日志系统。我们打算接收所有的日志消息,而不仅仅是一小部分。我们关心的是最新的消息而不是旧的。为了解决这个问题,我们需要做两件事情。

首先,当我们连接上RabbitMQ的时候,我们需要一个全新的、空的队列。我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。我们只要在调用$queue->declare();方法的时候,不提供queue参数就可以了:

$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declare();

这时候我们可以通过$queue->getName();获得已经生成的随机队列名。它可能是这样子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。

第二步,当与消费者(consumer)断开连接的时候,这个队列应当被删除。我们可以使用exclusive标识。

$queue->setFlags(AMQP_EXCLUSIVE);

绑定(Bindings)

我们已经创建了一个fanout类型的交换器和一个队列。现在我们需要告诉交换器如何发送消息给我们的队列。交换器和队列之间的关系我们称之为绑定(binding)。

$queue->bind($exchangeName, $queue->getName());

现在,logs交换器将会把消息添加到我们的队列中。

绑定列表。

你可以使用rabbitmqctl list_bindings队列出所有存在的绑定。.

整合代码

发布日志消息的程序看起来和之前的没有太大区别。最重要的改变就是我们把消息发送给logs交换器而不是匿名交换器。在发送的时候我们需要提供routingkey参数,但是它的值会被fanout交换器忽略。

emit_log.php

<?php$exchangeName = 'logs';
$message = empty($argv[1]) ? 'info:Hello World!' : ' '.$argv[1];$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
//$exchange->declare();
$exchange->declareExchange();$exchange->publish($message, '');
var_dump("[x] Sent $message");$connection->disconnect();

正如你看到的那样,在连接成功之后,我们声明了一个交换器,这一个是很重要的,因为不允许发布消息到不存在的交换器。

如果没有绑定队列到交换器,消息将会丢失。但这个没有所谓,如果没有消费者监听,那么消息就会被忽略。

receive_logs.php

<?php$exchangeName = 'logs';$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
//$exchange->declare();
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_EXCLUSIVE);
//$exchange->declare();
$queue->declareQueue();
$queue->bind($exchangeName, '');var_dump('[*] Waiting for messages. To exit press CTRL+C');
while (TRUE) {$queue->consume('callback');
}
$connection->disconnect();function callback($envelope, $queue) {$msg = $envelope->getBody();var_dump(" [x] Received:" . $msg);$queue->nack($envelope->getDeliveryTag());
}

这样我们就完成了。如果你想把日志保存到文件中,只需要打开控制台输入:

 php receive_logs.php > logs_from_rabbit.log

如果你想在屏幕中查看日志,那么打开一个新的终端然后运行:

 php receive_logs.php

当然还要发送日志:

php emit_log.php

 

使用方法,需要打开三个终端,两个用来运行receive_logs.php脚本【先启动】,一个用来运行emit_log.php脚本【后启动】,结果为两个receive_logs.php脚本会同时受到消息

运行效果:

 

转载于:https://www.cnblogs.com/-mrl/p/11102740.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/277450.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Proxmox VE 安装、配置、使用之第二章 Proxmox VE 的安全性

第一章 Proxmox VE 的安全性一、 角色及权限图2-1-1二、 Root 的密码安全性把 Root 的实际密码给出去, 在任何系统都是不符合安全规范的!所以在 Linux 里面, 最好把有需要 root 权限的使用者 放到 sudoers 的群组.# sudo usermod -a -G sudo testuserPVE 的权限设定方式 是由 u…

java allocate_Java中volatile关键字的最全总结

一、简介volatile是Java提供的一种轻量级的同步机制。Java 语言包含两种内在的同步机制&#xff1a;同步块(或方法)和 volatile 变量&#xff0c;相比于synchronized(synchronized通常称为重量级锁)&#xff0c;volatile更轻量级&#xff0c;因为它不会引起线程上下文的切换和调…

缩放手势 ScaleGestureDetector 源码解析,这一篇就够了

其实在我们日常的编程中&#xff0c;对于缩放手势的使用并不是很经常&#xff0c;这一手势主要是用在图片浏览方面&#xff0c;比如下方例子。但是&#xff08;敲重点&#xff09;&#xff0c;作为 Android 入门的基础来说&#xff0c;学习 ScaleGestureDetector 的使用&#x…

postgres的数据库备份和恢复

备份和恢复 一条命令就可以解决很简单: 这是备份的命令&#xff1a; pg_dump -h 127/0.0.1 -U postgres databasename > databasename.bak 指令解释&#xff1a; pg_dump 是备份数据库指令&#xff0c;164.82.233.54是数据库的ip地址&#xff08;必须保证数据库允许外部访…

java如何实现封装_java如何实现封装

Java中类的封装是如何实现的封装是将对象的信息隐藏在对象内部&#xff0c;禁止外部程序直接访问对象内部的属性和方法。 java封装类通过三个步骤实现&#xff1a; (1)修改属性的可见性&#xff0c;限制访问。 (2)设置属性的读取方法。 (3)在读取属性的方法中&#xff0c;添加对…

php token 验证,PHP如何实现Token验证

PHP如何实现Token验证首先将Token进行解析&#xff1b;然后根据解析出来的信息部分验证是否过期&#xff0c;如果未过期再将解析出的信息部分进行加密&#xff1b;最后将加密出来的数据和解析出来签名进行比对&#xff0c;如果相同则验证成功。示例代码&#xff1a;<?php f…

值得一用的Windows软件

该清单仅本人使用后所作推荐&#xff0c;可能会比较主观&#xff0c;所以仅供参考哈。可能某些软件链接会失效&#xff0c;可以自行百度搜索下载即可。 杀软 火绒安全&#xff1a;国内杀毒软件的一股清流&#xff0c;界面简洁&#xff0c;无推广。现在已经开启了 5.0 公测&…

Python字符串处理全攻略(四):常用内置方法轻松掌握

文章目录 引言Python字符串常用内置方法切片功能介绍语法示例注意事项 str.isalpha()功能介绍语法示例注意事项 str.isdigit()功能介绍语法示例注意事项总结 str.isalnum()功能介绍语法示例注意事项总结 str.isupper()功能介绍语法示例注意事项 islower()功能介绍语法示例注意事…

php空间限制磁盘限额,ORA-01536:超出表空间XXXX的空间限额

问题描述&#xff1a;在FMIS2600用户下进行某个DDL或DML操作时&#xff0c;提示&#xff1a;ORA-01536&#xff1a;超出表空间FMIS2600 的空间限额 或者 ORA-01950: 对表空间/*******************ORA-01536&#xff1a;超出表空间XXXX的空间限额*******************//*********…

01爬虫基本原理及Requests库下载

一、爬虫基本原理 1.什么是爬虫 ​ 爬虫就是爬取数据 2.什么是互联网&#xff1f; ​ 就是由一堆网络设备&#xff0c;把一台台的电脑互联在一起 3.互联网建立的目的 ​ 数据的传递和数据共享 4.什么是数据&#xff1f; ​ 例如&#xff1a; ​ 电商平台的商业信息&#xff08;…

php 怎么实现收藏功能,php收藏功能如何实现

php收藏功能如何实现php收藏功能的实现方法&#xff1a;首先创建好数据库表 &#xff1b;然后创建前台代码&#xff0c;实现登录界面&#xff1b;接着通过html实现收藏样式&#xff1b;最后使用php进行后台处理即可。推荐&#xff1a;《PHP视频教程》这是数据库表话不多说上代码…

quartus FIR仿真笔记

第一章&#xff1a; 最近百度了一些fir滤波器的资料&#xff0c;都没有自己想要的。容我吐槽一大段文字> 在旧版的quartus中&#xff0c;比如13.0&#xff0c;有两个fir滤波器的选项&#xff0c;如下所示&#xff1a; 网上很多都是讲不带II的那个&#xff0c;而在新版的quar…

git常用命令及分支简介

2019独角兽企业重金招聘Python工程师标准>>> 1、git基本命令 1&#xff09;git add 将想要快照的内容写入缓存区 2&#xff09;git status -s "AM" 状态的意思是&#xff0c;这个文件在我们将它添加到缓存之后又有改动 3&#xff09;git commit -m 第一次…

企业私有云部署im,视频服务

1&#xff0c;安全问题 2&#xff0c;员工跨地域 3&#xff0c;内部视频培训 考勤申请&#xff0c;设备借用申请 名片申请 会议室预订 审批 内网&#xff0c;局域网部署 Android源码 https://github.com/starrtc/android-demo ios源码https://github.com/starrtc/ios-demo

Leetcode怎么调试java代码,在Clion上调试LeetCode代码

在Clion上调试LeetCode代码在leetcode上做题调试起来总有些不方便&#xff0c;所以查阅了一些资料后&#xff0c;按以下配置&#xff0c;自我感觉效率还行&#xff0c;分享给大家。祝大家刷题愉快。并附上自己整理的leetcode400题题表。Leetcode400题&#xff1a;notion地址依赖…

来入门一下kotlin吧

Kotlin是什么&#xff1f; Kotlin是一种在java虚拟机上运行的静态类型的编程语言&#xff0c;被称之为 Android 世界的Swift&#xff0c;由 JetBrains 设计开发并开源。 Kotlin的优势&#xff01; Kotlin可以编译成java字节码&#xff0c;也可以编译成JavaScript。方便在没有ja…

ReactNative 触摸事件处理

ReactNative触摸事件处理 对RN触摸事件的捕获与冒泡机制的理解 组件A、B、C结构 组件A组件B组件C 捕获、冒泡机制 sequenceDiagram A->>A: 是否捕获&#xff1f;若是则停止向下一级传递 A->>B: B->>B: 是否捕获&#xff1f;若是则停止向下一级传递 B->&g…

程序员如何面试才能拿到offer

一、概述 面试&#xff0c;难还是不难&#xff1f;取决于面试者的底蕴&#xff08;气场技能&#xff09;、心态和认知及沟通技巧。面试其实可以理解为一场聊天和谈判&#xff0c;在这过程中有心理、思想上的碰撞和博弈。其实你只需要搞清楚一个逻辑&#xff1a;“面试官为什么会…

Generative Adversarial Learning Towards Fast Weakly Supervised Detection(CVPR2018)阅读笔记

弱监督目标检测相对于一般的目标检测任务来说&#xff0c;训练样本不需要实例级别的标注&#xff0c;只需要图片级别的标注&#xff0c;即告诉图片中有什么而不需标注位置信息&#xff0c;这种标注图片容易获取&#xff0c;能节省标注时间及精力。现有的大部分方法在进行若监督…

如何添加JWT生成的token在请求头中

前言 在我们使用JWT来做用户的验证时&#xff0c;我们登陆生成对应的token,并加入到请求的参数中发送到后台提供相关的权限校验。这个时候我们需要使用到传递请求头参数传递的问题&#xff0c;下面是两种方式。 1.ajax提交方式 1&#xff09;.方法一&#xff1a; $.ajax({ type…