tp5 mysql实现消息队列_TP5系列 | Queue消息队列

消费信息如下ThinkPHP5 Queue消息队列

优点

1、Queue内置了 Redis,Database,Topthink ,Sync这四种驱动,本文使用Redis驱动

2、Queue消息队列适用于大并发或者返回结果 时间有点长并需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送

3、Queue消息消息可进行发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等操作

流程图

fc3de0200cd2d3b9af66f628f32cf216.png

创建队列

文件路径:application\common\queue\TestQueue.php

TestQueue.php 参考代码

namespace app\common\queue;

use think\facade\Log;

use think\queue\Job;

class TestQueue

{

public function fire(Job $job, $data)

{

$isJobDone = $this->testJob($data);

// 如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法

if ($isJobDone) {

$job->delete();

} else {

//通过这个方法可以检查这个任务已经重试了几次了

$attempts = $job->attempts();

echo $attempts;

if ($attempts == 0 || $attempts == 1) {

// 重新发布这个任务

$job->release(2); //$delay为延迟时间,延迟2S后继续执行

} elseif ($attempts == 2) {

$job->release(5); // 延迟5S后继续执行

}

}

}

/**

* @Desc: 任务执行失败后自动执行方法

* @param $data

*/

public function failed($data)

{

// ...任务达到最大重试次数后,失败了

Log::error('任务达到最大重试次数后,失败了 '.json_encode($data));

}

/**

* @Desc: 自定义需要加入的队列任务

*/

private function testJob($data)

{

$jsonData = json_encode($data);

echo "1、具体执行任务接受到的参数:{$jsonData} \r\n";

if($data){

echo "2、恭喜你!{$data['email']} 邮件发送成功了 \r\n";

return true;

}else{

echo "2、很遗憾,{$data['email']} 邮件发送失败了 \r\n";

return false;

}

}

}

配置队列

1、这里使用Redis驱动来存储队列消息

2、队列配置文件路径:application\config\queue

配置参考代码

return [

'connector' => 'Redis',

'expire' => 3600,

'default' => 'REDIS_QUEUE',

'host' => 'dnmp-redis',

'port' => 6379,

'password' => '',

'select' => 0,

'timeout' => 0,

'persistent' => false,

];

生产者参考代码

/**

* @Desc: 生产者生产消息

*/

public function productMsg()

{

// 当前任务所需的业务数据,不能为 resource 类型,其他类型最终将转化为json形式的字符串

$data = [

'email' => rand(11,99).'@qq.com',

'username' => 'Tinywan'

];

// 当前任务归属的队列名称,如果为新队列,会自动创建

$queueName = 'testQueue';

// 将该任务推送到消息队列,等待对应的消费者去执行

$isPushed = Queue::push(TestQueue::class, $data, $queueName);

// database 驱动时,返回值为 1|false; redis驱动时,返回值为 随机字符串|false

if ($isPushed !== false) {

echo '['.$data['email'].']'." 队列加入成功 \r\n";

} else {

echo "队列加入失败 \r\n";

}

}

为了方便演示,这里使用cli模式。

执行生产者:php product_msg.php

# php product_msg.php

[27@qq.com] 队列加入成功

# php product_msg.php

[77@qq.com] 队列加入成功

1、这时候消息已经被持久化到Redis中去了(通过列表去存储)

2、推送成功,虽然我们这时候已经写好了消费者,但是我们并没有开始消费。但是推送消息依然是成功的。这个就是中间件的优势。他连接两个系统,但是又不会互相耦合,生产者并不会因为消费者的异常而影响到自己。

3、消息推送成功之后,如果没有消费者,消息会堆积在队列中。不过别怕,消息堆积很正常,并且一般的中间件堆积能力是非常强的。比如阿里就宣传自己mq可以堆积上亿条数据。

查看Redis消息与队列

> docker exec -it dnmp-redis redis-cli

127.0.0.1:6379> keys *

127.0.0.1:6379> keys *

1) "queues:testQueue"

127.0.0.1:6379> TYPE queues:testQueue

list

127.0.0.1:6379> LRANGE queues:testQueue 0 -1

1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"27@qq.com\",\"username\":\"Tinywan\"},\"id\":\"MLgNb4LFALhtmp7HZtfXMFPRUT0r94Bi\",\"attempts\":1}"

2) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"

127.0.0.1:6379>

消费者

开始消费消息。执行cli 命令 php think queue:work--queue队列名称

# php think queue:work --queue testQueue

1、具体执行任务接受到的参数: {"email":"27@qq.com","username":"Tinywan"}

2、恭喜你!27@qq.com 邮件发送成功了

Processed: app\common\queue\TestQueue

这里每消费掉一条消息,Redis数据库中将会减少一条消息

查看Redis队列消息

127.0.0.1:6379> LRANGE queues:testQueue 0 -1

1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"

127.0.0.1:6379>

