Thinkphp+workman+redis实现多进程异步任务处理

前言


PHP本身并不直接支持多线程编程,因为PHP的设计初衷是作为一个脚本语言,主要面向的是Web开发。不过我们可以使用一些扩展和库来实现多进程的功能,提高系统性能,比如workerman和swoole。通过多进程异步执行任务。

安装workman


  • 简介

    官网:高性能PHP应用容器 workerman

    文档:

    workerman 手册

    Workerman · ThinkPHP5.0完全开发手册 · 看云 (kancloud.cn)

    ThinkPHP 5.1 Workerman 快速上手指南 · ThinkPHP5.1 Workerman上手指南 · 看云 (kancloud.cn)

  • 环境要求

    • PHP >= 7.2
    • Composer >= 2.0
  • 安装扩展

    composer require topthink/think-worker
    

Thinkphp5.0使用workman创建多进程任务


  • 1.在项目根目录(注意不是pubcli目录)下创建文件server.php,文件内容如下

    <?php
    define('APP_PATH', __DIR__ . '/application/');
    define('BIND_MODULE','workman/Worker');
    // 加载框架引导文件
    require __DIR__ . '/thinkphp/start.php';
    
  • 2.在根目录创建\application\workman\controller目录,然后在该目录下新建Worker.php,文件内容如下

    <?phpnamespace app\workman\controller;use think\worker\Server;class Worker extends Server
    {//websocket服务端地址和端口protected $socket = 'websocket://0.0.0.0:2346';//设置进程数,默认为4,根据自己的需要和服务器配置合理设置,一般设置进程数为CPU核数的1倍-3倍protected $processes = 4;/*** 收到信息* @param $connection* @param $data*/public function onMessage($connection, $data){}/*** 当连接建立时触发的回调函数* @param $connection*/public function onConnect($connection){}/*** 当连接断开时触发的回调函数* @param $connection*/public function onClose($connection){}/*** 当客户端的连接上发生错误时触发* @param $connection* @param $code* @param $msg*/public function onError($connection, $code, $msg){echo "error $code $msg\n";}/*** 每个进程启动* @param $worker*/public function onWorkerStart($worker){echo 'workman进程启动,进程id ' . $worker->id . PHP_EOL;//监听redis队列$redis = new \Redis();$redis->connect('192.168.204.128', 6379);while (true) {//读取redis队列$data = $redis->lPop('test-queue');if ($data) {//处理业务echo '进程id ' . $worker->id . ' 开始处理业务数据' . $data . PHP_EOL;//模拟耗时任务sleep(5);echo '进程id ' . $worker->id . ' 处理业务数据' . $data . ' 完成' . PHP_EOL;} else {echo '进程id ' . $worker->id . ' 空闲中,休息5秒'. PHP_EOL;sleep(5);}}}
    }
    

    这里主要的功能就是创建一个workman的websocket服务端(使用其他tcp服务也是可以的),然后在每个进程启动的时候监听redis队列,利用这些进程异步去处理redis队列里的任务,代码里的模拟耗时任务可以直接替换成你在tp框架里写的耗时任务。

  • 3.在tp框架中将任务加入redis队列,例如我这里写一个添加redis列表元素的方法

    <?phpnamespace app\index\controller;class Index
    {//新增队列数据public function addQueue(){$redis = new \Redis();$redis->connect('192.168.204.128', 6379);$redis->rPush('test-queue', '1');$redis->rPush('test-queue', '2');$redis->rPush('test-queue', '3');$redis->rPush('test-queue', '4');$redis->rPush('test-queue', '5');$redis->rPush('test-queue', '6');$redis->rPush('test-queue', '7');echo 'success';}
    }
    
  • 4.启动workman

    直接在根目录下运行第一步创建的server.php

    php server.php start
    

    可以看到下面的输出

    Workerman[server.php] start in DEBUG mode
    -------------------------------------------- WORKERMAN --------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ---------------------------------------------
    proto   user            worker          listen                      processes    status
    tcp     root            none            websocket://0.0.0.0:2346    4             [OK]
    ---------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    workman进程启动,进程id 2
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    

    注意:如果是在windows下,设置进程数是没有用的,就只会启动一个worker进程,也就是只有单进程,要体验多进程只能在Linux环境下,同时也无法守护进程,cmd窗口关掉后服务即停止

  • 5.访问tp框架中的将任务加入redis队列接口,直接用浏览器或者命令行curl访问http://网站域名/index/index/addQueue即可,然后你就可以看到所有的redis队列将被workman进程分配并执行,以下是我启动workman->添加redis队列->workman处理->队列处理结束打印的结果:

    Workerman[server.php] start in DEBUG mode
    -------------------------------------------- WORKERMAN --------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ---------------------------------------------
    proto   user            worker          listen                      processes    status
    tcp     root            none            websocket://0.0.0.0:2346    4             [OK]
    ---------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    workman进程启动,进程id 2
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 1 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    (这里开始将任务加入redis队列,然后workman开始消费队列)
    进程id 0 开始处理业务数据1
    进程id 1 开始处理业务数据3
    进程id 3 开始处理业务数据4
    进程id 2 开始处理业务数据2
    进程id 0 处理业务数据1 完成
    进程id 1 处理业务数据3 完成
    进程id 2 处理业务数据2 完成
    进程id 1 开始处理业务数据6
    进程id 3 处理业务数据4 完成
    进程id 0 开始处理业务数据5
    进程id 3 开始处理业务数据7
    进程id 2 空闲中,休息5秒
    进程id 1 处理业务数据6 完成
    进程id 0 处理业务数据5 完成
    进程id 3 处理业务数据7 完成
    (到这里所有的7项任务已经处理完成)
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    

