Flutter: Websocket的使用与封装

1.引入相关插件库

  # websocketweb_socket_channel: ^2.4.0# 引入rxdart 解决Bad state: Stream has already been listened to.的报错问题rxdart: ^0.27.7# 状态管理*provider: ^6.0.5

2.代码编写及封装

import 'dart:async';import 'package:rxdart/subjects.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;typedef WebsocketMessageCallback = void Function(dynamic message);/// 注册流控制器需要在哪些页面使用
///
/// 目前分三种类型:
///
/// 1.[customerLoginPage]游客模式下也就是在未登录时候(用户处于登录相关页面)
/// 2.[customerMainPage]用户已登录,处于主页及其他登录后的页面下
/// 3.[chatRoomPage]用户处在聊天室里(游客下的在线客服聊天室、用户已登录下的在线客服聊天室、买卖用户之间的聊天室)
enum StreamControllerNameEnum {customerLoginPage,customerMainPage,chatRoomPage;
}class WebsocketHelper {WebsocketHelper._();static WebsocketHelper? _singleton;factory WebsocketHelper() => _singleton ??= WebsocketHelper._();/// 用于连接websocket的链接uriUri? wsUri;/// websocket连接后的对象WebSocketChannel? _webSocketChannel;/// 指定的stream流控制器存放mapMap<String, BehaviorSubject<String>>? streamControllerList;/// 是否开启心跳bool isOpenHeartBeat = true;/// 用于控制心跳轮询StreamSubscription<String>? _subscription;/// 是否是用户主动触发关闭连接bool isDisconnectByUser = false;/// 另辟一个单独的消息回调函数WebsocketMessageCallback? messageCallback;/// 连接断开回调Function()? onDone;/// 连接出错回调Function? onError;/// step one - ex: ws://localhost:1234initSocket({required String wsPath, bool isOpenHeartBeat = true}) {if (_webSocketChannel != null) {print("socket实例已存在, 请勿重复创建");return;}// 自己项目中后端需要前端拼一个登录令牌用于控制后端逻辑处理,这里使用的是登录后的tokenvar authorization = "登录后的token";wsUri = Uri.tryParse("$wsPath?Authorization=$authorization");// wsUri = Uri.tryParse(wsPath);if (wsUri == null) return;this.isOpenHeartBeat = isOpenHeartBeat;_connectWebsocket();}/// [isRunForReConnect] 是否是由重连机制触发的此方法void _connectWebsocket({bool isRunForReConnect = false}) {_webSocketChannel = WebSocketChannel.connect(wsUri!);if (!isRunForReConnect) {isDisconnectByUser = false;}}/// step two - listenvoid listen({WebsocketMessageCallback? messageCallback,Function()? onDone,Function? onError}) {this.messageCallback = messageCallback;this.onDone = onDone;this.onError = onError;streamControllerList ??= <String, BehaviorSubject<String>>{// StreamControllerNameEnum.customerLoginPage.name: BehaviorSubject(),// StreamControllerNameEnum.customerMainPage.name: BehaviorSubject(),// StreamControllerNameEnum.chatRoomPage.name: BehaviorSubject()};// 监听一系列连接情况(如收到消息、onDone:连接关闭、onError:接连异常)_webSocketChannel?.stream.listen((message) {print("websocket onData message = ${message.toString()}, type = ${message.runtimeType}");if (message is String && message.isEmpty) {// 消息为空(可能得情况:心跳 or another)return;}// 通过流控制器把消息分发出去,在需要的页面监听此流的消息streamControllerList?.forEach((key, value) {// print("key = $key, value.isClosed = ${value.isClosed}");if (!value.isClosed) {value.sink.add(message);}});this.messageCallback?.call(message);}, onDone: () {print("websocket onDone ...");this.onDone?.call();// 掉线重连reConnect();}, onError: (Object error, StackTrace stackTrace) {print("websocket onError error = ${error.toString()}, stackTrace = ${stackTrace.toString()}");showToast(msg: "连接服务器失败!");this.onError?.call(error, stackTrace);}, cancelOnError: false);// 连接建立成功时的回调通知,可在此做心跳操作_webSocketChannel?.ready.then((value) {print("webSocket ready");isDisconnectByUser = false;if (isOpenHeartBeat) {// 收到连接成功的回馈,开始执行心跳操作_startHeartBeat();}});}/// 掉线重连void reConnect() {if (isDisconnectByUser) return;Future.delayed(const Duration(seconds: 3),() {// disconnect();_subscription?.cancel();_subscription = null;_webSocketChannel?.sink.close(status.abnormalClosure, "掉线重连");_webSocketChannel = null;_connectWebsocket();listen(messageCallback: messageCallback, onDone: onDone, onError: onError);},);}/// 发送消息void sendMessage({required String message, bool needDisplayMsg = true}) {print("websocket sendMessage message = $message");if (needDisplayMsg) {streamControllerList?.forEach((key, value) {if (!value.isClosed) {value.sink.add(message);}});}_webSocketChannel?.sink.add(message);}/// 开启心跳void _startHeartBeat() {if (_subscription != null) {print("websocket startHeartBeat _subscription != null");return;}Future.delayed(const Duration(seconds: 30),() {var pollingStream = StreamTool().timedPolling(const Duration(seconds: 30), () => Future(() => ""), 100000000);//进行流内容监听_subscription = pollingStream.listen((result) {sendMessage(message: "heart beat", needDisplayMsg: false);});},);}/// 断开连接并销毁void disconnect({bool isDisconnectByUser = false}) {this.isDisconnectByUser = isDisconnectByUser;_subscription?.cancel();_subscription = null;streamControllerList?.forEach((key, value) {value.close();});streamControllerList?.clear();_webSocketChannel?.sink.close(status.normalClosure, "用户退出聊天界面,聊天关闭");_webSocketChannel = null;}/// 新建指定stream流控制器进行消息流回调setNewStreamController(StreamControllerNameEnum streamControllerName) {if (streamControllerList?.containsKey(streamControllerName.name) ?? false) {streamControllerList?[streamControllerName.name]?.close();}streamControllerList?[streamControllerName.name] = BehaviorSubject();}
}

