Hyperf 如何做到用两个端口 9501/9502 都能连接 Websocket 服务以及多 Worker 协作实现聊天室功能

为何 Hyperf 能够在两个端口上监听 WebSocket 连接?

源码角度来看,在配置了多个 Servers 时,实际上,只启动了一个 Server

注:我之前接触的代码都是启动一个服务绑定一个端口,之前也看过 swoole 扩展的文档,但是没留意服务和监听端口也是分离的,这启发了我一种思维,代码凡是能继续拆分的,就继续拆分,这样代码就会有更多的灵活,每个功能都能进行扩展,将服务和端口进行拆分之后,就可以在一个 Server 绑定多个 Port,每个 Port 又能有独立的事件。

/*** @param Port[] $servers* @return Port[]*/
protected function sortServers(array $servers): array
{$sortServers = [];foreach ($servers as $server) {switch ($server->getType()) {case ServerInterface::SERVER_HTTP:$this->enableHttpServer = true;if (! $this->enableWebsocketServer) {array_unshift($sortServers, $server);} else {$sortServers[] = $server;}break;case ServerInterface::SERVER_WEBSOCKET:$this->enableWebsocketServer = true;array_unshift($sortServers, $server);break;default:$sortServers[] = $server;break;}}return $sortServers;
}

从源码看,排在第一个的服务配置,会被创建服务,之后都是增加监听

