记一次hyperf框架封装swoole自定义进程

背景

公司准备引入swoole和rabbitmq来处理公司业务。因此,我引入hyperf框架,想用swoole的多进程来实现。

自定义启动服务封装
<?php
/*** 进程启动服务【manager】*/
declare(strict_types=1);namespace App\Command;use Swoole;
use Swoole\Process;
use Swoole\Process\Pool;
use App\Process\BaseProcess;
use Hyperf\Command\Command as HyperfCommand;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;/*** @Command*/
#[Command]
class TaskProcessCommand extends HyperfCommand
{const MANAGER_PROCESS_PID_PATH = BASE_PATH . '/runtime/taskProcess.pid';/*** @var ContainerInterface*/protected $container;protected $coroutine = false;public function __construct(ContainerInterface $container){$this->container = $container;parent::__construct('task');}public function configure(){parent::configure();$this->setDescription('自定义进程任务');$this->addOption('daemonize', 'd', InputOption::VALUE_NONE, '守护进程化');$this->addArgument('action', InputArgument::REQUIRED, 'start/stop/restart 启动/关闭/重启');}public function handle(){$action = $this->input->getArgument('action');if ($action === 'start') {$this->start();} elseif ($action === 'stop') {$this->stop();} elseif ($action === 'restart') {$this->restart();} else {echo "不支持的action, 请输入 -h 参数查看" . PHP_EOL;}}/*** 重启:php bin/hyperf.php task restart*/protected function restart(){$this->stop();$this->start();}/*** 停止:php bin/hyperf.php task stop*/protected function stop(){if (file_exists(self::MANAGER_PROCESS_PID_PATH)) {//后期可以写入数据表,根据状态进行重启$managerPid = file_get_contents(self::MANAGER_PROCESS_PID_PATH);echo "stopping...\n";echo "kill pid $managerPid \n";$managerPid = intval($managerPid);$startTime = time();$timeout = config('server.settings.max_wait_time', 10);@Process::kill($managerPid);//等待主进程结束while (@Process::kill($managerPid, 0)) {//waiting process stopecho "waiting...\r";usleep(100000);echo "              \r";echo "waiting.\r";usleep(100000);echo "              \r";//超时 强杀所有子进程if ($managerPid > 0 && time() - $startTime >= $timeout) {echo "wait timeout, kill -9 child process, pid: $managerPid \n";echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/'") . PHP_EOL;echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/ {print $2}'|xargs kill -9") . PHP_EOL;}}unlink(self::MANAGER_PROCESS_PID_PATH);echo "stopped. \n";} else {echo "找不到manager pid, path: " . self::MANAGER_PROCESS_PID_PATH;}}/*** 启动:php bin/hyperf.php task start* 守护进程启动:php bin/hyperf.php task start -d*/protected function start(){$processConfig = config('processes');if ($processConfig) {echo "start now.\n";$daemonize = $this->input->getOption('daemonize');if ($daemonize) {//重定向标准输出到指定日志文件fclose(STDOUT);fclose(STDERR);$STDOUT = fopen(BASE_PATH . '/runtime/logs/taskProcess_output.log', 'ab');$STDERR = fopen(BASE_PATH . '/runtime/logs/taskProcess_error.log', 'ab');Process::daemon(true, true);}//save pidfile_put_contents(self::MANAGER_PROCESS_PID_PATH, getmypid());//TODO 后期可以根据需要写入配置或者数据表,开启多个主进程、挂载多个子进程BaseProcess::setProcessName('manager');//主进程$startFuncMap = [];foreach ($processConfig as $processClass) {$processObj = new $processClass;if ($processObj->isEnable && ($processObj instanceof BaseProcess) && isset($processObj->nums) && $processObj->nums > 0) {for ($i = 0; $i < $processObj->nums; $i++) {$startFuncMap[] = [[$processObj, 'handle'],$processObj->enableCoroutine ?? false,$i,];}}}$pool = new Pool(count($startFuncMap), SWOOLE_IPC_UNIXSOCK, 0, false);$pool->on('workerStart', function (Pool $pool, int $workerId) use ($startFuncMap) {[$func, $enableCoroutine, $idx] = $startFuncMap[$workerId];if ($enableCoroutine) {run(function () use ($func, $pool, $workerId, $idx) {$pm = $func[0];//process下的类$idx += 1;BaseProcess::setProcessName($pm->name . "[{$idx}/{$pm->nums}]");//多个子进程call_user_func($func, $pool, $workerId);});} else {$func($pool, $workerId);//baseProcess下的handle}});$pool->on('Message', function (Swoole\Process\Pool $pool, string $data) {echo 'process Message,data=' .json_encode($data). PHP_EOL;});//进程关闭$pool->on("WorkerStop", function (Swoole\Process\Pool $pool, int $workerId) {echo "process WorkerId={$workerId} is stopped". PHP_EOL;});$pool->start();} else {printf("没有可启动的自定义进程, 请在配置task_process中声明,且继承%s\n", BaseProcess::class);}}/*** 查看运行状态:php bin/hyperf.php task status*/protected function status(){//TODO 查看任务执行状态}public function getProcess($pid = -1){if ($pid === -1) {$pid = getmypid();}return static::$process[$pid] ?? null;}public function getAllProcess(){return static::$process;}
}
基础process封装

此处可以用hyperf框架自带的,也可以自己封装

<?phpdeclare (strict_types = 1);namespace App\Process;use Swoole;
use Swoole\Process\Pool;abstract class BaseProcess {/*** 进程数* @var integer*/public $nums = 0;/*** 进程名称* @var string*/public $name = '';/*** 是否启用协程* @var bool*/public $enableCoroutine = true;/*** 是否随进程启动服务* @var bool*/public $isEnable = true;protected $isRunning = true;protected $process;static $signal = 0;function __construct() {//进程自动命名if (empty($this->name)) {$this->name = trim(str_replace('\\', '.', str_replace(__NAMESPACE__, '', get_called_class())), '.');}}final public function handle(Pool $pool, int $workerId): void {try {$this->processInit($pool->getProcess());$this->beforeRun();while (true) {//进程结束信号if (BaseProcess::$signal === SIGTERM) {$this->onProcessExit();break;}$this->run();}} catch (\Throwable $e) {throw $e;}}protected function onProcessExit() {$this->isRunning = false;}protected function processInit($process) {$this->process = $process;echo "process {$this->name} start, pid: " . getmypid().PHP_EOL;//注册信号处理器,实现优雅重启(等待任务执行完后或者等待超时)pcntl_signal(SIGTERM, function () {BaseProcess::$signal = SIGTERM;$maxWaitTime = config('server.settings.max_wait_time', 5);$sTime = time();//检查进程任务状态Swoole\Timer::tick(500, function () use ($sTime, $maxWaitTime) {$coStat = \Swoole\Coroutine::stats();//如果主循环结束,且其它协程任务执行完,清理定时器以退出进程if (!$this->isRunning && $coStat['coroutine_num'] <= 1) {Swoole\Timer::clearAll();$this->process->exit();}//等待超时,强制结束进程elseif (time() - $sTime >= $maxWaitTime) {Swoole\Timer::clearAll();if ($this->isRunning) {$this->onProcessExit();}$this->process->exit();}});});}public static function setProcessName(string $name) {swoole_set_process_name(env('APP_NAME', 'app') . '.taskProcess.' . $name);}/*** 事件循环前调用* @return [type] [description]*/abstract function beforeRun();/*** 事件循环,注意这里不能使用死循环* @return [type] [description]*/abstract function run();}
使用demo

demo1

<?phpdeclare (strict_types = 1);namespace App\Process;/*** test*/
class TestProcess extends BaseProcess {/*** 进程数* @var integer*/public $nums = 5;public $enableCoroutine = true;/*** 不随服务启动进程* @var bool */public $isEnable = false;public function beforeRun() {//事件循环前执行,比如一些初始化工作}public function run() {//事件循环主体echo date('Y-m-d H:i:s').PHP_EOL;usleep(1000);}}

demo2

<?phpnamespace App\Process;use App\Amqp\Producer\JbtyProducer;
use App\Amqp\Producer\UpdateZeroStockProducer;
use App\Library\Jbchip\JbchipRequest;
use App\Model\HqchipGoodsModel;
use App\Model\IcbaseGoodsModel;
use App\Model\JbtyGoodsModel;
use App\Model\LcscGoodsModel;
use App\Model\OneyacGoodsModel;
use Hyperf\Amqp\Producer;
use Hyperf\Redis\Redis;
use Hyperf\Utils\ApplicationContext;class UpdateZeroStock extends BaseProcess
{const ZERO_STOCK_KEY = 'platform_zero_stock_cache_key';/*** 进程数* @var integer*/public $nums = 1;public $enableCoroutine = true;/*** 随服务启动进程* @var bool */public $isEnable=true;public function beforeRun() {//事件循环前执行,比如一些初始化工作}public function run() {//事件循环主体$this->updateZeroStock();echo date('Y-m-d H:i:s').PHP_EOL;sleep(300);}public function updateZeroStock(){// 1.全量更新$list_hq = HqchipGoodsModel::select(['id','spu','stock','manufacturer'])->where('excute_time','<',8)->limit(1000)->get();$container = ApplicationContext::getContainer();$redis = $container->get(Redis::class);$producer = ApplicationContext::getContainer()->get(Producer::class);$today = date('Y-m-d');if($list_hq){foreach ($list_hq as $item){$spu = trim($item['spu']);$zeroStockKey =  $this->getZeroStockKey($today,'hqchip',$item['manufacturer']);if($redis->exists($zeroStockKey) && !$redis->hGet($zeroStockKey,$spu)){$sendData = $item;$sendData['appKey'] = $this->appSecretKey();$sendData['platform'] = 'hqchip';$message = new UpdateZeroStockProducer($sendData);$res = $producer->produce($message);echo date('Y-m-d H:i:s') . 'rabbitmq hqchip sendMq: ' .$res . PHP_EOL;}}}}/*** 零库存缓存KEY* @param $brand* @param $sku* @return string*/private function getZeroStockKey($day,$platfrom,$brand){return self::ZERO_STOCK_KEY .":". $platfrom .":" . $day . ":" . $brand;}/*** 密钥生产* @return string*/private function appSecretKey(){$a = 'chipmall-spider&V2&' . date('Y-m-d');$appKey = base64_encode(md5($a)   .'||'. base64_encode(time() . '|' . $a));return $appKey;}
}
在配置中进程需要执行的服务

在这里插入图片描述

以守护进程方式启动服务
php bin/hyperf.php task start -d

在这里插入图片描述
查看进程命令

ps -ef|grep taskProcess

在这里插入图片描述

疑惑

这次封装还存在两个点需要完善!!!
1.重复执行:

php bin/hyperf.php task start -d

会启动多个manager进程

2、没有封装查看进程状态的status方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/86265.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言每日一题(10):无人生还

文章主题&#xff1a;无人生还&#x1f525;所属专栏&#xff1a;C语言每日一题&#x1f4d7;作者简介&#xff1a;每天不定时更新C语言的小白一枚&#xff0c;记录分享自己每天的所思所想&#x1f604;&#x1f3b6;个人主页&#xff1a;[₽]的个人主页&#x1f3c4;&#x1f…

Weblogic反序列化漏洞(CVE-2018-2628/CVE-2023-21839复现)

内容目录 Weblogic反序列化漏洞(CVE-2018-2628/CVE-2023-21839)weblogic中间件CVE-2018-2628漏洞描述影响版本漏洞复现修复方案 CVE-2023-21839漏洞描述影响版本漏洞复现修复方案 Weblogic反序列化漏洞(CVE-2018-2628/CVE-2023-21839) weblogic中间件 WebLogic是美国Oracle公司…

在给应用ASO优化时要注意些什么

应用名称是搜索引擎优化和转化率优化非常重要的元素。用户在搜索结果页面中看到我们的应用程序&#xff0c;这是他们决定是否想要更多地了解我们应用的地方。当用户已经在查看产品页面时&#xff0c;应用程序名称也会影响转化率&#xff0c;如果列表元数据有吸引力&#xff0c;…

异步回调

Future 设计的初衷&#xff1a;对将来的某个事件的结果进行建模 package com.kuang.future;import com.kuang.pc.C;import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.uti…

静态路由与默认路由配置

实验原理&#xff1a; 路由分类 &#xff08;1&#xff09;根据目的网络的不同&#xff0c;路由可以划分为&#xff1a; 特定网络路由&#xff1a;目的网络为目的主机所在网络的IP地址&#xff0c;其子网掩码表示的前缀长度为32位&#xff08;对于IPv4地址&#xff09;&…

stm32_标准库_中断_按键点灯|蜂鸣器

配置流程 需要对AFIO、EXTI、NVIC、GPIOB进行配置形成通路将中断连接至CPU APB2总线连接的寄存器 LED灯代码 #include "stm32f10x.h" // Device header #include "Delay.h"GPIO_InitTypeDef GIPO_InitStruct;//结构体配置GPIO EXTI_InitTypeDef EXTI_…

WPS文件找回怎么做?文件恢复,4个方法!

“我平时习惯了用wps来记录一些工作心得或重点&#xff0c;不知道什么原因&#xff0c;有些很重要的文件莫名不见了&#xff0c;有什么方法可以帮我找回wps文件吗&#xff1f;” wps作一个常用的办公软件&#xff0c;有效的提高了我们的工作效率。在日常使用wps时&#xff0c;可…

非常详细的git-flow分支管理流程配置及使用

非常详细的git-flow分支管理流程配置及使用。 git-flow有两个涵义,一个是指软件开发领域的版本管理流程Gitflow。另一个是指git命令工具git flow。 目前业界主流的版本管理流程是Gitflow 和 trunk-based。 Gitflow流行的比较早。但是目前的流行度要低于 trunk-based模式工作…

MySQL8.0版安装教程 + Workbench可视化配置教程(史上最细、一步一图解)

文章目录 一、安装MySQL1、选择版本&#xff0c;点击“Download”进行下载2、双击下载好的安装包&#xff0c;点击运行3、选择安装类型为“Custom”4、依次进行选择&#xff0c;选到MySQL Servers 8.0.33 -X64&#xff0c;点击向右的箭头5、选中MySQL Servers 8.0.33 -X64&…

Mysql004:用户管理

前言&#xff1a;本章节讲解的是mysql中的用户管理&#xff0c;包括&#xff08;管理数据用户&#xff09;、&#xff08;控制数据库的访问权限&#xff09;。 目录 1. 查询用户 2. 创建用户 3. 修改用户密码 4. 删除用户 5. 权限控制 1. 查询用户 在mysql数据库中&#xff0…

古代有没有电子元器件?

手机&#xff0c;电脑&#xff0c;电视等等电子产品&#xff0c;无时无刻充斥在我们的生活中&#xff0c;如果有一天突然没有了这些功能多样的电子产品&#xff0c;估计大部分人都会一时之间难以适应。 这就好比正在上网&#xff0c;结果突然被人断了网&#xff0c;导致无网络连…

基于vue3 + ant-design 自定义SVG图标iconfont的解决方案;ant-design加载本地iconfont.js不显示图标问题

基于vue3 ant-design 自定义SVG图标iconfont的解决方案&#xff1b; ant-design加载本地iconfont.js不显示图标问题 一、准备工作 1、首先去阿里巴巴矢量图标库自定义添加自己的图标&#xff1b;网站地址https://www.iconfont.cn/ 整个步骤是&#xff1a;选择图标–添加到项…

变压器(电抗器) 红外测温作业指导书

1 范围 本标准化作业指导书规定了变压器(电抗器)红外测温(一般检测)工作的准备工作、测温流程图、 现场操作方法、测温周期和标准、测温记录管理等要求。 本标准化作业指导书适用于指导变压器(电抗器)红外测温的一般性检测工作。 2 规范性引用文件 下列文件对于本文件的应用…

积木报表 JimuReport v1.6.2-GA5版本发布—高危SQL漏洞安全加固版本

项目介绍 一款免费的数据可视化报表&#xff0c;含报表和大屏设计&#xff0c;像搭建积木一样在线设计报表&#xff01;功能涵盖&#xff0c;数据报表、打印设计、图表报表、大屏设计等&#xff01; Web 版报表设计器&#xff0c;类似于excel操作风格&#xff0c;通过拖拽完成报…

Linux复习-安装与熟悉环境(一)

这里写目录标题 虚拟机ubuntu系统配置镜像Linux命令vi编辑器3个模式光标命令vi模式切换命令vi拷贝与粘贴命令vi保存和退出命令vi的查找命令vi替换命令 末行模式复制、粘贴、剪切gcc编译器 虚拟机 VMware16 官网下载&#xff1a;vmware官网 网盘下载&#xff1a; 链接&#xff…

Pytorch 深度学习实践 day01(背景)

准备 线性代数&#xff0c;概率论与数理统计&#xff0c;Python理解随机变量和分布之间的关系 人类智能和人工智能 人类智能分为推理和预测 推理&#xff1a;通过外界信息的输入&#xff0c;来进行的推测 预测&#xff1a;例如&#xff0c;看到一个真实世界的实体&#xff…

idea更改java项目名

做了一个普通的java项目&#xff08;使用socket进行网络通信的练手项目&#xff09;&#xff0c;需要更改项目名&#xff0c;更改过程记录在这里。 修改项目名可能会出现很多错误&#xff0c;建议先备份当前项目 1.在idea里&#xff0c;右键项目名——》选择Refactor——》选择…

容器技术所涉及Linux内核关键技术

目录 一、容器技术前世今生 1.1 1979年 — chroot 1.2 2000年 — FreeBSD Jails 1.3 2001年 — Linux VServer 1.4 2004年 — Solaris容器 1.5 2005年 — OpenVZ 1.6 2006年 — Process容器 1.7 2007年 — Control Groups 1.8 2008年 — LXC 1.9 2011年 — Warden 1…

Nginx 代理 MySQL 连接

文章目录 Nginx 代理 MySQL 连接1. 前言2. 部署 Nginx&#xff0c;MySQL3. ngx_stream_core_module 配置方式3.1 stream3.2 server3.3 listen3.4 配置示例 4. 限制访问 IP4.1 allow4.2 deny4.3 配置示例 5. 综合案例 Nginx 代理 MySQL 连接 原文地址&#xff1a;https://mp.wei…