tp5 mysql实现消息队列_TP5系列 | Queue消息队列

消费信息如下ThinkPHP5 Queue消息队列

优点

1、Queue内置了 Redis,Database,Topthink ,Sync这四种驱动,本文使用Redis驱动

2、Queue消息队列适用于大并发或者返回结果 时间有点长并需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送

3、Queue消息消息可进行发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等操作

流程图

fc3de0200cd2d3b9af66f628f32cf216.png

创建队列

文件路径:application\common\queue\TestQueue.php

TestQueue.php 参考代码

namespace app\common\queue;

use think\facade\Log;

use think\queue\Job;

class TestQueue

{

public function fire(Job $job, $data)

{

$isJobDone = $this->testJob($data);

// 如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法

if ($isJobDone) {

$job->delete();

} else {

//通过这个方法可以检查这个任务已经重试了几次了

$attempts = $job->attempts();

echo $attempts;

if ($attempts == 0 || $attempts == 1) {

// 重新发布这个任务

$job->release(2); //$delay为延迟时间,延迟2S后继续执行

} elseif ($attempts == 2) {

$job->release(5); // 延迟5S后继续执行

}

}

}

/**

* @Desc: 任务执行失败后自动执行方法

* @param $data

*/

public function failed($data)

{

// ...任务达到最大重试次数后,失败了

Log::error('任务达到最大重试次数后,失败了 '.json_encode($data));

}

/**

* @Desc: 自定义需要加入的队列任务

*/

private function testJob($data)

{

$jsonData = json_encode($data);

echo "1、具体执行任务接受到的参数:{$jsonData} \r\n";

if($data){

echo "2、恭喜你!{$data['email']} 邮件发送成功了 \r\n";

return true;

}else{

echo "2、很遗憾,{$data['email']} 邮件发送失败了 \r\n";

return false;

}

}

}

配置队列

1、这里使用Redis驱动来存储队列消息

2、队列配置文件路径:application\config\queue

配置参考代码

return [

'connector' => 'Redis',

'expire' => 3600,

'default' => 'REDIS_QUEUE',

'host' => 'dnmp-redis',

'port' => 6379,

'password' => '',

'select' => 0,

'timeout' => 0,

'persistent' => false,

];

生产者参考代码

/**

* @Desc: 生产者生产消息

*/

public function productMsg()

{

// 当前任务所需的业务数据,不能为 resource 类型,其他类型最终将转化为json形式的字符串

$data = [

'email' => rand(11,99).'@qq.com',

'username' => 'Tinywan'

];

// 当前任务归属的队列名称,如果为新队列,会自动创建

$queueName = 'testQueue';

// 将该任务推送到消息队列,等待对应的消费者去执行

$isPushed = Queue::push(TestQueue::class, $data, $queueName);

// database 驱动时,返回值为 1|false; redis驱动时,返回值为 随机字符串|false

if ($isPushed !== false) {

echo '['.$data['email'].']'." 队列加入成功 \r\n";

} else {

echo "队列加入失败 \r\n";

}

}

为了方便演示,这里使用cli模式。

执行生产者:php product_msg.php

# php product_msg.php

[27@qq.com] 队列加入成功

# php product_msg.php

[77@qq.com] 队列加入成功

1、这时候消息已经被持久化到Redis中去了(通过列表去存储)

2、推送成功,虽然我们这时候已经写好了消费者,但是我们并没有开始消费。但是推送消息依然是成功的。这个就是中间件的优势。他连接两个系统,但是又不会互相耦合,生产者并不会因为消费者的异常而影响到自己。

3、消息推送成功之后,如果没有消费者,消息会堆积在队列中。不过别怕,消息堆积很正常,并且一般的中间件堆积能力是非常强的。比如阿里就宣传自己mq可以堆积上亿条数据。

查看Redis消息与队列

> docker exec -it dnmp-redis redis-cli

127.0.0.1:6379> keys *

127.0.0.1:6379> keys *

1) "queues:testQueue"

127.0.0.1:6379> TYPE queues:testQueue

list

127.0.0.1:6379> LRANGE queues:testQueue 0 -1

1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"27@qq.com\",\"username\":\"Tinywan\"},\"id\":\"MLgNb4LFALhtmp7HZtfXMFPRUT0r94Bi\",\"attempts\":1}"

2) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"

127.0.0.1:6379>

消费者

开始消费消息。执行cli 命令 php think queue:work--queue队列名称

# php think queue:work --queue testQueue

1、具体执行任务接受到的参数: {"email":"27@qq.com","username":"Tinywan"}

2、恭喜你!27@qq.com 邮件发送成功了

Processed: app\common\queue\TestQueue

这里每消费掉一条消息,Redis数据库中将会减少一条消息

查看Redis队列消息

127.0.0.1:6379> LRANGE queues:testQueue 0 -1

1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"

127.0.0.1:6379>

命令行挂起守护进程执行

/usr/bin/php /var/www/tp5/think queue:work --daemon --queue testQueue --memory 256

