1 前言
Channel 是一个并发安全的阻塞队列,可以通过 send 函数往队列中塞入数据,通过 receive 函数从队列中取出数据。
当队列被塞满时,send 函数将被挂起,直到队列有空闲缓存;当队列空闲时,receive 函数将被挂起,直到队列中有新数据存入。
Channel 中队列缓存空间的大小需要在创建时指定,如果不指定,缓存空间默认是 0。
2 Channel 中 send 和 receive 案例
2.1 capacity 为 0
fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {delay(10)println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {delay(100)var element = channel.receive()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}
打印如下。
send: 0
receive: 0
send: 1
receive: 1
send: 2
receive: 2
说明:send 的 delay 时间比 receive 的 delay 时间短,但是并没有出现连续打印两个 send,而是打印一个 send,再打印一个 recieve,它们交替打印。因为 Channel 中队列的缓存空间默认为 0,在执行了 send 后,如果没有执行 recieve,send 将一直被挂起,直到执行了 receive 才恢复执行 send。
2.2 capacity 大于 0
fun main() {var channel = Channel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {delay(10)println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {delay(100)var element = channel.receive()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}
打印如下。
send: 0
send: 1
send: 2
receive: 0
receive: 1
receive: 2
说明:Channel 中队列的缓存空间为 2,send 的 delay 时间比 receive 的 delay 时间短,因此会出现连续打印多个 send。
3 Channel 中迭代器
3.1 iterator
fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者var iterator = channel.iterator()while (iterator.hasNext()) {var element = iterator.next()println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}
打印如下。
send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2
3.2 for in
fun main() {var channel = Channel<Int>()CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in channel) {println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}
打印如下。
send: 0
receive: 0
send: 1
send: 2
receive: 1
receive: 2
4 Channel 中 produce 和 actor
produce 函数用于构造一个生产者协程,并返回一个 ReceiveChannel;actor 函数用于构造一个消费者协程,并返回一个 SendChannel。
4.1 produce
fun main() {var receiveChannel = CoroutineScope(Dispatchers.Default).produce<Int> { // 生产者repeat(3) {println("send: $it")send(it)}}CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in receiveChannel) {println("receive: $element")}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}
打印如下。
send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2
4.2 actor
fun main() {var sendChannel = CoroutineScope(Dispatchers.Default).actor<Int> { // 生产者repeat(3) {var element = receive()println("receive: $element")}}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {println("send: $it")sendChannel.send(it)}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}
打印如下。
send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2
5 Channel 的关闭
对于一个 Channel,如果我们调用了它的 close 函数,它会立即停止发送新元素,也就是说这时它的 isClosedForSend 会立即返回 true。而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。
fun main() {var channel = Channel<Int>(3)CoroutineScope(Dispatchers.Default).launch { // 生产者repeat(3) {println("send: $it")channel.send(it)}channel.close()println("producter, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")}CoroutineScope(Dispatchers.Default).launch { // 消费者repeat(3) {var element = channel.receive()println("receive: $element")}println("consumer, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}
打印如下。
send: 0
send: 1
send: 2
producter, isClosedForSend=true, isClosedForReceive=false
receive: 0
receive: 1
receive: 2
consumer, isClosedForSend=true, isClosedForReceive=true
6 BroadcastChannel
Channel 的生产者(producter)和消费者(consumer)都可以存在多个,但是同一个元素只会被一个消费者读到。BroadcastChannel 则不然,多个消费者不存在互斥行为。
6.1 Channel 中多个消费者
fun main() {var channel = Channel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")channel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者for (element in channel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}
打印如下。
send: 0
send: 1
send: 2
receive-0: 0
receive-0: 2
receive-1: 1
说明:结果表明,Channel 中同一个元素只会被一个消费者读到。
6.2 BroadcastChannel 中多个消费者
6.2.1 BroadcastChannel
fun main() {var broadcastChannel = BroadcastChannel<Int>(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")broadcastChannel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者var receiveChannel = broadcastChannel.openSubscription()for (element in receiveChannel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}
打印如下。
send: 0
send: 1
send: 2
receive-0: 0
receive-0: 1
receive-0: 2
receive-1: 0
receive-1: 1
receive-1: 2
说明:结果表明,BroadcastChannel 中同一个元素可以被所有消费者读到。
6.2.2 broadcast
fun main() {var channel = Channel<Int>()var broadcastChannel = channel.broadcast(2)CoroutineScope(Dispatchers.Default).launch { // 生产者delay(10)repeat(3) {println("send: $it")broadcastChannel.send(it)}}repeat(2) { index ->CoroutineScope(Dispatchers.Default).launch { // 消费者var receiveChannel = broadcastChannel.openSubscription()for (element in receiveChannel) {println("receive-$index: $element")}}}Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}
打印如下。
send: 0
send: 1
send: 2
receive-1: 0
receive-1: 1
receive-1: 2
receive-0: 0
receive-0: 1
receive-0: 2