Kotlin Channel 热流

协程:Channel 热流

1、Channel是什么?

  1. 生产者:多个协程
  2. 消费者:多个协程
  3. 中间:Channel 管道 并发安全队列
  4. 发送send
  5. 接收recv

协程间通信

1、Channel可以用于协程间通信

    // 通道Channelval channel = Channel<Int>()// 生产者launch{(1..6).forEach {delay(1000L)println("我生产了一个:$it")channel.send(it)}}// 消费者launch{(1..6).forEach {val r=  channel.receive()println("消费了一个:$r")}}

capacity

1、生产速度>消费速度

  1. 如果缓冲区满了,send会挂起,消费完后再生产
  2. capacity,默认容量,0

UNLIMITED:send不再挂起

  1. 容量接近于无限
  2. 容量不满就不会挂起
    // 通道Channelval channel = Channel<Int>(Channel.UNLIMITED)

消费方式

        // 第一种发方式 消费(1..8).forEach {delay(2000L)val r=  channel.receive()println("消费了一个:$r")}

iterator

        // 第二种发方式 消费val it = channel.iterator()while (it.hasNext()) {val item = it.next()delay(2000L)println("消费了一个:$item")}

item in channel

        // 第三种发方式 消费for (item in channel) {delay(2000L)println("消费了一个:$item")}

快捷方式

produce和ReceiveChannel

  1. produce快速构建消费者
// 生产者的快捷方式val produce = produce {(1..20).forEach { delay(2000L) ; send(it) }}// 普通的消费launch {for (item in produce) {println("消费了一个:$item")}}// receive()接收数据,有数据没有消费,send会一直阻塞launch {println("消费了一个:${produce.receive()}")delay(2000)println("消费了一个:${produce.receive()}")println("消费了一个:${produce.receive()}")println("消费了一个:${produce.receive()}")println("消费了一个:${produce.receive()}")println("消费了一个:${produce.receive()}")}

produce(capacity = 100),会增加缓冲区,只要没有放满send不会再阻塞。

actor和SendChannel

  1. actor快速构建消费者
    // 消费者的快捷方式val consumer = actor<Int> {(1..20).forEach {println("消费了一个:${receive()}")}}// 普通的生成launch {(1..20).forEach { delay(2000L) ; consumer.send(it) }}

close

1、channel.close

  1. 关闭
  2. 一般是生产者去close

isClosedForSend

channel.close() 之前 isClosedForSend == false
channel.close() 之后 isClosedForSend == true

    // 生产者launch {(1..6).forEach {if (!channel.isClosedForSend) {channel.send(it)println("我生产了一个$it")// if (it == 3) channel.close() // 大部分情况下,是生产者 去close}}println("close前 isClosedForSend:${channel.isClosedForSend} " +" isClosedForReceive:${channel.isClosedForReceive}")channel.close()println("close后 isClosedForSend:${channel.isClosedForSend} " +" isClosedForReceive:${channel.isClosedForReceive}")}

isClosedForReceive

如果消费完了 isClosedForReceive == true, 否则就是false
如果缓冲区里面还有内容,没有消费完 也是 false

    // 消费者launch {try {for (i in channel) {delay(2000L)println("我消费了一个:$i")}}finally {println("finally isClosedForSend:${channel.isClosedForSend} " +" isClosedForReceive:${channel.isClosedForReceive}")}}

BroadcastChannel

1、广播给所有消费者,多个地方可以接收到

  1. 创建
    val channel = Channel<Int>()val broadcastChannel = channel.broadcast(Channel.BUFFERED)
  1. 生产
    // 生产者launch {repeat(8) {delay(1000L)broadcastChannel.send(it + 100001) // 发送}broadcastChannel.close() // 关闭}

