【MQ02】基础简单消息队列应用

基础简单消息队列应用

在上一课中,我们已经学习到了什么是消息队列,有哪些消息队列,以及我们会用到哪个消息队列。今天,就直接进入主题,学习第一种,最简单,但也是最常用,最好用的消息队列模式。

最简单的队列功能

最简单的队列功能,无非就是将我们在数据结构与算法中学过的那个队列结构,变成一个外部功能组件。让各种语言和各种应用程序都可以通过这个队列来进行数据操作。这样的一个队列系统就称之为“消息队列中间件”。

一般,我们会将生产消息的程序,或者说,将数据放入到队列的一方称为 P (生产者,Producer);然后将队列称为Q(Queue);最后,将守候在队列前,等待从队列中获取数据的应用、程序或者代码段称为 C(消费者/客户端,Consumer)。

27021c06f480119d7c4c2696a2109768.png

这是 RabbitMQ 官网手册上的图,后面的相关图示我们也将直接使用它们的。在这个图中,有字母的部分就不多解释了。中间红色的一格一格的部分代表的就是 Q 。

通常来说,P 只管将数据放到 Q 中,Q 负责中间存储数据,然后 C 取出数据。数据的进出方式遵循经典数据结构队列中的规则,也就是 FIFO 先进先出。

是不是就和我们之前说过的一样,最简单的理解,它就是将队列这个数据结构抽成了一个第三方组件。这样就可以跨平台、跨应用、跨语言地进行数据存取了。

RebbitMQ 实现

好了,先来看 RabbitMQ 的实现。你需要先安装好 RabbitMQ ,我这里是使用的 Docker 安装的。

docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

然后,也要安装好 PHP 的 Composer 组件,用于操作 RabbitMQ 消息队列,PHP 需要开启 sockets 扩展。

composer require php-amqplib/php-amqplib

5672 是 RabbitMQ 的服务端口,15672 则是它自带的一个管理工具的访问端口。具体的内容大家可以到官方文档中进行更加深入的学习。当然,也可以使用虚拟机方式来搭建测试环境,这个大家看自己的喜好吧。

接下来,我们先实现 P 端的代码,也就是生产者向消息队列中添加数据。

// 2.rq.p.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道// 定义队列
$channel->queue_declare('hello', false, false, false, false);// 创建消息
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello'); // 将消息放入队列中echo "生产者向消息队列中发送信息:Hello World!";$channel->close();
$connection->close();

注释很清晰了吧,如果你之前没有用 PHP 操作过 RabbitMQ 的话,那么直接复制这段代码就可以了。其中比较特殊的是  channel ,它是共享单个 TCP 连接的轻量级信道。这个概念可能是 RabbitMQ 相较于其它消息队列系统比较特别的。然后就是消息内容是通过一个 AMQPMessage 对象承载的,这个 AMQP 其实就是 RabbitMQ 的核心协议。协议是通信双方能够相互看明白对方的基础,能够建立通信的基础,就像我们之前在 Redis 中学习过的 RESP 协议一样。这里大家只需要知道 RabbitMQ 是使用这个协议就好了,而且它也支持其它的一些协议。发送完消息之后,记得关闭连接哦。

好了,接下来是我们的消费者/客户端实现。

// 2.rq.c.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel(); // 获取频道// 定义队列
$channel->queue_declare('hello', false, false, false, false);echo "等待消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;// 定义接收数据的回调函数
$callback = function ($msg) {echo '接收到数据: ', $msg->body, PHP_EOL;
};
// 消费队列,获取到数据将调用 callback 回调函数
$channel->basic_consume('hello', '', false, true, false, false, $callback);// 频道是开启状态时,挂起程序,不停地执行
while ($channel->is_open()) {// 等待并监听频道中的队列信息// 发现上方 basic_consume 定义的队列有消息后// 就调用它对应的 callback$channel->wait();
}

