本文首发于公众号“AntDream”,欢迎微信搜索“AntDream”或扫描文章底部二维码关注,和我一起每天进步一点
于冷流不同,在垃圾回收之前,flow里的值都是存在内存之中,并且处于活跃状态
StateFlow
StateFlow是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。还可通过其value属性读取当前的状态值
- 和livedata比较像,有新数据可以通知collect的一方
- 同时又具有flow的所有特点,比如可以挂起,切换线程
SharedFlow
SharedFlow会向其中收集值得所有使用方发出数据
- 也就是一对多的关系,可以有多个collector
- 同时又具有flow的所有特点,比如可以挂起,切换线程
- 和上面的StateFlow不同的是,这个不能主动通知collect方,需要不断emit元素,也就是利用了flow的功能
channel
定义概念
- channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信
- channel实际上就是一个队列,队列中一定存在缓冲区,那么这个缓冲区一旦满了,并且也一直没有人调用receive并取走函数,send就需要挂起。
- 默认缓冲区大小是0
val channel = Channel<Int>()@Test
fun `test channel` ()= runBlocking<Unit>{val producer = GlobalScope.launch { var i = 0;while (true){delay(1000)channel.send(++i)println("send $i")}}val consumer = GlobalScope.launch { while (true){val element = channel.receive()println("receive $element")}}joinAll(producer, consumer)
}
- 在读取channel时可以直接获取一个channel的iterator迭代器
val iterator = channel.iterator()
while (iterator.hasNext()){val element = iterator.next()
}//也可以这样
for (element in channel){}
produce与actor
- 构造生产者和消费者的便捷方法
- 可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel,其他协程就可以用这个Channel来接受数据。反过来,我们可以用actor启动一个消费者协程。
val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce {repeat(100){delay(1000)send(it)}
}
val consumer = GlobalScope.launch {for (i in receiveChannel){println("receive $i")}
}val sendChannel:SendChannel<Int> = GlobalScope.actor {while (true){val element = receive()println(element)}
}
val producer = GlobalScope.launch {for (i in 1..3){sendChannel.send(i)}
}
channel的关闭
- produce与actor返回的channel都会随着对应的协程执行完毕而关闭
- 对于一个channel,如果我们调用了它的close方法,它会立即停止接受新元素,也就是说这时它的isClosedForSend会立即返回true。而由于channel缓冲区的存在,这时候可能还有元素没有被处理完,因此要等所有的元素都被读取之后isClosedForReceive才会返回true
- channel的生命周期最好由主导方来维护,建议由主导的一方实现关闭
BroadcastChannel
发送端和接收端在Channel中存在一对多的情形,从数据处理本身来说,虽然有多个接收端,但是同一个元素只会被一个接收端读到。广播则不然,多个接收端不存在互斥行为
欢迎关注我的公众号查看更多精彩文章!