--daemon 是否循环执行,如果不加该参数则该命令处理完下一个消息就退出 --queue 要处理的队列的名称 --delay 0 如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。 --memory 该进程允许使用的内存上限,以M为单位。

流程图

9af54836e9052b6ea56f3642ec8ef8f7.png

消费信息如下

php think queue:work --daemon --queue testQueue

1、具体执行任务接受到的参数: {"email":"77@qq.com","username":"Tinywan"}

2、恭喜你!77@qq.com 邮件发送成功了

Processed: app\common\queue\TestQueue

1、具体执行任务接受到的参数: {"email":"80@qq.com","username":"Tinywan"}

2、恭喜你!80@qq.com 邮件发送成功了

Processed: app\common\queue\TestQueue

1、具体执行任务接受到的参数: {"email":"34@qq.com","username":"Tinywan"}

2、恭喜你!34@qq.com 邮件发送成功了

Processed: app\common\queue\TestQueue

1、命令行模式可以常驻内存不停的执行php代码。这样就可以达到类似于静态语言的java的效果。

2、一开始监听队列。刚刚在队列中堆积的消息立刻就被获取到,开始执行了代码。最后执行完成,删除了消息。

3、在 queue:work--daemon 单进程循环消费的时候,改了代码是不会生效的。这时脚本语言有点类似于静态语言在执行。所以需要我们用queue:restart重启 work 进程 。

命令行挂起守护进程执行

/usr/local/php/bin/php /data/wwwroot/default/thinkphp_5/think queue:work --daemon --queue testQueue --memory 256

查看进程是否在运行

# ps

PID USER TIME COMMAND

1 root 0:00 php-fpm: master process (/usr/local/etc/php-fpm.conf)

6 www-data 0:00 php-fpm: pool www

7 www-data 0:00 php-fpm: pool www

16 root 0:00 sh

56 root 0:00 sh

113 root 0:00 php think queue:work --daemon --queue testQueue

你再也不用守在终端了,以后只负责生产消息就可以了。Redis队列也不会积累消息了

其他(中间件)

中间件系统的定义是两个独立的不同的系统在中间构建起传递消息的工具。但是同一个系统也可以通过中间件来榨取性能,大家肯定项目中遇到过性能瓶颈。

比如发送邮件,发送短信,转换视频格式等等。这些业务都是比较耗性能,又对先后顺序不敏感的业务。这种业务就非常适合使用消息队列系统来异步处理,使性能提升。

重启队列和生成队列

002bc4b1faeeb034f9de0ec1d471ee53.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/526846.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java创建临时文件夹_java创建临时文件

[java]代码库/*** 创建临时文件** param prefix* 临时文件名的前缀* param suffix* 临时文件名的后缀* param dirName* 临时文件所在的目录,如果输入null,则在用户的文档目录下创建临时文件* return 临时文件创建成功返回true,否则返回false*…

linux cmake编译安装mysql_Linux源码安装MySQL 5.6.12 (Cmake编译)

Linux源码安装MySQL 5.6.12 (Cmake编译)1.安装make编译器(默认系统自带)下载地址:tar zxvf make-3.82.tar.gzcd make-3.82./configuremakemake install2.安装bison下载地址:tar zxvf bison-2.5.tar.gzcd bison-2.5./configuremakemake install3.安装gcc-…

JAVA怎么实现网页退出系统_java后台实现js关闭本页面,父页面指定跳转或刷新操作...

关闭本页面,跳转到百度response.setCharacterEncoding("gbk");PrintWriter outresponse.getWriter();out.print("");out.print("");关闭本页面,刷新父页面response.setCharacterEncoding("gbk");PrintWriter ou…

java 布尔逻辑运算符_Java运算符

Java语言提供许多操作符。操作符是特殊的符号(symbol),它对一个或者两个、三个的操作数进行运算,然后返回一个结果,最简单的就像我们一年级学到的 -号。一般地,可以将运算符分为四大类:算数运算符、位运算符、关系运算…

Java自动化获取页面主题_基于Selenium2+Java的UI自动化(4) - WebDriver API简单介绍

1. 启动浏览器前边有详细介绍启动三种浏览器的方式(IE、Chrome、Firefox);private WebDriver driver null;private String chromeDriverDir "D:\\workspace\\A_Test\\resource\\chromedriver.exe";/*** 打开谷歌浏览器;*/public void openCh…

js java 反射机制_java 类加载机制和反射机制

一.类的加载机制jvm把class文件加载到内存,并对数据进行校验、解析和初始化,最终形成jvm可以直接使用的java类型的过程。(1)加载将class文件字节码内容加载到内存中,并将这些静态数据转换成方法区中的运行时数据结构,在堆中生成一…

lambda 流 peek java_JDK8 流与λ表达式

λ表达式什么是λ表达式λ表达式有三部分组成:参数列表,箭头(->),以及一个表达式或者语句块。public int add(int x, int y) {return x y;}转换为λ表达式(int x, int y) -> x y;去除参数类型(x, y) -> x y;无参 以及 只有一个参…