TP5.1及TP6使用workman创建多进程任务


  • 1.在根目录创建\application\workman\controller目录,然后在该目录下新建Worker.php,文件内容如下

    <?phpnamespace app\workman;use think\worker\Server;class Worker extends Server
    {protected $host = '127.0.0.1';protected $port = 2346;protected $protocol = 'websocket';protected $option = ['count'   => 4, //设置进程数,默认为4,根据自己的需要和服务器配置合理设置,一般设置进程数为CPU核数的1倍-3倍'name'    => 'think'];/*** 收到信息* @param $connection* @param $data*/public function onMessage($connection, $data){}/*** 当连接建立时触发的回调函数* @param $connection*/public function onConnect($connection){}/*** 当连接断开时触发的回调函数* @param $connection*/public function onClose($connection){}/*** 当客户端的连接上发生错误时触发* @param $connection* @param $code* @param $msg*/public function onError($connection, $code, $msg){echo "error $code $msg\n";}/*** 每个进程启动* @param $worker*/public function onWorkerStart($worker){echo 'workman进程启动,进程id ' . $worker->id . PHP_EOL;//监听redis队列$redis = new \Redis();$redis->connect('127.0.0.1', 6379);while (true) {//读取redis队列$data = $redis->lPop('test-queue');if ($data) {//处理业务echo '进程id ' . $worker->id . ' 开始处理业务数据' . $data . PHP_EOL;//模拟耗时任务sleep(5);echo '进程id ' . $worker->id . ' 处理业务数据' . $data . ' 完成' . PHP_EOL;} else {echo '进程id ' . $worker->id . ' 空闲中,休息5秒' . PHP_EOL;sleep(5);}}}
    }
    

    这里主要的功能就是创建一个workman的websocket服务端(使用其他tcp服务也是可以的),然后在每个进程启动的时候监听redis队列,利用这些进程异步去处理redis队列里的任务,代码里的模拟耗时任务可以直接替换成你在tp框架里写的耗时任务。

  • 2.指定workman服务类名

    修改config/worker_server.php,将worker_class的值改为app\workman\Worker

    'worker_class'   => 'app\workman\Worker', // 自定义Workerman服务类名 支持数组定义多个服务
    
  • 3.启动workman

    直接在根目录下运行命令php think worker:server或者php think worker:server -d即可启动,如果要调整workman参数,修改config/worker_server.php中的选项即可

    php think worker:server
    

    可以看到下面的输出

    Starting Workerman server...
    ----------------------- WORKERMAN -----------------------------
    Workerman version:3.5.35          PHP version:7.3.4
    ------------------------ WORKERS -------------------------------
    worker               listen                              processes status
    none                 websocket://127.0.0.1:2346          1         [ok]
    workman进程启动,进程id 0
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    

    注意:如果是在windows下,设置进程数是没有用的,就只会启动一个worker进程,也就是只有单进程,要体验多进程只能在Linux环境下,同时也无法守护进程,cmd窗口关掉后服务即停止

  • 4.在tp框架中将任务加入redis队列,例如我这里写一个添加redis列表元素的方法

    <?php
    namespace app\controller;use app\BaseController;class Index extends BaseController
    {//新增队列数据public function addQueue(){$redis = new \Redis();$redis->connect('127.0.0.1', 6379);$redis->rPush('test-queue', '1');$redis->rPush('test-queue', '2');$redis->rPush('test-queue', '3');$redis->rPush('test-queue', '4');$redis->rPush('test-queue', '5');$redis->rPush('test-queue', '6');$redis->rPush('test-queue', '7');echo 'success';}
    }
    
  • 5.访问上面的将任务加入redis队列接口,直接用浏览器或者命令行curl访问http://网站域名/index/addQueue即可,然后你就可以看到所有的redis队列将被workman进程分配并执行,以下是我启动workman->添加redis队列->workman处理->队列处理结束打印的结果:

    Starting Workerman server...
    Workerman[think] start in DEBUG mode
    -------------------------------------------- WORKERMAN ---------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ----------------------------------------------
    proto   user            worker          listen                        processes    status
    tcp     root            think           websocket://127.0.0.1:2346    4             [OK]
    ----------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    进程id 0 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    workman进程启动,进程id 2
    进程id 2 空闲中,休息5秒
    进程id 0 开始处理业务数据2
    进程id 3 开始处理业务数据1
    进程id 1 开始处理业务数据3
    进程id 2 开始处理业务数据4
    进程id 0 处理业务数据2 完成
    进程id 0 开始处理业务数据5
    进程id 3 处理业务数据1 完成
    进程id 1 处理业务数据3 完成
    进程id 3 开始处理业务数据6
    进程id 1 开始处理业务数据7
    进程id 2 处理业务数据4 完成
    进程id 2 空闲中,休息5秒
    进程id 0 处理业务数据5 完成
    进程id 1 处理业务数据7 完成
    进程id 3 处理业务数据6 完成
    进程id 0 空闲中,休息5秒
    进程id 1 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/761195.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java-常见面试题收集(三)

八 集合 1 List,Set,Map 是否继承自 Collection 接口 List,Set 接口继承于 Collection 接口   Map 没有继承于 Collection 接口 2 List,Set,Map 三者的区别 ① List、Set 都是继承自 Collection 接口&#xff0c;Map 则不是 ② List 特点&#xff1a;元素有放入顺序&#x…

【C#】Conventions(惯例)最佳实践和准则

在C#中,Conventions(惯例)是指编写代码时的一套最佳实践和准则。这些惯例旨在提高代码的可读性、一致性和可维护性。虽然这些惯例不是语言的强制规则,但遵循它们可以使你的代码更加清晰和专业。 以下是一些常见的C#编码惯例: 命名约定: 使用有意义的、描述性的名称。类名和公…

P8711 [蓝桥杯 2020 省 B1] 整除序列 存疑解决篇 Python

[蓝桥杯 2020 省 B1] 整除序列 题目描述 有一个序列&#xff0c;序列的第一个数是 n n n&#xff0c;后面的每个数是前一个数整除 2 2 2&#xff0c;请输出这个序列中值为正数的项。 输入格式 输入一行包含一个整数 n n n。 输出格式 输出一行&#xff0c;包含多个整数…

Zinx框架的高级用法

一、使用框架提供的实用类 zinx框架已经提供了常用的IO通道类-TCP。 阅读Tcp相关类的使用文档&#xff0c;将之前的3个案例用TCP的方式实现。 步骤&#xff1a; 创建Tcp数据通道类继承ZinxTcpData&#xff0c;重写GetInputNextStage函数&#xff0c;内容跟之前标准输入通道类…

Mysql2-sql语句

一、MySQL数据库表操作 MySQL表的基本概念 在windows中有个程序叫做excel. 而Excel文件中存在了如sheet1、sheet2、sheet3的表, 所有的sheet都存储在这个Excel文件中, 在某个sheet中有相应的数据. 回到数据库和表的关系上来说, 这个Excel文件就是一个数据库, 所有的sheet就是…

基于SpringBoot和Vue的大学生租房系统的设计与实现

今天要和大家聊的是一款今天要和大家聊的是一款基于SpringBoot和Vue的大学生租房系统的设计与实现。 &#xff01;&#xff01;&#xff01; 有需要的小伙伴可以通过文章末尾名片咨询我哦&#xff01;&#xff01;&#xff01; &#x1f495;&#x1f495;作者&#xff1a;李同…

【C++】弥补C语言的不足(②有默认参数的函数)

&#x1f33b;缺省参数 我们先来看一个简单地例子&#xff0c;对于在函数的定义中三个形参都给定默认值&#xff1a; #include <iostream> using namespace std; void fun(int a 10, int b 20, int c 30) {cout << "a " << a << endl;…

【考研数学】跟张宇,一看就会,一做就废,怎么办?

刚开始考研的时候都是这种情况&#xff0c;建议降低习题难度 刚开始就做1000题的都是勇士 1000题适合在强化阶段做&#xff0c;因为1000题中的题目&#xff0c;综合度高&#xff0c;需要做题者掌握比较多的解题技巧&#xff0c;而且对于计算能力要求也比较高。初学者肯定是不…

C语言经典面试题目(二十六)

1、解释一下C语言中的函数原型及其作用。 函数原型是指在函数定义之前声明函数的参数类型、返回类型和函数名称的一种声明方式。函数原型的作用包括&#xff1a; 编译器检查&#xff1a;函数原型能够告诉编译器函数的返回类型和参数类型&#xff0c;从而能够在编译阶段检查函…

【OJ】动归练习一

个人主页 &#xff1a; zxctscl 如有转载请先通知 题目 1. 前言2. 1137第 N 个泰波那契数2.1 分析2.2 代码 3. 面试题 08.01. 三步问题3.1 分析3.2 代码 4. 746使用最小花费爬楼梯4.1 分析4.1.1 以i位置为终点4.1.2 以i位置为起点 4.2 代码4.2.1以i位置为终点4.2.2以i位置为起点…

深浅拷贝与初始化列表

一、深拷贝与浅拷贝 浅拷贝&#xff1a;简单的赋值拷贝操作 深拷贝&#xff1a;在堆区重新申请空间 由于栈上的数据先进后出&#xff0c;所以p2后释放&#xff0c;在执行析构代码时&#xff0c;新建的堆区数据就被释放&#xff0c;再当p1进行释放时&#xff0c;由于堆区数据…

【力扣每日一题】lc1793. 好子数组的最大分数(单调栈)

LC1793. 好子数组的最大分数 题目描述 给你一个整数数组 nums &#xff08;下标从 0 开始&#xff09;和一个整数 k 。 一个子数组 (i, j) 的 分数 定义为 min(nums[i], nums[i1], ..., nums[j]) * (j - i 1) 。 一个 好 子数组的两个端点下标需要满足 i < k < j 。 请…

【每日一题】13. 罗马数字转整数

13. 罗马数字转整数 给你一个字符串 s&#xff0c;由若干单词组成&#xff0c;单词前后用一些空格字符隔开。返回字符串中 最后一个 单词的长度。 单词 是指仅由字母组成、不包含任何空格字符的最大子字符串。 示例 1&#xff1a; 输入&#xff1a;s “Hello World” 输出&…

【leetcode】67.二进制求和

前言&#xff1a;剑指offer刷题系列 问题&#xff1a; 给你两个二进制字符串 a 和 b &#xff0c;以二进制字符串的形式返回它们的和。 示例&#xff1a; 输入&#xff1a;a "1010", b "1011" 输出&#xff1a;"10101"思路1&#xff1a; …

【Nginx】反向代理解决跨域问题

电脑A写前端代码&#xff0c;电脑B写后端代码&#xff0c;电脑A用Nginx解决跨域问题&#xff0c;从而调用后端的接口。 为什么nginx反向代理可以实现跨域请求&#xff1f; 因为浏览器的同源策略&#xff08;Same-Origin Policy&#xff09;。 在同源策略下&#xff0c;无法向…

一命通关广度优先遍历

前言 在这篇文章之前&#xff0c;已对非线性结构遍历的另一种方法——深度优先遍历进行了讲解&#xff0c;其中很多概念词都是共用的。为了更好的阅读体验&#xff0c;最好先在掌握或起码了解dfs的基础上&#xff0c;再来阅读本文章&#xff0c;否则因为会有很多概念词看不明白…

nodejs的中雪花算法(Snowflake)

介绍 雪花算法&#xff08;Snowflake&#xff09;是Twitter开发的一种分布式唯一ID生成算法&#xff0c;用于生成全局唯一的ID。雪花算法的核心思想是利用时间戳和机器ID来生成唯一的ID&#xff0c;确保在分布式环境下生成的ID不会重复。 雪花算法生成的ID是一个64位的整数&a…

如何修复WordPress网站媒体库上传文件失败的问题

公司最近推出了一系列新产品&#xff0c;为了更新网站的视频和图片&#xff0c;我们需要将它们上传至网站媒体库。然而&#xff0c;在上传视频时&#xff0c;我们却遇到了一些问题。系统提示说&#xff0c;我们尝试上传的视频文件大小超出了站点的最大上传限制。尽管我们的视频…

计算机网络:性能指标

计算机网络&#xff1a;性能指标 速率带宽吞吐量时延时延带宽积往返时间利用率丢包率 本博客介绍计算机网络的性能指标&#xff0c;我们可以从不同的方面来度量计算机网络的性能。常用的计算机网络性能指标有以下 8 个&#xff0c;他们是&#xff1a;速率、带宽、吞吐量、时延、…

P1481 魔族密码

P1481 魔族密码 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 字典树 在插入字符串 s s s时&#xff0c;不断记录 s 0... k s_{0...k} s0...k​的个数取最大即可。 #include <bits/stdc.h> using namespace std; const int N 1e5 21; int cnt[N], tr[N][30], idx,…