原因
进程作为程序执行过程中资源分配的基本单位,拥有独立的地址空间,同一进程的线程可以共享本进程的全局变量,静态变量等数据和地址空间,但进程之间资源相互独立。
由于PHP语言不支持多线程,因此Swoole使用多进程模式,再多进程模式下就存在进程内存隔离,进程间通信与数据共享问题。
swoole中master主进程
会创建manager管理进程
和reactor线程
,真正的工作进程为worker进程
。
manager
是创建和管理worker
进程,reactor
进程测试监听socket
,接受数据任务,发送给worker
进程去工作,因此所有业务逻辑最终都是在worker
进程中进行,worker
进程之间的数据共享与通信必不可少。
swoole中设置选项worker_num
启动的worker
进程数,默认设置为CPU核数
。
例如:
$server = new swoole_server('127.0.0.1',9898);
$server->set(array('worker_num' => 4, //设置启动的Worker进程数。
));
如上面说描述,进程存在进程隔离:
$fds = array();
$server->on('connect', function ($server, $fd){echo "connection open: {$fd}n";global $fds;$fds[] = $fd;var_dump($fds);
});
$fds
虽然是全局变量,但是只在但前的进程内有效,swoole服务器底层会创建多个worker
进程,此处打印出来的只有部分连接的fd
。
解决方法:
swoole为我们提供了两种有效的解决方法,都是基于多进程内存型数据库,代替单进程PHP变量来存储fd
。
第一种为:swoole_redis
,特点是使用简单,跟PHP原生的redis
用法几乎一致。
第二种为:swoole_table
,这是swoole官方研制的一款内存型数据库,比redis
的可扩展性要强许多,单机器的情况下牛牛推荐大家使用这种方法。
Swoole_Tbale
swoole_redis
没什么好谈的,因为redis
都是个老活了,用法都一个鸟样。
下面我们就重点搞下swoole_table
的用法。
一波官方说明袭来:
Table
一个基于共享内存和锁实现的超高性能,并发数据结构。用于解决多进程/多线程数据共享和同步加锁问题。
请谨慎使用数组方式读写Table, 建议使用文档中提供的API来进行操作
数组方式取出的SwooleTableRow对象为一次性对象, 请勿依赖其进行过多操作
优势
性能强悍,单线程每秒可读写200万次
应用代码无需加锁,Table内置行锁自旋锁,所有操作均是多线程/多进程安全。用户层完全不需要考虑数据同步问题。
支持多进程,Table可以用于多进程之间共享数据
使用行锁,而不是全局锁,仅当2个进程在同一CPU时间,并发读取同一条数据才会进行发生抢锁
Table的内存容量不受PHP的memory_limit控制
官方文档地址:https://wiki.swoole.com/wiki/page/p-table.html
多进程数据共享的WebSocket例子:
<?php
// +----------------------------------------------------------------------
// 小黄牛blog - Swoole 即时通讯交互处理
// +----------------------------------------------------------------------
// Copyright (c) 2018 https://xiuxian.junphp.com All rights reserved.
// +----------------------------------------------------------------------
// Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// Author: 小黄牛 <1731223728@qq.com>
// +----------------------------------------------------------------------
class Server{/*** 客户端身份存储器*/private $_table = []; /*** WS的启动实例*/private $_ws;/*** host-IP,0.0.0.0表示允许接收所有请求*/private $_host = '0.0.0.0';/*** 端口号*/private $_port = '9502';/*** 最大服务端心跳重连次数*/private $_max = 3;/*** 强制心跳重连启动状态*/private $_status = true;/*** 这是启动服务端的入口*/public function run() { $this->start_service(); $this->start_table(); $this->start_handshake();$this->start_message();$this->end();}/*** ①启动websocker服务*/private function start_service() {# 创建websocket服务器对象,监听0.0.0.0:9502端口$this->_ws = new swoole_websocket_server($this->_host, $this->_port);$this->_ws->set(['worker_num' => 4,// 开4个工作进程]);}/*** ①创建Table服务*/private function start_table() {# 创建最大只能存储1024个用户的数据$this->_table = new swoole_table(1024);# 创建字段$this->_table->column('fd', swoole_table::TYPE_INT, 8); // FD$this->_table->column('status', swoole_table::TYPE_INT, 8); // 离线状态$this->_table->column('heartbeat', swoole_table::TYPE_INT, 8); // 心跳重连数$this->_table->column('user_id', swoole_table::TYPE_STRING, 32); // 会员ID$this->_table->column('user_nice', swoole_table::TYPE_STRING, 32); // 会员名称$this->_table->create();# 将表附加到ws实例里,方便后续使用$this->_ws->user = $this->_table;}/*** ②监听WebSocket握手申请*/private function start_handshake() {# 监听WebSocket连接打开事件$this->_ws->on('open', function ($ws, $request){# 这里可以做些鉴权验证之类的});}/*** ③监听客户端消息发送请求*/private function start_message() {# 监听WebSocket消息事件$this->_ws->on('message', function ($ws, $frame) {$data = json_decode($frame->data, true);$user_id = $data['user_id'];# 加入存储器$this->_ws->user->set($user_id, ['fd' => $frame->fd, # FD'status' => 1, # 设置上线状态'heartbeat' => 0, # 重置心跳重连数'user_id' => $data['user_id'], # 用户ID'user_nice' => $data['user_nice'], # 用户昵称]);# 登录广播处理if ($data['code'] == 1) {# 发送广播上线消息$data['content'] = '【'.$data['user_nice'].'】骑着小黄牛上线啦~!';$this->broadcast($ws, $this->json($data), $user_id);# 心跳重连检测} else if ($data['code'] == 4) {$this->broadcast($ws, $frame->data, $user_id);$this->timer();# 其他请求} else {# 广播消息$this->broadcast($ws, $frame->data, $user_id);}});} /*** ④监听客户端退出事件*/private function end() {# 这里加入了unset,清除open存储器,防止存储器无限增大# 监听WebSocket连接关闭事件$this->_ws->on('close', function ($ws, $fd) {# 这块提取用户信息还有优化空间,实际开发中这样for会消耗内存$user = null;foreach ($this->_ws->user as $k=>$v) {if ($v['fd'] == $fd) {$user = $v;break;}}# 获取用户ID$user_id = $user['user_id'];# 获取用户nice$user_nice = $user['user_nice'];# 设置离线状态$this->_ws->user->set($user_id, ['status' => 0, # 设置离线状态]);$data = ['code' => 2,'user_id' => $user_id,'user_nice' => $user_nice,'content' => '【'.$user_nice.'】骑着小扫帚灰溜溜的走了~~!'];# 广播消息$this->broadcast($ws, $this->json($data));});$this->_ws->start();}/*** 广播消息* @todo 无* @author 小黄牛* @version v1.0.0.1 + 2018.11.12* @deprecated 暂不弃用* @global 无* @param object $wx 实例* @param string $content 广播内容* @param string $id 用户的userid* @param bool $status 是否做心跳限制 * @return void*/private function broadcast($ws, $content, $id=null, $status=false) {# 向所有人广播foreach ($this->_ws->user as $k=>$v) {# 不向自己广播,并且要在线的# 注意,这里一定要有上线状态的限制,否则假设用户已经退出,但你的进程还开着,实际上已经关闭,这时候push就会报错# 只有正常在线的用户才能接收到广播# 加入心跳检测限制if ($k != $id && $v['status'] == 1 && $status == true) {$ws->push($v['fd'], $content);} else if ($v['user_id'] != $id && $v['status'] == 1 && $v['heartbeat'] == 0) {$ws->push($v['fd'], $content);}}}/*** 数组转json* @todo 无* @author 小黄牛* @version v1.0.0.1 + 2018.11.08* @deprecated 暂不弃用* @global 无* @param array $array 数组* @return json*/private function json($array) {return json_encode($array, JSON_UNESCAPED_UNICODE);}/*** 服务端定时强制心跳检测* @todo 无* @author 小黄牛* @version v1.0.0.1 + 2018.11.08* @deprecated 暂不弃用* @global 无* @return void*/private function timer() {# 注意强制心跳触发器不能放在open事件里,因为那时候用户还没有提交登录请求,是还没有userID的# 还有,强制心跳定时器只能触发一次,否则会出现生成多个定时器的情况if ($this->_status) {$this->_status = false;/*** ⑤服务端强制心跳检测* 每隔1分钟发送1次,如果连续3次强制心跳检测未通过,服务端将强制断开连接*/$obj = $this;swoole_timer_tick(60000, function ($timer_id) use (&$obj) {# 广播消息$obj->broadcast($obj->_ws, $obj->json(['code' => 5]), null, true);# 所有人的心跳次数+1foreach ($this->_ws->user as $k=>$v) {if (empty($v['heartbeat'])) {# 重置心跳次数$this->_ws->user->set($v['user_id'], ['heartbeat' => 0,]);}# 心跳次数累加$this->_ws->user->set($v['user_id'], ['heartbeat' => $v['heartbeat']+1]);# 心跳次数大于等于_max && 在线的 的连接关闭if ($v['heartbeat'] >= $obj->_max && $v['status'] == 1) {$data = $v;# 发送强制掉线广播$data['code'] = 6;$data['content'] = '【'.$data['user_nice'].'】已被服务端强制下线!';$obj->broadcast($obj->_ws, $obj->json($data), null, true);# 这里不需要unset连接,因为在close事件中,已经将这个连接设置为离线了# 主动关闭连接k$obj->_ws->close($v['fd']);}}});}}
}
$socketServer = new Server();
$socketServer->run();
最后推荐大家可以用下我开源的一个基于Swoole4.5+研发的PHP框架。该框架基于注解实现了很多好玩的功能,很适合新人快速上手Swoole扩展。
SW-X框架-专注高性能便捷开发而生的PHP-SwooleX框架www.sw-x.cn