命令行挂起守护进程执行

/usr/bin/php /var/www/tp5/think queue:work --daemon --queue testQueue --memory 256

--daemon 是否循环执行,如果不加该参数则该命令处理完下一个消息就退出 --queue 要处理的队列的名称 --delay 0 如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。 --memory 该进程允许使用的内存上限,以M为单位。

流程图

9af54836e9052b6ea56f3642ec8ef8f7.png

消费信息如下

php think queue:work --daemon --queue testQueue

1、具体执行任务接受到的参数: {"email":"77@qq.com","username":"Tinywan"}

2、恭喜你!77@qq.com 邮件发送成功了

Processed: app\common\queue\TestQueue

1、具体执行任务接受到的参数: {"email":"80@qq.com","username":"Tinywan"}

2、恭喜你!80@qq.com 邮件发送成功了

Processed: app\common\queue\TestQueue

1、具体执行任务接受到的参数: {"email":"34@qq.com","username":"Tinywan"}

2、恭喜你!34@qq.com 邮件发送成功了

Processed: app\common\queue\TestQueue

1、命令行模式可以常驻内存不停的执行php代码。这样就可以达到类似于静态语言的java的效果。

2、一开始监听队列。刚刚在队列中堆积的消息立刻就被获取到,开始执行了代码。最后执行完成,删除了消息。

3、在 queue:work--daemon 单进程循环消费的时候,改了代码是不会生效的。这时脚本语言有点类似于静态语言在执行。所以需要我们用queue:restart重启 work 进程 。

命令行挂起守护进程执行

/usr/local/php/bin/php /data/wwwroot/default/thinkphp_5/think queue:work --daemon --queue testQueue --memory 256

查看进程是否在运行

# ps

PID USER TIME COMMAND

1 root 0:00 php-fpm: master process (/usr/local/etc/php-fpm.conf)

6 www-data 0:00 php-fpm: pool www

7 www-data 0:00 php-fpm: pool www

16 root 0:00 sh

56 root 0:00 sh

113 root 0:00 php think queue:work --daemon --queue testQueue

你再也不用守在终端了,以后只负责生产消息就可以了。Redis队列也不会积累消息了

其他(中间件)

中间件系统的定义是两个独立的不同的系统在中间构建起传递消息的工具。但是同一个系统也可以通过中间件来榨取性能,大家肯定项目中遇到过性能瓶颈。

比如发送邮件,发送短信,转换视频格式等等。这些业务都是比较耗性能,又对先后顺序不敏感的业务。这种业务就非常适合使用消息队列系统来异步处理,使性能提升。

重启队列和生成队列

002bc4b1faeeb034f9de0ec1d471ee53.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/526846.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql表里插不进去数据_Oracle数据中表值插不进去问题(转)

相信我们在进行测试的时候,有的时候会遇上数据库表的值插不进去的情况,在执行SQL语句的时候,好像卡住一样,没有反应。但是当你把SQL语句c&#xf…

java 类默认访问权限_Java类 成员 访问权限 默认

Java中的访问权限控制符有四个.作用域 当前类 同一package 子孙类 其他packagepublic √ √ √ √protected √ …

java创建临时文件夹_java创建临时文件

