【Kotlin精简】第9章 Kotlin Flow

1 前言

上一章节我们学习了Kotlin的协程【Kotlin精简】第8章 协程,我们知道 协程实质是对线程切换的封装,能更加安全实现异步代码同步化,本质上协程、线程都是服务于并发场景下,其中协程是协作式任务,线程是抢占式任务。默认协程用来处理实时性不高的数据,请求到结果后整个协程就结束了,即它是一锤子买卖。

本章节我们来学习一下依赖Kotlin协程实现的Flow数据流

2 Flow简介

2.1 Flow是什么

Flowgoogle官方提供的一套基于Kotlin协程响应式编程模型,它与RxJava的使用类似,但相比之下Flow使用起来更简单,另外Flow作用在协程内,可以与协程的生命周期绑定,当协程取消时,Flow也会被取消,避免了内存泄漏风险。
FlowKotlin提供的一个工具,使用协程封装成生产者-消费者模式
上游来负责生产、中介进行数据加工(可选)、下游来接收消耗

官方对数据流三个成员的定义:

  1. 上游 - 提供方(生产者):会生成添加到数据流中的数据。通过协程,数据流还可以异步生成数据。
  2. 中介 - 数据加工 (可选):修改发送到数据流的值,或修正数据流本身。
  3. 下游 - 使用方(消费者):使用或接收数据流中的值。
    在这里插入图片描述
    Flow数据流中,api使用emit()生产collect()消费
    在这里插入图片描述

2.2 Flow 特性

flow{}构建块中的代码可以使用挂起函数
Flow构建器函数可以不用supend修饰符
流的每次单独收集都是按顺序执行的,除非使用特殊操作符
Flow是一种类似序列的冷流,flow构建器中代码直到流被收集的时候才运行

2.3 冷流热流

flow{}会创建一个数据流,并且这个数据流默认是冷流。下面是冷流和热流的区别:

  1. 冷流:当执行订阅的时候,上游发布者才开始发射数据流。订阅者与发布者是一一对应的关系,即当存在多个订阅者时,每个新的订阅者都会重新收到完整的数据。主动需要即是主动收集才会提供发射数据,即有消费collect才会触发
  2. 热流:不管是否被订阅,上游发布者都会发送数据流到内存中。订阅者与发布者是一对多的关系,当上游发送数据时,多个订阅者都会收到消息。不管你需不需要一上来数据全都发射给你,不管是否消费collect都会触发

3 Flow使用

Flow官方文档可以参考一下,我们这里简单介绍一些Flow常用的流创建方式、操作符等。

Flow流使用步骤

  1. 创建流:flow { ... }flowOf{ ... }
  2. 使用操作符修改、加工流数据
  3. 发射流:collect

3.1 创建流

  1. flow用于创建从顺序调用到发出函数的任意流。
  2. flowOf()函数根据一组固定的值创建流。
  3. asFlow()扩展函数可以将各种类型的函数转换为流。
  4. channelFlow创建从潜在并发调用到send函数的任意流。
  5. MutableStateFlowMutableSharedFlow创建可直接更新的热流。

suspend fun main() {//1.不需要用挂起函数修饰符flow {for (i in 1..3) {delay(1000)//可以使用挂起函数emit(i)//发射元素}}.collect {println("yvan flow:${it}")}
//    yvan flow:1
//    yvan flow:2
//    yvan flow:3// 2.flowOf 不需要挂起函数修饰符 flowOf自动实现发射元素flowOf(1, 2, 3, 4, 5).collect {println("yvan flowOf:${it}")}
//    yvan flowOf:1
//    yvan flowOf:2
//    yvan flowOf:3
//    yvan flowOf:4
//    yvan flowOf:5// 3.asFlow 不需要挂起函数修饰符 flowOf自动实现发射元素(5..10).asFlow().collect {println("yvan asFlow:${it}")}
//    yvan asFlow:5
//    yvan asFlow:6
//    yvan asFlow:7
//    yvan asFlow:8
//    yvan asFlow:9
//    yvan asFlow:10// 4.从潜在并发调用到send函数的任意流channelFlow {for (i in 1..3) {delay(1000)//可以使用挂起函数send(i)//发射元素}}.collect{println("yvan channelFlow:${it}")}
//    yvan channelFlow:1
//    yvan channelFlow:2
//    yvan channelFlow:3// 5.热流SharedFlow的创建runBlocking {val sharedFlow = MutableSharedFlow<Int>(// 相当于粘性次数replay = 2,// 接受得慢时候,发送入栈extraBufferCapacity = 1,onBufferOverflow = BufferOverflow.SUSPEND)launch {sharedFlow.collect {println("yvan MutableSharedFlow:${it}")}}sharedFlow.emit(1)sharedFlow.emit(2)}
//    yvan MutableSharedFlow:1
//    yvan MutableSharedFlow:2// 6.热流StateFlow的创建runBlocking {val stateFlow = MutableStateFlow(0)launch {stateFlow.collect {println("yvan MutableStateFlow:${it}")}}stateFlow.emit(1)stateFlow.emit(2)}
//    yvan MutableStateFlow:2
}

3.2 常用操作符