看着好像多很多东西呀?其实上面一半和生产者是一样的。从中间开始,我们使用的是 Channel 对象的 basic_consume() 方法,这个方法最后有一个回调函数参数。然后在下面通过 wait() 方法持续监听队列中是否有数据。如果有数据了,就调用指定的回调函数。并将消息内容交给回调函数的参数。

注意哦,一般来说,消息队列的消费者,或者说是客户端,或者说是 C 端。大部分情况下可能都会是这样通过一个死循环挂起的。目的就是当我们运行起程序之后,可以不停地,不间断地一直处理队列中的消息数据。之前在学习 Swoole 时,另外如果你学习过 Go 语言的话,也会发现它们的 Http 服务中也是有类似的死循环代码来实现服务端挂起的。这个大家可以到我的 Swoole 系列中看看哦。

测试一下吧,运行一下生产者代码。

➜  source git:(master) ✗ php 2.rq.p.php 
生产者向消息队列中发送信息:Hello World!%

执行结束了,只是输出了一句话,没啥别的效果。那么我们再来运行一下消费者代码。

> php 2.rq.c.php 
等待消息,或者使用 Ctrl+C 退出程序。
接收到数据: Hello World!

可以看到,消费者先是输出了接收到的数据,这个数据其实是上一步我们运行生产者插入到队列中的数据。现在,这条数据打印出来了,其实就是相当于已经被我们消费了。当然,在实际业务中,你可能会对这些数据进行更复杂的业务操作。但在演示时,我这里只是打印了一下。然后,消费者会继续挂在这里等待下一条消息的到来。这时,你可以再次运行生产者代码,然后就会看到消费者这边直接就已经消费了。

Redis 实现

对于 Redis 的实现,其实非常简单,我们之前也已经学过的,那就是使用 List 这个数据结构。先来看生产者,直接 push 数据就好了。

// 2.rs.p.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);echo "生产者向消息队列中发送信息:Hello World!";$redis->lpush('hello', 'Hello World!');

消费者呢?当然就是我们之前已经学过的 pop 啦。

// 2.rs.p.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);echo "等待消息,或者使用 Ctrl+C 退出程序。", PHP_EOL;while(1){$data = $redis->rpop('hello');if ($data){echo '接收到数据: ', $data, PHP_EOL;}
}

在这里,我们使用的是 lpush() + rpop() 的方式,当然你也可以使用 rpush() + lpop() 的方式,大家思考一下,如果是 lpush() + lpop() ,是实现的什么数据结构呢?

同样地,在 Redis 的消费者中,我们也需要通过一个死循环挂起消费者,然后不停地获取数据进行处理。剩下的测试过程就和上面的 RabbitMQ 一样了。

我的实践

之前我就说过,我的消息队列实践不多。唯一的业务场景下实现的高并发消息队列应用其实就是上面的 Redis 这两段代码。真的,核心就是这点东西。但为了抗高并发,我是使用的 Swoole ,生产者是在 Hyperf 框架中通过控制器接收到数据后,直接就放到 Redis 里。然后消费者就是一个命令行,接着开 100 个协程,将获取到的消息数据丢给协程处理。

业务应用是游戏的日志上报,最高并发 20000+ ,入库日志量 3000万+ 。使用的是阿里云最便宜的那个 Redis 服务,4G 大小单实例的那款。

是不是见到了消息队列的恐怖能力。这个量级,对于任何消息队列应用来说问题都不大,RabbitMQ 被认为是比较慢的,但是,它的处理能力是每秒几万次请求。而 Redis ,就是以 Redis 的读写性能为基础的,大概每秒11万的读和8万的写。这个在之前的 Redis 学习中都已经说过了。

总结

今天通过代码,我们其实就已经学习到了整个消息队列中最核心的内容。没错,消息队列就是这么地简单,但又这么地实用。我的业务例子其实是异步解耦的一种实现。而对于另一种常见的场景,秒杀,大家想一想,是不是也可以直接通过这样一个简单的队列就能够实现了。当然,可能还比较简陋,也需要考虑更多的东西,但是,一秒内处理几万条请求和一秒内让几万条请求入队,这个差别可大了去了。

