Flutter中stream学习
- 概述
- Stream的基础概念
- stream的常用方法
- Stream.fromFuture(Future<T> future)
- Stream.fromFutures(Iterable<Future<T>> futures)
- Stream.fromIterable(Iterable<T> elements)
- Stream.periodic(Duration period, [T computation(int computationCount)?])
- Stream<T> take(int count)
- Stream<T> takeWhile(bool test(T element))
- Stream<T> where(bool test(T event))
- Stream<T> distinct([bool equals(T previous, T next)])
- Stream<T> skip(int count)
- Stream<T> skipWhile(bool test(T element))
- Stream<S> map<S>(S convert(T event))
- Stream<S> expand<S>(Iterable<S> convert(T element))
- Stream的分类
- 单订阅
- 广播订阅
概述
Stream 主要应用于 Flutter 的异步操作,在其他编程语言中也存在;Stream 提供了一种接受事件队列的方法,可通过 listen 进行数据监听,通过 error 接收失败状态,通过 done 来接收结束状态;
Stream的基础概念
Stream
:表示一个可以接收异步事件的数据源。可以生成一个或多个值。StreamController
:控制Stream,可以向其添加事件、错误以及关闭它。StreamSubscription
:表示对Stream的监听,可以用来取消订阅。Sink
:用来向Stream添加数据、错误、以及关闭。
stream的常用方法
Stream.fromFuture(Future future)
Stream
通过Future
对象创建新的单订阅流, 当Future
对象完成时会触发 data / error,
然后已done
事件结束
Future<String> getDate() async {await Future.delayed(const Duration(seconds: 3));return "当前时间为${DateTime.now()}";}void testStreamFromFuture() {Stream.fromFuture(getDate()).listen((event) {print("testStreamFromFuture============$event");}).onDone(() {print("testStreamFromFuture==========done 结束");});}
输出结果:
Stream.fromFutures(Iterable<Future> futures)
Stream 通过一系列的 Future 创建新的单订阅流,每个 Future 都会有自身的 data / error 事件, 当这一系列的 Future 均完成时,Stream 以 done 事件结束;若 Futures 为空,则 Stream 会立刻关闭;其分析源码,很直接的看到是将每一个 Future 事件监听完之后才会执行的微事件结束;
源码代码:
factory Stream.fromFutures(Iterable<Future<T>> futures) {_StreamController<T> controller =new _SyncStreamController<T>(null, null, null, null);int count = 0;// Declare these as variables holding closures instead of as// function declarations.// This avoids creating a new closure from the functions for each future.void onValue(T value) {if (!controller.isClosed) {controller._add(value);if (--count == 0) controller._closeUnchecked();}}void onError(Object error, StackTrace stack) {if (!controller.isClosed) {controller._addError(error, stack);if (--count == 0) controller._closeUnchecked();}}// The futures are already running, so start listening to them immediately// (instead of waiting for the stream to be listened on).// If we wait, we might not catch errors in the futures in time.for (var future in futures) {count++;future.then(onValue, onError: onError);}// Use schedule microtask since controller is sync.if (count == 0) scheduleMicrotask(controller.close);return controller.stream;}
示例代码:
var datas = [getDate(), getDate(), getDate()];Stream.fromFutures(datas).listen((event) {print("testStreamFromFutures============$event");}).onDone(() {print("testStreamFromFutures==========done 结束");});
输出结果:
Stream.fromIterable(Iterable elements)
Stream 通过数据集合中获取并创建单订阅流,通过 listen 监听迭代器中每一个子 element,当 Stream 监听到取消订阅或 Iterator.moveNext 返回 false / throw 异常 时停止迭代;
void testStreamFromIterable() {var datas = [1, 2, "5.toStroing", false, 9];Stream.fromIterable(datas).listen((event) {print("testStreamFromIterable============$event");}).onDone(() {print("testStreamFromIterable==========done 结束");});}
输出结果:
Stream.periodic(Duration period, [T computation(int computationCount)?])
Stream 通过 Duration 对象作为参数创建一个周期性事件流,其中若不设置 computation 时 onData 获取数据为 null;若没有事件结束则会一直周期性执行; 因为 computation 函数是返回流的结果
void testStreamPeriodic() {Stream.periodic(const Duration(seconds: 1)).listen((event) {print("testStreamPeriodic===没有computation==================$event");});Stream.periodic(const Duration(seconds: 1), (x) => x).listen((event) {print("testStreamPeriodic---- listen========$event");}).onDone(() {print("testStreamPeriodic==========done 结束");});}
输出结果:
Stream take(int count)
take() 对于单订阅方式,可以提供 take 设置之前的 Stream 订阅数据,例如设置中断 Stream.periodic 周期展示次数;小菜粗略理解为 take 可以作为中断订阅, 如果 take 设置次数大于 onDone 之前的订阅数据次数,Stream 依旧获取所有 onDone 之前的订阅数据
void testStreamTake() {Stream.periodic(const Duration(seconds: 1), (x) => x).take(5) // 如果不设置这个, 这个流将一直会执行, 但是设置之后只会执行设置的数的次数.listen((event) {print("testStreamTake===========$event");}).onDone(() {print("testStreamTake==============done 结束");});}
输出结果:
Stream takeWhile(bool test(T element))
takeWhile
也可以实现上述take
方法相同效果, 返回一个 boolean 类型,如果为 false 则中断订阅
void testStreamTakeWhile() {Duration interval = const Duration(seconds: 1);Stream<int> streamData = Stream<int>.periodic(interval, (data) => data);streamData.takeWhile((element) {print('Stream.periodic.takeWhile -> $element');return element < 5;}).listen((event) {print('Stream.periodic -> $event');}).onDone(() {print('Stream.periodic -> done 结束');});}
输出结果:
Stream where(bool test(T event))
where 用于在当前 Stream 中创建一个新的 Stream 用来丢弃不符合 test 的数据;简单理解为类似数据库查询一样,仅过滤符合需求的数据流;且 where 可以设置多次
void testStreamWhere() {Stream.periodic(const Duration(seconds: 1), (data) => data).takeWhile((element) => element <= 5).where((event) {print('Stream.periodic.where -> $event');return event > 3;}).listen((event) {print("testStreamWhere==================$event");}).onDone(() {print("testStreamWhere===================== done 结束");});}
输出结果:
Stream distinct([bool equals(T previous, T next)])
作用:相邻的两个数据去重哈
void testStreamDistinct() {var datas = [1, 2, '3.toString()', true, true, false, true, 6];Stream.fromIterable(datas).distinct().listen((event) {print("testStreamDistinct===========================$event");}).onDone(() {print('testStreamDistinct============================ done 结束');});}
输出结果:
Stream skip(int count)
作用: skip 用于跳过符合条件的订阅数据次数 count: 跳过的次数;
void testStreamSkip() {Stream<int> streamData =Stream<int>.periodic(const Duration(seconds: 1), (data) => data + 1);streamData.takeWhile((element) {print('Stream.periodic.takeWhile -> $element');return element <= 6;}).where((event) {print('Stream.periodic.where -> $event');return event > 2;}).skip(2).listen((event) {print('Stream.periodic -> $event');}).onDone(() {print('Stream.periodic -> done 结束');});}
输出结果 :
Stream skipWhile(bool test(T element))
skipWhile 用于跳过在 where 符合条件下满足设置 条件的订阅数据;即当 返回 为 true 时跳过当前订阅数据监听;
void testSkipWhile() {Stream.periodic(const Duration(seconds: 1), (data) => data).takeWhile((element) => element < 5).skipWhile((element) => element < 3).listen((event) {print("testSkipWhile=========$event");}).onDone(() {print("testSkipWhile========done 结束");});}
输出 结果:
Stream map(S convert(T event))
在当前 Stream 基础上创建一个新的 Stream 并对当前 Stream 进行数据操作,onData 监听到的是 map 变更后的新的数据流;
void testStreamMap() {// 创还能一个stream刘Stream<int> streamData =Stream<int>.periodic(const Duration(seconds: 1), (data) => data + 1);streamData.takeWhile((element) {print('Stream.periodic.takeWhile -> $element');return element < 5;}).map((event) {print('Stream.periodic.map -> $event -> ${event * 100}');return event * 100;}).listen((event) {print('Stream.periodic -> $event');}).onDone(() {print('Stream.periodic -> done 结束');});}
输出结果:
Stream expand(Iterable convert(T element))
在当前 Stream 基础上创建新的 Stream 并将当前订阅数据转为新的订阅数据组,onData 监听 数据组 中每个新的订阅数据元素;
void testStreamExpand() {Stream<int> streamData =Stream<int>.periodic(const Duration(seconds: 1), (data) => data + 1);streamData.takeWhile((element) {print('Stream.periodic.takeWhile -> $element');return element <= 6;}).expand((element) {print('Stream.periodic.expand -> $element -> ${element * 10} -> ${element * 100}');return [element, element * 10, element * 100];}).listen((event) {print('Stream.periodic -> $event');}).onDone(() {print('Stream.periodic -> done 结束');});}
输出结果:
Stream的分类
单订阅
默认情况下Streams会被设置成单订阅,点订阅会保持当前的值,直到有其它的订阅。
单订阅Stream(Single-Subscription Stream)一次只能有一个监听器(listener),当我们对单订阅进行监听的时候,程序会被错。通常用于一次性事件
void testStreamController() {// 使用streamController创建一个streamfinal streamController = StreamController<int>();// 获取stream流final stream = streamController.stream;// 监听streamstream.listen((event) {print("testStreamController========================$event");});// stream.listen((event) {// print("testStreamController=11111=======================$event");// });// 添加测试数据到streamstreamController.sink.add(1);streamController.sink.add(2);streamController.sink.add(3);// 关闭stream流streamController.close();}
如果我打开上述注释掉的监听, 对一个单订阅的stream进行多次监听会报如下错误:
广播订阅
广播(Broadcast Stream)允许多个监听器,可以同时向多个订阅者推送数据。 这种类型适合用于事件广播,比如用户操作、全局数据推送等。
void testStreamBoardcast() {final streamController = StreamController<int>.broadcast();final stream = streamController.stream;stream.listen((event) {print("testStreamBoardcast==============$event");});stream.listen((event) {print("testStreamBoardcast111111111===================$event");});streamController.sink.add(1);streamController.sink.add(2);streamController.sink.add(3);streamController.close();}
输出结果:
除此之外Flutter官方还提供了StreamBuilder
这种专门用于监听Stream
并根据数据变化更新UI的Widget
。具体用法可以参考官方文档。