理解java虚拟机工作后了解吗_JAVA入门到再次入门——深入理解JAVA虚拟机(二)|七日打卡...

前言为什么叫做入门到到再次入门请参考前一篇或个人博客,在此不再赘述,嗯哼,了解了JVM的基本运行流程以及内存结构,算是初步认识了JVM,跟着课本往前走,继续了解根据JVM的内存模型探索java当中变量的可见性以…

java访问错误404_如何解决 Java web 项目中的 404 错误

在使用 Tomcat 进行 Java Web 开发的时候,经常会遇到以下 HTTP 404 错误:错误代码为 HTTP 404(未找到),描述信息是:“The origin server did not find a current representation for the target resource or is not willing to di…

java double 的精度_Java Double的精度问题

Java.text类 DecimalFormatjava.lang.Objectjava.text.Formatjava.text.NumberFormatjava.text.DecimalFormatvoid setMaximumFractionDigits(int newValue) 设置某个数的小数部分中所允许的最大数字位数。void setMinimumFractionDigits(int newValue) …

java餐饮管理系统图片,基于jsp的酒店餐饮管理系统-JavaEE实现酒店餐饮管理系统 - java项目源码...

基于jspservletpojomysql实现一个javaee/javaweb的酒店餐饮管理系统, 该项目可用各类java课程设计大作业中, 酒店餐饮管理系统的系统架构分为前后台两部分, 最终实现在线上进行酒店餐饮管理系统各项功能,实现了诸如用户管理, 登录注册, 权限管理等功能, 并实现对各类酒店餐饮管…

php 验证码一直不对,ThinkPHP验证码老是出错怎么办

ThinkPHP验证码老是出错的解决办法:1、找到服务器php配置文件php.ini在网站根目录下建一个info.php文件。例如:D:\wwwRoot\wp 这个是网站的根目录,在此目录下,新建一个txt文档,输入如下代码:然后另存为info…

如何在php中插入数据并修改,php怎么同时向2张表里插入数据

情况是这个样子的:我要做一个发消息的表,因为接受人可能是多个,所以又给接收人一单独的表,(这种方案好还是全部都放到一张表里好点呢?)2张表的字段如下:message_id是第一张表的主键,如果收件人有…

java设计模式之道文字版,Java Web设计模式之道 PDF

资源名称:Java Web设计模式之道 PDF第一部分 仙人指路——设计模式简介第1章 设计模式概述1.1 设计模式是什么1.2 软件设计模式的发展历程1.3 作者阐述软件设计模式的主要方式第二部分 设计红宝书——设计模式原则详解第2章 设计原则之开闭原则2.1 何谓开闭原则2.2 …

matlab变量由非标量,matlab中的if语句

有条件性地执行语句语法if expressionstatementsend描述MATLAB计算表达式,如果产生一个逻辑真或者非零结果,然后就执行一条或者多条MATLAB命令语句。当有嵌套if时,每一个if必须和一个相应的end匹配。当你在if语句里面嵌套使用else if或者else…

rodbc 连接oracle,R語言 使用RODBC連接oracle數據庫

使用R語言有多種包可以連接oracle數據庫,我今天在這里講一下使用使用RODBC連接oracle數據庫。1. 如果你的本地是windows系統的話,你需要安裝oracle客戶端。2. 然后需要在ODBC管理者界面配置你要進行連接的數據庫數據及使用的驅動等信息。如下圖所示&…

oracle实验七 答案,Oracle表的常用查询实验(七)

Oracle表的常用查询实验(七)1.问题描述:有一个商品信息表,该表反应了各种商品的销售情况,一个产品是按照gid和gname两个字段来区分的,一个产品可能会有多个型号。create table T_Goods(Id int primary key,GId varchar2(10) not n…

oracle ogg00423,【案例】Oracle报错PLS-00378 PLS-00439产生原因和MOS官方解决办法

【案例】Oracle报错PLS-00378 PLS-00439产生原因和MOS官方解决办法时间:2016-11-12 21:31 来源:Oracle研究中心 作者:代某人 点击:次天萃荷净PLS-00378此版本的PL / SQL编译单元无效原因:编译单元是一个文件,其中包含的PL / SQL传递给编译器的源代…

linux系统如何备份系统软件,Linux折腾记(十三):我该如何备份系统

在前面的一些文章中,我反复提到经常会把系统搞崩溃,所以备份系统就是一件不容忽视的事情。由于Linux系统本身的优越性,系统的备份和还原还是比较容易的。主要表现在以下方面:1.Linux系统所有的数据都以文件的形式存在,…

linux kset subsystem 3.10内核,Kobject、Kset 和 Subsystem

2014年5月12日Kobject 、Kset 和 Subsy stem - 海王 - 博客园http://www.doczj.com/doc/93b5b6113b3567ec112d8a49.html/leav en/archiv e/2010/04/24/1719191.html 4/8件(2)4. Linux input 子系统 io 控制字段(2)5. linux 内核定时器 tim er_list 详解(2) (2)把kobject 的 kse…