文章目录
- 首先是解决如何运行gatewayworker
- 调试gatewayworker程序
- 向指定客户端发送消息
- 在TP框架中调用Gateway的API
- 总结说明
测试环境 windows10;PHP7.2;TP5.1;
这里只介绍如何使用TP集成的workerman扩展库think-worker
,原生workerman的使用请参考官方文档
TP5.1集成了workerman,使用composer require topthink/think-worker=2.0.*
安装即可。
TP5.1只能安装think-worker2.0版本,最新的think-worker3.0版本是给TP6.0用的,但依赖安装workerman的版本是最新的。
虽然集成了,但是在windows下使用还是有许多问题,比如直接运行命令php think woker:gateway
会报错GatewayWorker Not Support On Windows.
windows解决方案 ,Linux下可以直接运行(应该吧~)。
官方的使用文档也不够详细,只列举了worker
和worker:server
两种运行方式的简单示列。但是大部分使用workerman都是奔着GatewayWorker去的,毕竟自己用workerman完全搭建还是需要技术和时间的。
单纯的使用workerman,直接运行php think worker
或php think worker:server
就可以,调试也非常简单,TP官方文档有说明就不赘述了,重点是gatewayworker。
首先是解决如何运行gatewayworker
根据workerman的文档,windows下不能在同一个php文件中运行多个worker,所以需要修改tinkphp的命令行
新建自定义命令行文件application\common\command\Workerman.php
<?phpnamespace app\common\command;use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\facade\Config;
use Workerman\Worker;/*** Worker 命令行*/
class Workerman extends Command
{protected function configure(){$this->setName('workerman')->addArgument('service', Argument::OPTIONAL, 'workerman service: gateway|register|businessworker', null)->addOption('host', 'H', Option::VALUE_OPTIONAL, 'the host of workerman server', null)->addOption('port', 'P', Option::VALUE_OPTIONAL, 'the port of workerman server', null)->addOption('daemon', 'd', Option::VALUE_OPTIONAL, 'Run the workerman server in daemon mode.')->setDescription('workerman Server for ThinkPHP');}public function execute(Input $input, Output $output){$service = $input->getArgument('service');$option = Config::pull('gateway_worker');if ($input->hasOption('host')) {$host = $input->getOption('host');} else {$host = !empty($option['host']) ? $option['host'] : '0.0.0.0';}if ($input->hasOption('port')) {$port = $input->getOption('port');} else {$port = !empty($option['port']) ? $option['port'] : '2347';}$registerAddress = !empty($option['registerAddress']) ? $option['registerAddress'] : '127.0.0.1:1236';switch ($service) {case 'register':$this->register($registerAddress);break;case 'businessworker':$this->businessWorker($registerAddress, isset($option['businessWorker']) ? $option['businessWorker'] : []);break;case 'gateway':$this->gateway($registerAddress, $host, $port, $option);break;default:$output->writeln("<error>Invalid argument action:{$service}, Expected gateway|register|businessworker .</error>");exit(1);break;}Worker::runAll();}/*** 启动register* @access public* @param string $registerAddress* @return void*/public function register($registerAddress){// 初始化registernew Register('text://' . $registerAddress);}/*** 启动businessWorker* @access public* @param string $registerAddress registerAddress* @param array $option 参数* @return void*/public function businessWorker($registerAddress, $option = []){// 初始化 bussinessWorker 进程$worker = new BusinessWorker();$this->option($worker, $option);$worker->registerAddress = $registerAddress;}/*** 启动gateway* @access public* @param string $registerAddress registerAddress* @param string $host 服务地址* @param integer $port 监听端口* @param array $option 参数* @return void*/public function gateway($registerAddress, $host, $port, $option = []){// 初始化 gateway 进程if (!empty($option['socket'])) {$socket = $option['socket'];unset($option['socket']);} else {$protocol = !empty($option['protocol']) ? $option['protocol'] : 'websocket';$socket = $protocol . '://' . $host . ':' . $port;unset($option['host'], $option['port'], $option['protocol']);}$gateway = new Gateway($socket, isset($option['context']) ? $option['context'] : []);// 以下设置参数都可以在配置文件中重新定义覆盖$gateway->name = 'Gateway';$gateway->count = 4;$gateway->lanIp = '127.0.0.1';$gateway->startPort = 2000;$gateway->pingInterval = 30;$gateway->pingNotResponseLimit = 0;$gateway->pingData = '{"type":"ping"}';$gateway->registerAddress = $registerAddress;// 全局静态属性设置foreach ($option as $name => $val) {if (in_array($name, ['stdoutFile', 'daemonize', 'pidFile', 'logFile'])) {Worker::${$name} = $val;unset($option[$name]);}}$this->option($gateway, $option);}/*** 设置参数* @access protected* @param Worker $worker Worker对象* @param array $option 参数* @return void*/protected function option($worker, array $option = []){// 设置参数if (!empty($option)) {foreach ($option as $key => $val) {$worker->$key = $val;}}}
}
在application\command.php
命令行参数配置文件中添加
return ['workerman' => '\\app\\common\\command\\Workerman',
];
打开三个cmd命令窗口,分别运行
php think workerman register
php think workerman businessworker
php think workerman gateway
运行结果
调试gatewayworker程序
添加Events监听事件文件application\workerman\Events.php
,这里偷懒直接复制了官方的Events文件,自己写的话,方法没写全运行时会报错退出,所以干脆直接全部复制,修改一下命名空间即可。
<?phpnamespace app\workerman;use GatewayWorker\Lib\Gateway;
use think\worker\Application;
use Workerman\Worker;/*** Worker 命令行服务类*/
class Events
{/*** onWorkerStart 事件回调* 当businessWorker进程启动时触发。每个进程生命周期内都只会触发一次** @access public* @param \Workerman\Worker $businessWorker* @return void*/public static function onWorkerStart(Worker $businessWorker){$app = new Application;$app->initialize();}/*** onConnect 事件回调* 当客户端连接上gateway进程时(TCP三次握手完毕时)触发** @access public* @param int $client_id* @return void*/public static function onConnect($client_id){Gateway::sendToCurrentClient("Your client_id is $client_id");}/*** onWebSocketConnect 事件回调* 当客户端连接上gateway完成websocket握手时触发** @param integer $client_id 断开连接的客户端client_id* @param mixed $data* @return void*/public static function onWebSocketConnect($client_id, $data){var_export($data);}/*** onMessage 事件回调* 当客户端发来数据(Gateway进程收到数据)后触发** @access public* @param int $client_id* @param mixed $data* @return void*/public static function onMessage($client_id, $data){Gateway::sendToAll($data);}/*** onClose 事件回调 当用户断开连接时触发的方法** @param integer $client_id 断开连接的客户端client_id* @return void*/public static function onClose($client_id){GateWay::sendToAll("client[$client_id] logout\n");}/*** onWorkerStop 事件回调* 当businessWorker进程退出时触发。每个进程生命周期内都只会触发一次。** @param \Workerman\Worker $businessWorker* @return void*/public static function onWorkerStop(Worker $businessWorker){echo "WorkerStop\n";}
}
修改配置监听文件config\gateway_worker.php
// BusinsessWorker配置'businessWorker' => ['name' => 'BusinessWorker','count' => 1,'eventHandler' => '\app\workerman\Events', // 原来是\think\worker\Events,改成自己的监听文件位置],
添加前端测试文件,这里使用的是vue,关于前端如何使用webSocket,网上到处都是,也很简单。
// vue测试代码片段
export default {data () {return {websocket: null}},mounted () {this.websocket = new WebSocket('ws://127.0.0.1:2348') // 使用gateway的地址this.websocket.onmessage = evt => {console.log(evt.data) // 打印接收的消息}}
}
重启businessworker服务,运行vue,前端控制台会打印
Your client_id is 7f00000107d000000001
这是在Events文件监听事件onConnect中的程序,当客户端连接时,向当前客户端发送信息,多开几个窗口,测试多客户端连接时的效果。
向指定客户端发送消息
首先需要明确的是,gatewayworker只能通过client_id识别客户端,每产生一次连接,就会生成一个client_id,即便是同一个页面,发生了多次连接,gatewayworker也会认为是不同的客户端。
实际业务中客户端往往是以用户id或其他形式的id作为区分,所以实际业务中需要将client_id和业务id进行绑定并做判断,这里做测试就不深入讨论了,直接用client_id进行测试
修改前端文件
// vue模板代码片段
<template><el-row type="flex"><el-select v-model="selectClientId"><el-option v-for="(item, index) in clients" :key="index" :value="item" :label="item" /></el-select><el-input v-model="message"></el-input><el-button @click="submit">发送</el-button></el-row>
</template>
// vue js代码片段data () {return {websocket: null,clients: [], // client用户列表selectClientId: '', // 选择的用户message: '' // 需要发送的消息}},methods: {submit () {const data = {client_id: this.selectClientId, // 指定的客户端idmessage: this.message}this.websocket.send(JSON.stringify(data))}},mounted () {this.websocket = new WebSocket('ws://127.0.0.1:2348') // 使用gateway的地址this.websocket.onmessage = evt => {const data = JSON.parse(evt.data)if (data.type === 'login') {this.clients.push(data.client_id)}console.log(data.message)}}
修改监听文件,修改了onConnect和onMessage两个监听回调
# ...public static function onConnect($client_id){// Gateway::sendToCurrentClient("Your client_id is $client_id");$message = ['type' => 'login','client_id' => $client_id,'message' => 'user ' . $client_id . ' is login',];Gateway::sendToAll(json_encode($message));}# ...public static function onMessage($client_id, $data){// Gateway::sendToAll($data);$data = json_decode($data, true);$form_client = $client_id;$to_client = $data['client_id'];$message = $data['message'];$send_message = ['type' => 'message','message' => "user {$form_client} send {$message} to you",];if ($to_client) {// 如果有指定用户,则发送给指定用户Gateway::sendToClient($to_client, json_encode($send_message));} else {// 没有指定用户,发送给全部Gateway::sendToAll($data);}}
重启worker服务,测试效果
workerman的官方文档中明确指出不建议直接通过客户端发送消息,而是通过原来的框架处理业务逻辑
与ThinkPHP等框架结合
总体原则:
现有mvc框架项目与GatewayWorker独立部署互不干扰
所有的业务逻辑都由网站页面post/get到mvc框架中完成
GatewayWorker不接受客户端发来的数据,即GatewayWorker不处理任何业务逻辑,GatewayWorker仅仅当做一个单向的推送通道
仅当mvc框架需要向浏览器主动推送数据时才在mvc框架中调用Gateway的API(GatewayClient)完成推送
在TP框架中调用Gateway的API
workerman官方文档建议使用GatewayClient
提供的API发送数据,这个需要额外安装composer require workerman/gatewayclient
,使用方法在官方文档中有说明,和使用gateway一样。但在TP的实际测试中,无需安装也可以正常使用,这里使用的是GatewayWorker\Lib\Gateway
,也不需要配置参数,可以直接使用。
TP处理业务逻辑的控制器
<?phpnamespace app\index\controller;use GatewayWorker\Lib\Gateway;
use think\Controller;class Index extends Controller
{public function index(){$client_id = $this->request->get('client_id');$send_message = $this->request->get('message');$message = ['type' => 'message','message' => $this->request->get('message'),];Gateway::sendToClient($client_id, json_encode($message));}
}
浏览器直接访问或ajax访问效果一致,运行结果
至此,TP5.1中使用think-worker调试基本通过,剩下的就是根据实际业务逻辑进行处理了。
总结说明
在官方文档 与ThinkPHP等框架结合 的使用说明中和案例中发现,不需要在Events监听文件中写业务逻辑和判断,所有的业务逻辑都可以在TP框架中完成,Events的作用仅仅是将client_id
告诉客户端。
而在tink-worker原来的Events文件中,当客户端连接时,就向当前客户端发送过一条信息"Your client_id is 7f00000107d000000001"
,使用正则匹配就能拿到client_id,无需更改文件。
那么TP5.1的think-worker的使用可以简化如下
- windows下修改gatewayworker的启动方式,Linux无需更改(我也没有测试)。
- php业务逻辑中使用
GatewayWorker\Lib\Gateway
调用gateway的API给客户端发送消息。
所以不需要过多的更改gateway的配置文件,也不需要额外的建立监听文件,就可以直接使用gateway了,当然windows环境下因为机制问题,所以更改了启动方式。饶了一大圈回来,发现think-worker的使用方式是如此简单,所以官方文档是觉得太简单了所以没有给使用说明的必要么😓