protected function initServers(ServerConfig $config)
{$servers = $this->sortServers($config->getServers());foreach ($servers as $server) {$name = $server->getName();$type = $server->getType();$host = $server->getHost();$port = $server->getPort();$sockType = $server->getSockType();$callbacks = $server->getCallbacks();if (! $this->server instanceof SwooleServer) {$this->server = $this->makeServer($type, $host, $port, $config->getMode(), $sockType);$callbacks = array_replace($this->defaultCallbacks(), $config->getCallbacks(), $callbacks);$this->registerSwooleEvents($this->server, $callbacks, $name);$this->server->set(array_replace($config->getSettings(), $server->getSettings()));ServerManager::add($name, [$type, current($this->server->ports)]);if (class_exists(BeforeMainServerStart::class)) {// Trigger BeforeMainServerStart event, this event only trigger once before main server start.$this->eventDispatcher->dispatch(new BeforeMainServerStart($this->server, $config->toArray()));}} else {/** @var bool|\Swoole\Server\Port $slaveServer */$slaveServer = $this->server->addlistener($host, $port, $sockType);if (! $slaveServer) {throw new \RuntimeException("Failed to listen server port [{$host}:{$port}]");}$server->getSettings() && $slaveServer->set(array_replace($config->getSettings(), $server->getSettings()));$this->registerSwooleEvents($slaveServer, $callbacks, $name);ServerManager::add($name, [$type, $slaveServer]);}// Trigger beforeStart event.if (isset($callbacks[Event::ON_BEFORE_START])) {[$class, $method] = $callbacks[Event::ON_BEFORE_START];if ($this->container->has($class)) {$this->container->get($class)->{$method}();}}if (class_exists(BeforeServerStart::class)) {// Trigger BeforeServerStart event.$this->eventDispatcher->dispatch(new BeforeServerStart($name));}}
}

从makeServer函数来看,如果服务中有SERVER_WEBSOCKET,则这个会被作为主服务启动,new SwooleWebSocketServer

protected function makeServer(int $type, string $host, int $port, int $mode, int $sockType): SwooleServer
{switch ($type) {case ServerInterface::SERVER_HTTP:return new SwooleHttpServer($host, $port, $mode, $sockType);case ServerInterface::SERVER_WEBSOCKET:return new SwooleWebSocketServer($host, $port, $mode, $sockType);case ServerInterface::SERVER_BASE:return new SwooleServer($host, $port, $mode, $sockType);}throw new RuntimeException('Server type is invalid.');
}

$this->registerSwooleEvents($this->server, $callbacks, $name); 这句代码会将 Websocket 的各种事件都注册进去,于是主服务器拥有 websocket 的各种事件,而后 http 服务器挂载 9501 端口上,绑定了 onrequest 事件,但是如果有 websocket 连接9501 端口上时,默认该服务器是自动开启 websocket 自动升级的,又因为监听 9501 端口绑定的主服务器是 WebSocketServer,因此,WebSocketServer 默认的 onmessage,onopen事件就会被拿来用。

推测,如果开启 9503 WebSocket服务器,那么理论上用 WebSocket连接 9501 端口,应该就是连接的 9503 的回调事件。如果不想让 http 监听端口自动开启 websocket 协议,则将open_websocket_protocol=false

<?php
return [
// 这里省略了该文件的其它配置
'servers' => [['name' => 'http','type' => Server::SERVER_HTTP,'host' => '0.0.0.0','port' => 9501,'sock_type' => SWOOLE_SOCK_TCP,'callbacks' => [Event::ON_REQUEST => [Hyperf\HttpServer\Server::class,'onRequest'],],'settings' => ['open_websocket_protocol' => false,]],]];

关于很多SWOOLE中出现的常量来看,这些东西在执行脚本时,会被自动设置好,通过实际代码运行发现

define('SWOOLE_HTTP2_ERROR_COMPRESSION_ERROR', 9);
define('SWOOLE_HTTP2_ERROR_CONNECT_ERROR', 10);
define('SWOOLE_HTTP2_ERROR_ENHANCE_YOUR_CALM', 11);
define('SWOOLE_HTTP2_ERROR_INADEQUATE_SECURITY', 12);
define('SWOOLE_BASE', 1);
define('SWOOLE_PROCESS', 2);
define('SWOOLE_IPC_UNSOCK', 1);
define('SWOOLE_IPC_MSGQUEUE', 2);
define('SWOOLE_IPC_PREEMPTIVE', 3);

以下,随便设置的一个test.php中输出SWOOLE_BASE,都能输出1,我之前都以为这些常量是运行时设置的呢,看来这种理解是错误的。

<?php
echo SWOOLE_BASE."\n";
echo "hello\n";// 输出
1
hello

Hyperf-skeleton 给的默认配置就是进程模式,这个竟然没有发现,这样就比较明确了,使用的都是PROCESS模式,那么在websocket连接时,所有的连接都是由Manager来控制

SWOOLE_PRECESS 和 SWOOLE_BASE 两种模式

Server 的两种运行模式介绍

在 Swoole\Server 构造函数的第三个参数,可以填 2 个常量值 -- SWOOLE_BASE或 SWOOLE_PROCESS,下面将分别介绍这两个模式的区别以及优缺点

SWOOLE_PROCESS

SWOOLE_PROCESS 模式的 Server 所有客户端的 TCP 连接都是和主进程建立的,内部实现比较复杂,用了大量的进程间通信、进程管理机制。适合业务逻辑非常复杂的场景。Swoole 提供了完善的进程管理、内存保护机制。 在业务逻辑非常复杂的情况下,也可以长期稳定运行。

Swoole 在 Reactor线程中提供了 Buffer 的功能,可以应对大量慢速连接和逐字节的恶意客户端。

进程模式的优点:

  • 连接与数据请求发送是分离的,不会因为某些连接数据量大某些连接数据量小导致 Worker 进程不均衡

  • Worker 进程发生致命错误时,连接并不会被切断

  • 可实现单连接并发,仅保持少量 TCP 连接,请求可以并发地在多个 Worker 进程中处理

进程模式的缺点:

  • 存在 2 次 IPC 的开销,master 进程与 worker 进程需要使用 unixSocket进行通信

  • SWOOLE_PROCESS 不支持 PHP ZTS,在这种情况下只能使用 SWOOLE_BASE 或者设置 single_thread为 true

SWOOLE_BASE

SWOOLE_BASE 这种模式就是传统的异步非阻塞 Server。与 Nginx 和 Node.js 等程序是完全一致的。

worker_num参数对于 BASE 模式仍然有效,会启动多个 Worker 进程。

当有 TCP 连接请求进来的时候,所有的 Worker 进程去争抢这一个连接,并最终会有一个 worker 进程成功直接和客户端建立 TCP 连接,之后这个连接的所有数据收发直接和这个 worker 通讯,不经过主进程的 Reactor 线程转发。

  • BASE 模式下没有 Master 进程的角色,只有 Manager进程的角色。

  • 每个 Worker 进程同时承担了 SWOOLE_PROCESS模式下 Reactor线程和 Worker 进程两部分职责。

  • BASE 模式下 Manager 进程是可选的,当设置了 worker_num=1,并且没有使用 Task 和 MaxRequest 特性时,底层将直接创建一个单独的 Worker 进程,不创建 Manager 进程

BASE 模式的优点:

  • BASE 模式没有 IPC 开销,性能更好

  • BASE 模式代码更简单,不容易出错

BASE 模式的缺点:

  • TCP 连接是在 Worker 进程中维持的,所以当某个 Worker 进程挂掉时,此 Worker 内的所有连接都将被关闭

  • 少量 TCP 长连接无法利用到所有 Worker 进程

  • TCP 连接与 Worker 是绑定的,长连接应用中某些连接的数据量大,这些连接所在的 Worker 进程负载会非常高。但某些连接数据量小,所以在 Worker 进程的负载会非常低,不同的 Worker 进程无法实现均衡。

  • 如果回调函数中有阻塞操作会导致 Server 退化为同步模式,此时容易导致 TCP 的 backlog队列塞满问题。

BASE 模式的适用场景:

如果客户端连接之间不需要交互,可以使用 BASE 模式。如 Memcache、HTTP 服务器等。

BASE 模式的限制:

在 BASE 模式下,Server 方法除了 send和 close以外,其他的方法都不支持跨进程执行。

Reactor 线程和 Worker 进程

Reactor 线程

  • Reactor 线程是在 Master 进程中创建的线程

  • 负责维护客户端 TCP 连接、处理网络 IO、处理协议、收发数据

  • 不执行任何 PHP 代码

  • 将 TCP 客户端发来的数据缓冲、拼接、拆分成完整的一个请求数据包

Worker 进程

  • 接受由 Reactor 线程投递的请求数据包,并执行 PHP 回调函数处理数据

  • 生成响应数据并发给 Reactor 线程,由 Reactor 线程发送给 TCP 客户端

  • 可以是异步非阻塞模式,也可以是同步阻塞模式

  • Worker 以多进程的方式运行

他们之间的关系可以理解为 Reactor 就是 nginx,Worker 就是 PHP-FPM。Reactor 线程异步并行地处理网络请求,然后再转发给 Worker 进程中去处理。Reactor 和 Worker 间通过 unixSocket进行通信。

在 PHP-FPM 的应用中,经常会将一个任务异步投递到 Redis 等队列中,并在后台启动一些 PHP 进程异步地处理这些任务。Swoole 提供的 TaskWorker 是一套更完整的方案,将任务的投递、队列、PHP 任务处理进程管理合为一体。通过底层提供的 API 可以非常简单地实现异步任务的处理。另外 TaskWorker 还可以在任务执行完成后,再返回一个结果反馈到 Worker。

Swoole 的 Reactor、Worker、TaskWorker 之间可以紧密的结合起来,提供更高级的使用方式。

一个更通俗的比喻,假设 Server 就是一个工厂,那 Reactor 就是销售,接受客户订单。而 Worker 就是工人,当销售接到订单后,Worker 去工作生产出客户要的东西。而 TaskWorker 可以理解为行政人员,可以帮助 Worker 干些杂事,让 Worker 专心工作。

结论

  1. SWOOLE_PRECESS 模式下,Websocket 的连接对象都是由 Server 来控制,创建一个 Reactor 后,将该连接交付给 Reactor 来管理,Reactor 会将请求分配给 Worker 处理,Worker 处理完后,再把消息发给Server,Server 将消息压入队列等具体的 Reactor 发送出去

  2. hyperf 中有一处进程间通信,目前还不清楚,这里为何要向其他进程求助,以及其他进程监听到消息后会不会多发,等问了 hyperf 开发者们再补充

  3. 也就是说分布式 websocket 必须借助进程间通信这种方式来实现,否则分布式服务器无法给聊天室内的所有人发消息

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/47777.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot 配置优先级

一般而言&#xff0c;SpringBoot支持配置文件进行配置&#xff0c;即在resources下的application.properties或application.yml。 关于配置优先级而言&#xff0c; application.properties>application.yml>application.yaml 另外JAVA程序程序还支持java系统配置和命令行…

PHP实现轻量级WEB服务器接收HTTP提交的RFID刷卡信息并回应驱动读卡器显示播报语音

本示例使用的读卡器&#xff1a;RFID网络WIFI无线TCP/UDP/HTTP可编程二次开发读卡器POE供电语音-淘宝网 (taobao.com) <?php mb_http_output(utf-8); $port88; $socket socket_create(AF_INET, SOCK_STREAM, SOL_TCP); $bool socket_bind($socket, "0.0.0.0",…

html表格中加入斜线,使用css给table表格表头单元格添加斜线

背景&#xff1a;业务给了90张word电子表格&#xff0c;需要用html设计出来。 如图所示&#xff0c;红色区域的下斜线如何实现&#xff1f; 先说结论&#xff1a;html中table没有直接的斜线表头标签&#xff0c;但结合css、svg之类的可以实现。 #lineTd{ background:#FFFFFF u…

卷积神经网络——上篇【深度学习】【PyTorch】【d2l】

文章目录 5、卷积神经网络5.1、卷积5.1.1、理论部分5.1.2、代码实现5.1.3、边缘检测 5.2、填充和步幅5.2.1、理论部分5.2.2、代码实现 5.3、多输入多输出通道5.3.1、理论部分5.3.2、代码实现 5.4、池化层 | 汇聚层5.4.1、理论部分5.4.2、代码实现 5、卷积神经网络 5.1、卷积 …

Elasticsearch学习

1、什么是Elasticsearch? Elaticsearch&#xff0c;简称为 ES&#xff0c; ES 是一个开源的高扩展的分布式全文搜索引擎&#xff0c; 是整个 ElasticStack 技术栈的核心。可以帮助我们从海量数据中快速找到需要的内容。 elasticsearch结合kibana、Logstash、Beats&#xff0…

Hope.money:新兴DeFi项目如何重新定义稳定币生态的未来?

联储加息导致金融市场紧缩&#xff0c;Terra、3AC、FTX等知名中心化机构未能妥善应对而暴雷&#xff0c;并重创了整个加密货币市场&#xff0c;导致参与者损失惨重。这些事件揭示了中心化机构的局限&#xff0c;投资者对其资产掌控权的担忧愈发强烈。 自2018年首个DeFi协议Com…

简单介绍 CPU 的工作原理

内部架构 CPU 的根本任务就是执行指令&#xff0c;对计算机来说最终都是一串由 0 和 1 组成的序列。CPU 从逻辑上可以划分成 3 个模块&#xff0c;分别是控制单元、运算单元和存储单元 。其内部架构如下&#xff1a; 【1】控制单元 控制单元是整个CPU的指挥控制中心&#xff…

2023年Java核心技术面试第二篇(篇篇万字精讲)

目录 四. 强引用&#xff0c;软引用&#xff0c;弱引用&#xff0c;幻象引用之间的区别&#xff1f; 1.前言 2.强引用 2.1 强引用赋值为null 2.2 超过了引用的作用域 2.2.1 描述&#xff1a; 2.2.2 作用域内&#xff1a; 2.2.3 不在作用域内: 3. 软引用&#xff08;SoftRefere…

华为云classroom赋能--Toolkit系列插件DevSecOps助力开发者提速

一、前言 DevOps的概念想必大家都不陌生&#xff0c;它是一组过程、方法与系统的统称&#xff0c;通过它可以对交付速率、协作效率、部署频率速率、质量、安全和可靠性等进行提升改善。相比传统的软件开发模式&#xff0c;它是一种工作方式和文化的转变&#xff0c;把开发者和…

C++笔记之注册的含义

C笔记之注册的含义 code review! 文章目录 C笔记之注册的含义1.注册对象到Qt的信号槽系统中2.注册函数到Qt的元对象系统中元对象系统例1例2 3.注册自定义类型到C STL容器中4.将函数指针传递给另一个类&#xff0c;注册回调函数class ICallback存在的意义例1&#xff0c;用于说…

Python Opencv实践 - 图像中值滤波

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/pomeranian.png", cv.IMREAD_COLOR) print(img.shape) pixel_count img.shape[0] * img.shape[1] print(pixel_count)#为图像添加椒盐噪声 #参考资料&#xf…

Java中异常的详细讲解与细节讨论

用一个代码引出异常为什么要使用异常 代码&#xff1a; public static void main(Sting args[]){int n1 1;int n2 0;System.out.println(n1/n2);//这里会抛ArihmaticException,因为除数不能为0,若未用异常处理机制则程序直接结束&#xff0c;后面的代码将不执行。这样很不好…

windows ipv4 多ip地址设置,默认网关跃点和自动跃点是什么意思?(跃点数)

文章目录 Windows中的IPv4多IP地址设置以及默认网关跃点和自动跃点的含义引言IPv4和IPv6&#xff1a;简介多IP地址设置&#xff1a;Windows环境中的实现默认网关跃点&#xff1a;概念和作用自动跃点&#xff1a;何时使用&#xff1f;关于“跃点数”如何确定应该设置多少跃点数&…

我的创作纪念日(C++修仙练气期总结)

分享自己最喜欢的一首歌&#xff1a;空想フォレスト—伊東歌詞太郎 机缘 现在想想自己在CSDN创作的原因&#xff0c;一开始其实就是想着拿着博客当做自己的学习笔记&#xff0c;笔记嘛&#xff0c;随便写写&#xff0c;自己看得懂就ok了的态度凸(艹皿艹 )。也是用来作为自己学习…

vue3+element下拉多选框组件

<!-- 下拉多选 --> <template><div class"select-checked"><el-select v-model"selected" :class"{ all: optionsAll, hidden: selectedOptions.data.length < 2 }" multipleplaceholder"请选择" :popper-app…

vite 项目搭建

1. 创建 vite 项目 npm create vite@latest 2. 安装sass/less ( 一般我使用sass ) cnpm add -D sasscnpm add -D less 3. 自动导入 两个插件 使用之后,不用导入vue中hook reactive ref cnpm install -D unplugin-vue-components unplugin-auto-import 在 vite.config.…

STM32设置为I2C从机模式(HAL库版本)

STM32设置为I2C从机模式&#xff08;HAL库版本&#xff09; 目录 STM32设置为I2C从机模式&#xff08;HAL库版本&#xff09;前言1 硬件连接2 软件编程2.1 步骤分解2.2 测试用例 3 运行测试3.1 I2C连续写入3.2 I2C连续读取3.3 I2C单次读写测试 4 总结 前言 我之前出过一篇关于…

Claude 2 国内镜像站

Claudeai是什么&#xff1f; Claude 2被称为ChatGPT最强劲的竞争对手&#xff0c;支持100K上下文对话&#xff0c;并且可以同时和5个文档进行对话&#xff0c;不过国内目前无法正常实用的&#xff0c;而claudeai是一个Claude 2 国内镜像站&#xff0c;并且免翻可用&#xff0…

实验三 HBase1.2.6安装及配置

系列文章目录 文章目录 系列文章目录前言一、HBase1.2.6的安装二、HBase1.2.6的配置2.1 单机模式配置2.2 伪分布式模式配置 总结参考 前言 在安装HBase1.2.6之前&#xff0c;需要安装好hadoop2.7.6。 本篇文章参考&#xff1a;HBase2.2.2安装和编程实践指南 一、HBase1.2.6的安…

Android---- 一个完整的小项目(消防app)

前言&#xff1a; 针对不同群体的需求&#xff0c;想着应该拓展写方向。医疗app很受大家喜欢&#xff0c;就打算顺手写个消防app&#xff0c;里面基础框架还是挺简洁 规整的。登陆注册和本地数据库写的便于大家理解。是广大学子的毕设首选啊&#xff01; 此app主要为了传递 消防…