在很早很早以前,WebSocket 协议还没有被发明的时候,人们在 Web 端制作类实时数据动态更新时,一般采用轮询、 长连接 (Long Polling) 来实现。大概就是:
轮询:客户端不停发送 HTTP 请求给服务端,服务端返回最新数据
长连接:客户端发送一条 HTTP 请求给服务端,服务端 HOLD 连接直到有新数据再返回
当时的应用有 WebQQ、FaceBook IM 等
但是这样的实现有一个非常大的缺陷,HTTP 请求是半双工 (Half Duplex) 的,只能由客户端发送请求到服务端返回。大量的请求可能会导致 CPU 资源占用、内存溢出等问题。于是 WebSocket 协议被发明了,与 HTTP 协议类似,地址为:ws:// (HTTP 页面) 或 wss:// (HTTPS 页面)。
WebSocket 是全双工 (Full Duplex) 的,也就是说服务端也可以发送数据到客户端了。那比如在聊天时,就可以省去客户端的请求,对方客户端有数据提交到服务端时直接由服务端发送至当前客户端。
比较知名的 WebSocket 框架有 http://Socket.io (node.js)、Workerman (PHP)、Swoole (PHP) 等 (我只尝试过前两个)
Pokers 的群聊功能就是轮询实现的,但是我的 1H1M1G 的小水管服务器是承受不住持续增长的用户量的,必须尝试用 WebSocket 来实现了
<?php//引入 composer
require '../vendor/autoload.php';
require_once '../vendor/workerman/workerman/Autoloader.php';
require_once '../vendor/workerman/channel/src/Server.php'; //Workerman 分组发送
require_once '../vendor/workerman/channel/src/Client.php'; //Workerman 分组发送
define('LAZER_DATA_PATH', dirname(dirname(__FILE__)) . '/data/'); //Pokers 使用的 json 数据库use Lazer\Classes\Database as Lazer;
use Workerman\Worker;
use Workerman\Lib\Timer;$channel_server = new Channel\Server('0.0.0.0', 2206); //分组服务器地址
$worker = new Worker('websocket://0.0.0.0:2000'); //WebSocket 地址
$worker->count = 2; //Workerman 进程数
// 全局群组到连接的映射数组
$group_con_map = array();$worker->onWorkerStart = function ($worker) {// Channel客户端连接到Channel服务端Channel\Client::connect('0.0.0.0', 2206);// 监听全局分组发送消息事件Channel\Client::on('send', function ($event_data) {$thread = $event_data['thread_id'];$con_id = $event_data['con_id'];$mes_id = $event_data['mes_id'];$speaker = $event_data['speaker'];$class = $event_data['class_id'];$array = Lazer::table('messages')->limit(1)->where('id', '=', (int) $mes_id)->andWhere('speaker', '=', (int) $speaker)->andWhere('belong_class', '=', (int) $class)->find()->asArray();if (!!$array[0]['speaker']) {global $group_con_map;if (isset($group_con_map[$thread])) {foreach ($group_con_map[$thread] as $con) {$con->send(json_encode($array[0])); //发送数据到群组每位成员}}} else {$array = ['op' => 'sent','status' => false,'code' => 108,'msg' => 'Illegal Request'];global $group_con_map;$group_con_map[$thread][$con_id]->send(json_encode($array));}});//心跳计时Timer::add(55, function () use ($worker) {foreach ($worker->connections as $connection) {$array = ['op' => 'keep'];$connection->send(json_encode($array));}});
};//发送消息
$worker->onMessage = function ($con, $data) {$data = json_decode($data, true);$cmd = $data['action'];$thread = $data['thread_id'];$class = $data['class_id'];$user = $data['speaker'];$user_name = $data['speaker_name'];@$mes_id = $data['mes_id'];if (!empty($user_name) && !empty($thread) && !empty($class) && !empty($user)) {$array = Lazer::table('classes')->limit(1)->where('id', '=', (int) $class)->find()->asArray();if (!!$array) {$array = Lazer::table('threads')->limit(1)->where('id', '=', (int) $thread)->andWhere('belong_class', '=', (int) $class)->find()->asArray();if (!!$array) {$array = Lazer::table('users')->limit(1)->where('id', '=', (int) $user)->andWhere('name', '=', (string) $user_name)->find()->asArray();if (!!$array && in_array((string) $class, explode(',', $array[0]['class']))) { //判断用户存在switch ($cmd) {case "join": //客户端加入群组global $group_con_map;// 将连接加入到对应的群组数组里$group_con_map[$thread][$con->id] = $con;$array = ['op' => 'join','thread' => $thread,'status' => true,'code' => 100];break;case "send": //客户端发送内容Channel\Client::publish('send', array('thread_id' => $thread,'class_id' => $class,'speaker' => $user,'speaker_name' => $user_name,'con_id' => $con->id,'mes_id' => $mes_id));$array = ['op' => 'send','status' => true,'code' => 105];break;default:$array = ['op' => 'send','status' => false,'code' => 101,'msg' => 'Illegal request'];break;}} else {$array = ['op' => 'send','status' => false,'code' => 107,'msg' => 'User does not exist or not in the class'];}} else {$array = ['op' => 'send','status' => false,'code' => 102,'msg' => 'Thread does not exist'];}} else {$array = ['op' => 'send','status' => false,'code' => 103,'msg' => 'Class does not exist'];}} else {$array = ['op' => 'send','status' => false,'code' => 104,'msg' => 'Illegal request'];}$con->send(json_encode($array));
};// 这里很重要,连接关闭时把连接从全局群组数据中删除,避免内存泄漏
$worker->onClose = function ($con) {global $group_con_map;if (isset($con->group_id)) {unset($group_con_map[$con->group_id][$con->id]);if (empty($group_con_map[$con->group_id])) {unset($group_con_map[$con->group_id]);}}
};$worker->onConnect = function ($con) {$array = ['op' => 'connect','status' => true];$con->send(json_encode($array));
};Worker::runAll();
前端js
//websocket 连接this.ws = new WebSocket('wss://pokers.zeo.im/wss');this.ws.onmessage = function (data) {var re = eval('(' + data.data + ')');switch (re.op) {case 'send':if (!re.status) {antd.$message.error('Service Unavailable');}break;case 'connect':console.log('Connected to Pokers Server');break;case 'join':if (!re.status) {antd.$message.error('Service Unavailable');}break;case 'keep':break;default://在内容段后添加一段antd.opened_mes_info.meses.push(re);if (parseInt(re.speaker) !== parseInt(antd.user.id)) {if ($(window).height() + $('#mes-container').scrollTop() >= $('#mes-inner').height()) {//当前窗口可视区域+滑动距离大于总可滑动高度,有更新直接到底部antd.bottom_mes();} else {antd.unread.visible = true;setTimeout(function () {antd.unread.visible = false;}, 1000);}}antd.update_mes();break;}};
JavaScript 连接代码
//广播全 thread 在线用户
this.ws.send('{"action":"send", "thread_id":' + antd.opened_mes_info.thread_id + ', "class_id":' + antd.opened_mes_info.class_id + ', "speaker":' + antd.user.id + ',"speaker_name":"' + antd.user.info.name + '","mes_id":' + res.data.code + '}');
JavaScript 发送代码
//加入当前 Thread
this.ws.send('{"action":"join", "thread_id":' + id + ', "class_id":' + belong_class + ', "speaker":' + antd.user.id + ',"speaker_name":"' + antd.user.info.name + '"}');