Flow 是一种异步数据流的表示,可以用于处理异步数据流的操作。
AbstractFlow(@ExperimentalCoroutinesApi)
协程库中的一个抽象类,用于创建具有状态的 Flow 实现。它跟踪了用于上下文保留的所有属性,并在任何属性被违反时抛出 IllegalStateException。
fun main() = runBlocking {val text = StringBuffer()val values = listOf(1, 2, 3, 4, 5)val countingListFlow = CountingListFlow(values)countingListFlow.collect{ value ->// 处理收集到的每个值println("Collected value: $value")text.append("-- $value")}println("---- text${text.toString()} ")println(countingListFlow.toDiagnosticString())delay(6000)
}class CountingListFlow(private val values: List<Int>) : AbstractFlow<Int>() {private val collectedCounter = AtomicInteger(0)override suspend fun collectSafely(collector: FlowCollector<Int>) {collectedCounter.incrementAndGet() // 增加收集计数println("collectSafely${collectedCounter.get()} ")values.forEach { // 发射所有的值collector.emit(it)}}fun toDiagnosticString(): String = "Flow with values $values was collected ${collectedCounter.get()} times"
}collectSafely1
Collected value: 1
Collected value: 2
Collected value: 3
Collected value: 4
Collected value: 5
---- text-- 1-- 2-- 3-- 4-- 5
Flow with values [1, 2, 3, 4, 5] was collected 1 times
asFlow
kotlinx.coroutines.flow 包中的一个扩展函数,用于将不同的数据源转换为 Flow。比如:
- 函数:对应普通函数和挂起函数都是一样的操作
fun main() {val flow1: Flow<String> = singleValueFunction.asFlow()runBlocking {flow1.collect { value ->println(value) // 输出: Hello, World!}}val flow2: Flow<String> = ::delayedValue.asFlow()runBlocking {flow2.collect { value ->println(value) // 输出: Delayed Hello!}} }val singleValueFunction: () -> String = { "Hello, World!" }suspend fun delayedValue(): String {delay(1000) // 模拟异步操作return "Delayed Hello!" }Hello, World! Delayed Hello!
- Iterable、Iterator、Sequence 或 Array
val list = listOf(1, 2, 3, 4, 5) val flowFromList: Flow<Int> = list.asFlow()val iterator = listOf(1, 2, 3, 4, 5).iterator() val flowFromIterator: Flow<Int> = iterator.asFlow()val sequence = sequenceOf(1, 2, 3, 4, 5) val flowFromSequence: Flow<Int> = sequence.asFlow()val array = arrayOf(1, 2, 3, 4, 5) val flowFromArray: Flow<Int> = array.asFlow()
- Range
val intRange = 1..5 val flowFromIntRange: Flow<Int> = intRange.asFlow()val longRange = 1L..5L val flowFromLongRange: Flow<Long> = longRange.asFlow()
asSharedFlow
将可变的共享流(MutableSharedFlow)转换为只读的共享流(SharedFlow)。在某些情况下将可变的流限制为只读的形式,以便在代码中传递并确保只读取流的内容。
fun main() = runBlocking {// 创建一个可变的共享流val mutableSharedFlow = MutableSharedFlow<Int>()// 将可变的共享流转换为只读的共享流val sharedFlow: SharedFlow<Int> = mutableSharedFlow.asSharedFlow()// 启动一个协程,向可变的共享流发送数据launch {repeat(5) {delay(1000)mutableSharedFlow.emit(it)}}// 启动另一个协程,收集只读的共享流的数据launch {sharedFlow.collect {println("Received: $it")}}delay(6000) // 等待足够的时间以触发所有数据的发送
}Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
asStateFlow
将可变的状态流(MutableStateFlow)转换为只读的状态流(StateFlow)
class SampleViewModel {private val _mutableStateFlow = MutableStateFlow("Initial State")// 将可变状态流转换为只读状态流val stateFlow: StateFlow<String> = _mutableStateFlow.asStateFlow()fun updateState(newState: String) {// 可以通过 MutableStateFlow 写入新的状态_mutableStateFlow.value = newState}
}fun main() = runBlocking {val viewModel = SampleViewModel()// 通过只读状态流收集数据launch {viewModel.stateFlow.collect { state ->println("Current State: $state")}}delay(1000)// 模拟更新状态viewModel.updateState("New State")// 等待状态更新被收集delay(1000)
}Current State: Initial State
Current State: New State
buffer
在 Flow 的操作过程中引入一个缓冲区,以提高并发性能。
fun main() = runBlocking {flowOf("A", "B", "C").onEach { println("1$it") }.collect { println("2$it") }//协程Q//Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--flowOf("A", "B", "C").onEach { println("1$it") }.buffer() // <--------------- buffer between onEach and collect.collect { println("2$it") }//它将使用两个协程来执行代码。调用此代码的协程Q将执行collect,buffer之前的代码将在一个单独的新协程P中与Q同时执行://P : -->-- [1A] -- [1B] -- [1C] ---------->-- // flowOf(...).onEach { ... }//// |// | channel // buffer()// V////Q : -->---------- [2A] -- [2B] -- [2C] -->-- // collect
}1A
2A
1B
2B
1C
2C1A
1B
1C
2A
2B
2C
callbackFlow
将基于回调的异步 API 转换为协程的 Flow,使得异步操作可以以类似同步的方式被处理。这跟我前篇提到的suspendCancellableCoroutine:(单次回调) 类似却有些不一样他是处理(多次回调的)
// 假设有一个基于回调的 API 接口
interface CallbackBasedApi {fun register(callback: Callback)fun unregister(callback: Callback)
}// 回调接口
interface Callback {fun onNextValue(value: Int)fun onApiError(cause: Throwable)fun onCompleted()
}// 使用 callbackFlow 将回调 API 转换为 Flow
fun flowFrom(api: CallbackBasedApi): Flow<Int> = callbackFlow {// 创建回调对象val callback = object : Callback {override fun onNextValue(value: Int) {// 发送值到 FlowtrySendBlocking(value).onFailure { throwable ->// Downstream 已取消或失败,可以在这里进行日志记录}}override fun onApiError(cause: Throwable) {// 发生 API 错误时取消 Flowcancel(CancellationException("API Error", cause))}override fun onCompleted() {// 当回调完成时关闭 Flowchannel.close()}}// 注册回调api.register(callback)// 等待 Flow 完成或被取消awaitClose { api.unregister(callback) }
}var callBack:Callback? = null// 在协程中使用 Flow
fun main() {val api = object : CallbackBasedApi {override fun register(callback: Callback) {callBack = callbackprintln("注册回调")}override fun unregister(callback: Callback) {callBack = nullprintln("取消注册回调")}}// 启动协程收集 Flow 中的数据// 这里只是简单的打印每个值,实际使用时可能需要更新 UI 或进行其他处理val job = GlobalScope.launch {flowFrom(api).collect { value ->println("Received value: $value")}}GlobalScope.launch {delay(1000)//模拟一些操作触发回调callBack?.onNextValue(1)callBack?.onNextValue(2)delay(300)callBack?.onNextValue(3)
// callBack?.onApiError(Throwable("111"))delay(300)callBack?.onNextValue(4)callBack?.onCompleted()}// 等待协程完成runBlocking {job.join()}}注册回调
Received value: 1
Received value: 2
Received value: 3
Received value: 4
取消注册回调
cancellable
操作符提供了一种快捷方式,相当于使用 .onEach { currentCoroutineContext().ensureActive() } 来检查协程的取消状态。具体而言,ensureActive: 是一个内置函数,用于检查当前协程是否已被取消,如果是,则抛出相应的取消异常。
fun createRandomNumberFlow(): Flow<Int> = flow {repeat(10) {delay(1000) // 模拟异步操作emit(Random.nextInt(1, 100))}
}fun main() {val job = runBlocking {val cancellableFlow = createRandomNumberFlow().cancellable()val job = launch {try {cancellableFlow.collect { value ->println("Received: $value")}} catch (e: CancellationException) {println("Flow was cancelled due to: ${e.cause}")}}delay(3000) // 运行3秒后取消 Flowjob.cancel("Timeout exceeded")job.join()}println("Main coroutine has completed")
}Received: 15
Received: 13
Flow was cancelled due to: null
Main coroutine has completed
catch
一个用于捕获流(Flow)中异常的操作符。它可以用于捕获在流的产生、变换或收集过程中发生的异常,并执行特定的操作来处理这些异常
fun main() {runBlocking {// 创建一个流,模拟可能抛出异常的操作val flowWithException = flow {emit(1)emit(2)throw IOException("Simulated IOException")emit(3) // 这个不会被执行}// 使用 catch 操作符来捕获异常并处理flowWithException.catch { e ->if (e is IOException) {// 在这里处理 IOExceptionprintln("Caught IOException: $e")} else {// 重新抛出其他异常throw e}}.onEach { value ->// 这里的代码不受异常的影响println("Received value: $value")}.catch { e ->// 这个 catch 操作符用于处理第一个 catch 未处理的异常println("Unhandled exception: $e")}.collect {// 这里的代码不受异常的影响println("Collected value: $it")}}
}Received value: 1
Collected value: 1
Received value: 2
Collected value: 2
Caught IOException: java.io.IOException: Simulated IOException
channelFlow
一个构建 Flow 的函数,它允许您以异步和并发的方式生成 Flow 的元素
fun main() {runBlocking {val other = flowOf( 11, 23, 45, 76)// 使用 channelFlow 创建冷流val flow: Flow<Int> = channelFlow {// ProducerScope 提供了 SendChannel 用于发送元素for (i in 1..3) {send(i)}other.collect {send(it)}}// 启动协程收集流中的元素val job = launch {flow.collect { value -> println("Received: $value") }}// 等待协程完成job.join()}
}Received: 1
Received: 2
Received: 3
Received: 11
Received: 23
Received: 45
Received: 76
collect
用于收集 Flow 中所有元素的终端操作符。它的作用是触发 Flow 的收集操作,但它本身不处理任何收集到的元素。collect()是collect {}的简写
fun main() {runBlocking {// 创建一个简单的 Flowval simpleFlow: Flow<Int> = flow {for (i in 1..5) {emit(i)// 模拟异步操作delay(1000)}}// 使用 onEach 处理每个元素simpleFlow.onEach { value -> println("Received: $value") }.catch { e -> println("Exception: $e") }.collect() // 触发收集操作}
}Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
collectIndexed
使用提供的动作收集给定的流,该动作接受元素的索引(从零开始)和元素本身,比collect {}多一个索引
fun main() {runBlocking {// 创建一个简单的 Flowval simpleFlow: Flow<Int> = flow {for (i in 11..16) {emit(i)// 模拟异步操作delay(1000)}}// 使用 onEach 处理每个元素simpleFlow.onEach { value -> println("Received: $value") }.catch { e -> println("Exception: $e") }.collectIndexed { index, value ->println("Received: index--$index value--$value")}}
}Received: 11
Received: index--0 value--11
Received: 12
Received: index--1 value--12
Received: 13
Received: index--2 value--13
Received: 14
Received: index--3 value--14
Received: 15
Received: index--4 value--15
Received: 16
Received: index--5 value--16
collectLatest
提供一个挂起的操作来处理这个流的每个值。与 collect 不同的是,当原始流发出新值时,会取消上一个值的操作块
val flow = flow {emit(1)delay(50)emit(2)}flow.onEach { value -> println("Emitting $value") }.flowOn(Dispatchers.Default) // 设置流(flow)运行在后台线程.collectLatest { value ->println("Collecting $value")delay(100) // 模拟耗时操作println("$value collected")}Emitting 1
Collecting 1
Emitting 2
Collecting 2
2 collected
combine
用于将多个 Flow 的最新值进行合并,然后通过指定的转换函数生成新的 Flow。这允许您在处理异步数据流时,根据不同的数据源生成新的数据流。
fun main() {runBlocking {val flow = flowOf(1, 2).onEach { delay(1000) }val flow2 = flowOf("a", "b", "c").onEach { delay(1500) }// 1000触发(flow:1 flow2:null)// 1500触发(flow:1 flow2:a)// 2000触发(flow:2 flow2:a)// 3000触发(flow:2 flow2:b)// 4500触发(flow:2 flow2:c)combine(flow, flow2) { i, s -> "$i$s" }.collect {println(it) // Will print "1a 2a 2b 2c"}
// 同效果代码
// flow.combine(flow2) { i, s -> "$i$s" }.collect {
// println(it) // Will print "1a 2a 2b 2c"
// }}
}
combineTransform
带Transform的组合函数其实生命哪个示例就是combineTransform(flow, flow2) { a, b -> emit(transform(a, b)) }的简写
fun requestFlow(): Flow<String> = flow {emit("Requesting data")delay(1000)emit("Data received")
}fun searchEngineFlow(): Flow<String> = flow {emit("Searching for results")delay(1500)emit("Results found")
}fun download(request: String, searchEngine: String): String {return "$request - $searchEngine"
}fun main() = runBlocking {val flow1 = requestFlow()val flow2 = searchEngineFlow()flow1.combineTransform(flow2) { request, searchEngine ->emit("Downloading in progress")val result = download(request, searchEngine)emit(result)}.collect {println(it)}
}Requesting data - Searching for results
Downloading in progress
Data received - Searching for results
Downloading in progress
Data received - Results found
conflate
通过使用 conflated channel 来合并流的发射,同时在一个单独的协程中运行收集器。其效果是,发射器不会因为慢速的收集器而被挂起,而收集器总是获取到最近发射的值。buffer 操作符的一种快捷方式,等同于使用 buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST)
consumeAsFlow
ReceiveChannel 转换为一个热流,并在第一次对此流进行收集时消耗该通道。结果的流只能被收集一次,如果尝试多次收集它,将抛出 IllegalStateException。
fun main() = runBlocking {// 创建一个通道val channel = Channel<Int>()// 向通道发送数据launch {for (i in 1..5) {channel.send(i)}channel.close()}// 使用 consumeAsFlow 将通道转换为流val flow = channel.consumeAsFlow()// 收集流的数据flow.collect {println(it)}
}1
2
3
4
5
count
计算 Flow 中元素的数量。有两个重载的函数:
- count():返回 Flow 中元素的总数量
- count(predicate: suspend (T) -> Boolean):返回 Flow 中满足给定条件的元素的数量
fun main() = runBlocking {val numbersFlow: Flow<Int> = listOf(1, 2, 3, 4, 5).asFlow()// 计算 Flow 中元素的总数量val totalCount = numbersFlow.count()println("Total Count: $totalCount")// 计算 Flow 中满足条件的元素数量val evenCount = numbersFlow.count { it % 2 == 0 }println("Even Count: $evenCount")
}Total Count: 5
Even Count: 2
debounce
过滤掉在给定时间间隔内后面紧跟着的新值,而只发射最新的值。这在处理用户输入或其他频繁变化的数据时非常有用,以避免不必要的处理和减轻系统负担。
fun main() = runBlocking {//通过固定时间间隔进行 debounceflow {emit(1)delay(90)emit(2)delay(90)emit(3)delay(1010)emit(4)delay(1010)emit(5)}.debounce(1000).collect { println(it) }//通过动态的时间间隔进行 debounceprintln("-----------------")flow {emit(1)delay(90)emit(2)delay(90)emit(3)delay(1010)emit(4)delay(1010)emit(5)}.debounce {if (it == 1) {0L} else {1000L}}.collect { println(it) }
}3
4
5
-----------------
1
3
4
5
distinctUntilChanged
过滤掉流中连续重复的元素,确保只有当新元素与前一个元素不相同时才会传递。当然你可以自定义过滤条件
fun main() = runBlocking {val numbers = arrayOf(1,2,2,3,4,5,5,5,6,7,8).asFlow()// 示例1: 基本版本numbers.distinctUntilChanged().onEach { println("Received: $it") }.collect()// 示例2: 自定义比较函数版本 仅当两个偶数才过滤val customComparator: (old: Int, new: Int) -> Boolean = { old, new -> (old % 2 == 0) && (new % 2 == 0) }numbers.distinctUntilChanged(areEquivalent = customComparator).onEach { println("Received (Custom): $it") }.collect()
}Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received (Custom): 1
Received (Custom): 2
Received (Custom): 3
Received (Custom): 4
Received (Custom): 5
Received (Custom): 5
Received (Custom): 5
Received (Custom): 6
Received (Custom): 7
Received (Custom): 8
distinctUntilChangedBy
过滤掉流中连续重复的元素与distinctUntilChanged()相比,它可以自定义条件字段。
fun main():Unit = runBlocking {val data = listOf(Person(1, "Alice"),Person(2, "Bob"),Person(1, "Charlie1"),Person(1, "Charlie2"),Person(1, "Charlie3"),Person(3, "David")).asFlow()data.distinctUntilChangedBy {it.id}.onEach {println("Person id:${it.id} str:${it.str}")}.collect()
}Person id:1 str:Alice
Person id:2 str:Bob
Person id:1 str:Charlie1
Person id:3 str:David
drop
忽略的元素
fun main():Unit = runBlocking {//drop函数忽略前面的3个元素flowOf(0,1,2,3,4,5,6,7,8,9).drop(3).collect{println("Received element:${it}")}
}Received element:3
Received element:4
Received element:5
Received element:6
Received element:7
Received element:8
Received element:9
dropWhile
检查元素,一旦找到第一个不满足条件的元素,它就会返回一个新的Flow,其中包含从该元素开始的所有后续元素。
fun main():Unit = runBlocking {flowOf(0,1,2,3,4,5,6,7,8,9).dropWhile {it == 0 || it == 1 || it == 2 || it == 3 || it == 5}.collect{println("Received element:${it}")}
}Received element:4
Received element:5
Received element:6
Received element:7
Received element:8
Received element:9
emitAll
传递元素。它有两个重载版本
- suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>)
从给定的通道中发射所有元素到流收集器(FlowCollector)中
相当于 channel.consumeEach { value -> emit(value) } - suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
从给定的流中收集所有的值并将它们发射到流收集器(FlowCollector)中
相当于 flow.collect { value -> emit(value) }
fun main():Unit {// 通过流发射数据val myFlow = flow {// 在这里生成流数据for (i in 1..5) {emit(i)delay(1000) // 模拟异步操作}}// 通过通道发射数据val myChannel = callbackFlow {// 在这里生成通道数据for (i in 6..11) {send(i)delay(1000) // 模拟异步操作}awaitClose { /* 可选的清理操作 */ }}// 使用 emitAll 将流中的数据传递给另一个流val combinedFlow = flow {emitAll(myFlow)emitAll(myChannel)}//使用协程来观察数据runBlocking {combinedFlow.collect { value ->// 处理从流中接收到的值println("Received: $value")}}
}Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9
Received: 10
Received: 11
emptyFlow
表示一个没有元素的数据流,这在一些场景中可能很有用,例如在初始化时创建一个空的数据流,或者在某些条件不满足时返回一个空的数据流。
filter
筛选出符合给定条件的元素,返回一个新的Flow
fun main():Unit {runBlocking {// 创建一个简单的Flow,包含1到10的数字val originalFlow = (1..10).asFlow()// 使用filter操作符,过滤出偶数val filteredFlow = originalFlow.filter { it % 2 == 0 }// 收集并打印过滤后的结果filteredFlow.collect { println(it) }}
}2
4
6
8
10
filterIsInstance
过滤 Flow 中的元素,仅保留指定类型的元素
fun main() = runBlocking {val mixedFlow: Flow<Any> = flowOf(1, "Hello", 3.14, "World", 42)val stringFlow: Flow<String> = mixedFlow.filterIsInstance()stringFlow.collect { value ->println(value)}
}Hello
World
filterNot
过滤掉满足给定条件的元素,返回一个新的 Flow,包含原始 Flow 中不符合条件的元素
fun main() = runBlocking {// 创建一个包含数字的 Flowval numbersFlow: Flow<Int> = (1..10).asFlow()// 使用 filterNot 过滤掉偶数val oddNumbersFlow: Flow<Int> = numbersFlow.filterNot { it % 2 == 0 }// 收集并打印结果oddNumbersFlow.collect { println(it) }
}1
3
5
7
9
filterNotNull
过滤掉 null 值,得到一个新的 Flow
class MyViewModel : ViewModel() {private val _nullableStrings = MutableStateFlow<String?>(null)// 提供一个只包含非空字符串的 Flowval nonNullStrings = _nullableStrings.asStateFlow().filterNotNull()fun updateString(newString: String?) {viewModelScope.launch {_nullableStrings.value = newString}}
}
first
返回流中的第一个元素或满足条件的第一个元素,并取消流的收集。如果流为空,则抛出 NoSuchElementException 异常
fun main() = runBlocking {runBlocking {// 使用first获取第一个元素val flow1 = flowOf(1, 2, 3, 4, 5)val firstElement1 = flow1.first()println("First element in flow1: $firstElement1")// 使用first获取匹配谓词的第一个元素val flow2 = flowOf(1, 2, 3, 4, 5)val firstElement2 = flow2.first { it > 2 }println("First element in flow2 greater than 2: $firstElement2")// 处理流为空的情况val emptyFlow = emptyFlow<Int>()try {val result = emptyFlow.first()println("Result from emptyFlow: $result")} catch (e: NoSuchElementException) {println("Error: ${e.message}")}}
}First element in flow1: 1
First element in flow2 greater than 2: 3
Error: Expected at least one element
firstOrNull
同上,区别为空 返回 null
flatMapConcat
将原始的 Flow 中的元素进行转换,转换的过程中产生的每个元素都是一个 Flow,并将这些新的 Flow 连接在一起
fun main() = runBlocking {// 模拟一个包含整数的 Flowval originalFlow: Flow<Int> = (1..3).asFlow()// 定义一个转换函数,将每个整数转换为一个 Flow 包含两倍和三倍suspend fun transform(value: Int): Flow<Int> = flow {emit(value * 2)emit(value * 3)}// 使用 flatMapConcat 进行转换val resultFlow: Flow<Int> = originalFlow.flatMapConcat { transform(it) }.flowOn(Dispatchers.Default) // 切换到后台线程以执行转换操作// 收集结果并打印resultFlow.collect { println(it) }
}2
3
4
6
6
9
flatMapLatest
处理Flow中的元素,并通过指定的 transform 函数将每个元素映射为一个新的Flow。这个新的Flow将会被订阅,而当原始Flow发射新的元素时,之前订阅的流将被取消。这个操作符通常在需要处理最新元素的情况下非常有用。
fun main() = runBlocking {val originalFlow = flow {emit("a")delay(100)emit("b")}val transformedFlow = originalFlow.flatMapLatest { value ->flow {emit(value)delay(200)//模拟复杂处理emit(value + "_last")}}transformedFlow.collect { println(it) }
}a
b
b_last
flatMapMerge
将每个元素转换为另一个 Flow,并将这些新的 Flow 合并和展平。这个操作符可以限制并发地收集流的数量,以提高性能
fun main() = runBlocking {// 模拟包含 URL 的流val urlFlow: Flow<String> = flow {emit("https://example.com/image1.jpg")emit("https://example.com/image2.jpg")emit("https://example.com/image3.jpg")}// 使用 flatMapMerge 并发下载图像urlFlow.flatMapMerge { url ->flow {// 模拟下载操作,这里使用 delay 模拟下载耗时delay(1000)emit("${System.currentTimeMillis()} Downloaded image from $url")}}.collect { result ->println(result)}
}1705657442283 Downloaded image from https://example.com/image1.jpg
1705657442290 Downloaded image from https://example.com/image2.jpg
1705657442290 Downloaded image from https://example.com/image3.jpg
flattenConcat
将一个包含其他 Flow 的 Flow 打平(flatten),使得所有内部 Flow 的元素按顺序依次发射,而不会交叉发射。这意味着它会按照顺序收集和发射内部 Flow,而不会在它们之间进行交错。
fun main() = runBlocking {val flowOfFlows: Flow<Flow<Int>> = flow {emit(flow {emit(1)delay(100)emit(2)})emit(flow {delay(50)emit(3)})emit(flow {delay(200)emit(4)})}val flattenedFlow: Flow<Int> = flowOfFlows.flattenConcat()flattenedFlow.collect { result ->println("${System.currentTimeMillis()} --- $result")}
}1705717218466 --- 1
1705717218604 --- 2
1705717218665 --- 3
1705717218867 --- 4
flattenMerge
将包含 Flow 的 Flow 扁平化为一个单一的 Flow,并可以限制并发收集的内部 Flow 的数量。这对于在处理并发数据流时非常有用。
fun main() = runBlocking {// 创建一个包含 Flow 的 Flowval outerFlow: Flow<Flow<Int>> = flow {emit(flowOf(1, 2, 3).onEach { delay(100) })emit(flowOf(4, 5, 6).onEach { delay(100) })emit(flowOf(7, 8, 9).onEach { delay(100) })}// 使用 flattenMerge 将其扁平化val flattenedFlow: Flow<Int> = outerFlow.flattenMerge()// 收集并打印结果flattenedFlow.collect { value ->println(" ${System.currentTimeMillis()} value $value")}
}1705719006998 value 11705719006999 value 41705719006999 value 71705719007099 value 21705719007099 value 51705719007099 value 81705719007210 value 31705719007210 value 61705719007210 value 9
flow
支持异步数据流的处理。它允许您以声明性的方式定义、转换和收集数据流
fun main():Unit = runBlocking {// 收集并打印结果fibonacci().take(6).collect { value ->println(" ${System.currentTimeMillis()} value $value")}//emit错误示范flow {emit(1) // OkwithContext(Dispatchers.IO) {emit(2) // emit应该严格在块的调度器中发生,以便保留流上下文。这会报错 IllegalStateException}}.collect{println(" ${System.currentTimeMillis()} value $it")}
}1705722094088 value 01705722094088 value 11705722094089 value 11705722094089 value 21705722094089 value 31705722094091 value 51705722094100 value 1
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:Flow was collected in [BlockingCoroutine{Active}@3dca89ed, BlockingEventLoop@6b5931f4],but emission happened in [DispatchedCoroutine{Active}@2aa604a1, Dispatchers.IO].Please refer to 'flow' documentation or use 'flowOn' insteadat kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:85)at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:106)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:83)at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:66)at com.yang.myapplication.MainActivityKt$main$1$2$1.invokeSuspend(MainActivity.kt:107)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)at kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42)at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)Process finished with exit code 1
flowOf
创建一个 Flow
val flow: Flow<Int> = flowOf(1, 2, 3)val flow: Flow<Int> = flowOf(1)
flowOn
改变Flow的执行上下文。这对于在 Android 开发中处理异步任务非常有用,尤其是在 UI 线程和后台线程之间切换的情况下。
fun main():Unit = runBlocking {fetchData().flowOn(Dispatchers.IO) // 切换到 IO 线程执行数据库查询.collect { data -> // 切回主线程更新 UIupdateUI(data)}
}private fun fetchData(): Flow<String> = flow {// 模拟后台线程的数据库查询val data = fetchDataFromDatabase()emit(data)
}private fun fetchDataFromDatabase(): String {// 模拟数据库查询return "Data from database"
}private fun updateUI(data: String) {// 在主线程更新 UIprintln(data)
}Data from database
fold
累积Flow的元素。可以参考reduce没有初始累加器的值
fun main():Unit = runBlocking {val flow = flowOf("一","二", "三", "四", "五")val result = flow.fold("累加器的初始值") { acc, value ->//当前的累加器值 新的流元素// 模拟一些异步操作delay(1000)println("Accumulating: $acc + $value")acc + value//返回一个新的累加器值}// 打印最终结果println("Final result: $result")
}Accumulating: 累加器的初始值 + 一
Accumulating: 累加器的初始值一 + 二
Accumulating: 累加器的初始值一二 + 三
Accumulating: 累加器的初始值一二三 + 四
Accumulating: 累加器的初始值一二三四 + 五
Final result: 累加器的初始值一二三四五
getAndUpdate
MutableStateFlow 扩展函数之一,用于原子性地更新其值并返回先前的值。
fun main():Unit = runBlocking {val numberState = MutableStateFlow(0)launch {numberState.collect { newValue ->// 在这里处理新的值println("New value: $newValue")}}// 使用 getAndUpdate 更新 MutableStateFlow 的值val previousValue = numberState.getAndUpdate { current ->// 在这里执行逻辑,返回新的值current + 1}// 打印先前的值println("Previous value: $previousValue")
}Previous value: 0
New value: 1
last
等待直到Flow中的最后一个可用元素,然后将其返回。如果Flow为空,它会抛出 NoSuchElementException 异常
fun main():Unit = runBlocking {val flow = flow {for (i in 1..5) {delay(1000) // 模拟异步操作emit(i)}}try {val lastElement = flow.last()println("Last element: $lastElement")} catch (e: NoSuchElementException) {println("Flow was empty.")}
}Last element: 5
lastOrNull
同上,不一样的是如果Flow为空它会返回null
launchIn
用于在给定的协程作用域中启动对Flow的收集。它是 scope.launch { flow.collect() } 的简写。通常,launchIn 与 onEach、onCompletion 和 catch 等操作符一起使用,用于处理Flow中的所有发射值,以及处理上游Flow或处理过程中可能发生的异常。
private val uiScope = CoroutineScope(Dispatchers.Default)fun main():Unit = runBlocking {val flow = getSampleFlow()flow.onEach { value -> updateUi(value) }.onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }.catch { cause -> handleException(cause) }.launchIn(uiScope)delay(5000)uiScope.cancel()delay(10000)}private fun getSampleFlow(): Flow<String> = flow {emit("A")delay(1000)emit("B")delay(1000)emit("C")throw RuntimeException("Sample exception")
}private fun updateUi(value: String) {println("Collected value: $value")
}private fun handleException(cause: Throwable) {println("Exception: $cause")
}Collected value: A
Collected value: B
Collected value: C
Collected value: Failed
Exception: java.lang.RuntimeException: Sample exception
map
对 Flow 中的每个元素进行转换
fun main():Unit = runBlocking {val originalFlow: Flow<Int> = flow {for (i in 1..5) {emit(i)delay(1000) // 模拟异步操作}}// 使用 map 进行转换val transformedFlow: Flow<String> = originalFlow.map { value ->"Transformed: $value"}transformedFlow.collect { transformedValue ->println(transformedValue)}
}
mapLatest
转换原始流中的元素。该函数的特点是在原始流发出新值时,会取消先前值的转换操作。
fun main():Unit = runBlocking {val originalFlow: Flow<String> = flow {emit("a")delay(100)emit("b")}val transformedFlow: Flow<String> = originalFlow.mapLatest { value ->println("Started computing $value")delay(200)"Computed $value"}transformedFlow.collect { value ->println(value)}
}
merge
将多个 Flow 合并成一个新的 Flow。合并的 Flow 不保留元素的顺序,而是并发地将元素收集到新的 Flow 中,没有同时收集 Flow 的数量限制
fun main():Unit = runBlocking {val flow1 = flowOf(1, 2, 3).onEach { delay(150) }val flow2 = flowOf(4, 5, 6).onEach { delay(100) }// 使用 Iterable<Flow<T>>.merge()val mergedFlow1 = listOf(flow1, flow2).merge()mergedFlow1.collect { println(it) }println("---->----->---->")// 使用 vararg mergeval mergedFlow2 = merge(flow1, flow2)mergedFlow2.collect { println(it) }
}
MutableSharedFlow
SharedFlow的可变版本,参考后面的SharedFlow
MutableStateFlow
继承自 StateFlow 和 MutableSharedFlow。MutableStateFlow 通过提供一个可变的 value 属性来允许修改其当前状态,参考后面的StateFlow
onCompletion
用于在Flow完成或被取消后执行指定的操作。
fun main():Unit = runBlocking {val myFlow: Flow<Int> = flow {for (i in 1..3) {// 发射元素emit(i)}}myFlow.onCompletion { cause ->// cause 参数是一个 Throwable,表示取消或失败的原因if (cause == null) {println("Flow completed successfully")} else {println("Flow completed with an exception: $cause")}}.collect { value ->// 处理 Flow 发射的元素println("Received: $value")}
}
onEach
用于在流的每个元素发射到下游之前执行指定的操作。这个函数通常用于执行一些副作用,而不改变流的元素。
fun main():Unit = runBlocking {// 创建一个简单的流val simpleFlow: Flow<Int> = flow {for (i in 1..5) {emit(i)}}// 使用 onEach 打印每个元素,并在每个元素发射之前执行一些操作val modifiedFlow = simpleFlow.onEach { value ->// 在每个元素发射之前执行的操作println("Before emission: $value")}// 收集修改后的流modifiedFlow.collect { value ->// 打印每个元素println("Collected: $value")}
}Before emission: 1
Collected: 1
Before emission: 2
Collected: 2
Before emission: 3
Collected: 3
Before emission: 4
Collected: 4
Before emission: 5
Collected: 5
onEmpty
Flow 完成但未发射任何元素时执行指定的操作
fun main():Unit = runBlocking {//emptyFlow<Int>() 创建了一个不发射任何元素的空 Flow 执行onEmpty里方法emptyFlow<Int>().onEmpty {emit(1)emit(2)}.collect { println(it) }
}Before emission: 1
Collected: 1
Before emission: 2
Collected: 2
Before emission: 3
Collected: 3
Before emission: 4
Collected: 4
Before emission: 5
Collected: 5
onStart
在开始收集流之前执行一个操作。这个操作可以是挂起的,因此您可以在协程中执行异步的初始化工作。
fun main():Unit = runBlocking {flowOf("a", "b", "c").onStart {emit("Begin")println("Initialization in progress...")// 可以进行一些异步的初始化工作delay(1000)println("Initialization complete.")}.collect { println(it) }
}Begin
Initialization in progress...
Initialization complete.
a
b
c
onSubscription
在共享流(SharedFlow 后续有提到)开始被收集(订阅注册后)时调用指定的操作。
class SharedFlowExample {private val sharedFlow: MutableSharedFlow<String> = MutableSharedFlow()fun startFlow() {// 启动协程来收集共享流runBlocking {launch {sharedFlow.onSubscription {// 在订阅开始时执行的操作println("Subscription started")// 可以在此处发射额外的元素emit("Extra element 1")emit("Extra element 2")}.collect { value ->// 收集流中的元素println("Collected: $value")}}}}suspend fun emitValue(value: String) {// 发射新的值到共享流中sharedFlow.emit(value)}
}fun main():Unit = runBlocking {val example = SharedFlowExample()launch {// 启动共享流的订阅example.startFlow()}launch {delay(1000)// 发射新的值到共享流中example.emitValue("Value 1")example.emitValue("Value 2")}
}Subscription started
Collected: Extra element 1
Collected: Extra element 2
Collected: Value 1
Collected: Value 2
produceIn
在协程中创建生产者协程(produce coroutine),它会启动一个新的协程来收集给定的流,并返回一个 ReceiveChannel,通过该通道可以接收来自流的元素。
private val scope = CoroutineScope(Dispatchers.Default)fun main():Unit = runBlocking {// 创建一个简单的流val simpleFlow = flow {for (i in 1..5) {delay(1000) // 模拟异步操作emit(i)}}// 在生命周期内启动协程来收集流val channel = simpleFlow.produceIn(scope)// 启动一个协程来消费通道中的数据scope.launch {for (value in channel) {// 处理接收到的数据println("Received: $value")}}delay(6000)
}Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
receiveAsFlow
将一个 ReceiveChannel 转换为热流(hot flow)。热流表示每当该流被收集时,都会从通道中以(fan-out)的方式接收元素,即每个收集器(collector)都会收到一个元素。
private val channel = Channel<Int>()fun main():Unit = runBlocking {// 使用 receiveAsFlow 将通道转换为热流val flow = channel.receiveAsFlow()launch {// 在流上设置收集器,每当流被收集时,输出元素flow.collect { value ->println("Received: $value")}}for (i in 1..5) {delay(1000) // 模拟一些工作channel.send(i) // 将数据发送到通道}// 关闭通道(通知流收集器)channel.close()
}Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
reduce
从 Flow 的元素中累积一个值与fold相比没有初始累加器的值。
fun main() {runBlocking {val sum: Int = createFlow().reduce { accumulator, value ->accumulator + value //返回}println("Sum of elements: $sum")}
}fun createFlow(): Flow<Int> = flow {// Emit some integersemit(1)emit(2)emit(3)emit(4)
}Sum of elements: 10
retry
上游 Flow 发生特定异常时进行重试。可填重试次数,默认值为 Long.MAX_VALUE,即无限次重试,谨慎使用
fun main() {runBlocking {// 模拟一个可能会失败的流val flowWithError: Flow<Int> = flow {emit(1)emit(2)throw RuntimeException("Simulated error") // 第三次发射时引发异常emit(3) // 不会执行到这里}// 使用 retry 进行重试flowWithError.retry(3) { e ->// 重试条件:发生异常且异常类型为 RuntimeException(e is RuntimeException).also { if (it) delay(1000) }}.collect { value ->// 在重试之后,成功从流中收集到的值println("Collected: $value")}}
}Collected: 1
Collected: 2Collected: 1
Collected: 2Collected: 1
Collected: 2Collected: 1
Collected: 2Exception in thread "main" java.lang.RuntimeException: Simulated error
retryWhen
上游流中发生异常时进行重试,通过返回Boolean确定是否重试。
fun main() {runBlocking {val retryLimit = 3// 使用 retryWhen 进行重试,如果是 IOException 则一直重试,否则最多重试 retryLimit 次val resultFlow = flow {emit(1)emit(2)throw IOException("Simulated IO Exception")emit(3) // This won't be reached due to the exception}.retryWhen { cause, attempt ->if (cause is IOException && attempt < retryLimit) {println("Retrying... (Attempt $attempt)")delay(1000) // 延迟 1 秒后重试true // 返回 true 表示继续重试} else {println("Retry limit reached. Not retrying.")false // 返回 false 表示不再重试}}// 收集并打印结果resultFlow.collect { value ->println("Received value: $value")}}
}Received value: 1
Received value: 2
Retrying... (Attempt 0)
Received value: 1
Received value: 2
Retrying... (Attempt 1)
Received value: 1
Received value: 2
Retrying... (Attempt 2)
Received value: 1
Received value: 2
Retry limit reached. Not retrying.
Exception in thread "main" java.io.IOException: Simulated IO Exception
runningFold
在累积的过程中,每次都发射中间结果,包括初始值。参考flod:但是fold 最终只发射最终结果,而不是在每个中间步骤都发射结果
fun main() = runBlocking {flowOf(1, 2, 3).runningFold(emptyList<Int>()) { acc, value -> acc + value }.collect{println(it)}
}[]
[1]
[1, 2]
[1, 2, 3]
runningReduce
同上 参考Reduce 同样runningReduce每个中间步骤都发射结果
fun main() = runBlocking {val result = flowOf(1, 2, 3, 4).runningReduce { acc, value -> acc + value }.toList()println(result) // 输出
}[1, 3, 6, 10]
sample
在给定的采样周期内,仅发射原始流中最新的值。
private val dataFlow = MutableStateFlow(0)fun main() = runBlocking {launch {// 模拟从网络或数据库获取数据repeat(10) {delay(110)dataFlow.value = it}}dataFlow.sample(200).collect {// 在这里处理采样后的数据println(it)}
}0
2
4
5
7
9
scan
功能和runningFold一样
fun main() = runBlocking {val numbersFlow = flowOf(1, 2, 3, 4, 5)val sumFlow = numbersFlow.scan(0) { acc, value -> acc + value }// 当累积完成后,可以得到最终的结果println("Final sum: ${sumFlow.toList()}")
}Final sum: [0, 1, 3, 6, 10, 15]
SharedFlow
一种热数据流,可以在所有收集器之间以广播方式共享发射的值。与普通的 Flow 不同,SharedFlow 的活动实例独立于收集器的存在,因此它是“热”的。
data class Event(val message: String)class EventBus {private val _events = MutableSharedFlow<Event>(1) // 私有的可变 SharedFlowval events: SharedFlow<Event> = _events.asSharedFlow() // 公开的只读 SharedFlowsuspend fun produceEvent(event: Event) {_events.emit(event) // 挂起直到所有订阅者都收到事件}
}fun main() = runBlocking {val eventBus = EventBus()// 启动第一个订阅者launch {eventBus.events.collect { event ->println("Subscriber 1 received: ${event.message}")}}// 启动第二个订阅者launch {eventBus.events.collect { event ->println("Subscriber 2 received: ${event.message}")}}//延迟一下确保被订阅//因为MutableSharedFlow<Event>(1)-设置了向新订阅者重播的值数(不能为负数,默认为零)为1 所以不用延迟。
// delay(1000)// 生产事件eventBus.produceEvent(Event("Hello from EventBus!"))
}Subscriber 1 received: Hello from EventBus!
Subscriber 2 received: Hello from EventBus!
shareIn
将冷流(flow)转换为上述热共享流(sharedflow)的操作符
fun main() = runBlocking {val backendMessages: Flow<String> = flow {// 模拟从后端获取数据repeat(5) {emit("Message $it")delay(1000)}}// 使用 shareIn 将冷流转换为热共享流val messages: Flow<String> = backendMessages.shareIn(this, SharingStarted.Eagerly, 1) // 使用 Eager 策略}
StateFlow
一个状态容器可观察数据流,可向其收集器发出当前状态更新和新状态更新。
class CounterModel {private val _counter = MutableStateFlow(0) // 私有可变状态流val counter: StateFlow<Int> get() = _counter // 公开为只读状态流fun increment() {_counter.value += 1 // 增加计数器的值}
}fun main():Unit = runBlocking {val counterModel = CounterModel()launch {counterModel.counter.collect { value ->// 在这里处理状态变化println("Counter value changed: $value")}}repeat(5){delay(500)counterModel.increment()}
}Counter value changed: 0
Counter value changed: 1
Counter value changed: 2
Counter value changed: 3
Counter value changed: 4
Counter value changed: 5
stateIn
将冷流(Flow)转换为上述状态流(StateFlow)
take
创建一个新的 Flow,其中包含原始 Flow 中的前 count 个元素。当消耗了 count 个元素后,原始的 Flow 就会被取消。如果 count 不是正数,则会抛出 IllegalArgumentException
fun main():Unit = runBlocking {val numberFlow = flow {for (i in 1..10) {delay(100) // 模拟异步操作emit(i)}}// 从流中获取前 5 个元素val result = numberFlow.take(5)launch {// 收集并打印结果result.collect { value ->println("result $value")}}
}result 1
result 2
result 3
result 4
result 5
takeWhile
返回一个包含满足给定断言的前几个元素的新 Flow 对象。请注意,生成的 Flow 不包含使断言返回 false 的那个元素
fun main():Unit = runBlocking {val numbersFlow: Flow<Int> = flow {for (i in intArrayOf(1,2,3,4,5,6,1,2,3,4)) {delay(1000) // Simulating some asynchronous operationemit(i)}}// 使用 takeWhile 筛选满足条件的前几个元素val filteredFlow: Flow<Int> = numbersFlow.takeWhile { it < 5 }// 收集并打印结果filteredFlow.collect { println(it) }
}1
2
3
4
toCollection
将 Flow 中的元素收集到一个目标可变集合(MutableCollection)中。
fun main() = runBlocking {// 创建一个 Flow,模拟一些异步操作val myFlow: Flow<Int> = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// 创建一个可变列表作为目标集合val resultList = mutableListOf<Int>()// 使用 toCollection 将 Flow 中的元素收集到列表中myFlow.toCollection(resultList)// 打印结果println(resultList)
}[1, 2, 3]
toList
将 Flow 中的元素收集到一个 List
fun main() = runBlocking {// 创建一个 Flow,模拟一些异步操作val myFlow: Flow<Int> = flow {emit(1)delay(1000)emit(2)delay(1000)emit(3)}// 创建一个可变列表作为目标集合val resultList1 = mutableListOf<Int>()// 使用 toList 将 Flow 中的元素收集到列表中myFlow.toList(resultList1)val resultList2 = myFlow.toList()// 打印结果println(resultList1)println(resultList2)
}[1, 2, 3]
[1, 2, 3]
toSet
将 Flow 中的元素收集到一个 Set 用法同上
transform
对 Flow 中的每个元素进行灵活的转换、过滤或多次发射。
fun main() = runBlocking {// 创建一个简单的 Flow,包含 1 到 5 的整数val sourceFlow: Flow<Int> = flowOf(1, 2, 3, 4, 5)// 使用 transform 操作符进行转换,只发射偶数值,并发射两次val transformedFlow: Flow<Int> = sourceFlow.transform { value ->if (value % 2 == 0) {// 发射偶数值两次emit(value)emit(value)}// 奇数值将被跳过}// 收集并打印转换后的 Flow 中的元素transformedFlow.collect { value ->println(value)}
}2
2
4
4
transformLatest
transform 函数不同的是,transformLatest 在原始流发射新值时会取消之前的 transform 块
fun main() = runBlocking {// 创建一个简单的 Flow,包含 1 到 5 的整数val sourceFlow: Flow<Int> = flow {repeat(5) {delay(300)emit(it)}}val transformedFlow: Flow<String> = sourceFlow.transformLatest { value ->// 模拟操作emit("value_$value")delay(500)emit("Action result for: $value")}// 收集并打印转换后的 Flow 中的元素transformedFlow.collect { value ->println(value)}
}value_0
value_1
value_2
value_3
value_4
Action result for: 4
transformWhile
在给定流的每个值上应用一个转换函数,只要该函数返回true,就会一直继续应用。这个函数的接收者是FlowCollector<R>,因此它是一个灵活的函数,可以在发射元素时进行转换、跳过或多次发射。
data class DownloadProgress(val percentage: Int, val isDone: Boolean)fun Flow<DownloadProgress>.completeWhenDone(): Flow<DownloadProgress> =transformWhile { progress ->emit(progress) // 总是发射进度!progress.isDone // 只要下载未完成,就继续}suspend fun simulateDownload(): Flow<DownloadProgress> = flow {for (percentage in 0..100 step 10) {emit(DownloadProgress(percentage, false))delay(500) // 模拟下载延迟}emit(DownloadProgress(100, true)) // 下载完成
}fun main() = runBlocking {val downloadFlow = simulateDownload()downloadFlow.completeWhenDone().collect { progress ->// 处理下载进度println("Download Progress: ${progress.percentage}%")}
}Download Progress: 0%
Download Progress: 10%
Download Progress: 20%
Download Progress: 30%
Download Progress: 40%
Download Progress: 50%
Download Progress: 60%
Download Progress: 70%
Download Progress: 80%
Download Progress: 90%
Download Progress: 100%
Download Progress: 100%
update
MutableStateFlow 类的一个扩展函数,用于原子地使用指定的函数更新MutableStateFlow.value。在多线程或并发环境下,确保更新是原子性的。
fun main():Unit = runBlocking {val userFlow = MutableStateFlow(User("John", 25))launch {// 收集并打印更新后的值userFlow.collect { updatedUser ->println(updatedUser)}}delay(100)// 使用 update 函数更新 MutableStateFlowuserFlow.update { currentUser ->User(currentUser.name, currentUser.age + 1)}
}User(name=John, age=25)
User(name=John, age=26)
updateAndGet
原子地更新 MutableStateFlow 的值,并返回新的值。它通常用于在多线程或并发环境中,确保对状态的更新是原子的。跟updateAndGet 相比它返回新的值,更适用于 Kotlin 协程库中的 MutableStateFlow 类型,用于管理可变的状态。
fun main():Unit = runBlocking {val mutableStateFlow = MutableStateFlow(0)// 更新并获取新的值val newValue = mutableStateFlow.updateAndGet { currentValue ->currentValue + 1}println("New value: $newValue")println("Current value: ${mutableStateFlow.value}")
}New value: 1
Current value: 1
withIndex
将每个元素包装成 IndexedValue 对象,该对象包含值和它的索引(从零开始)。这对于需要知道元素在流中的位置的场景非常有用
fun main() = runBlocking {// 创建一个包含整数的 Flowval flow: Flow<Int> = (1..5).asFlow()// 使用 withIndex 处理 Flow 中的元素及其索引val indexedFlow: Flow<IndexedValue<Int>> = flow.withIndex()// 收集并打印每个元素及其索引indexedFlow.collect { indexedValue ->println("Index: ${indexedValue.index}, Value: ${indexedValue.value}")}
}Index: 0, Value: 1
Index: 1, Value: 2
Index: 2, Value: 3
Index: 3, Value: 4
Index: 4, Value: 5
zip
将两个 Flow 进行合并,然后使用提供的 transform 函数应用于每一对值。合并后的 Flow 将在两个输入的任何一个完成时完成,并且在剩余的 Flow 上调用 cancel。
fun main() = runBlocking {val flow = flowOf(1, 2, 3).onEach { delay(10) }val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }flow.zip(flow2) { i, s -> i.toString() + s }.collect {println(it) // 将打印 "1a 2b 3c"}
}1a
2b
3c