Laravel+swoole 实现websocket长链接

需要使用 swoole 扩展

我使用的是 swoole 5.x

start 方法启动服务 和 定时器

调整 listenQueue 定时器可以降低消息通讯延迟

定时器会自动推送队列里面的消息

testMessage 方法测试给指定用户推送消息

使用 laravel console 启动

<?phpnamespace App\Console\Commands;use App\Services\SocketService;
use Illuminate\Console\Command;class WsServer extends Command
{/*** The name and signature of the console command.** @var string*/protected $signature = 'app:wsServer';/*** The console command description.** @var string*/protected $description = 'Command description';/*** Execute the console command.*/public function handle(){$SocketService = new SocketService();$SocketService->start();}
}

socket 服务实现代码

<?phpnamespace App\Services;use Swoole\WebSocket\Server;
use Swoole\Timer;
use Illuminate\Support\Facades\Redis;
use RedisException;
use Swoole\Http\Request;class SocketService
{public $port = 9501;public $server;public $links;public $cmds = [];public function __construct (){$this->links = collect([]);$this->server = new Server("0.0.0.0", env('APP_SOCKET_PORT', $this->port ));$this->server->on( 'open', function (Server $server, Request $request){$this->open( $server, $request );} );$this->server->on( 'message', function (Server $server, $frame){$this->message( $server, $frame );} );$this->server->on( 'close', function (Server $server, $fd){$this->close( $server, $fd );} );}public function start(){$this->linkManage();$this->listenQueue();$this->server->start();}public function print( $message, $level = 'info' ){if( is_array($message) || is_object($message) ){$message = json_encode($message, 320);}print_r( "[". date("Y-m-d H:i:s") ."] " . $level . ' ' . $message . "\n" );}public function linkManage(){Timer::tick( 100, function (){//var_dump( "listenQueue while: " . json_encode($this->cmds, 320) );$cmd = array_shift( $this->cmds );if( $cmd ){switch ( $cmd['operate'] ){case 'open':// 活跃$this->links->push( [ "fd" => $cmd['fd'], "user_id" => intval($cmd['user_id']??0), 'updated_at' => date("Y-m-d H:i:s") ] );$this->print( "添加客户端:fd = " . json_encode($cmd, 320)  );break;case 'close':$newLinks = [];foreach ( $this->links as $link ){if( $link['fd'] == $cmd['fd'] ){continue;}$newLinks[] = $link;}$this->links = collect( $newLinks );$this->print( "删除客户端:fd = " . json_encode($cmd, 320) );break;case 'heartbeat':$newLinks = [];foreach ( $this->links as $link ){if( $link['fd'] == $cmd['fd'] ){$link['updated_at'] = date("Y-m-d H:i:s");}$newLinks[] = $link;}$this->links = collect( $newLinks );break;}// $this->print( "连接数量是:" . $this->links->count() );// $this->print( "连接数量是:" . $this->links->toJson() );}$newLinks = [];foreach ( $this->links as $link ){if( strtotime( $link['updated_at'] ) < (time() - 60) ){$this->print( "长时间未心跳,删除客户端:fd = " . json_encode($link, 320) );if( $this->server->isEstablished( $link['fd'] ) ){$this->disconnect( $link['fd'], '未进行心跳' );}continue;}$newLinks[] = $link;}$this->links = collect( $newLinks );} );}public function listenQueue(){Timer::tick( 1000, function (){// Redis::rpush( "SocketService:listenQueue", serialize(["hahah"]) )try{$element = Redis::lpop('SocketService:listenQueue');if( $element ){$this->print( "listenQueue 有新的信息哦:" . $element );$data = unserialize($element);if( ! empty( $data['user_id']) ){$links = $this->links->where( "user_id", $data['user_id'] )->values()->all();if( empty($links) ){$this->print( "没有在线用户:user_id = " . json_encode($data, 320) );//var_export( $this->links );//var_export( $links );}foreach ( $links as $link ){if( ! $this->server->isEstablished( $link['fd'] ) ){array_push( $this->cmds, [ 'operate' => 'close', 'fd' => $link['fd'] ] );continue;}try{// 生成消息数据$message = $this->makeMessage( $data['data'], $data['type'], $data['message'] );// 开始推送$this->runPush( $link['fd'], $message );}catch (\Throwable $e){$this->print( "数据推送异常:" . json_encode([ $e->getMessage(),$e->getLine(), $e->getFile() ], 320) );}}}}}catch (RedisException $e){Redis::connect();}});}public function open( Server $server, Request $request ){$params = $request->get;if( empty( $params['user_id'] ) ){$this->disconnect( $request->fd, '缺少用户信息' );return true;}array_push( $this->cmds, [ 'operate' => 'open', 'fd' => $request->fd, 'user_id' => $params['user_id'] ] );// 生成消息数据$message = $this->makeMessage( [ 'fd' => $request->fd ], "connectionSuccessful", "连接成功" );// 开始推送$this->runPush( $request->fd, $message );$this->print( "server: handshake success with fd{$request->fd} " );}public function message( Server $server, $frame ){//$data = json_decode( $frame->data, true );if( is_array( $data ) ){if( $data['type'] == "ping" ){array_push( $this->cmds, [ 'operate' => 'heartbeat', 'fd' => $frame->fd ] );$this->server->push( $frame->fd, json_encode( [ "type" => "pong" ] , 320 ) );}else{$this->print( "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish} " );}}}public function close(Server $server, $fd){array_push( $this->cmds, [ 'operate' => 'close', 'fd' => $fd ] );$this->print( "client {$fd} closed " );}public function push( $fd, string $data ){$this->server->push($fd, $data);}public function disconnect(int $fd, string $reason = '', int $code = SWOOLE_WEBSOCKET_CLOSE_NORMAL){$this->server->disconnect($fd, $code, $reason);}public function makeMessage( array $data, $type = "", $message = "" ){return [ 'type' => $type, "message" => $message, "data" => $data ];}public function runPush( $fd, $message ){$this->print( "推送消息: {$fd} - " .  json_encode(  $message, 320 ) );$this->server->push( $fd, json_encode( $message , 320 ) );}/*** App\Services\SocketService::testMessage( 92 )* @param $user_id* @return void*/public static function testMessage( $user_id ){Redis::rpush( "SocketService:listenQueue", serialize(["user_id" => $user_id,"type" =>  "testMessage", "message" => "测试消息", "data" => ["hello world!"],]) );}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/47360.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华纳云:查看服务器磁盘I/O性能的工具和方法

要查看服务器硬盘的I/O性能&#xff0c;可以使用多种工具和方法。以下是一些常用的命令和工具&#xff1a; 1. 使用 dd 命令 dd 是一个强大的工具&#xff0c;可以用来测试硬盘的读写速度。 测试写性能 dd if/dev/zero of/path/to/testfile bs1M count1024 convfdatasync 测试读…

k8s核心操作_存储抽象_K8S中使用ConfigMap抽取配置_实现配置热更新---分布式云原生部署架构搭建032

现在有个问题,是上面我们利用pv和pvc 就是持久卷 以及 持久卷申请,实现了对存储的,pod删除以后,对其使用的存储空间也进行了删除,那么还有个问题,对于redis这种我们希望,他的配置也管理起来. 比如这个redis的配置文件. 以后其他的配置文件也是这样. 使用配置文件的存储在k8s中…

Spring Boot 中使用 Resilience4j 实现弹性微服务的简单了解

1. 引言 在微服务架构中&#xff0c;服务的弹性是非常重要的。Resilience4j 是一个轻量级的容错库&#xff0c;专为函数式编程设计&#xff0c;提供了断路器、重试、舱壁、限流器和限时器等功能。 这里不做过多演示&#xff0c;只是查看一下官方案例并换成maven构建相关展示&…

【Go系列】Go语言的测试

承上启下 在Go语言中&#xff0c;我们写了代码之后经常就要进行测试。我们可以直接在go函数中调用具体的函数&#xff0c;从而实现测试的目的。但是一旦系统复杂的情况下&#xff0c;我们频繁修改main调用函数就显得不太正常了。那么是不是存在一种方法&#xff0c;让我们可以虚…

代码随想录学习 54day 图论 Bellman_ford 队列优化算法(又名SPFA) 学习

Bellman_ford 队列优化算法&#xff08;又名SPFA&#xff09; 卡码网&#xff1a;94. 城市间货物运输 I 题目描述 某国为促进城市间经济交流&#xff0c;决定对货物运输提供补贴。共有 n 个编号为 1 到 n 的城市&#xff0c;通过道路网络连接&#xff0c;网络中的道路仅允许从…

Hadoop3:RPC通信原理及简单案例实现

一、场景介绍 我们知道&#xff0c;Hadoop中存在多种服务&#xff0c;那么&#xff0c;服务之间是如何通信的了&#xff1f; 比如&#xff0c;DN和NN之间如何通信&#xff1f; 这里&#xff0c;实际上是通过RPC实现进程间通信的了。 RPC属于Java网络编程范畴 需要编写客户端和…

自用自用自用,持续更新,记录部分CPU,显卡,部分跑分软件,游戏帧数参考,自用

自用自用自用&#xff0c;持续更新&#xff0c;记录部分CPU&#xff0c;显卡&#xff0c;部分跑分软件&#xff0c;游戏帧数参考&#xff0c;自用 CPU跑分显卡游戏 CPU跑分 CPUZ单核CPUZ多核R23单核R23多核5800h576.85860.014311270212100f644.43258.31576804313500h763.36658…

AAD Connect自定义同步用户上云

使用场景&#xff1a;我想同步本地AD域的那些用户信息、账号上云端做SSO登录和权限管控&#xff0c;但是不希望使用快速上传一股脑传上去&#xff0c;所以使用自定义同步功能上传&#xff0c;这是一篇对AAD CONNECT这个应用的详解和配置步骤推荐 AD Connect如何自定义配置&…

easyswoole/Hyperf开发的php系统 cpu超负荷定位排查

EasySwoole EasySwoole是一个高性能的PHP协程框架&#xff0c;它利用了协程的特性来提高PHP应用的性能。当使用EasySwoole开发的PHP系统遇到CPU超负荷的问题时&#xff0c;可以从以下几个方面进行全方位排查和优化&#xff1a; 1. 监控系统资源 使用top、htop、vmstat、iost…

隐性行为克隆——机器人的复杂行为模仿学习的新表述

介绍 论文地址&#xff1a;https://arxiv.org/pdf/2109.00137.pdf 源码地址&#xff1a;https://github.com/opendilab/DI-engine.git 近年来&#xff0c;人们对机器人学习进行了大量研究&#xff0c;并取得了许多成果。其中&#xff0c;模仿学习法尤其受到关注。这是一种从人…

iOS ------ 消息传递和消息转发

一&#xff0c;消息传递 在OC中&#xff0c;传递消息就是在对象上调用方法。 相对于C语言的方法就“静态绑定”的函数&#xff0c;在编译器就决定了运行时所要调用的函数。在OC中&#xff0c;如果向某对象传递消息&#xff0c;就会使用动态绑定机制来决定需要调用那个方法。调…

全球风味:红酒中的地域风情与特色

在红酒的世界里&#xff0c;每一滴琼浆玉液都承载着地域的风情与特色。它们不仅仅是葡萄酒&#xff0c;更是大自然的恩赐&#xff0c;是时间的馈赠&#xff0c;是人类智慧的结晶。今天&#xff0c;就让我们一起走进红酒的世界&#xff0c;感受那些来自不同地域的风情与魅力。 …

前端面试题日常练-day91 【Less】

题目 希望这些选择题能够帮助您进行前端面试的准备&#xff0c;答案在文末 在Less中&#xff0c;以下哪种语法适用于创建混合器&#xff08;Mixin&#xff09;&#xff1f; a) mixin b) #mixin c) .mixin d) extend Less中的子元素选择器是用什么符号表示的&#xff1f; a) &…

【Vue】RouterLink的replace属性

1、作用&#xff1a;控制路由跳转时操作浏览器历史记录的模式&#xff1b; 2、浏览器的历史记录有两种写入方式&#xff1a;分别为push和replace&#xff0c;push是追加历史记录&#xff0c;replace是替换当前记录。路由跳转时候默认为push&#xff1b; 3、如何开启replace模式…

ROS2入门到精通—— 2-6 ROS2实战:可调节纯跟踪算法(局部规划)

1 Regulated Pure Pursuit 纯追踪算法变体&#xff1a;调节纯追踪算法 将自适应纯追踪&#xff08;Adaptive Pure Pursuit&#xff09;算法的特性与围绕线性速度的规则相结合&#xff0c;重点关注消费类、工业和服务型机器人的需求。我们还实现了几种常识性的安全机制&#xf…

业务终端动态分配IP-DHCP技术、DHCP中继技术

一、为什么需要DHCP? 1、许多设备(主机、无线WiFi终端等)需要动态地址的分配; 2、人工手工配置任务繁琐、容易出错,比如:IP地址冲突; 3、网络规模扩大、复杂度提高,网络配置越来越复杂,计算机的位置变化和数量超过可分配IP地址的数量,造成IP地址变法频繁以及IP地址…

Monaco 使用 DocumentHighlightProvider

Monaco 中有一个文字高亮的功能&#xff0c;就是选中一个单词&#xff0c;会高亮文字文档中所有出现该单词的位置&#xff0c;效果如下&#xff1a; Monaco 默认就有这个功能&#xff0c;可以根据具体需求进行定制。通过 registerDocumentHighlightProvider 进行注册 实现 pro…

java包装类 及其缓存

Java 包装类&#xff08;Wrapper Class&#xff09;是将基本数据类型转换为对象的方式&#xff0c;每个基本数据类型在 java.lang 包中都有一个相应的包装类&#xff1a; Boolean 对应基本类型 boolean Character 对应基本类型 char Integer 对应基本类型 int Float 对应基本…

Java时间练习(8) (2024.7.17)

Duration、Period、ChronoUnit类 package DurationPeriodChronoUnitExercise20240717; import java.time.*; import java.time.temporal.ChronoUnit;// ChronoUnit是用来得到时间间隔的类&#xff0c;涵盖了所有时间的单位&#xff0c;Duration和Period用法和其一致&#xff0…

【Java数据结构】初始线性表之一:链表

为什么要有链表 上一节我们描述了顺序表&#xff1a;【Java数据结构】初识线性表之一&#xff1a;顺序表-CSDN博客 并且进行了简单模拟实现。通过源码知道&#xff0c;ArrayList底层使用数组来存储元素。 由于其底层是一段连续空间&#xff0c;当在ArrayList任意位置插入或者…