1.引入相关插件库
# websocketweb_socket_channel: ^2.4.0# 引入rxdart 解决Bad state: Stream has already been listened to.的报错问题rxdart: ^0.27.7# 状态管理*provider: ^6.0.5
2.代码编写及封装
import 'dart:async';import 'package:rxdart/subjects.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;typedef WebsocketMessageCallback = void Function(dynamic message);/// 注册流控制器需要在哪些页面使用
///
/// 目前分三种类型:
///
/// 1.[customerLoginPage]游客模式下也就是在未登录时候(用户处于登录相关页面)
/// 2.[customerMainPage]用户已登录,处于主页及其他登录后的页面下
/// 3.[chatRoomPage]用户处在聊天室里(游客下的在线客服聊天室、用户已登录下的在线客服聊天室、买卖用户之间的聊天室)
enum StreamControllerNameEnum {customerLoginPage,customerMainPage,chatRoomPage;
}class WebsocketHelper {WebsocketHelper._();static WebsocketHelper? _singleton;factory WebsocketHelper() => _singleton ??= WebsocketHelper._();/// 用于连接websocket的链接uriUri? wsUri;/// websocket连接后的对象WebSocketChannel? _webSocketChannel;/// 指定的stream流控制器存放mapMap<String, BehaviorSubject<String>>? streamControllerList;/// 是否开启心跳bool isOpenHeartBeat = true;/// 用于控制心跳轮询StreamSubscription<String>? _subscription;/// 是否是用户主动触发关闭连接bool isDisconnectByUser = false;/// 另辟一个单独的消息回调函数WebsocketMessageCallback? messageCallback;/// 连接断开回调Function()? onDone;/// 连接出错回调Function? onError;/// step one - ex: ws://localhost:1234initSocket({required String wsPath, bool isOpenHeartBeat = true}) {if (_webSocketChannel != null) {print("socket实例已存在, 请勿重复创建");return;}// 自己项目中后端需要前端拼一个登录令牌用于控制后端逻辑处理,这里使用的是登录后的tokenvar authorization = "登录后的token";wsUri = Uri.tryParse("$wsPath?Authorization=$authorization");// wsUri = Uri.tryParse(wsPath);if (wsUri == null) return;this.isOpenHeartBeat = isOpenHeartBeat;_connectWebsocket();}/// [isRunForReConnect] 是否是由重连机制触发的此方法void _connectWebsocket({bool isRunForReConnect = false}) {_webSocketChannel = WebSocketChannel.connect(wsUri!);if (!isRunForReConnect) {isDisconnectByUser = false;}}/// step two - listenvoid listen({WebsocketMessageCallback? messageCallback,Function()? onDone,Function? onError}) {this.messageCallback = messageCallback;this.onDone = onDone;this.onError = onError;streamControllerList ??= <String, BehaviorSubject<String>>{// StreamControllerNameEnum.customerLoginPage.name: BehaviorSubject(),// StreamControllerNameEnum.customerMainPage.name: BehaviorSubject(),// StreamControllerNameEnum.chatRoomPage.name: BehaviorSubject()};// 监听一系列连接情况(如收到消息、onDone:连接关闭、onError:接连异常)_webSocketChannel?.stream.listen((message) {print("websocket onData message = ${message.toString()}, type = ${message.runtimeType}");if (message is String && message.isEmpty) {// 消息为空(可能得情况:心跳 or another)return;}// 通过流控制器把消息分发出去,在需要的页面监听此流的消息streamControllerList?.forEach((key, value) {// print("key = $key, value.isClosed = ${value.isClosed}");if (!value.isClosed) {value.sink.add(message);}});this.messageCallback?.call(message);}, onDone: () {print("websocket onDone ...");this.onDone?.call();// 掉线重连reConnect();}, onError: (Object error, StackTrace stackTrace) {print("websocket onError error = ${error.toString()}, stackTrace = ${stackTrace.toString()}");showToast(msg: "连接服务器失败!");this.onError?.call(error, stackTrace);}, cancelOnError: false);// 连接建立成功时的回调通知,可在此做心跳操作_webSocketChannel?.ready.then((value) {print("webSocket ready");isDisconnectByUser = false;if (isOpenHeartBeat) {// 收到连接成功的回馈,开始执行心跳操作_startHeartBeat();}});}/// 掉线重连void reConnect() {if (isDisconnectByUser) return;Future.delayed(const Duration(seconds: 3),() {// disconnect();_subscription?.cancel();_subscription = null;_webSocketChannel?.sink.close(status.abnormalClosure, "掉线重连");_webSocketChannel = null;_connectWebsocket();listen(messageCallback: messageCallback, onDone: onDone, onError: onError);},);}/// 发送消息void sendMessage({required String message, bool needDisplayMsg = true}) {print("websocket sendMessage message = $message");if (needDisplayMsg) {streamControllerList?.forEach((key, value) {if (!value.isClosed) {value.sink.add(message);}});}_webSocketChannel?.sink.add(message);}/// 开启心跳void _startHeartBeat() {if (_subscription != null) {print("websocket startHeartBeat _subscription != null");return;}Future.delayed(const Duration(seconds: 30),() {var pollingStream = StreamTool().timedPolling(const Duration(seconds: 30), () => Future(() => ""), 100000000);//进行流内容监听_subscription = pollingStream.listen((result) {sendMessage(message: "heart beat", needDisplayMsg: false);});},);}/// 断开连接并销毁void disconnect({bool isDisconnectByUser = false}) {this.isDisconnectByUser = isDisconnectByUser;_subscription?.cancel();_subscription = null;streamControllerList?.forEach((key, value) {value.close();});streamControllerList?.clear();_webSocketChannel?.sink.close(status.normalClosure, "用户退出聊天界面,聊天关闭");_webSocketChannel = null;}/// 新建指定stream流控制器进行消息流回调setNewStreamController(StreamControllerNameEnum streamControllerName) {if (streamControllerList?.containsKey(streamControllerName.name) ?? false) {streamControllerList?[streamControllerName.name]?.close();}streamControllerList?[streamControllerName.name] = BehaviorSubject();}
}
3.提供一个轮询工具类StreamTool
import 'dart:async';typedef FutureGenerator<T> = Future<T> Function();class StreamTool {/// interval 轮询时间间隔/// maxCount 最大轮询数Stream<T> timedPolling<T>(Duration interval, FutureGenerator<T> future,[int maxCount = 1]) {StreamController<T>? controller;int counter = 0;bool polling = true;void stopTimer() {polling = false;}void tick() async {counter++;T result = await future();if (controller != null && !controller.isClosed) {controller.add(result);}if (counter == maxCount) {stopTimer();controller?.close();} else if (polling) {Future.delayed(interval, tick);}}void startTimer() {polling = true;tick();}//StreamSubscription调用pause,cancel时,stream里面的轮询也能响应暂停或取消controller = StreamController<T>(onListen: startTimer,onPause: stopTimer,onResume: startTimer,onCancel: stopTimer,);return controller.stream;}
}
4.新建全局的ChangeNotifier -> GlobalWebsocketVM
class GlobalWebsocketVM extends ChangeNotifier {
void startWebSocket() {WebsocketHelper()..initSocket(wsPath: Api.wsUrlPath, isOpenHeartBeat: false)..listen(messageCallback: (message) {// 延迟500毫秒,使listview进行滑动到底部// gotoListBottom();},onDone: () {},);}/// 获取socket实时数据流////// 每次都需要新绑定一个StreamController,避免数据流出现错乱情况Stream<String>? getMessageStream(StreamControllerNameEnum streamControllerName) =>(WebsocketHelper()..setNewStreamController(streamControllerName)).streamControllerList?[streamControllerName.name]?.stream;
}
5.在入口类main.dart中MaterialApp中使用全局GlobalWebsocketVM
late GlobalWebsocketVM socketVM;@override
void initState() {socketVM = GlobalWebsocketVM();
}MaterialApp.router(debugShowCheckedModeBanner: false,onGenerateTitle: (context) => S.current.appName,theme: ThemeData(useMaterial3: true,colorScheme: ColorScheme.fromSeed(seedColor: Colors.white),appBarTheme: const AppBarTheme(color: Colors.white, surfaceTintColor: Colors.white),bottomAppBarTheme: BottomAppBarTheme.of(context).copyWith(color: Colors.white, surfaceTintColor: Colors.white),scaffoldBackgroundColor: Colors.grey[200],cardTheme: const CardTheme(color: Colors.white, surfaceTintColor: Colors.white),progressIndicatorTheme:const ProgressIndicatorThemeData(color: AppColor.appThemeColor),// 统一修改输入框光标颜色、文本选中颜色textSelectionTheme: const TextSelectionThemeData(cursorColor: AppColor.appThemeColor,selectionColor: AppColor.appThemeColor,selectionHandleColor: AppColor.appThemeColor,),// ios主题色设置cupertinoOverrideTheme:const CupertinoThemeData(primaryColor: AppColor.appThemeColor),iconButtonTheme: IconButtonThemeData(style: AppButtonStyle.stGlobalDefaultBtn,),textButtonTheme: TextButtonThemeData(style: AppButtonStyle.stGlobalDefaultBtn,),// primarySwatch: themeVM.theme,),// locale: localeVM.getLocale(),builder: FlutterSmartDialog.init(builder: (context, mChild) {return MultiProvider(providers: [ChangeNotifierProvider<UserVM>(create: (_) => userVM,),// 在这里为每个页面添加GlobalWebsocketVM绑定ChangeNotifierProvider<GlobalWebsocketVM>(create: (_) => socketVM,),],builder: (context, child) => mChild ?? const SizedBox.shrink(),);/*return ChangeNotifierProvider<UserVM>(create: (_) => userVM,builder: (context, child) =>mChild ?? const SizedBox.shrink(),);*/},),localizationsDelegates: const [S.delegate,GlobalMaterialLocalizations.delegate,GlobalWidgetsLocalizations.delegate,GlobalCupertinoLocalizations.delegate,],supportedLocales: S.delegate.supportedLocales,// localeResolutionCallback: (_locale, supportedLocales) {// if (localeVM.getLocale() != null) {// //如果已经选定语言,则不跟随系统// return localeVM.getLocale();// } else {// //跟随系统// Locale locale;// if (supportedLocales.contains(_locale)) {// locale = _locale!;// } else {// //如果系统语言不是中文简体或美国英语,则默认使用美国英语// locale = const Locale('en', 'US');// }// return locale;// }// },routerConfig: RouterHelper.router,);
6.页面中调用,在initState方法中建立连接,在build中使用StreamBuilder进行消息监听
@overridevoid initState() {super.initState();// 建立连接ws global initcontext.read<GlobalWebsocketVM>().startWebSocket();}@overrideWidget build(BuildContext context) {return Scaffold(resizeToAvoidBottomInset: false,appBar: TitleBar.build(title: "正与${model.titleContent}沟通"),// 监听聊天消息并刷新聊天列表body: StreamBuilder<String>(stream: context.read<GlobalWebsocketVM>().getMessageStream(StreamControllerNameEnum.chatRoomPage),builder: (context, snapshot) {if (snapshot.connectionState == ConnectionState.active) {if (snapshot.data?.isEmpty ?? true) {return const SizedBox.shrink();}addMessageAndRefreshUI("orderNo", snapshot.data!);return const SizedBox.shrink();}return const SizedBox.shrink();},// catchError: (context, error) => error.toString(),),,);
}void addMessageAndRefreshUI(String tag, String message) {print("收到聊天消息:" + message);
}
7.发送消息
/// 在合适的地方(比如发送按钮点击发送聊天消息)
void sendChatMessage() {
WebsocketHelper().sendMessage(message: "我发送一条消息",needDisplayMsg: false,);
}
8.退出app断开websocket清理内存(可以在任何想断开websocket的地方调用销毁)
/// 通常在dispose中调用销毁,可以在任何想断开websocket的地方调用销毁
@overridevoid dispose() {ScanHelper().dispose();WebsocketHelper().disconnect(isDisconnectByUser: true);super.dispose();}