openSubscription

  1. 消费
    repeat(8) {// 消费者launch {val r = broadcastChannel.openSubscription()for (i in r) {println("协程$it ---- 消费者 ${i}")}}}

select

1、select: 择优选择数据,谁先返回用谁的

  1. 加载首页数据,可以作缓存
  2. 缓存有用缓存,缓存不存在去请求
  3. "慢的不会再执行"会被cancel

2、select 是一个用于多路选择的结构,可以同时等待多个挂起函数或通道的操作完成。它类似于 switch 或 if-else 的多路分支语句,但是它是用于协程的异步操作。

suspend fun selectExample() {select<Unit> {someChannel.onReceive { value ->// 处理从通道接收到的值}someDeferred.onAwait { result ->// 处理异步操作完成后的返回值}onTimeout(1000) {// 在指定时间内没有任何操作完成时执行}}
}

3、select可以用于上游,也可以用于下游

onAwait

  1. async有onAwait
data class Home(val info1: String, val info2: String)data class HomeRequestResponseResultData(val code: Int, val msg: String, val home: Home)// 请求本地加载首页数据
fun CoroutineScope.getHomeLocalData() = async (Dispatchers.IO) {delay(3000)Home("数据1...", "数据1...")
}// 请求网络服务器加载首页数据
fun CoroutineScope.getHomeRemoteData() = async (Dispatchers.IO) {delay(6000)Home("数据3...", "数据4...")
}
    launch {val localRequestAction = getHomeLocalData()val remoteRequestAction = getHomeRemoteData()val resultResponse = select<HomeRequestResponseResultData> {localRequestAction.onAwait {// 做校验 工作// ...// 省略1000行代码HomeRequestResponseResultData(200, "恭喜你,请求成功", it) // 最后一行作为返回值}remoteRequestAction.onAwait {// 做校验 工作// ...// 省略1000行代码HomeRequestResponseResultData(200, "恭喜你,请求成功", it) // 最后一行作为返回值}}println("resultResponse:$resultResponse")}

2、async需要在调用的CoroutineScope中执行

fun CoroutineScope.getHomeLocalData() = async (Dispatchers.IO) {delay(3000)Home("数据1...", "数据1...")
}
// 对CoroutineScope扩展

channel数组

  1. 哪个更快选择哪个Channel
onReceive
  1. onReceive: 接收数据后的回调
    val channels = arrayOf(Channel<String?>(), Channel<String?>())launch {delay(6000)channels[0].send("login successful")}launch {delay(8000)channels[1].send("register successful")}val receiveResult = select<String ?> {for (channel in channels) {channel.onReceive {// 做校验 工作// ...// 省略1000行代码"[$it]" // 最后一行作为返回值}}}println(receiveResult)

onJoin

launch无返回值,但想看谁执行的最快

    val job1 = launch {println("launch1 run")} // 无返回值val job2 = launch {println("launch2 run")} // 无返回值select<Unit> {job1.onJoin { println("launch1 执行完成了 很快") }job2.onJoin { println("launch2 执行完成了 很快") }}

onSend

发送数据,并且显示回调的内容(上游)

    // 准备Channel数组val channels = arrayOf(Channel<Char>(), Channel<Char>())// 协程一:Channel 的 发射源launch(Dispatchers.Default) {select<Unit> {// 并行干活,sendlaunch {channels[0].onSend('女') {println("channels[0].onSend('女') { $it }")}}// 并行干活,sendlaunch {channels[1].onSend('男') {println("channels[1].onSend('男') { $it }")}}}}// 协程二:下游 接收阶段launch { println("channel1 下游接收 ${channels[0].receive()}") }launch { println("channel2 下游接收 ${channels[1].receive()}") }输出:
channel1 下游接收 女
channels[0].onSend('女') { RendezvousChannel@34206005{EmptyQueue} }
// 1. onSend先发送消息
// 2. 下游接收到
// 3. onSend回调打印消息

await

复用Channel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/48390.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 读取pcd、ply点云文件数据

最近研究了下用pcl读取点云数据&#xff0c;又做了个C#的dll&#xff0c;方便读取&#xff0c;同样这个dll基于pcl 最新版本1.13.1版本开发。 上次做的需要先得到点云长度&#xff0c;再获取数据。这次这个定义了一个PointCloudXYZ类来存数据。将下面的dll拷贝到可执行目录下&a…

Docker详解

文章目录 Docker详解一、Docker简介什么是容器 &#xff1f;容器技术有哪些优点 &#xff1f;什么是Docker &#xff1f;Docker的特点Docker的使用场景 二、Docker的基本组成Docker 客户端 / 守护进程Docker Image 镜像Docker Container 容器Docker Registry 仓库 三、Docker 依…

嵌入式系统中如何选择RTC电池?

RTC&#xff08;Real Time Clock&#xff09;是一种用于提供系统时间的独立定时器&#xff0c;它可以在系统断电或低功耗模式下继续运行&#xff0c;只需要一个后备电池作为供电源。在嵌入式系统中&#xff0c;选择合适的RTC电池时非常关键的&#xff0c;它会影响系统时间的准确…

二、SQL注入之联合查询

文章目录 1、SQL注入原理2、SQL注入的原因3、SQL注入的危害4、SQL注入基础4.1 MySQL相关4.2 SQL注入流程&#xff1a; 5、联合注入实例基本步骤6、总结 1、SQL注入原理 SQL注入(Sql Injection&#xff09;就是通过把SQL命令插入到Web表单提交或输入域名或页面请求的查询字符串&…

【从零学习python 】56. 异常处理在程序设计中的重要性与应用

文章目录 异常的概念读取文件异常try...except语句try...else语句try...finally语句 进阶案例 异常的概念 在程序运行过程中&#xff0c;由于编码不规范或其他客观原因&#xff0c;可能会导致程序无法继续运行&#xff0c;此时就会出现异常。如果不对异常进行处理&#xff0c;…

[C++] string类常用接口的模拟实现

文章目录 1、前言2、遍历2.1 operator[ ]下标方式2.2 迭代器2.3 范围for2.4 c_str 3、容量相关3.1 size&#xff08;大小&#xff09;3.2 capacity&#xff08;容量&#xff09;3.3 empty&#xff08;判空&#xff09;3.4 clear&#xff08;清理&#xff09;3.5 reserve3.6 res…

代码随想录算法训练营day37 | LeetCode 738. 单调递增的数字 968. 监控二叉树

738. 单调递增的数字&#xff08;题目链接&#xff1a;力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台&#xff09; 思路&#xff1a;从后往前考虑&#xff0c;题目会变得很容易&#xff0c;从前往后考虑&#xff0c;结果很难处理。 int monotoneIn…

最新AI系统ChatGPT网站程序源码/搭建教程/支持GPT4.0/Dall-E2绘画/支持MJ以图生图/H5端/自定义训练知识库

一、正文 SparkAi系统是基于国外很火的ChatGPT进行开发的Ai智能问答系统。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。 那么如何搭建部署AI创作ChatGPT&#xff1f;小编这里写一个详细图文教程吧&#xff01…

前端高频面试题 Day03

1. Vue computed 和 watch 区别 对于Computed: ● 它支持缓存&#xff0c;只有依赖的数据发生了变化&#xff0c;才会重新计算 不支持异步&#xff0c;当Computed中有异步操作时,无法监听数据的变化 ● computed的值会默认走缓存&#xff0c;计算属性是基于它们的响应式依赖…

SpringMVC之入门搭建框架

文章目录 前言一、SpringMVC简介1.什么是MVC2.什么是SpringMVC3.SpringMVC的特点 二、搭建框架——HelloWorld1.创建maven工程&#xff08;web项目&#xff09;2.配置web.xml3.配置springMVC.xml4.创建请求控制器 总结 前言 基础小白第一次走进SpringMVC&#xff1a;了解什么是…

自定义HttpClient工具类

自定义HttpClient工具类 简介 依赖 <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.6</version> </dependency>api 发送get请求: doGet(String url);发送po…

LinkedList

LinkedList的模拟实现&#xff08;底层是一个双向链表&#xff09;LinkedList使用 LinkedList的模拟实现&#xff08;底层是一个双向链表&#xff09; 无头双向链表&#xff1a;有两个指针&#xff1b;一个指向前一个节点的地址&#xff1b;一个指向后一个节点的地址。 节点定…

vim 配置环境变量与 JDK 编译器异常

vim 配置环境变量 使用 vim 打开系统中的配置信息&#xff08;不存在将会创建&#xff09;&#xff1a; vim ~/.bash_profile 以配置两个版本 JDK 为例&#xff08;前提是已安装 JDK&#xff09;,使用上述命令打开配置信息&#xff1a; 输入法调成英文&#xff0c;输入 i&…

Mysql Oracle 区别

1. oracle select *, id需要在星号前加别名&#xff0c;mysql则不需要 mysql语法&#xff1a; select *, id from xin_student_t;oracle语法&#xff1a; select st.*, st.id from xin_student_t st;2. oracle表定义了别名&#xff0c;在查询时可以不用别名指定字段&#xf…

OWASP Top 10(2021)漏洞学习(最新)

A01:2021-权限控制失效 从第五位上升到第一位&#xff0c;94%的应用程序都接受了某种形式的针对“失效的访问控制”的测试&#xff0c;该事件的 平均发生率为 3.81%&#xff0c;该漏洞在提供的数据集中出现漏洞的应用数量最多&#xff0c;总发生漏洞应用数量超过31.8万多 次。 …

Python爬取斗罗大陆全集

打开网址http://www.luoxu.cc/dmplay/C888H-1-265.html F12打开Fetch/XHR&#xff0c;看到m3u8&#xff0c;ts&#xff0c;一眼顶真&#xff0c;打开index.m3u8 由第一个包含第二个index.m3u8的地址&#xff0c;ctrlf在源代码中一查index&#xff0c;果然有&#xff0c;不过/…

解决:(error) ERR unknown command shutdow,with args beginning with

目录 一、遇到问题 二、出现问题的原因 三、解决办法 一、遇到问题 要解决连接redis闪退的问题&#xff0c;按照许多的方式去进行都没有成功&#xff0c;在尝试使用了以下的命名去尝试时候&#xff0c;发现了这个问题。 二、出现问题的原因 这是一个粗心大意导致的错误&am…

【unity小技巧】Unity2D TileMap+柏林噪声生成随机地图(附源码)

文章目录 前言柏林噪声素材导入Rule Tile配置生成随机地图问题扩展问题添加植被源码参考完结 前言 我的上一篇文章介绍了TileMap的使用&#xff0c;主要是为我这篇做一个铺垫&#xff0c;看过上一篇文章的人&#xff0c;应该已经很好的理解TileMap的使用了&#xff0c;这里我就…

C++,从“hello world“开始

一、"hello world" #inclue <iostream>using namespace std;int main() {cout << "hello world" << endl;return 0; } 1.1 #include&#xff1a;预处理标识 1.2 <iostream>&#xff1a;输入输出流类所在头文件 1.2.1 istream&a…

【学习日记】【FreeRTOS】延时列表的实现

前言 本文在前面文章的基础上实现了延时列表&#xff0c;取消了 TCB 中的延时参数。 本文是对野火 RTOS 教程的笔记&#xff0c;融入了笔者的理解&#xff0c;代码大部分来自野火。 一、如何更高效地查找延时到期的任务 1. 朴素方式 在本文之前&#xff0c;我们使用了一种朴…