其实,从队列的思想就可以看出,我们用数据库也可以实现队列,插入数据是入队,然后倒序查询出来一条就可以视为出队。但是呢,数据库的性能往往和专业的消息队列以及 NoSQL 工具都是有很大的差距的。因此,其实还是那句话,把握本质和思想,工具用啥都好说。

测试代码:

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/2.rq.c.php

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/649664.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

百度百科词条编辑规则是什么?

百度百科词条编辑规则是指在百度百科平台上编辑和创建词条时需要遵循的一系列标准和指南。百度百科作为全球最大的中文百科全书,旨在为用户提供准确、全面、客观的知识信息。为了确保词条内容的质量,百度设定了严格的编辑规则。伯乐网络传媒来给大家分享…

用navigator.sendBeacon完成网页埋点异步请求记录用户行为,当网页关闭的时候,依然后完美完成接口请求,不会因为浏览器关闭了被中断请求。

代码用例 <template><div :class"$options.name"><el-button type"primary" click"sendBeacon">navigator.sendBeacon 请求埋点接口 发送json对象数据</el-button></div> </template><script> expor…

java web 职位推荐系系统Myeclipse开发mysql数据库协同过滤算法java编程计算机网页项目

一、源码特点 java Web职位推荐系统是一套完善的java web信息管理系统 &#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql5.0…

【小白教程】幻兽帕鲁服务器一键搭建 | 支持更新 | 自定义配置

幻兽帕鲁刚上线就百万在线人数&#xff0c;官方服务器的又经常不稳定&#xff0c;所以这里给大家带来最快捷的搭建教程&#xff0c;废话不多说直接开始。 步骤一&#xff1a;准备服务器 服务器建议 Linux 系统&#xff0c;资源占用低&#xff0c;而且一键脚本只需要一条命令&am…

安卓程序开发——搭建主页框架

一、实验目的 搭建项目框架掌握Android Activity组件使用和Intent机制&#xff0c;加强对Activity生命周期的理解&#xff0c;掌握Fragment的使用。 二、实验设备及器件 Android Studio 三、实验内容 1.创建一个Android应用&#xff0c;设置工程名MobileShop&#xff0c;包…

携程开源 基于真实请求与数据的流量回放测试平台、自动化接口测试平台AREX

携程开源 基于真实请求与数据的流量回放测试平台、自动化接口测试平台AREX 官网文档 基于真实请求与数据的流量回放测试平台、自动化接口测试平台AREX 这篇文章稍稍水一下&#xff0c;主要讲下部署过程里踩的坑&#xff0c;因为部署的过程主要是运维同学去处理了&#xff0c;我…

【Spring 篇】MyBatis DAO层实现:数据之舞的精灵

欢迎来到MyBatis DAO层的神奇世界&#xff0c;这里将为你揭示DAO层的奥秘&#xff0c;让你成为数据之舞的精灵。无论你是初学者还是想要深入了解DAO层的开发者&#xff0c;这篇博客将引导你踏入MyBatis DAO层的王国&#xff0c;一探其中的精彩。 舞台1&#xff1a;DAO层的角色…

人脸识别 FaceNet人脸识别(一种人脸识别与聚类的统一嵌入表示)

人脸识别 FaceNet人脸识别&#xff08;一种人脸识别与聚类的统一嵌入表示&#xff09; FaceNet的简介Facenet的实现思路训练部分 FaceNet的简介 Facenet的实现思路 import torch.nn as nndef conv_bn(inp, oup, stride 1):return nn.Sequential(nn.Conv2d(inp, oup, 3, stride…

Redis 面试题 | 14.精选Redis高频面试题

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

手机视频压缩怎么压缩?一键瘦身~