除上面3.1创建流前面三种创建操作符外,还有回调操作符变换操作符过滤操作符组合操作符功能性操作符末端操作符

3.2.1 创建操作符

上面3.1有介绍

  1. flow:创建Flow的操作符。
  2. flowOf:构造一组数据的Flow进行发送。
  3. asFlow:将其他数据转换成Flow,一般是集合向Flow的转换,如listOf(1,2,3).asFlow()
  4. callbackFlow:将基于回调的 API 转换为Flow数据流

3.2.2 回调操作符

我们先来看一个简单的例子

   flow {println("yvan start emit hello")emit("hello") //发送数据println("yvan start emit world")emit("world") //发送数据println("yvan end emit")}.flowOn(Dispatchers.IO).onEmpty { println("yvan onEmpty") }.onStart { println("yvan onStart") }.onEach { println("yvan onEach: $it") }.onCompletion { println("yvan onCompletion") }.catch { exception -> exception.message?.let { println("yvan catch exception:$it") } }.collect {//接收数据流println("yvan collect: $it")}
//        yvan onStart
//        yvan start emit hello
//        yvan onEach: hello
//        yvan collect: hello
//        yvan start emit world
//        yvan onEach: world
//        yvan collect: world
//        yvan end emit
//        yvan onCompletion
  1. onStart:上游flow{}开始发送数据之前执行
  2. onEach:上游向下游发送数据之前调用,每一个上游数据发送后都会经过onEach()
  3. onEmpty:当流完成却没有发出任何元素时执行。如emptyFlow().onEmpty {}
  4. onCompletionflow数据流取消或者结束时执行
  5. onSubscriptionSharedFlow 专用操作符,建立订阅之后回调。和onStart的区别:因为SharedFlow是热流,因此如果在onStart发送数据,下游可能接收不到,因为提前执行了。

3.2.3 变换操作符

  1. map:对上游发送的数据进行变换,collect最后接收的是变换之后的值
    flow {//发送数据emit("hello")}.map {"$it world"}.collect {//接收数据流println("yvan collect: $it")}// 输出:yvan collect: hello world
  1. mapLatest:类似于collectLatest,当emit发送新值,会取消掉map上一次转换还未完成的值。
    flow {//发送数据repeat(10){delay(10)emit(it)}}.mapLatest {delay(15)it}.collect {//接收数据流println("yvan collect: $it")}//输出:yvan collect: 9
  1. mapNotNull:仅发送map之后不为空的值。
 flow {//发送数据repeat(10){delay(10)emit(it)}}.mapNotNull {delay(15)if (it > 5){it}else{null}}.collect {//接收数据流println("yvan collect: $it")}//输出://yvan collect: 6//yvan collect: 7//yvan collect: 8//yvan collect: 9
  1. transform:对发出的值进行变换 。不同于map的是,经过transform之后可以重新发送数据,甚至发送多个数据,因为transform内部又重新构建了flow。
    flow {//发送数据repeat(10){delay(10)emit(it)}}.transform {delay(100)emit(it*10)}.collect {//接收数据流println("yvan collect: $it")}//输出://yvan collect: 0//yvan collect: 10//yvan collect: 20//yvan collect: 30//yvan collect: 40//yvan collect: 50//yvan collect: 60//yvan collect: 70//yvan collect: 80//yvan collect: 90
  1. transformLatest:类似于mapLatest,当有新值发送时,会取消掉之前还未转换完成的值。
    flow {//发送数据repeat(10){delay(10)emit(it)}}.transformLatest {delay(100)emit(it*10)}.collect {//接收数据流println("yvan collect: $it")}//输出://yvan collect: 90
  1. transformWhile:返回值是一个Boolean,当为true时会继续往下执行;反之为false,本次发送的流程会中断。
    flow {//发送数据repeat(10){delay(10)emit(it)}}.transformWhile {emit(it)it != 2}.collect {//接收数据流println("yvan collect: $it")}//输出://yvan collect: 0//yvan collect: 1//yvan collect: 2
  1. asSharedFlow:MutableStateFlow 转换为 StateFlow ,即从可变状态变成不可变状态。
  2. asStateFlow:MutableSharedFlow 转换为 SharedFlow ,即从可变状态变成不可变状态。
  3. receiveAsFlow:Channel 转换为Flow ,上游与下游是一对一的关系。如果有多个下游观察者,可能会轮流收到值。
  4. consumeAsFlow:Channel 转换为Flow ,有多个下游观察者时会crash。
  5. withIndex:将数据包装成IndexedValue类型,内部包含了当前数据的Index。
  6. scan(initial: R, operation: suspend (accumulator: R, value: T) -> R):把initial初始值和每一步的操作结果发送出去。
  7. produceIn:转换为Channel的 ReceiveChannel
  8. runningFold(initial, operation: (accumulator: R, value: T) -> R):initial值与前面的流共同计算后返回一个新流,将每步的结果发送出去。
  9. runningReduce:返回一个新流,将每步的结果发送出去,默认没有initial值。
  10. shareIn:flow 转化为热流SharedFlow,后面会详细介绍。
  11. stateIn:flow转化为热流StateFlow,后面会详细介绍。

3.2.4 过滤操作符

  1. filter:筛选符合条件的值,返回true继续往下执行。
    flow {//发送数据repeat(10){delay(10)emit(it)}}.filter {it < 2}.collect {//接收数据流println("yvan collect: $it")}//输出://yvan collect: 0//yvan collect: 1
  1. filterNot:与filter相反,筛选不符合条件的值,返回false继续往下执行。
  2. filterNotNull:筛选不为空的值。
    flow {//发送数据emit(1)emit(null)emit(2)}.filterNotNull().collect {//接收数据流println("yvan collect: $it")}// 输出://yvan collect: 1//yvan collect: 2
  1. filterInstance:筛选对应类型的值,如.filterIsInstance()用来过滤String类型的值
    flow {//发送数据emit("1")emit(null)emit(2)}.filterIsInstance<String>().collect {//接收数据流println("yvan collect: $it")}// 输出:// yvan collect: 1
  1. drop:drop(count: Int)参数为Int类型,意为丢弃掉前count个值。
    flow {//发送数据emit("1")emit(null)emit(2)}.drop(1).collect {//接收数据流println("yvan collect: $it")}// 输出:// yvan collect: null// yvan collect: 2
  1. dropWhile:找到第一个不满足条件的值,返回其和其后所有的值。
    flow {//发送数据emit("1")emit(null)emit(2)}.dropWhile { it != null }.collect {//接收数据流println("yvan collect: $it")}// 输出:// yvan collect: null// yvan collect: 2
  1. take:与drop()相反,意为取前n个值。
  2. takeWhile:与dropWhile()相反,找到第一个不满足条件的值,返回其前面所有的值。
  3. debounce:debounce(timeoutMillis: Long)指定时间内只接收最新的值,其他的过滤掉。防抖动,指定数据接收的间隔时间。
    flow {//发送数据emit(1)delay(900)emit(2)delay(800)emit(3)delay(1000)emit(4)delay(700)emit(5)}.debounce(1000).collect {//接收数据流println("yvan collect: $it")}// 由于1、2、4的时间间隔小于1000所以被过滤,3时间1000,5是最后一个,所以输出是3和5//输出:// yvan collect: 3// yvan collect: 5
  1. sample:sample(periodMillis: Long)在指定周期内,获取最新发出的值,定时周期接收。如:
    flow {//发送数据emit(1)delay(50)emit(2)delay(100)emit(3)delay(200)emit(4)delay(50)emit(5)}.sample(200).collect {//接收数据流println("yvan collect: $it")}// 输出:// yvan collect: 3// yvan collect: 4
  1. distinctUntilChangedBy:判断两个连续值是否重复,可以设置是否丢弃重复值。
    flowOf(1, 2, 1, 2).distinctUntilChanged().collect {println("yvan distinctUntilChangedBy = $it")}//输出://yvan distinctUntilChangedBy = 1//yvan distinctUntilChangedBy = 2
  1. distinctUntilChanged:若连续两个值相同,则跳过后面的值。
    flowOf(1, 1, 2, 2).distinctUntilChanged().collect {println("yvan distinctUntilChanged = $it")}//输出://yvan distinctUntilChanged = 1//yvan distinctUntilChanged = 2//yvan distinctUntilChanged = 1//yvan distinctUntilChanged = 2

3.2.5 组合操作符

  1. combine:组合两个Flow流最新发出的数据,直到两个流都结束为止。扩展:在kotlinx-coroutines-core-jvm中的FlowKt中,可以将更多的flow结合起来返回一个Flow,典型应用场景:多个筛选条件选中后,展示符合条件的数据。如果后续某个筛选条件发生了改变,只需要通过发生改变的Flow的flow.value = newValue重新发送,combine就会自动构建出新的Flow,这样UI层会接收到新的变化条件进行刷新即可。
  val flow1 = flow {emit("A")delay(100)emit("B")}val flow2 = flow {emit(1)delay(50)emit(2)}flow1.combine(flow2){ it1,it2->return@combine it1 to it2}.collect{println("yvan $it")}//输出://yvan (A, 1)//yvan (A, 2)//yvan (B, 2)
  1. combineTransform: combine + transform操作
  2. merge:listOf(flow1, flow2).merge(),多个流合并为一个流。zip是提供高级函数组合流的数据,而merage则是之间将两个流在一个collect里输出
    val flow1 = flowOf("A", "B", "C")val flow2 = flowOf(1, 2, 3, 4)merge(flow1, flow2).collect {println("yvan $it")}//输出://yvan A//yvan B//yvan C//yvan 1//yvan 2//yvan 3//yvan 4
  1. flattenConcat:以顺序方式将给定的流展开为单个流 。示例如下:
    flow {emit(flowOf(1,2))emit(flowOf(3,4))} .flattenConcat().collect {println("yvan $it")}//输出://yvan 1//yvan 2//yvan 3//yvan 4
  1. flattenMerge:作用和 flattenConcat 一样,将多个flow流按并发后输出,但是可以设置并发收集流的数量。
    flow{emit(flowOf(1,2,3))emit(flowOf("A","B","C"))}.flattenMerge().collect{println("yvan $it")}//输出://yvan 1//yvan A//yvan B//yvan C//yvan 2//yvan 3
  1. flatMapContact:相当于 map + flattenConcat , 通过 map 转成一个流,在通过 flattenConcat发送。
  2. flatMapLatest:当有新值发送时,会取消掉之前还未转换完成的值。
  3. flatMapMerge:相当于map + flattenMerge ,参数concurrency: Int 来限制并发数。
  4. zip:组合两个Flow流最新发出的数据,上游流在同一协程中顺序收集,没有任何缓冲。不同于combine的是,当其中一个流结束时,另外的Flow也会调用cancel,生成的流完成。
    val flow = flowOf(1, 2, 3).onEach { delay(50) }val flow2 = flowOf("a", "b", "c", "d").onEach { delay(150) }val startTime = System.currentTimeMillis() // 记录开始的时间flow.zip(flow2) { i, s -> i.toString() + s }.collect {// Will print "1a 2b 3c"println("yvan $it 耗时 ${System.currentTimeMillis() - startTime} ms")}//输出(flow已经执行完,所以flow2中的d被cancel了)://yvan 1a 耗时 156 ms//yvan 2b 耗时 307 ms//yvan 3c 耗时 459 ms//如果换做combine,执行结果如下(组合的是最新发出的数据)://yvan 2a 耗时 156 ms//yvan 3a 耗时 159 ms//yvan 3b 耗时 311 ms//yvan 3c 耗时 466 ms//yvan 3d 耗时 620 ms//注:上面combine多次执行的结果可能不一致,但每次组合的是最新发出的数据

3.2.6 功能性操作符

  1. cancellable:判断当前协程是否被取消 ,如果已取消,则抛出异常
  2. catch:对此操作符之前的流发生的异常进行捕获,对此操作符之后的流无影响。当发生异常时,默认collect{}中lambda将不会再执行。当然,可以自行通过emit()继续发送。
  3. retry:流发生异常时的重试机制。如果是无限重试,直接调用retry()默认方法即可,retry()最终调用的也是retryWhen()方法。
public fun <T> Flow<T>.retry(retries: Int = Int.MAX_VALUE, //指定重试次数predicate: (Throwable) -> Boolean = { true } //返回true且满足retries次数要求,继续重试;false停止重试
): Flow<T> {require(retries > 0) { "Expected positive amount of retries, but had $retries" }return retryWhen { cause, attempt -> predicate(cause) && attempt < retries }
}
  1. retryWhen:流发生异常时的重试机制。
public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> = { ...... }

有条件的进行重试 ,lambda 中有两个参数: cause是 异常原因,attempt是当前重试的位置,lambda返回true时继续重试; 反之停止重试。

  1. buffer:流执行总时间就是所有运算符执行时间之和。如果上下游运算符都比较耗时,可以考虑使用buffer()优化,该运算符会在执行期间为流创建一个单独的协程。
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {......}
    flowOf("A", "B", "C").onEach  { println("yvan 1$it") }.buffer()  // <--------------- buffer between onEach and collect.collect { println("yvan 2$it") }// 输出:// yvan 1A// yvan 1B// yvan 1C// yvan 2A// yvan 2B// yvan 2C

上述代码将在两个协程中执行,其中buffer()以上还是在协程P中执行,而buffer()下面的collect()会在协程Q中执行,数据通过Channel进行传递,从而减少了执行的总时间。

  1. conflate:仅保留最新值, 内部实现是 buffer(CONFLATED)
    flowOf("A", "B", "C").onEach  { println("yvan 1$it") }.conflate()  // <--------------- buffer between onEach and collect.collect { println("yvan 2$it") }// 输出:// yvan 1A// yvan 1B// yvan 1C// yvan 2C
  1. flowOn:flowOn 会更改上游数据流的 CoroutineContext,且只会影响flowOn之前(或之上)的任何中间运算符。下游数据流(晚于 flowOn 的中间运算符和使用方)不会受到影响。如果有多个 flowOn 运算符,每个运算符都会更改当前位置的上游数据流。
  flow {//这里的线程应该是跟随创建线程println("yvan 1 thread:${Thread.currentThread().name}")emit("flowOnTest")}.flowOn(Dispatchers.IO).map {println("yvan 2 thread:${Thread.currentThread().name}")return@map it}.flowOn(Dispatchers.Default).collect {println("yvan 3 thread:${Thread.currentThread().name}  结果:$it ")}//输出://yvan 1 thread:DefaultDispatcher-worker-2//yvan 2 thread:DefaultDispatcher-worker-1//yvan 3 thread:DefaultDispatcher-worker-1  结果:flowOnTest 

3.2.7 末端操作符

  1. collect:数据收集操作符,默认的flow是冷流,即当执行collect时,上游才会被触发执行。
  2. collectIndexed:带下标的收集操作,如collectIndexed{ index, value -> }。
  3. collectLatest:与collect的区别:当新值从上游发出时,如果上个收集还未完成,会取消上个值得收集操作。
  4. toCollection、toList、toSet:将flow{}结果转化为集合。

注:还有很多操作符没有列出来~

3.3 发射流

发射流即上面3.2.7的末端操作符collect

4 热流

4.1 SharedFlow

我们知道flow{}构建的是冷流,而SharedFlow(共享Flow)默认是热流,发送器与收集器是一对多的关系。

public fun <T> MutableSharedFlow(// replay:重播给新订阅者时缓存数据的个数,默认是0。// 当新订阅者collect时,会先尝试获取上游replay个数据,为0时则不会获取之前的数据。// replay缓存是针对后续所有的订阅者准备的。replay: Int = 0,// extraBufferCapacity:除了replay外,缓冲值的数量。// 当有剩余的缓冲区空间时,Emit不挂起(可选,不能为负,默认为零) 。// extraBufferCapacity是为上游快速发射器及下游慢速收集器这种场景提供缓冲的,// 有点类似于线程池中的存储队列。// 要注意,replay保存的是最新值,而extraBufferCapacity保存的是最先发送的一个或多个值。extraBufferCapacity: Int = 0,// 配置缓冲区溢出的操作(可选,默认为SUSPEND,暂停尝试发出值),// 可选值有:SUSPEND-暂停发送、DROP_OLDEST-丢弃队列中最老的、DROP_LATEST-丢弃队列中最新的。onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

上面操作符中提到shareIn,其作用是将普通flow转化为SharedFlow

public fun <T> Flow<T>.shareIn(// 协程作用域范围scope: CoroutineScope,// 控制共享的开始、结束策略。一共有三种,// Eagerly:马上开始,在scope作用域结束时终止// Lazily:当订阅者出现时开始,在scope作用域结束时终止// WhileSubscribed(stopTimeoutMillis: Long = 0,replayExpirationMillis: Long = Long.MAX_VALUE):// 其中stopTimeoutMillis:表示最后一个订阅者结束订阅与停止上游流的时间差,默认值为0(立即停止上游流),replayExpirationMillis:数据重播的超时时间。started: SharingStarted,// 重播给新订阅者的数量replay: Int = 0
): SharedFlow<T> 

举例:

 //ViewModel中 普通flow通过shareIn转化为SharedFlowval flowConvertSharedFlow by lazy {flow {emit("1、flow")emit("2、convert")emit("3、SharedFlow")}.shareIn(//协程作用域范围viewModelScope, //立即开始SharingStarted.Eagerly, //重播给新订阅者的数量replay = 3 ).onStart { println("yvan onStart") }}//Activity中mBtnConvertF.setOnClickListener {val builder: StringBuilder = StringBuilder()lifecycleScope.launch {mFlowModel.flowConvertSharedFlow.collect {println(it)builder.append(it).append("\n")mTvConvertF.text = builder.toString()}}}//输出://yvan onStart//1、flow//2、convert//3、SharedFlow

4.2 StateFlow

StateFlow可以认为是一个replay为1,且没有缓冲区SharedFlow,所以新订阅者collect时会先获取一个默认值,构造函数如下:

//MutableStateFlow构造函数
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)//MutableStateFlow接口继承了MutableSharedFlow接口
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {public override var value: Tpublic fun compareAndSet(expect: T, update: T): Boolean
}

StateFlow有自动去重的功能,即如果上游连续发送的value重复时,下游的接收方只会接收第一次的值,后续的重复值不会再接收,可以通过StateFlow.value获取发送的值。

上面操作符中提到的stateIn作用是将普通flow转化为StateFlow

public fun <T> Flow<T>.stateIn(// 协程作用域范围scope: CoroutineScope,// 控制共享的开始、结束策略。一共有三种,// Eagerly:马上开始,在scope作用域结束时终止// Lazily:当订阅者出现时开始,在scope作用域结束时终止// WhileSubscribed(stopTimeoutMillis: Long = 0,replayExpirationMillis: Long = Long.MAX_VALUE):// 其中stopTimeoutMillis:表示最后一个订阅者结束订阅与停止上游流的时间差,默认值为0(立即停止上游流),replayExpirationMillis:数据重播的超时时间。started: SharingStarted,// 默认StateFlow的初始值,会发送到下游initialValue: T
): StateFlow<T> {//这里设置的replay是1 及重播给新订阅者的缓存为1val config = configureSharing(1)......}

使用举例:

 //ViewModel中val flowConvertStateFlow by lazy {flow {//转化为StateFlow是 emit()可以是0个或1个 或多个,// 当是多个时,新订阅者collect只会收到最后一个值(replay为1)emit("1、flow convert StateFlow")}.stateIn(//协程作用域范围viewModelScope, //立即开始SharingStarted.Eagerly, // 默认StateFlow的初始值,会发送到下游"0、initialValue" ).onStart { println("yvan onStart") }}//Activity中mBtnConvertSF.setOnClickListener {lifecycleScope.launch {val builder = StringBuilder()mFlowModel.flowConvertStateFlow.collect {println(it)builder.append(it).append("\n")mTvConvertSF.text = builder.toString()}}}//输出://yvan onStart//0、initialValue//1、flow convert StateFlow

注:在UI层使用Lifecycle.repeatOnLifecycle 配合上游的SharingStarted.WhileSubscribed一块使用是一种更安全、性能更好的流收集方式。

4.3 StateFlow和LiveData的异同点

我们知道通过LiveData可以让数据被观察,且具备生命周期感知能力,但LiveData的缺点也很明显:

  1. LiveData的接收只能在主线程;
  2. LiveData发送数据是一次性买卖,不能多次发送;
  3. LiveData发送数据的线程是固定的,不能切换线程,setValue/postValue本质上都是在主线程上发送的。当需要来回切换线程时,LiveData就显得无能为力了。

StateFlowLiveData 具有相似之处。两者都是可观察的数据容器类,并且在应用架构中使用时,两者都遵循相似模式。但两者还是有不同之处的:

  1. StateFlow 需要将初始状态传递给构造函数,而 LiveData 不需要。
  2. View 进入 STOPPED 状态时,LiveData.observe() 会自动取消注册使用方,而从 StateFlow 或任何其他数据流收集数据的操作并不会自动停止。如需实现相同的行为,需要从 Lifecycle.repeatOnLifecycle 块收集数据流。

4.4 StateFlow、SharedFlow 和 Channel对比

Flow底层使用的Channel机制实现,StateFlowSharedFlow都是一对多的关系,如果上游发送者与下游UI层的订阅者是一对一的关系,可以使用Channel来实现,Channel默认是粘性的

Channel使用场景:一次性消费场景,比如弹窗,需求是在UI层只弹一次,即使App切到后台再切回来,也不会重复订阅(不会多次弹窗);

如果使用SharedFlow/StateFlow,UI层使用的lifecycle.repeatOnLifecycleFlow.flowWithLifecycle,则在App切换前后台时,UI层会重复订阅,弹窗事件可能会多次执行,不符合要求。

Channel使用特点:

  1. 每个消息只有一个订阅者可以收到,用于一对一的通信
  2. 第一个订阅者可以收到collect之前的事件,即粘性事件

Channel使用举例:

//viewModel中
private val _loadingChannel = Channel<Boolean>()
val loadingFlow = _loadingChannel.receiveAsFlow()private suspend fun loadStart() {_loadingChannel.send(true)
}private suspend fun loadFinish() {_loadingChannel.send(false)
}//UI层接收Loading信息mViewModel.loadingFlow.flowWithLifecycle2(this, Lifecycle.State.STARTED) { isShow ->mStatusViewUtil.showLoadingView(isShow)}

5 扩展

在新项目或者新需求中,我们可以直接使用协程来替代之前的多线程场景的使用方式,如可以通过withContext(Dispatchers.IO)在协程中来回切换线程且能在线程执行完毕后自动切回当前线程,避免使用接口回调的方式导致逻辑可读性变差。然而,如果我们是在现有项目中开发或者网络框架就是回调方式使用时,没有办法直接使用协程,但是可以通过suspendCancellableCoroutinecallbackFlow将接口回调转化成协程:

suspendCancellableCoroutine等待单次回调API的结果时挂起协程,并将结果返回给调用者;如果需要返回Flow数据流,可以使用callbackFlow

5.1 suspendCancellableCoroutine

使用举例:

   //ViewModel中/*** suspendCancellableCoroutine将回调转化为协程使用*/suspend fun suspendCancelableData(): String {return try {getSccInfo()} catch (e: Exception) {"error: ${e.message}"}}/*** suspendCancellableCoroutine将回调转化为协程使用*/private suspend fun getSccInfo(): String = suspendCancellableCoroutine { continuation ->val callback = object : ICallBack {override fun onSuccess(sucStr: String?) {//1、返回结果 将结果赋值给getSccInfo()挂起函数的返回值//2、如果调用了continuation.cancel(),resume()的结果将不会返回了,因为协程取消了continuation.resume(sucStr ?: "empty")}override fun onError(error: Exception) {//这里会将异常抛给上层 需要上层进行处理continuation.resumeWithException(error)}}continuation.invokeOnCancellation {//协程取消时调用,可以在这里进行解注册println("invokeOnCancellation")}//模拟网络请求 此时协程被suspendCancellableCoroutine挂起,直到触发回调Thread {Thread.sleep(500)//模拟Server返回数据callback.onSuccess("getServerInfo")//模拟抛异常//callback.onError(IllegalArgumentException("server error"))}.start()//模拟取消协程//continuation.cancel()}//Activity中mBtnScc.setOnClickListener {lifecycleScope.launch {val result = mFlowModel.suspendCancelableData()println(result)}}//输出:// getServerInfo

suspendCancellableCoroutine声明了作用域,并且传入一个CancellableContinuation参数,它可以调用resumeresumeWithException来处理对应的成功、失败回调,还可以调用cancel()方法取消协程的执行(抛出CancellationException异常,但程序不会崩溃,当然也可以通过catch抓住该异常进行处理)。

上面例子中,当开始执行时会将suspendCancellableCoroutine作用域内协程挂起,如果成功返回数据,会回调continuation.resume()方法将结果返回;如果出现异常,会回调continuation.resumeWithException()将异常抛到上层。这样整个函数处理完后,上层会从挂起点恢复并继续往下执行。

5.2 callbackFlow

callbackFlow相对于suspendCancellableCoroutine,对接口回调封装以后返回的是Flow数据流,后续就可以对数据流进行一系列操作。

callbackFlow中的几个重要方法:

  1. trySend/offer:在接口回调中使用,用于上游发射数据,类似于flow{}中的emit(),kotlin 1.5.0以下使用offer,1.5.0以上推荐使用trySend()
  2. awaitClose:写在最后,这是一个挂起函数, 当 flow 被关闭的时候 block 中的代码会被执行 可以在这里取消接口的注册等。

使用举例,比如当前有个场景:去某个地方,需要先对目的地进行搜索,再出发到达目的地,假设搜索、到达目的地两个行为都是使用回调来执行的,我们现在使用callbackFlow对他们进行修改:

ViewModel中,搜索目的地:

fun getSearchCallbackFlow(): Flow<Boolean> = callbackFlow {val callback = object : ICallBack {override fun onSuccess(sucStr: String?) {//搜索目的地成功trySend(true)}override fun onError(error: Exception) {//搜索目的地失败trySend(false)}}//模拟网络请求Thread {Thread.sleep(500)//模拟Server返回数据callback.onSuccess("getServerInfo")}.start()//这是一个挂起函数, 当 flow 被关闭的时候 block 中的代码会被执行 可以在这里取消接口的注册等awaitClose { println("awaitClose") }
}

ViewModel中,前往目的地:

fun goDesCallbackFlow(isSuc: Boolean): Flow<String?> = callbackFlow {val callback = object : ICallBack {override fun onSuccess(sucStr: String?) {trySend(sucStr)}override fun onError(error: Exception) {trySend(error.message)}}//模拟网络请求Thread {Thread.sleep(500)if (isSuc) {//到达目的地callback.onSuccess("arrive at the destination")} else {//发生了错误callback.onError(IllegalArgumentException("Not at destination"))}}.start()awaitClose { println("awaitClose") }}
Activity中,使用Flow.flatMapConcat对两者进行整合:mBtnCallbackFlow.setOnClickListener {lifecycleScope.launch {//将两个callbackFlow串联起来 先搜索目的地,然后到达目的地mFlowModel.getSearchCallbackFlow().flatMapConcat {mFlowModel.goDesCallbackFlow(it)}.collect {mTvCallbackFlow.text = it ?: "error"}}}//输出:// arrive at the destination

以下结论摘自官网:
flow 构建器不同,callbackFlow 允许通过 send 函数从不同 CoroutineContext 发出值,或者通过 offer/trySend 函数在协程外发出值。
在协程内部,callbackFlow 会使用通道,它在概念上与阻塞队列非常相似。通道都有容量配置,限定了可缓冲元素数的上限。在 callbackFlow 中所创建通道的默认容量为 64 个元素。当您尝试向完整通道添加新元素时,send 会将数据提供方挂起,直到新元素有空间为止,而 offer 不会将相关元素添加到通道中,并会立即返回 false

4 总结

FlowKotlin 提供的解决复杂异步场景的方案,Flow本质是挂起函数,主要用于构建类似生产者-中介-消费者模型,可对流进行处理过滤变换组合重试异常捕获线程切换等,便于开发者进行流的创建处理消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/167300.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

保姆级 ARM64 CPU架构下安装部署Docker + rancher + K8S 说明文档

1 K8S 简介 K8S是Kubernetes的简称&#xff0c;是一个开源的容器编排平台&#xff0c;用于自动部署、扩展和管理“容器化&#xff08;containerized&#xff09;应用程序”的系统。它可以跨多个主机聚集在一起&#xff0c;控制和自动化应用的部署与更新。 K8S 架构 Kubernete…

从Redis反序列化UserDetails对象异常后中发现FastJson序列化的一些问题

最近在使用SpringSecurityJWT实现认证授权的时候&#xff0c;出现Redis在反序列化userDetails的异常。通过实践发现&#xff0c;使用不同的序列化方法和不同的fastJson版本&#xff0c;异常信息各不相同。所以特地记录了下来。 一、项目代码 先来看看我项目中redis相关配置信息…

VMware Workstation 17 虚拟机自启动失效 解决脚本

VMware Workstation17新增加了虚拟机自启配置 但是很奇怪在我的一台计算机上能够自启&#xff0c;在另一台计算机上就失效 编写脚本 以命令方式完成虚拟机开机自启 #虚拟机自启.batif "%1""hide" goto CmdBegin start mshta vbscript:createobject("w…

缓存组件状态,提升用户体验:探索 keep-alive 的神奇世界

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

Day31| Leetcode 455. 分发饼干 Leetcode 376. 摆动序列 Leetcode 53. 最大子数组和

进入贪心了&#xff0c;我觉得本专题是最烧脑的专题 Leetcode 455. 分发饼干 题目链接 455 分发饼干 让大的饼干去满足需求量大的孩子即是本题的思路&#xff1a; class Solution { public:int findContentChildren(vector<int>& g, vector<int>& s) {…

仿ChatGPT对话前端页面(内含源码)

仿ChatGPT对话前端页面&#xff08;内含源码&#xff09; 前言布局样式和Js部分关键点全部源码 前言 本文主要讲解如何做出类似ChatGPT的前端页面。具体我们的效果图是长这样&#xff0c;其中除了时间是动态的之外&#xff0c;其他都是假数据。接下来让我们从布局和样式的角度…

Android Tombstone 与Debuggerd 原理浅谈

一、前言 Android系统类问题主要有stability、performance、power、security。Android集成一个守护进程tombstoned是android平台的一个守护进程&#xff0c;它注册成3个socket服务端&#xff0c;客户端封装在crash_dump和debuggerd_client。 crash_dump用于跟踪定位C crash&am…

前端入门(三)Vue生命周期、组件技术、事件总线、

文章目录 Vue生命周期Vue 组件化编程 - .vue文件非单文件组件组件的注意点组件嵌套Vue实例对象和VueComponent实例对象Js对象原型与原型链Vue与VueComponent的重要内置关系 应用单文件组件构建 Vue脚手架 - vue.cli项目文件结构refpropsmixin插件scoped样式 Vue生命周期 1、bef…

cineSync 3.3新功能: 深入iconik集成、激光工具、OTIOZ支持等

cineSync 3.3为大家带来了灵活性和精准度&#xff0c;使连接审阅会话与iconik中的媒体管理和存储更加容易&#xff0c;并且引入了颜色配置文件以快速测试颜色配置&#xff0c;还有通过激光指针等新工具带来新的可能性。 在ftrack&#xff0c;我们意识到当今的远程创意工作流比以…

反爬虫机制与反爬虫技术(二)

反爬虫机制与反爬虫技术二 1、动态页面处理与验证码识别概述2、反爬虫案例:页面登录与滑块验证码处理2.1、用例简介2.2、库(模块)简介2.3、网页分析2.4、Selenium准备操作2.5、页面登录2.6、模糊移动滑块测试3、滑块验证码处理:精确移动滑块3.1、精确移动滑块的原理3.2、滑…

PyQt6简介

锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计12条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话版…

企业远程访问业务系统:对比MPLS专线,贝锐蒲公英为何更优优势?

如今&#xff0c;企业大多都会采用OA、ERP、CRM等各种数字化业务系统。 私有云、公有云混合架构也变得越来越常见。 比如&#xff1a;研发系统部署在公司本地私有云、确保数据安全&#xff0c;OA采用公有云方案、满足随时随地访问需求。 如此一来&#xff0c;也产生了远程访问…

js实现图片懒加载

方式一&#xff1a;html实现 在img标签加上 loading"lazy" 方式二&#xff1a;js实现 通过js监听页面的滚动&#xff0c;实现的原理主要是判断当前图片是否到了可视区域&#xff1a; 拿到所有的图片 dom 。遍历每个图片判断当前图片是否到了可视区范围内。如果到了…

Maven项目下详细的SSM整合流程

文章目录 &#x1f389;SSM整合流程一、两个容器整合✨ 1、先准备好数据库config.properties连接、mybatis-config.xml&#x1f38a; 2、容器一&#xff1a;优先配置spring.xml文件&#x1f38a; 3、容器二&#xff1a;配置springMVC.xml文件&#x1f38a; 4、Tomcat整合spring…

解释PCIe MSI 中断要求中断向量连续?PCIe 规范里并没有明确指出

MSI 向量必须连续&#xff1f; 前言 MSI 物理条件&#xff0c;MSI 中断产生的逻辑是RC初始化的时候&#xff0c;由软件将配置写入到 EP 的 2 个寄存器中&#xff0c;这两个寄存器一个指示的是地址 Message Address&#xff0c;一个指示的是数据 Message Data。当 EP 试图触发…

你再不学Git就来不及了!!!

其他系列文章导航 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 版本控制 什么是版本控制 为什么要版本控制 一、认识 Git 1.1Git 简史 1.2Git 与其他版本管理系统的主要区别 1.3Git 的三种状态 二、Git 使用快速入门 2.1获…

springboot使用redis缓存乱码(key或者 value 乱码)一招解决

如果查看redis中的值是这样 创建一个配置类就可以解决 package com.deka.config;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; i…

CPU+GPU多样化算力,ZStack Cloud助力游戏精酿核心业务上云

游戏精酿通过ZStack Cloud云平台提供高性能、高可用的云主机、云存储和云网络&#xff1b;前期通过超融合架构快速构建云基础设施&#xff0c;来支持Jira、Redis等关键业务&#xff1b;并实现对原有私有云平台业务的替代&#xff0c;按需将原有私有云业务滚动迁移到ZStack Clou…

深入理解Spring AOP的工作流程

文章目录 引言什么是AOP&#xff1f;Spring AOP的工作原理1. JDK动态代理2. CGLIB代理 Spring AOP的注解方式Aspect注解EnableAspectJAutoProxy注解 Spring AOP的工作流程拓展应用1. 自定义注解2. 异常处理3. 切面优先级 结论 &#x1f389;深入理解Spring AOP的工作流程 ☆* o…

关于运行软件程序出现vcruntime140.dll丢失的修复教程-解决方案

vcruntime140.dll是Microsoft Visual C库文件的一部分&#xff0c;用于支持Windows操作系统上的应用程序。如果找不到或丢失了这个文件&#xff0c;可能会导致某些应用程序无法正常运行。下面是关于vcruntime140.dll丢失的5个修复方法&#xff0c;以及vcruntime140.dll文件属性…