3.提供一个轮询工具类StreamTool

import 'dart:async';typedef FutureGenerator<T> = Future<T> Function();class StreamTool {/// interval 轮询时间间隔/// maxCount 最大轮询数Stream<T> timedPolling<T>(Duration interval, FutureGenerator<T> future,[int maxCount = 1]) {StreamController<T>? controller;int counter = 0;bool polling = true;void stopTimer() {polling = false;}void tick() async {counter++;T result = await future();if (controller != null && !controller.isClosed) {controller.add(result);}if (counter == maxCount) {stopTimer();controller?.close();} else if (polling) {Future.delayed(interval, tick);}}void startTimer() {polling = true;tick();}//StreamSubscription调用pause,cancel时,stream里面的轮询也能响应暂停或取消controller = StreamController<T>(onListen: startTimer,onPause: stopTimer,onResume: startTimer,onCancel: stopTimer,);return controller.stream;}
}

4.新建全局的ChangeNotifier -> GlobalWebsocketVM

class GlobalWebsocketVM extends ChangeNotifier {
void startWebSocket() {WebsocketHelper()..initSocket(wsPath: Api.wsUrlPath, isOpenHeartBeat: false)..listen(messageCallback: (message) {// 延迟500毫秒,使listview进行滑动到底部// gotoListBottom();},onDone: () {},);}/// 获取socket实时数据流////// 每次都需要新绑定一个StreamController,避免数据流出现错乱情况Stream<String>? getMessageStream(StreamControllerNameEnum streamControllerName) =>(WebsocketHelper()..setNewStreamController(streamControllerName)).streamControllerList?[streamControllerName.name]?.stream;
}

5.在入口类main.dart中MaterialApp中使用全局GlobalWebsocketVM

late GlobalWebsocketVM socketVM;@override
void initState() {socketVM = GlobalWebsocketVM();
}MaterialApp.router(debugShowCheckedModeBanner: false,onGenerateTitle: (context) => S.current.appName,theme: ThemeData(useMaterial3: true,colorScheme: ColorScheme.fromSeed(seedColor: Colors.white),appBarTheme: const AppBarTheme(color: Colors.white, surfaceTintColor: Colors.white),bottomAppBarTheme: BottomAppBarTheme.of(context).copyWith(color: Colors.white, surfaceTintColor: Colors.white),scaffoldBackgroundColor: Colors.grey[200],cardTheme: const CardTheme(color: Colors.white, surfaceTintColor: Colors.white),progressIndicatorTheme:const ProgressIndicatorThemeData(color: AppColor.appThemeColor),// 统一修改输入框光标颜色、文本选中颜色textSelectionTheme: const TextSelectionThemeData(cursorColor: AppColor.appThemeColor,selectionColor: AppColor.appThemeColor,selectionHandleColor: AppColor.appThemeColor,),// ios主题色设置cupertinoOverrideTheme:const CupertinoThemeData(primaryColor: AppColor.appThemeColor),iconButtonTheme: IconButtonThemeData(style: AppButtonStyle.stGlobalDefaultBtn,),textButtonTheme: TextButtonThemeData(style: AppButtonStyle.stGlobalDefaultBtn,),// primarySwatch: themeVM.theme,),// locale: localeVM.getLocale(),builder: FlutterSmartDialog.init(builder: (context, mChild) {return MultiProvider(providers: [ChangeNotifierProvider<UserVM>(create: (_) => userVM,),// 在这里为每个页面添加GlobalWebsocketVM绑定ChangeNotifierProvider<GlobalWebsocketVM>(create: (_) => socketVM,),],builder: (context, child) => mChild ?? const SizedBox.shrink(),);/*return ChangeNotifierProvider<UserVM>(create: (_) => userVM,builder: (context, child) =>mChild ?? const SizedBox.shrink(),);*/},),localizationsDelegates: const [S.delegate,GlobalMaterialLocalizations.delegate,GlobalWidgetsLocalizations.delegate,GlobalCupertinoLocalizations.delegate,],supportedLocales: S.delegate.supportedLocales,// localeResolutionCallback: (_locale, supportedLocales) {//   if (localeVM.getLocale() != null) {//     //如果已经选定语言,则不跟随系统//     return localeVM.getLocale();//   } else {//     //跟随系统//     Locale locale;//     if (supportedLocales.contains(_locale)) {//       locale = _locale!;//     } else {//       //如果系统语言不是中文简体或美国英语,则默认使用美国英语//       locale = const Locale('en', 'US');//     }//     return locale;//   }// },routerConfig: RouterHelper.router,);

6.页面中调用,在initState方法中建立连接,在build中使用StreamBuilder进行消息监听

@overridevoid initState() {super.initState();// 建立连接ws global initcontext.read<GlobalWebsocketVM>().startWebSocket();}@overrideWidget build(BuildContext context) {return Scaffold(resizeToAvoidBottomInset: false,appBar: TitleBar.build(title: "正与${model.titleContent}沟通"),// 监听聊天消息并刷新聊天列表body: StreamBuilder<String>(stream: context.read<GlobalWebsocketVM>().getMessageStream(StreamControllerNameEnum.chatRoomPage),builder: (context, snapshot) {if (snapshot.connectionState == ConnectionState.active) {if (snapshot.data?.isEmpty ?? true) {return const SizedBox.shrink();}addMessageAndRefreshUI("orderNo", snapshot.data!);return const SizedBox.shrink();}return const SizedBox.shrink();},// catchError: (context, error) => error.toString(),),,);
}void addMessageAndRefreshUI(String tag, String message) {print("收到聊天消息:" + message);
}

7.发送消息

/// 在合适的地方(比如发送按钮点击发送聊天消息)
void sendChatMessage() {
WebsocketHelper().sendMessage(message: "我发送一条消息",needDisplayMsg: false,);
}

8.退出app断开websocket清理内存(可以在任何想断开websocket的地方调用销毁)

/// 通常在dispose中调用销毁,可以在任何想断开websocket的地方调用销毁
@overridevoid dispose() {ScanHelper().dispose();WebsocketHelper().disconnect(isDisconnectByUser: true);super.dispose();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/194412.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

西北大学计算机844考研-最后20天复习重点

西北大学计算机844考研-最后20天复习重点 ​ 我做844辅导超过400小时&#xff0c;人数超过20余人&#xff0c; 现在是2023年12月03日晚上22:33&#xff0c;这篇文章旨在帮助844众多考研学子在最后20天稳住心态&#xff0c;科学备考&#xff0c;争取获得专业课高分&#xff0c;…

使用求2个字符串最短编辑距离动态规划算法实现 git diff 算法 java 实现

测试类 MyDiffTest.java&#xff1a; import java.io.BufferedReader; import java.io.FileReader; import java.util.ArrayList; import java.util.List;public class MyDiffTest {private static String path "\\xxx\\";private static List<String> lines…

Springboot启动原理解析

我们开发任何一个Spring Boot项目&#xff0c;都会用到如下的启动类 SpringBootApplication public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);} } 从上面代码可以看出&#xff0c;Annotation定义&#x…

Янгель杨格尔年谱

米哈伊尔 库兹米其 杨格尔 生于1911年11月7日&#xff08;旧历10月25日&#xff09;&#xff0c;沙俄伊尔库斯克省。逝世于1971年10月25日。 出生于农民家庭&#xff0c;有11个兄弟姐妹。 1926年6年级结束后前往莫斯科投奔哥哥。 1929年在学校学徒毕业&#xff08;&#xf…

网络运维与网络安全 学习笔记2023.12.3

网络运维与网络安全 学习笔记 第三十三天 今日目标 目录-文件基本管理、vim文本编辑、用户账号管理 组账号管理、归属控制、权限控制 目录-文件基本管理 ls 列目录及文档属性 ls - List 格式:ls[选项]…[目录或文件路径] 1.如果不以/开始,表示相对路径(省略了当前所在位置…

go并发编程(中)

目录 一、并发安全性 1.1 变量并发安全性 1.2 容器并发安全性 二、多路复用 三、协程常见的面试题 3.1交替打印奇数偶数 一、并发安全性 1.1 变量并发安全性 这个和C中并发安全是一样的&#xff0c;主要是多个线程对临界资源的同时访问&#xff0c;最经典的就是 n操作…

智慧公厕新风系统是什么?具体作用?

大家好&#xff0c;你们可曾在公厕里遇到那个臭味怪兽&#xff0c;闻得让人头晕目眩&#xff1f;别怕&#xff0c;我们有一把利剑&#xff0c;叫做“智慧公厕新风系统”&#xff01;不仅是空气净化器的升级版&#xff0c;还有一大堆高级功能等着你来领略&#xff01; 1. 风清气…

Node.js版本管理工具NVM(Node Version Manager)的使用

nvm简介 nvm&#xff08;Node Version Manager&#xff09;是一个用于管理 Node.js 版本的工具。它可以让你在同一台计算机上安装并切换多个 Node.js 版本&#xff0c;非常方便。 如何安装 nvm 下载 nvm 安装包&#xff1a;访问 https://github.com/nvm-sh/nvm#installing-a…

Kettle 安装配置

文章目录 Kettle 安装配置Kettle 安装Kettle 配置连接 Hive Kettle 安装配置 Kettle 安装 在安装Kettle之前&#xff0c;需要确定已经安装Java运行环境。Kettle需要Java的支持才能运行&#xff0c;JDK的版本最好是8.x的太新的也会出现bug。Kettle的7.1版本的太旧了&#xff0…

stm32 RTC时钟设置能不能用毫秒

stm32 RTC时钟设置能不能用毫秒 具体的程序里面写的是 RTC_SetAlarm(SENDTIMERTC_GetCounter()); 进入原函数看看发现是&#xff1a; void RTC_SetAlarm(uint32_t AlarmValue) {RTC_EnterConfigMode();/* Set the ALARM MSB word */RTC->ALRH AlarmValue >> 16;/* …

一段代码隐藏WordPress后台登录地址

WordPress作为全球知名内容管理系统&#xff0c;有太多的人和网站在使用&#xff0c;WordPress本身在安全方面已经是做的非常出色的&#xff0c;官方和社区也非常的活跃&#xff0c;但也并不是没有漏洞可钻&#xff0c;比如后台爆破。 正因为有太多的人在使用WordPress&#xf…

vue el-button 封装及使用

使用了 Element UI 中的 el-button 组件&#xff0c;并对其进行了封装和定制。 创建组件index.vue (src/common-ui/button/index.vue) <template><el-buttonclass"h-button":type"type":icon"hIcon":disabled"disabled"clic…

MQ - KAFKA 基础篇

##1、KAFKA的核心组件/API Producer API&#xff0c;它允许应用程序向一个或多个 topics 上发送消息记录 Consumer API&#xff0c;允许应用程序订阅一个或多个 topics 并处理为其生成的记录流 Streams API&#xff0c;它允许应用程序作为流处理器&#xff0c;从一个或多个主…

面试篇:算法(二:二叉树)

一:创建节点 class Node(public int idata; --节点public int Ddata; --节点数据public Node LeftChild; --左节点public Node RightChild; --右节点public void prints(System.out.print(Data);)二&#xff1a;遍历 1. 前序遍历。 public void preOrder&#xff08;Node n…

【springboot】启动失败 Failed to start bean ‘webServerStartStop‘

lsof -i&#xff1a;xxx 发现端口被占用 kill掉该进程

代码随想录算法训练营第五十三天【动态规划part14】 | 1143.最长公共子序列、1035.不相交的线、53. 最大子序和

1143.最长公共子序列 题目链接 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 求解思路 动规五部曲 1.确定dp数组及其下标含义&#xff1a; dp[i][j]&#xff1a;长度为[0, i - 1]的字符串text1与长度为[0, j - 1]的字符串text2的最长公共子序…

OpenWrt作为旁路由(网关)配置

目录 背景前提条件环境操作步骤物理层连接设置与主路由同一网段禁用IPv6取消LAN接口桥接防火墙配置 背景 本文简介如何配置OpenWrt&#xff0c;使其作为旁路由&#xff08;网关&#xff09;运行。 旁路由大概有以下这几种工作方式&#xff1a; 主路由开DHCP&#xff0c;网关未…

QToolTip 是 Qt 框架中用于显示工具提示(Tooltip)的类

QToolTip 是 Qt 框架中用于显示工具提示&#xff08;Tooltip&#xff09;的类。 工具提示是一种小窗口&#xff0c;通常在用户将鼠标悬停在控件上时显示&#xff0c;提供有关该控件的额外信息或说明。QToolTip 类提供了设置和管理工具提示的方法。 以下是 QToolTip 常见的用法…

zephir 实现PHP封装成C语言扩展文件so实现demo简单案例【菜鸟级教程】

从github 安装 zephir.phar 最新网址 https://github.com/zephir-lang/zephir/releases 将文件改名 zephir.phar 改名为 zephir 放到 /bin 目录下 查看是否安装 zephir help安装 zephir_parser pecl install zephir_parser增加扩展到 php.ini .重新加载 extensionzephir_pars…

qt treeview 控制节点收缩

在Qt中&#xff0c;可以使用QTreeView控件来显示树形结构的数据。要控制节点&#xff08;树形结构中的项&#xff09;的展开和收缩&#xff0c;您可以使用QTreeView的一些方法来实现这些操作。 下面是一些常用的方法&#xff1a; 展开节点&#xff1a;使用expand方法展开一个…