
在之前的博文中,我们已经学完了如果使用swoole搭建websocket长连接,也学会了swoole的多进程数据共享操作。
但在一个完整的websocket长连接日常操作链中,服务端往往会主动给在线的用户单独推送消息,会群发一些消息。
在Swoole-websocket中给我们提供了一个onRequest
事件,该事件用于监听外部请求。
也就是我们可以通过http请求向websocket中调取数据,进而发送消息。
onRequest
的示例代码如下:
官网文档是:https://wiki.swoole.com/wiki/page/397.html
$this->_ws->on('request', function ($request, $response) {//var_dump($request);# 如果你是get的,就改成get,可以用dump看看$request$param = $request->post;$data = [];$data['code'] = 3;$data['user_nice'] = '系统通知';$data['content'] = $param['content'];# 下面我们来广播消息if (empty($param['user_id'])) {# 群发$this->broadcast($this->_ws, $this->json($data));# 返回消息$this->endRequest('200', '发送成功', $request, $response);} else {# 单发if (empty($this->_ws->user[$param['user_id']]['fd'])) {# 返回消息$this->endRequest('500', '客户不存在', $request, $response);} else {$user = $this->_ws->user[$param['user_id']];if ($user['status'] == 0) {# 返回消息$this->endRequest('500', '客户已下线', $request, $response);} else {$this->_ws->push($user['fd'], $this->json($data));# 返回消息$this->endRequest('200', '发送成功', $request, $response);}}}
});
其中最重要的是endRequest这个方法的代码,我们接着往下看:/*** request事件返回值
*/
private function endRequest($code, $msg, $request, $response) {$json = ['code' => "$code",'msg' => "$msg",];# 输出响应$return = json_encode($json, JSON_UNESCAPED_UNICODE);# 需要end事件,否注会报500错误,并无结果返回# 不知道为啥,CLI模式下这个事件一次请求会有2次监听,但发现最后一次其中的server->request_uri会有个/favicon.ico参数# 所以凭借这个参数,我们可以做判断,放弃掉第一次监听返回# 还有,如果我们直接在onRequest中过滤掉第一次监听,那第二次监听就不会执行,也会报500错误# 所以我们只能在返回的时候做下手脚//if($request->server['request_uri'] == '/favicon.ico') {$response->end($return);//}# 而且我发现经过这样处理,onRequest事件那边也只会有一次请求了,特别奇怪。# 而且这样返回之后,浏览器直接请求还是报500错误。# 熟悉Swoole的朋友可以在下方留言,指教下我的疑惑。
}
从上面的注释中我们可以看出,endRequest
的输出值很奇怪,它支持CLI
模式下运行,但该模式下的会有2次endRequest
监听,需要使用server->request_uri
的/favicon.ico
参数进行拦截输出返回值,否注将报错。
而通过CURL
或浏览器发包的方式则不能拦截,同时这种请求方式只有1次endRequest
监听,所以不能拦截返回值。
同时需要注意,endRequest
同步输出返回值,不能直接使用echo
,而是需要把返回内容放在$response->end()
中。
下面我们来看看完整的server.php
端代码:
<?php
// +----------------------------------------------------------------------
// 小黄牛blog - Swoole 即时通讯交互处理
// +----------------------------------------------------------------------
// Copyright (c) 2018 https://xiuxian.junphp.com All rights reserved.
// +----------------------------------------------------------------------
// Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// Author: 小黄牛 <1731223728@qq.com>
// +----------------------------------------------------------------------
class Server{/*** 客户端身份存储器*/private $_table = []; /*** WS的启动实例*/private $_ws;/*** host-IP,0.0.0.0表示允许接收所有请求*/private $_host = '0.0.0.0';/*** 端口号*/private $_port = '9502';/*** 最大服务端心跳重连次数*/private $_max = 3;/*** 强制心跳重连启动状态*/private $_status = true;/*** 这是启动服务端的入口*/public function run() { $this->start_service(); $this->start_table(); $this->request();$this->start_handshake();$this->start_message();$this->end();}/*** ①启动websocker服务*/private function start_service() {# 创建websocket服务器对象,监听0.0.0.0:9502端口$this->_ws = new swoole_websocket_server($this->_host, $this->_port);$this->_ws->set(['worker_num' => 4,// 开4个工作进程]);}/*** ①创建Table服务*/private function start_table() {# 创建最大只能存储1024个用户的数据$this->_table = new swoole_table(1024);# 创建字段$this->_table->column('fd', swoole_table::TYPE_INT, 8); // FD$this->_table->column('status', swoole_table::TYPE_INT, 8); // 离线状态$this->_table->column('heartbeat', swoole_table::TYPE_INT, 8); // 心跳重连数$this->_table->column('user_id', swoole_table::TYPE_STRING, 32); // 会员ID$this->_table->column('user_nice', swoole_table::TYPE_STRING, 32); // 会员名称$this->_table->create();# 将表附加到ws实例里,方便后续使用$this->_ws->user = $this->_table;}/*** ②监听WebSocket握手申请*/private function start_handshake() {# 监听WebSocket连接打开事件$this->_ws->on('open', function ($ws, $request){# 这里可以做些鉴权验证之类的});}/*** ③监听客户端消息发送请求*/private function start_message() {# 监听WebSocket消息事件$this->_ws->on('message', function ($ws, $frame) {$data = json_decode($frame->data, true);$user_id = $data['user_id'];# 加入存储器$this->_ws->user->set($user_id, ['fd' => $frame->fd, # FD'status' => 1, # 设置上线状态'heartbeat' => 0, # 重置心跳重连数'user_id' => $data['user_id'], # 用户ID'user_nice' => $data['user_nice'], # 用户昵称]);# 登录广播处理if ($data['code'] == 1) {# 发送广播上线消息$data['content'] = '【'.$data['user_nice'].'】骑着小黄牛上线啦~!';$this->broadcast($ws, $this->json($data), $user_id);# 心跳重连检测} else if ($data['code'] == 4) {$this->broadcast($ws, $frame->data, $user_id);$this->timer();# 其他请求} else {# 广播消息$this->broadcast($ws, $frame->data, $user_id);}});} /*** ④监听客户端退出事件*/private function end() {# 这里加入了unset,清除open存储器,防止存储器无限增大# 监听WebSocket连接关闭事件$this->_ws->on('close', function ($ws, $fd) {$user = null;foreach ($this->_ws->user as $k=>$v) {if ($v['fd'] == $fd) {$user = $v;}}# 如果没用用户就跳过if (!$user) {return false;}# 获取用户ID$user_id = $user['user_id'];# 获取用户nice$user_nice = $user['user_nice'];# 设置离线状态$this->_ws->user->set($user_id, ['status' => 0, # 设置离线状态]);$data = ['code' => 2,'user_id' => $user_id,'user_nice' => $user_nice,'content' => '【'.$user_nice.'】骑着小扫帚灰溜溜的走了~~!'];# 广播消息$this->broadcast($ws, $this->json($data));});$this->_ws->start();}/*** ④监听外部请求推送事件*/private function request() {$this->_ws->on('request', function ($request, $response) {//var_dump($request);# 如果你是get的,就改成get,可以用dump看看$request$param = $request->post;$data = [];$data['code'] = 3;$data['user_nice'] = '系统通知';$data['content'] = $param['content'];# 下面我们来广播消息if (empty($param['user_id'])) {# 群发$this->broadcast($this->_ws, $this->json($data));# 返回消息$this->endRequest('200', '发送成功', $request, $response);} else {# 单发if (empty($this->_ws->user[$param['user_id']]['fd'])) {# 返回消息$this->endRequest('500', '客户不存在', $request, $response);} else {$user = $this->_ws->user[$param['user_id']];if ($user['status'] == 0) {# 返回消息$this->endRequest('500', '客户已下线', $request, $response);} else {$this->_ws->push($user['fd'], $this->json($data));# 返回消息$this->endRequest('200', '发送成功', $request, $response);}}}});}/*** request事件返回值*/private function endRequest($code, $msg, $request, $response) {$json = ['code' => "$code",'msg' => "$msg",];# 输出响应$return = json_encode($json, JSON_UNESCAPED_UNICODE);# 需要end事件,否注会报500错误,并无结果返回# 不知道为啥,CLI模式下这个事件一次请求会有2次监听,但发现最后一次其中的server->request_uri会有个/favicon.ico参数# 所以凭借这个参数,我们可以做判断,放弃掉第一次监听返回# 还有,如果我们直接在onRequest中过滤掉第一次监听,那第二次监听就不会执行,也会报500错误# 所以我们只能在返回的时候做下手脚//if($request->server['request_uri'] == '/favicon.ico') {$response->end($return);//}# 而且我发现经过这样处理,onRequest事件那边也只会有一次请求了,特别奇怪。# 而且这样返回之后,浏览器直接请求还是报500错误。# 熟悉Swoole的朋友可以在下方留言,指教下我的疑惑。}/*** 广播消息* @todo 无* @author 小黄牛* @version v1.0.0.1 + 2018.11.12* @deprecated 暂不弃用* @global 无* @param object $wx 实例* @param string $content 广播内容* @param string $id 用户的userid* @param bool $status 是否做心跳限制 * @return void*/private function broadcast($ws, $content, $id=null, $status=false) {# 向所有人广播foreach ($this->_ws->user as $k=>$v) {# 不向自己广播,并且要在线的# 注意,这里一定要有上线状态的限制,否则假设用户已经退出,但你的进程还开着,实际上已经关闭,这时候push就会报错# 只有正常在线的用户才能接收到广播# 加入心跳检测限制if ($k != $id && $v['status'] == 1 && $status == true) {$ws->push($v['fd'], $content);} else if ($v['user_id'] != $id && $v['status'] == 1 && $v['heartbeat'] == 0) {$ws->push($v['fd'], $content);}}}/*** 数组转json* @todo 无* @author 小黄牛* @version v1.0.0.1 + 2018.11.08* @deprecated 暂不弃用* @global 无* @param array $array 数组* @return json*/private function json($array) {return json_encode($array, JSON_UNESCAPED_UNICODE);}/*** 服务端定时强制心跳检测* @todo 无* @author 小黄牛* @version v1.0.0.1 + 2018.11.08* @deprecated 暂不弃用* @global 无* @return void*/private function timer() {# 注意强制心跳触发器不能放在open事件里,因为那时候用户还没有提交登录请求,是还没有userID的# 还有,强制心跳定时器只能触发一次,否则会出现生成多个定时器的情况if ($this->_status) {$this->_status = false;/*** ⑤服务端强制心跳检测* 每隔1分钟发送1次,如果连续3次强制心跳检测未通过,服务端将强制断开连接*/$obj = $this;swoole_timer_tick(60000, function ($timer_id) use (&$obj) {# 广播消息$obj->broadcast($obj->_ws, $obj->json(['code' => 5]), null, true);# 所有人的心跳次数+1foreach ($this->_ws->user as $k=>$v) {if (empty($v['heartbeat'])) {# 重置心跳次数$this->_ws->user->set($v['user_id'], ['heartbeat' => 0,]);}# 心跳次数累加$this->_ws->user->set($v['user_id'], ['heartbeat' => $v['heartbeat']+1]);# 心跳次数大于等于_max && 在线的 的连接关闭if ($v['heartbeat'] >= $obj->_max && $v['status'] == 1) {$data = $v;# 发送强制掉线广播$data['code'] = 6;$data['content'] = '【'.$data['user_nice'].'】已被服务端强制下线!';$obj->broadcast($obj->_ws, $obj->json($data), null, true);# 这里不需要unset连接,因为在close事件中,已经将这个连接设置为离线了# 主动关闭连接k$obj->_ws->close($v['fd']);}}});}}
}
$socketServer = new Server();
$socketServer->run();
然后再看如何给websocket主动推送的client.php
端代码:
<?php
// +----------------------------------------------------------------------
// 小黄牛blog - websocket - http发包给TCP
// +----------------------------------------------------------------------
// Copyright (c) 2018 https://xiuxian.junphp.com All rights reserved.
// +----------------------------------------------------------------------
// Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// Author: 小黄牛 <1731223728@qq.com>
// +----------------------------------------------------------------------
function https_request($url, $data = null){# 初始化一个cURL会话$curl = curl_init(); //设置请求选项, 包括具体的urlcurl_setopt($curl, CURLOPT_URL, $url);curl_setopt($curl, CURLOPT_SSL_VERIFYPEER, FALSE); //禁用后cURL将终止从服务端进行验证curl_setopt($curl, CURLOPT_SSL_VERIFYHOST, FALSE);if (!empty($data)){curl_setopt($curl, CURLOPT_POSTFIELDS, $data); //设置具体的post数据}curl_setopt($curl, CURLOPT_RETURNTRANSFER, 1); $response = curl_exec($curl); //执行一个cURL会话并且获取相关回复//$httpCode = curl_getinfo($curl,CURLINFO_HTTP_CODE); //echo $httpCode;curl_close($curl); //释放cURL句柄,关闭一个cURL会话return $response;
}
var_dump(https_request('http://IP:端口', ['user_id' => '用户ID',// 为空群发'content'=> '测试内容'
]));
我们只需要通过访问client.php
,就能给指定用户推送消息拉。
同时我们需要注意,在真正开发中,我们还需要对onRequest
事件的请求进行加密跟鉴权处理,否注很容易被竞争对手恶意攻击。
而且服务端还可以通过onRequest
事件拉取到所有的在线用户消息,更多相关的功能都可自行扩展。
最后推荐大家可以用下我开源的一个基于Swoole4.5+研发的PHP框架。该框架基于注解实现了很多好玩的功能,很适合新人快速上手Swoole扩展。
SW-X框架-专注高性能便捷开发而生的PHP-SwooleX框架www.sw-x.cn