协程:Channel 热流
1、Channel是什么?
- 生产者:多个协程
- 消费者:多个协程
- 中间:Channel 管道 并发安全队列
- 发送send
- 接收recv
协程间通信
1、Channel可以用于协程间通信
// 通道Channelval channel = Channel<Int>()// 生产者launch{(1..6).forEach {delay(1000L)println("我生产了一个:$it")channel.send(it)}}// 消费者launch{(1..6).forEach {val r= channel.receive()println("消费了一个:$r")}}
capacity
1、生产速度>消费速度
- 如果缓冲区满了,send会挂起,消费完后再生产
- capacity,默认容量,0
UNLIMITED:send不再挂起
- 容量接近于无限
- 容量不满就不会挂起
// 通道Channelval channel = Channel<Int>(Channel.UNLIMITED)
消费方式
// 第一种发方式 消费(1..8).forEach {delay(2000L)val r= channel.receive()println("消费了一个:$r")}
iterator
// 第二种发方式 消费val it = channel.iterator()while (it.hasNext()) {val item = it.next()delay(2000L)println("消费了一个:$item")}
item in channel
// 第三种发方式 消费for (item in channel) {delay(2000L)println("消费了一个:$item")}
快捷方式
produce和ReceiveChannel
- produce快速构建消费者
// 生产者的快捷方式val produce = produce {(1..20).forEach { delay(2000L) ; send(it) }}// 普通的消费launch {for (item in produce) {println("消费了一个:$item")}}// receive()接收数据,有数据没有消费,send会一直阻塞launch {println("消费了一个:${produce.receive()}")delay(2000)println("消费了一个:${produce.receive()}")println("消费了一个:${produce.receive()}")println("消费了一个:${produce.receive()}")println("消费了一个:${produce.receive()}")println("消费了一个:${produce.receive()}")}
produce(capacity = 100),会增加缓冲区,只要没有放满send不会再阻塞。
actor和SendChannel
- actor快速构建消费者
// 消费者的快捷方式val consumer = actor<Int> {(1..20).forEach {println("消费了一个:${receive()}")}}// 普通的生成launch {(1..20).forEach { delay(2000L) ; consumer.send(it) }}
close
1、channel.close
- 关闭
- 一般是生产者去close
isClosedForSend
channel.close() 之前 isClosedForSend == false
channel.close() 之后 isClosedForSend == true
// 生产者launch {(1..6).forEach {if (!channel.isClosedForSend) {channel.send(it)println("我生产了一个$it")// if (it == 3) channel.close() // 大部分情况下,是生产者 去close}}println("close前 isClosedForSend:${channel.isClosedForSend} " +" isClosedForReceive:${channel.isClosedForReceive}")channel.close()println("close后 isClosedForSend:${channel.isClosedForSend} " +" isClosedForReceive:${channel.isClosedForReceive}")}
isClosedForReceive
如果消费完了 isClosedForReceive == true, 否则就是false
如果缓冲区里面还有内容,没有消费完 也是 false
// 消费者launch {try {for (i in channel) {delay(2000L)println("我消费了一个:$i")}}finally {println("finally isClosedForSend:${channel.isClosedForSend} " +" isClosedForReceive:${channel.isClosedForReceive}")}}
BroadcastChannel
1、广播给所有消费者,多个地方可以接收到
- 创建
val channel = Channel<Int>()val broadcastChannel = channel.broadcast(Channel.BUFFERED)
- 生产
// 生产者launch {repeat(8) {delay(1000L)broadcastChannel.send(it + 100001) // 发送}broadcastChannel.close() // 关闭}
openSubscription
- 消费
repeat(8) {// 消费者launch {val r = broadcastChannel.openSubscription()for (i in r) {println("协程$it ---- 消费者 ${i}")}}}
select
1、select: 择优选择数据,谁先返回用谁的
- 加载首页数据,可以作缓存
- 缓存有用缓存,缓存不存在去请求
- "慢的不会再执行"会被cancel
2、select 是一个用于多路选择的结构,可以同时等待多个挂起函数或通道的操作完成。它类似于 switch 或 if-else 的多路分支语句,但是它是用于协程的异步操作。
suspend fun selectExample() {select<Unit> {someChannel.onReceive { value ->// 处理从通道接收到的值}someDeferred.onAwait { result ->// 处理异步操作完成后的返回值}onTimeout(1000) {// 在指定时间内没有任何操作完成时执行}}
}
3、select可以用于上游,也可以用于下游
onAwait
- async有onAwait
data class Home(val info1: String, val info2: String)data class HomeRequestResponseResultData(val code: Int, val msg: String, val home: Home)// 请求本地加载首页数据
fun CoroutineScope.getHomeLocalData() = async (Dispatchers.IO) {delay(3000)Home("数据1...", "数据1...")
}// 请求网络服务器加载首页数据
fun CoroutineScope.getHomeRemoteData() = async (Dispatchers.IO) {delay(6000)Home("数据3...", "数据4...")
}
launch {val localRequestAction = getHomeLocalData()val remoteRequestAction = getHomeRemoteData()val resultResponse = select<HomeRequestResponseResultData> {localRequestAction.onAwait {// 做校验 工作// ...// 省略1000行代码HomeRequestResponseResultData(200, "恭喜你,请求成功", it) // 最后一行作为返回值}remoteRequestAction.onAwait {// 做校验 工作// ...// 省略1000行代码HomeRequestResponseResultData(200, "恭喜你,请求成功", it) // 最后一行作为返回值}}println("resultResponse:$resultResponse")}
2、async需要在调用的CoroutineScope中执行
fun CoroutineScope.getHomeLocalData() = async (Dispatchers.IO) {delay(3000)Home("数据1...", "数据1...")
}
// 对CoroutineScope扩展
channel数组
- 哪个更快选择哪个Channel
onReceive
- onReceive: 接收数据后的回调
val channels = arrayOf(Channel<String?>(), Channel<String?>())launch {delay(6000)channels[0].send("login successful")}launch {delay(8000)channels[1].send("register successful")}val receiveResult = select<String ?> {for (channel in channels) {channel.onReceive {// 做校验 工作// ...// 省略1000行代码"[$it]" // 最后一行作为返回值}}}println(receiveResult)
onJoin
launch无返回值,但想看谁执行的最快
val job1 = launch {println("launch1 run")} // 无返回值val job2 = launch {println("launch2 run")} // 无返回值select<Unit> {job1.onJoin { println("launch1 执行完成了 很快") }job2.onJoin { println("launch2 执行完成了 很快") }}
onSend
发送数据,并且显示回调的内容(上游)
// 准备Channel数组val channels = arrayOf(Channel<Char>(), Channel<Char>())// 协程一:Channel 的 发射源launch(Dispatchers.Default) {select<Unit> {// 并行干活,sendlaunch {channels[0].onSend('女') {println("channels[0].onSend('女') { $it }")}}// 并行干活,sendlaunch {channels[1].onSend('男') {println("channels[1].onSend('男') { $it }")}}}}// 协程二:下游 接收阶段launch { println("channel1 下游接收 ${channels[0].receive()}") }launch { println("channel2 下游接收 ${channels[1].receive()}") }输出:
channel1 下游接收 女
channels[0].onSend('女') { RendezvousChannel@34206005{EmptyQueue} }
// 1. onSend先发送消息
// 2. 下游接收到
// 3. onSend回调打印消息