Kotlin 协程基础十 —— 协作、互斥锁与共享变量

Kotlin 协程基础系列:

Kotlin 协程基础一 —— 总体知识概述

Kotlin 协程基础二 —— 结构化并发(一)

Kotlin 协程基础三 —— 结构化并发(二)

Kotlin 协程基础四 —— CoroutineScope 与 CoroutineContext

Kotlin 协程基础五 —— Channel

Kotlin 协程基础六 —— Flow

Kotlin 协程基础七 —— Flow 操作符(一)

Kotlin 协程基础八 —— Flow 操作符(二)

Kotlin 协程基础九 —— SharedFlow 与 StateFlow

Kotlin 协程基础十 —— 协作、互斥锁与共享变量

本节将介绍在协程间如果有先后执行、互相等待的需求时,应该怎样去处理这种等待和协作的工作。更会与 Java 的线程的协作工作对比,从而说明,在线程中通常不太简单的协作操作,在协程中很容易实现。

1、协程间的协作与等待

从运行角度来看,协程天生就是并行的,不论是对同等级的协程、父子协程还是毫无关系的协程。假如我们需要让协程互相等待,希望在协程的执行过程中可以停住,等待别的协程执行完毕,可以使用 Job 的 join() 或 Deferred 的 await()。

线程对于这种互相等待的需求可以通过 Thread 的 join(),还有 Future 和 CompletableFuture 以及 CountDownLatch。

CountDownLatch 适用于一个线程等待多个线程:

fun main() = runBlocking<Unit> {// countdown 译为倒计时,latch 是门闩、插销,组合起来就是用于倒计时的插销val countDownLatch = CountDownLatch(2)thread {// await() 会在 CountDownLatch 内的 count 减到 0 时结束等待countDownLatch.await()println("Count in CountDownLatch is 0 now,I'm free!")}thread {sleep(1000)// countDown() 会调用原子操作让 CountDownLatch 内的 count 减 1countDownLatch.countDown()println("Invoke countDown,count: ${countDownLatch.count}")}thread {sleep(2000)countDownLatch.countDown()println("Invoke countDown,count: ${countDownLatch.count}")}
}

运行结果:

Invoke countDown,count: 1
Count in CountDownLatch is 0 now,I'm free!
Invoke countDown,count: 0

修改 CountDownLatch 构造方法的 count 参数就可以修改要等待的线程数量,对于这种一个等待多个的业务需求,在协程中也可以用 join() 来做:

fun main() = runBlocking<Unit> {// 两个前置任务val preJob1 = launch {delay(1000)}val preJob2 = launch {delay(2000)}// 此协程需要等待两个协程执行之后再运行自己的内容launch {preJob1.join()preJob2.join()// 等待完前置任务,再做自己的事...}
}

实际上线程里也可以这么做,只不过因为线程本身的结构化管理比较麻烦,所以在正式的项目里很少真正的这么写。但因为协程可以结构化取消,因此它的 join() 比线程的 join() 更实用,在正式项目里的应用也较多。

其实,用 Channel 也能实现类似 CountDownLatch 那种,不指定具体等待哪些协程,只等待固定的次数的效果:

private fun channelSample() = runBlocking<Unit> {// 指定 Channel 的容量为 2val channel = Channel<Unit>(2)// 由于要等待两次发送数据才能继续执行后续代码,因此要 repeat(2) 接收launch {repeat(2) {channel.receive()}}launch {delay(1000)channel.send(Unit)}launch {delay(2000)channel.send(Unit)}
}

通过两个简单的例子可以发现,线程中有些复杂、比较底层、不太容易使用的协作和等待 API,在协程中的对应/等价 API 难度要大大降低。

2、select():先到先得

select() 会在内部开启多线竞赛,谁最快就用谁。

onJoin() 是仅限于在 select 代码块中才能调用的函数,它是一个监听注册,会监听 Job 的结束,不论 Job 是正常结束还是被取消,在其结束时都会回调执行 onJoin() 大括号的内容,并且大括号的返回值会作为 select() 的返回值:

fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val job1 = scope.launch {delay(1000)println("job1 done")}val job2 = scope.launch {delay(2000)println("job2 done")}val job = scope.launch {val result = select {// select 只执行最先结束的 onJoin 回调job1.onJoin {1}job2.onJoin {2}}println("result: $result")}joinAll(job, job1, job2)
}

运行结果:

job1 done
result: 1
job2 done

结果能看出,select() 只执行了最先结束的 job1 的 onJoin,没有执行 job2 的。

与 Job 的 onJoin() 功能类似的还有:Deferred 的 onAwait()、Channel 的 onSend()、onReceive() 以及 onReceiveCatching()。此外还有一个特殊的函数 onTimeout(),如果 select() 内所有的监听回调都没有在 onTimeout() 设置的超时时间内完成,那么就由 onTimeout() 作为 select() 的返回值:

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val job1 = scope.launch {delay(1000)println("job1 done")}val deferred = scope.async {delay(2000)println("deferred done")}val channel = Channel<String>()val job = scope.launch {val result = select {// select 只执行最先结束的 onJoin 回调job1.onJoin {1}deferred.onAwait {2}channel.onSend("haha") {}/*channel.onReceive {}channel.onReceiveCatching {}*/onTimeout(500) {"Timeout!"}}println("result: $result")}joinAll(job, job1)
}

运行结果:

result: Timeout!
job1 done

3、互斥锁和共享变量

在遇到一个不太好理解的知识点时,我们还是先说线程,再引入到协程中。

线程中有一个术语叫竞态条件,或者说竞争条件,英文是 race condition。这个词的含义比较广,在 Java 和 Kotlin 这种高级编程语言中,它指的是多个线程访问共享资源时,由于缺乏并发控制,导致资源的访问顺序不受控,进而导致出现错误的结果的条件。

在 Kotlin 中,仍然可以使用我们在 Java 中熟知的 synchronized 和 Lock 这两种锁机制来保证共享资源的线程安全,也提供了新的选项,下面我们来说一说。

3.1 @Synchronized

Kotlin 中没有 synchronized 关键字,代替它的是 @Synchronized 注解。对于方法而言,使用 @Synchronized 注解的作用与 Java 中使用 synchronized 关键字修饰方法的作用是一样的。被 @Synchronized 注解标记的方法不能同时被多个线程(注意,不是协程)执行。

而 Java 中 synchronized 代码块在 Kotlin 中被 synchronized 函数代替了:

fun main() = runBlocking<Unit> {var number = 0val lock = Any()val thread1 = thread {repeat(100_000) {synchronized(lock) {number++}}}val thread2 = thread {repeat(100_000) {synchronized(lock) {number--}}}thread1.join()thread2.join()println("result: $number") // 输出 0
}

同样的代码结构也可以用在协程中:

fun main() = runBlocking<Unit> {var number = 0val lock = Any()val scope = CoroutineScope(EmptyCoroutineContext)val job1 = scope.launch {repeat(100_000) {synchronized(lock) {number++}}}val job2 = scope.launch {repeat(100_000) {synchronized(lock) {number--}}}job1.join()job2.join()println("result: $number")
}

synchronized() 仍然掐住的是线程,确切的说是掐住了执行 synchronized() 所在的协程的线程。虽然这样做有点浪费,因为不止掐住了协程,连运行该协程代码的线程都被掐住了,但确实实现了共享资源的线程安全,而且 synchronized() 本来也是针对线程的,只不过从协程的角度看,如果可以只掐住协程,不影响运行该协程代码的线程就更好了。

这个区别就好像 delay() 与 sleep() 一样。协程的 delay() 只会挂起当前的协程,但是不会影响其所在的线程;而 sleep() 是让整个线程休眠。因此在协程中,为了不影响整个线程,我们通常都是使用 delay() 仅作用于当前协程,而不会使用 sleep() 为了让协程挂起而影响到整个线程的运行。下一节要讲的 Mutex 就可以解决这个问题。

Lock 的用法也大致相同,这里不多赘述。

3.2 Mutex

Mutex 是计算机领域的专属词汇,全称是 mutual exclusion,即互斥。Kotlin 提供的 Mutex 是基于协程的、挂起式的,不同于前面两个是基于线程的、阻塞式的。Mutex 是协程自己的实现,它不卡线程,性能更好,使用也很方便:

fun main() = runBlocking<Unit> {var number = 0val mutex = Mutex()val scope = CoroutineScope(EmptyCoroutineContext)val job1 = scope.launch {repeat(100_000) {try {mutex.lock()number++} finally {mutex.unlock()}}}val job2 = scope.launch {repeat(100_000) {mutex.withLock {number--}}}job1.join()job2.join()println("result: $number")
}

job1 内使用的是常规用法,在操作共享变量前用 lock() 加锁,在 finally 代码块中解锁。job2 内使用的是简便写法,withLock() 将代码块内的代码放入 try 中执行,在 finally 中用 unlock() 解锁:

@OptIn(ExperimentalContracts::class)
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {contract {callsInPlace(action, InvocationKind.EXACTLY_ONCE)}lock(owner)return try {action()} finally {unlock(owner)}
}

Mutex 的优势是性能,但由于它是基于协程的,因此只能在协程中使用。所以,如果只在协程中使用共享资源,那么就用 Mutex,如果需要在线程中使用,就要用上一节说的 synchronized 与 Lock。

3.3 Semaphore

Java 还有一个 Semaphore,信号量,一个可以被多个线程持有的锁。你可以在它的构造方法中指定它最多可以被几个线程持有,如果有多余指定数量的线程去获取 Semaphore 就会陷入等待。获取锁用 acquire(),释放锁用 release()。

由于共享变量是只要有两个线程同时访问就会导致出错了,因此允许多个线程持有的 Semaphore 并不能用于解决竞态条件的问题,它是用来做性能控制的。你可以用它来实现类似线程池的功能,只不过你实现出来的是自己定制的对象池:同一时间最多只有多少个对象同时在做事,满了之后如果再来新对象就得等着,直到有新的坑让出来,这些新对象才能开始做事。

Kotlin 提供了一个 Semaphore 的协程版本,就叫 Semaphore,定位与 Java 的 Semaphore 相同,只不过是协程版本。

3.4 其他 API

在传统的线程系统里,还有一组典型的 API:wait()、notify()、notifyAll()。它们三个属于更底层的 API,在线程系统里,它既能实现互斥锁,也能实现线程之间相互等待的功能。但事实上,这些年已经基本没人再用这组函数了。因为 synchronized 关键字与 Lock 的推出,已经基本上完全替代了它们,而且它们用起来也很麻烦,所以现在没人用。正因如此,协程没有推出与它们类似的 API。

AtomicInteger 与 CopyOnWriteArrayList 等等也可以在协程中使用。虽然它们是针对线程的,但是卡住线程的同时一定把协程也卡住了。所以在协程里也可以无风险地使用。

此外,volatile 与 transient 也可以在协程中使用,只不过不再是关键字,而是注解。

4、ThreadLocal

ThreadLocal 是线程的局部变量,即该变量在每个线程都是独立的,从不同的线程中访问该变量,这些线程对变量的值的读写都是相互独立的,对每个线程都有独立的副本。

ThreadLocal 是用来干嘛的?它的定位就像它的名字一样,就是针对线程的局部变量。Java 变量按照作用域由小到大可以划分为局部变量(方法内)、成员变量(类内)、静态变量(全局),ThreadLocal 是一种介于局部变量和静态变量之间的一种变量,范围比方法大,比静态全局小,只在当前线程范围内有效。

ThreadLocal 是对 Java 线程一个很关键的能力补充。前面提过,协程相对线程的一大优势就是线程不具备结构化管理的能力,而协程结构化管理的能力相当强大。线程不具备结构化管理的能力,但我们开发时是有结构化管理的需求的,这时就要用 ThreadLocal。有了 ThreadLocal 之后,在同一个线程里执行的多个方法之间就可以共享变量了,且该共享变量只针对当前线程有效,跨线程时还是独立的。因此 ThreadLocal 通常会作为静态变量存在。

ThreadLocal 在协程中的等价物是什么?有什么东西是跨方法的、针对协程的局部变量吗?CoroutineContext 就是协程里的 ThreadLocal。

本来,由于协程是具备结构化管理能力的,你完全不需要在协程内使用 ThreadLocal。但是开发过程中,免不了与 Java 代码进行协作,如果想在协程代码里访问老代码里的 ThreadLocal 对象,是不能像如下这样直接使用的:

val kotlinLocalString = ThreadLocal<String>()
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val job = scope.launch {kotlinLocalString.set("Test")delay(1000)println(kotlinLocalString.get())}job.join()
}

kotlinLocalString 的 get() 拿到的值一定是 set() 设置的值吗?不一定!因为虽然协程没变,但是执行协程代码的线程有可能改变了,delay() 的时候线程被让出,可能会去执行其他协程的代码。等 delay() 结束继续执行下面代码的时候,有可能就不是在刚才的线程中执行了。因为协程只能保证在执行挂起函数之后依然运行在刚才的 ContinuationInterceptor 所管理的某一个线程池上,不能保证同一个线程。

因此 ThreadLocal 不能在协程中直接使用,因为它的效果在协程中变得不可靠了。怎么办?用 asContextElement() 把 ThreadLocal 转换成 CoroutineContext:

val kotlinLocalString = ThreadLocal<String>()
fun main() = runBlocking<Unit> {val scope = CoroutineScope(EmptyCoroutineContext)val job = scope.launch {val stringContext = kotlinLocalString.asContextElement("Test")withContext(stringContext) {delay(1000)println(kotlinLocalString.get())}}job.join()
}

asContextElement() 是 ThreadLocal 的扩展函数,它会把参数里的值封装到返回值的 ThreadLocalElement 中。再将结果填到 withContext() 的参数中,包住获取 ThreadLocal 值的代码,这时候里面的 ThreadLocal 就是对协程兼容的了。不管里面怎么切协程,只要没出协程,它的值都会被保持住。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/68268.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker启动达梦 rman恢复

目录标题 1. 主库备份2. Docker启动备库3. 备库修改属组4. 开始恢复5. 连接数据库配置归档 & Open6. 检查数据 关于达梦数据库&#xff08;DMDBMS&#xff09;的主库备份、Docker启动备库、恢复备份以及配置归档和打开数据库的详细步骤。 1. 主库备份 # 使用达梦数据库备…

WPS excel使用宏编辑器合并 Sheet工作表

使用excel自带的工具合并Sheet表&#xff0c;我们会发现需要开通WPS会员才能使用合并功能&#xff1b; 那么WPS excel如何使用宏编辑器进行合并 Sheet表呢&#xff1f; 1、首先我们要看excel后缀是 .xlsx 还是 .xls &#xff1b;如果是.xlsx 那么 我们需要修改为 .xls 注…

【Python项目】个人密码本文档系统

【Python项目】个人密码本文档系统 技术简介&#xff1a;采用Python技术、Django、MYSQL数据库等实现。 系统简介&#xff1a;系统主要的功能有&#xff08;1&#xff09;新建密码本&#xff1a;用户可以创建新的密码本来记录自己的账户与密码&#xff1b; &#xff08;2&#…

《OpenCV》——模版匹配

文章目录 OpenCV——模版匹配简介模版匹配使用场景OpenCV 中模板匹配的函数参数 OpenCV——模版匹配实例导入所需库读取图片并处理图片对模版图片进行处理进行模版匹配显示模版匹配的结果注意事项 OpenCV——模版匹配简介 OpenCV 是一个非常强大的计算机视觉库&#xff0c;其中…

doc、pdf转markdown

国外的一个网站可以&#xff1a; Convert A File Word, PDF, JPG Online 这个网站免费的&#xff0c;算是非常厚道了&#xff0c;但是大文件上传多了之后会扛不住 国内的一个网站也不错&#xff1a; TextIn-AI智能文档处理-图像处理技术-大模型加速器-在线免费体验 https://…

金融项目实战 06|Python实现接口自动化——日志、实名认证和开户接口

目录 一、日志封装及应用&#xff08;理解&#xff09; 二、认证开户接口脚本编写 1、代码编写 1️⃣api目录 2️⃣script目录 2、BeautifulSoup库 1️⃣简介及例子 2️⃣提取html数据工具封装 3、认证开户参数化 一、日志封装及应用&#xff08;理解&#xff09; &…

浅谈云计算15 | 存储可靠性技术(RAID)

存储可靠性技术 一、存储可靠性需求1.1 数据完整性1.2 数据可用性1.3 故障容错性 二、传统RAID技术剖析2.1 RAID 02.2 RAID 12.3 RAID 52.4 RAID 62.5 RAID 10 三、RAID 2.0技术3.1 RAID 2.0技术原理3.1.1 两层虚拟化管理模式3.1.2 数据分布与重构 3.2 RAID 2.0技术优势3.2.1 自…

Spring官网构建Springboot工程

注意&#xff1a;基于Idea的 Spring Initializr 快速构建 SpringBoot 工程时需要联网。 1.进入SpringBoot官网 Spring | Home 点击QUICKSTART 点击start.spring.io进入spring initializr 2.选择依赖 3.生成工程 下载好后解压用IDEAD导入即可。

计算机的错误计算(二百一十二)

摘要 利用两个大模型计算 实验表明&#xff0c;两个大模型均进行了中肯的分析。另外&#xff0c;其中一个大模型给出了 Python代码&#xff0c;运行后&#xff0c;结果中有7位错误数字&#xff1b;而一个大模型进行加减运算时出错。 例1. 计算 下面是与一个大模型的对话…

Vue+Echarts+百度地图 实现 路径规划

实现功能: 通过选择 相关调拨&#xff0c;系统自动规划 路径&#xff0c;并且以地图的形式呈现最佳路径 技术难点: 1. vue 结合使用 echarts 2.echarts 在 vue嵌入百度地图&#xff0c;并且做出路径 曲线 最终结果:

【算法】图解两个链表相交的一系列问题

问&#xff1a; 给定两个可能有环也可能无环的单链表&#xff0c;头节点head1和head2。请实现一个函数&#xff0c;如果两个链表相交&#xff0c;请返回相交的第一个节点&#xff1b;如果不相交&#xff0c;返回null。如果两个链表长度之和为N&#xff0c;时间复杂度请达到O(N…

Go-Zero整合Goose实现MySQL数据库版本管理

推荐阅读 【系列好文】go-zero从入门到精通&#xff08;看了就会&#xff09; 教程地址&#xff1a;https://blog.csdn.net/u011019141/article/details/139619172 Go-Zero整合Goose实现MySQL数据库版本管理的教程 在开发中&#xff0c;数据库迁移和版本管理是必不可少的工作。…

JAVA:Spring Boot 集成 JWT 实现身份验证的技术指南

1、简述 在现代Web开发中&#xff0c;安全性尤为重要。为了确保用户的身份&#xff0c;JSON Web Token&#xff08;JWT&#xff09;作为一种轻量级且无状态的身份验证方案&#xff0c;广泛应用于微服务和分布式系统中。本篇博客将讲解如何在Spring Boot 中集成JWT实现身份验证…

说一说mongodb组合索引的匹配规则

一、背景 有一张1000多万条记录的大表&#xff0c;需要做归档至历史表&#xff0c;出现了大量慢查询。 查询条件是 "classroomId": {$in: ["xxx", "xxx", ..... "xxx","xxx", "xxx" ] }耗时近5秒&#xff0c;且…

更新java

下载 Java 下载 |神谕 (oracle.com)

CSS3的aria-hidden学习

前言 aria-hidden 属性可用于隐藏非交互内容&#xff0c;使其在无障碍 API 中不可见。即当aria-hidden"true" 添加到一个元素会将该元素及其所有子元素从无障碍树中移除&#xff0c;这可以通过隐藏来改善辅助技术用户的体验&#xff1a; 纯装饰性内容&#xff0c;如…

【Java设计模式-5】装饰模式:给咖啡加点“佐料”

今天咱们要探索一下Java世界里的装饰模式&#xff08;Decorator Pattern&#xff09;。为了让这个过程更加生动易懂&#xff0c;咱们就以大家都熟悉的咖啡饮品来举例吧&#xff0c;想象一下&#xff0c;你就是那个咖啡大师&#xff0c;要给顾客调制出各种独特口味的咖啡哦&…

C++(5)

1.运算符重载 头文件 #ifndef MYSTRING_H #define MYSTRING_H#include <iostream> #include <cstring>using namespace std;class myString { private:char *str;//C风格字符串int size0; public:std::string s_str;//转换构造函数myString(const std::string &a…

K8S--配置存活、就绪和启动探针

目录 1 本人基础环境2 目的3 存活、就绪和启动探针介绍3.1 存活探针3.2 就绪探针3.3 启动探针 4 探针使用场景4.1 存活探针4.2 就绪探针4.3 启动探针 5 配置存活、就绪和启动探针5.1 定义存活探针5.2 定义一个存活态 HTTP 请求接口5.3 定义 TCP 的就绪探针、存活探测5.4 定义 g…

【HTML+CSS+JS+VUE】web前端教程-36-JavaScript简介

JavaScript介绍 JavaScript是一种轻量级的脚本语言&#xff0c;所谓脚本语言&#xff0c;指的是它不具备开发操作系统的能力&#xff0c;而是用来编写控制其他大型应用程序的“脚本” JavaScript是一种嵌入式语言&#xff0c;它本身提供的核心语法不算很多 为什么学习JavaScri…