[java]代码库/*** 创建临时文件** param prefix* 临时文件名的前缀* param suffix* 临时文件名的后缀* param dirName* 临时文件所在的目录,如果输入null,则在用户的文档目录下创建临时文件* return 临时文件创建成功返回true,否则返回false*…

java quartz2.1_quartz 2.1学习(一)

quartz是一种开源任务调度框架,提供了强大的任务调度机制,Quartz允许开发人员灵活地定义触发器的调度时间表,并可对触发器和任务进行关联映射。废话不多说了,介绍一下编程的基本步骤:实现Job接口,编码实现需…

java http setheader_response.setHeader各种用法详解

本文主要介绍了response.setHeader各种用法。具有很好的参考价值,下面跟着小编一起来看下吧一秒刷新页面一次 response.setHeader("refresh","1");二秒跳到其他页面 response.setHeader("refresh","2;URLotherPagename");没…

datagridview取消默认选中_C# WinForm 取消DataGridView的默认选中Cell 使其不反蓝

dataGridView1.Rows[0].Selected false;默认情况下 DataGridView绑定数据后会选中首行首列为实现其没有默认不选中(即绑定后 看不到首行首列反蓝)之前将dataGridView1.Rows[0].Selected false;放在窗体的构造函数中 怎么都看似不起效果 首行首列还是反蓝后来尝试放在窗体的Lo…

java多线程的优点_【java多线程的优点】

作者:Jakob Jenkov 翻译:古圣昌 校对:欧振聪尽管面临很多挑战,在java学习中多线程有一些优点使得它一直被使用。这些优点是:资源利用率更好程序设计在某些情况下更简单程序响应更快资源利用率更好想…

java boolean是什么_java中的boolean与Boolean有什么不同

java中的boolean与Boolean有什么不同发布时间:2020-11-11 15:59:21来源:亿速云阅读:74作者:Leah这篇文章给大家介绍java中的boolean与Boolean有什么不同,内容非常详细,感兴趣的小伙伴们可以参考借鉴&#x…

linux cmake编译安装mysql_Linux源码安装MySQL 5.6.12 (Cmake编译)

Linux源码安装MySQL 5.6.12 (Cmake编译)1.安装make编译器(默认系统自带)下载地址:tar zxvf make-3.82.tar.gzcd make-3.82./configuremakemake install2.安装bison下载地址:tar zxvf bison-2.5.tar.gzcd bison-2.5./configuremakemake install3.安装gcc-…

啊哈java_1.桶排序——啊哈算法java实现

/*** 题目:* 5个人考试得分分别为 5分,3分,5分,2分,8分;满分是10分;* 要将 5 3 5 2 8 这个数组进行降序排序;* 即排序后变为 8 5 5 3 2;* *//*** 桶排序解法: 建一个大小为11的一维数组a,a[0]~a[10]元素都初…

java成员变量的初始化_Java成员变量初始化过程

import java.util.*;public class Main{public static void main(String[] args){Student s new Student(5);s.show();}}class Person{public Person(){System.out.println("父初始化");show();}public void show(){System.out.println("父show");}}class…

java校招面试题_java校招面试编程题及答案.docx

java校招面试编程题及答案java校招面试编程题及答案  Java集合框架为Java编程语言的基础,也是Java面试中很重要的一个知识点。这里,我列出了一些关于Java集合的重要问题和答案。   集合框架是什么?说出一些集合框架的优点?   每种编程语言中都有…

合并两个有序数组 java_合并两个有序的数组

/*** 写在前面,题目要求的是将有序数组合并,那么有可能这所谓的有序是顺序或者逆序* 所以,应该在开始的时候判断一下* 然后,在比较的时候应该根据顺序逆序来写判断逻辑* 不过常规应该是顺序递增,然后就有了以下的代码&…

arp linux 清空_Linux怎么清理ARP缓存

1、系统初始arp环境[rootesx ~]# arp -nAddress HWtype HWaddress Flags Mask Iface192.168.1.175 ether 00:24:1D:97:B6:7F C vswif0192.168.1.120 ether 00:1F:C6:3A:DC:81 C vswif0192.168.1.51 (incomplete) vswif02、执行清除所有arp 缓存命令[rootesx ~]# arp -n|awk /^[…

ctf mysql hash传递_分享个 CTF 小工具 bruteHASH

别问,问就是为了 CTF思路源于一次三小时十二题的内部 CTF 竞赛,其中一道简单 MISC 给出明文范围(字母数字)和 MD5 开头,要求穷举出 flag——这当然不难,python 十几行代码搞定,但是运行出结果竟然用了近 20 分钟&#…

JAVA怎么实现网页退出系统_java后台实现js关闭本页面,父页面指定跳转或刷新操作...

关闭本页面,跳转到百度response.setCharacterEncoding("gbk");PrintWriter outresponse.getWriter();out.print("");out.print("");关闭本页面,刷新父页面response.setCharacterEncoding("gbk");PrintWriter ou…

huffman树java_HuffmanTree - java实现

该思想借鉴于《2019版数据结构高分笔记(c语言版)》- 第7版最近事多,有时间会把思路在这里阐述一下代码思路如下/*** add() 输入names[] weights[]* sort()排序* generateTree()生成树* preOrder() 先序遍历生成节点编码code* inOrder() 前序遍历输出编码** root* / …

java crontriggerbean_java – 使用JobStoreTX为石英聚类配置CronTriggerFactoryBean

我们正在使用Quartz 2.1.5;我们设置了以下属性:org.quartz.jobStore.classorg.quartz.impl.jdbcjobstore.JobStoreTXorg.quartz.jobStore.driverDelegateClassorg.quartz.impl.jdbcjobstore.CloudscapeDelegateorg.quartz.jobStore.useProperties trueorg.quartz.j…

利用文本文档运行java程序_java代码创建文件夹和读取文本文件txt的内容(可运行)...

java代码创建文件夹和读取文本文件txt的内容(可运行)读取txt 的内容 和 创建一个 新的文件夹package com.sec.file;import java.io.BufferedReader;import java.io.File;import java.io.FileReader;public class ReaderFile {public static void main(String[] args) throws Ex…

java ssh 那一层应该捕获异常_java ssh异常(大神来看看啊)

提一个小问题呢!ssh框架整合时 我看别人的代码都不对异常做处理,这是为什么呢 如不比如Dao操作数据库的代码中都没用throws异常,那service层中的应该是取不到抛出的异常才对啊,那这样在service层事务的交给Spring来管理的service层取不到异常…