现在手机已经成为我们日常生活中必不可少的工具&#xff0c;而在手机的应用领域中&#xff0c;文件的传输和存储是一个非常重要的问题。很多用户都会遇到这样一个问题&#xff0c;那就是在手机上存储的文件太多太大&#xff0c;导致手机存储空间不足&#xff0c;那么怎么在手机…

初识MQRabbitMQ快速入门

一、同步和异步通讯 微服务间通讯有同步和异步两种方式&#xff1a; 同步通讯&#xff1a;就像打电话&#xff0c;需要实时响应。 异步通讯&#xff1a;就像发邮件&#xff0c;不需要马上回复。 两种方式各有优劣&#xff0c;打电话可以立即得到响应&#xff0c;但是你却不能…

【LUA】mac状态栏添加天气

基于网络上的版本修改的&#xff0c;找不到出处了。第一个摸索的lua脚本&#xff0c;调了很久。 主要修改&#xff1a;如果风速不大&#xff0c;就默认不显示&#xff0c;以及调整为了一些格式 local urlApi http://.. --这个urlApi去申请个免费的就可以了 然后打开对应的json…

七分钟交友匿名聊天室源码

多人在线聊天交友工具&#xff0c;无需注册即可畅所欲言&#xff01;你也可以放心讲述自己的故事&#xff0c;说出自己的秘密&#xff0c;因为谁也不知道对方是谁。 运行说明&#xff1a; 安装依赖项&#xff1a;npm install 启动&#xff1a;node app.js 运行&#xff1a;直接…

每次请求sessionid变化【SpringBoot+Vue】

引言&#xff1a;花了一晚上的时间&#xff0c;终于把问题解决了&#xff0c;一开始后端做完后,用apifox所有接口测试都是可以的,但当前端跑起来后发现接收不到后端的数据。 当我写完前后端&#xff0c;主页面和获取当前页面信息接口后&#xff0c;配置了cros注解 CrossOrigin…

keil使用教程

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据 总结 前言 例如&#xff1a;随着人工智能的不断发展&#xff0c;机器学习这门技术也越来越重…

Hive实战 —— 电商数据分析(全流程详解 真实数据)

目录 前言需求概述数据清洗数据分析一、前期准备二、项目1. 数据准备和了解2.确定数据粒度和有效列3.HDFS创建用于上传数据的目录4.建库数仓分层 5.建表5.1近源层建表5.2. 明细层建表为什么要构建时间维度表&#xff1f;如何构建时间维度表&#xff1f; 5.3 轻聚层建表6. 指标数…

前端工程化之:webpack1-6(编译过程)

一、webpack编译过程 webpack 的作用是将源代码编译&#xff08;构建、打包&#xff09;成最终代码。 整个过程大致分为三个步骤&#xff1a; 初始化编译输出 1.初始化 初始化时我们运行的命令 webpack 为核心包&#xff0c; webpack-cli 提供了 webpack 命令&#xff0c;通过…

数学经典教材有哪些

说实话&#xff0c;国内大学教材编写的初衷&#xff0c;就是让学生自己看不懂。。 不信你去看看同济大学出版社的高数书籍。 给大家推荐两本国外的数学书&#xff0c;质量吊打国内大部分教材&#xff08;特别是同济的高数教材&#xff09;。如果我大学能看到这些教材&#xf…

android camera系列(Camera1、Camera2、CameraX)的使用以及输出的图像格式

一、Camera 1.1、结合SurfaceView实现预览 1.1.1、布局 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:app"http://schemas.android.com/apk/res-au…

PuLP库-多数线性规划问题

投标价格重预算 背景 甲方需要采购一批物资&#xff0c;采购数量为甲方给定的预计采购数量&#xff0c;并限制了采购总价。例甲方采购预算清单如下&#xff0c;采购总预算为不超过 3175 元 采购内容采购数量投标单价投标报价合计电脑10空调20洗衣机8桌子7打印机35合计 注&a…