【Kotlin】Flow简介

1 前言

        Flow 是 Kotlin 标准库中的一个新的异步流处理框架,旨在简化异步数据流的操作和处理,它提供了一种声明式的方式来处理数据流。

        Flow 中一些接口调用有些类似 Sequence(详见 → Sequence简介),协程的使用详见 → 协程。

        Flow 有以下特性和概念。

  1. 异步流(Asynchronous Streams):Flow 允许以一种非阻塞的方式处理一系列的值或事件,这使得在处理大量数据或涉及 IO 操作时能够更加高效。

  2. 冷流(Cold Flow):只有在收集器(collector)订阅(或启动)了之后才会开始发射(emit)数据。

  3. 热流(Hot Flow):在创建后就立即开始发射(emit)数据,不管是否有收集器(collector),这会导致收集器可能只接收到部分数据。

  4. 声明式 API:Flow 提供了一套简洁清晰的操作符,允许以声明式的方式对流进行处理,如 map、filter、reduce 等。

  5. 协程集成:Flow 构建在协程之上,因此可以与协程一起使用,并且可以利用协程的优势,比如轻量级、顺序性等。

  6. 取消支持:Flow 支持与协程一样的取消操作,从而释放资源和避免内存泄漏。

  7. 背压支持:Flow 提供了背压支持,可以通过各种操作符来控制数据的生产和消费速率,防止生产者速度过快导致消费者无法跟上。

        Flow 有中间操作和终端操作,如下。

  • 中间操作:每次操作返回一个新的 Flow 对象(主要操作有:flowOn、catch、buffer、conflate、filter、distinctUntilChanged、drop、take、map 等)。
  • 终端操作:每次操作返回一个值或集合,每个 Flow 只能进行一次终端操作(主要操作有:first、last、count、fold、reduce、collect、toCollection、toSet、toList 等)。

2 创建 Flow

2.1 emptyFlow

public fun <T> emptyFlow(): Flow<T> = EmptyFlow

2.2 flow

        1)源码

public fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

        2)应用

var coldFlow = flow<String> {  emit("A")emit("B")
}

2.3 MutableSharedFlow

        1)源码

public fun <T> MutableSharedFlow(replay: Int = 0,extraBufferCapacity: Int = 0,onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

        2)应用

var hotFlow = MutableSharedFlow<String>()
CoroutineScope(Dispatchers.Default).launch {hotFlow.emit("A")hotFlow.emit("B")
}

2.4 flowOf

        1)源码

public fun <T> flowOf(value: T): Flow<T> = flow {emit(value)
}public fun <T> flowOf(vararg elements: T): Flow<T> = flow {for (element in elements) {emit(element)}
}

        2)应用

var flow1 = flowOf("A")
var flow2 = flowOf("A", "B", "C")

2.5 asFlow

2.5.1 () -> T

        1)源码

public fun <T> (() -> T).asFlow(): Flow<T> = flow {emit(invoke())
}

        2)应用

fun main() {var fun2 = { fun1() }.asFlow()
}fun fun1(): String {return "xxx"
}

2.5.2 Iterator

        1)源码

public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {forEach { value ->emit(value)}
}

        2)应用

var array = intArrayOf(1, 2, 3)
var iterator = array.iterator()
var flow = iterator.asFlow()

2.5.3 Sequence

        1)源码

public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {forEach { value ->emit(value)}
}

        2)应用

var sequence = sequenceOf(1, 2, 3)
var flow = sequence.asFlow()

2.5.4 Array

        1)源码

public fun <T> Array<T>.asFlow(): Flow<T> = flow {forEach { value ->emit(value)}
}

        2)应用

var array = arrayOf(1, 2, 3)
var flow = array.asFlow()

2.5.5 Range

        1)源码

public fun IntRange.asFlow(): Flow<Int> = flow {forEach { value ->emit(value)}
}

        2)应用

var range = 1..3
var flow = range.asFlow()

2.6 zip

        1)源码

public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>

        2)应用

var flow1 = flowOf(1, 3, 5)
var flow2 = flowOf("A", "B", "C")
// A-1, B-3, C-5
var flow = flow1.zip(flow2) { num, str ->"$str-$num"
}

3 Flow 冷流和热流

3.1 冷流

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launchfun main() {val coldFlow = emitFlow()CoroutineScope(Dispatchers.Default).launch {coldFlow.collect {println("CoroutineScope, $it")}}MainScope().launch(Dispatchers.IO) {coldFlow.collect {println("MainScope, $it")}}Thread.sleep(1000)
}fun emitFlow(): Flow<String> = flow {for (i in 1..2) {emit("emit-$i")delay(100)}
}

        打印如下。

CoroutineScope, emit-1
MainScope, emit-1
CoroutineScope, emit-2
MainScope, emit-2

        说明:可以看到每一个订阅者都可以收到所有消息。

3.2 热流

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launchfun main() {var hotFlow = emitFlow()CoroutineScope(Dispatchers.Default).launch {hotFlow.collect {println("CoroutineScope, $it")}}MainScope().launch(Dispatchers.IO) {hotFlow.collect {println("MainScope, $it")}}Thread.sleep(1000)
}fun emitFlow(): MutableSharedFlow<String> {var hotFlow = MutableSharedFlow<String>()CoroutineScope(Dispatchers.Default).launch {for (i in 1..2) {hotFlow.emit("emit-$i")delay(100)}}return hotFlow
}

        打印如下。

MainScope, emit-2
CoroutineScope, emit-2

        说明:可以看到每一个订阅者都只收到部分消息。

4 Flow 的中间操作

4.1 源码

// 切换线程
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
// 捕获异常
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(Throwable) -> Unit): Flow<T>
// 在数据流中使用一个缓冲区来存储数据, 当数据产生速率超过消费速率时, 数据会暂时存储在缓冲区中, 直到有足够的空间将其传递给订阅者。这可以确保数据不会丢失,但可能会占用更多的内存。
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T>
// 当数据产生速率超过消费速率时, 跳过一些数据, 只保留最新的数据。这样可以减少内存占用,但会丢失一部分数据。
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
// 过滤
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>
// 去除相邻的重复元素
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T>
// 丢弃前 n 个元素
public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 截取前 n 个元素
public fun <T> Flow<T>.take(count: Int): Flow<T>
// 映射(T -> R)
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>

4.2 应用

fun main() {var flow = flowOf(4, 9, 1, 8, 5, 7, 7, 5, 3, 6, 2)CoroutineScope(Dispatchers.Default).launch {flow.flowOn(Dispatchers.Default).catch {println(it.message)}.buffer().filter { it in 3..7 } // 4, 5, 7, 7, 5, 3, 6.distinctUntilChanged() // 4, 5, 7, 5, 3, 6.drop(1) // 5, 7, 5, 3, 6.take(4) // 5, 7, 5, 3.map { it * it } // 25, 49, 25, 9.collect(::println)}Thread.sleep(1000)
}

5 Flow 的终端操作

5.1 first、last、count

        1)源码

// 首元素
public suspend fun <T> Flow<T>.first(): T
// 尾元素
public suspend fun <T> Flow<T>.last(): T

        2)应用

fun main() {var flow = flowOf(3, 5, 7, 6)CoroutineScope(Dispatchers.Default).launch {println(flow.first()) // 3println(flow.last()) // 6println(flow.count()) // 4}Thread.sleep(1000)
}

5.2 collect

        1)源码

public suspend fun collect(collector: FlowCollector<T>)

        2)应用

fun main() {var flow = flowOf(1, 3, 5, 7)CoroutineScope(Dispatchers.Default).launch {flow.collect(::println) // 1, 3, 5, 7}Thread.sleep(1000)
}

5.3 fold

        1)源码

// 规约运算,定义运算 o, result = (((((i o e1) o e2)) o e3) o e4) o ...
public suspend inline fun <T, R> Flow<T>.fold(initial: R,crossinline operation: suspend (acc: R, value: T) -> R
): R

        说明:fold 与 reduce 的区别是,fold 有初值,reduce 无初值。

        2)应用

fun main() {var flow = flowOf(2, 3, 5)CoroutineScope(Dispatchers.Default).launch {// 10+2+3+5=20var res1 = flow.fold(10, Integer::sum)println(res1)// 1*1-2*2=-3, (-3)*(-3)-3*3=0, 0*0-5*5=-25var res2 = flow.fold(1) { e1, e2 ->e1 * e1 - e2 * e2}println(res2)}Thread.sleep(1000)
}

5.4 reduce

        1)源码

// 规约运算,定义运算 o, result = ((((e1 o e2)) o e3) o e4) o ...
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

        说明:reduce 与 fold 的区别是,reduce 无初值,fold 有初值。

        2)应用

fun main() {var flow = flowOf(1, 3, 5)CoroutineScope(Dispatchers.Default).launch {var sum = flow.reduce(Integer::sum)println(sum) // 9// 1*1-3*3=-8, (-8)*(-8)-5*5=39var res = flow.reduce { e1, e2 ->e1 * e1 - e2 * e2}println(res) // 39}Thread.sleep(1000)
}

5.5 集合转换

        1)源码

public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>

        2)应用

fun main() {var flow = flowOf(1, 3, 5)CoroutineScope(Dispatchers.Default).launch {var set1 = flow.toCollection(mutableSetOf()) // [1, 3, 5]var list1 = flow.toCollection(mutableListOf()) // [1, 3, 5]var set2 = flow.toSet() // [1, 3, 5]var list2 = flow.toList() // [1, 3, 5]}Thread.sleep(1000)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/4187.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flutter - 折叠面板

demo 地址: https://github.com/iotjin/jh_flutter_demo 代码不定时更新&#xff0c;请前往github查看最新代码 flutter 自定义折叠组件 支持三种类型和两种展示效果可自定义title和被折叠的内容 效果图 示例 import package:flutter/material.dart; import /jh_common/widge…

K8s: 从集群外部访问Service

从集群外部访问 Service 1 &#xff09;概述 在前面我们一直实践的是在集群内部访问 Service&#xff0c;之前有2种方法 方法1&#xff1a;在一个node节点上&#xff0c;通过对创建的的时候&#xff0c;对port进行一个环境变量的注册来保证Service能够正确对 不同的pod 访问到…

基于SNAP使用SAR数据做变化检测change detection

基于SNAP使用SAR数据做变化检测change detection 1. 前言2. 步骤2.1 添加数据2.2 辐射定标--散斑过滤--地形矫正2.3 裁剪2.4 变化检测 3.查看变化检测结果 1. 前言 在SNAP中&#xff0c;change detection主要通过Stack工具来完成。 SAR数据&#xff1a;Radarsat-2 SLC 其他数据…

基于FastGPT搭建知识库问答系统

什么是 FastGPT &#xff1f; FastGPT 是一个基于 LLM 大语言模型的知识库问答系统&#xff0c;提供开箱即用的数据处理、模型调用等能力。同时可以通过 Flow 可视化进行工作流编排&#xff0c;从而实现复杂的问答场景&#xff01; FastGPT 允许用户构建本地知识库&#xff0c;…

conda创建新环境

&#xff11;、创建虚拟环境&#xff1a; Anaconda创建环境&#xff1a;比如&#xff0c;创建pyhon&#xff1d;3.6的版本环境取名叫 name conda create -n name python3.6 2、删除虚拟环境操作&#xff1a;&#xff08;谨慎操作&#xff09; conda remove -n name --all &am…

MQTT数据传输Payload的常见格式介绍

使用MQTT client过程中看到常见的数据格式&#xff1a; 下面是介绍 Plaintext&#xff08;纯文本&#xff09; 介绍&#xff1a;纯文本编码是最基本的编码形式&#xff0c;它使用标准的ASCII或Unicode字符来表示数据。这种编码格式是人类可读的&#xff0c;因为它直接表示文本信…

(成品论文22页)24深圳杯数学建模A题1-4问完整代码+参考论文重磅更新!!!!

论文如下&#xff1a; 基于三球定位的多个火箭残骸的准确定位 针对问题一&#xff1a;为了进行单个残骸的精确定位&#xff0c;确定单个火箭残骸发生音爆 时的精确位置和时间&#xff0c;本文基于三球定位模型&#xff0c;考虑到解的存在性和唯一性&#xff0c; 选取了四个监测…

洛谷 B3969 [GESP202403 五级] B-smooth 数 题解

思路 我们只要求出每个数的最大质因数&#xff0c;再一个个判断是否满足要求即可。 如何找到每个数的最大质因数呢&#xff1f;其实&#xff0c;我们可以在埃氏筛法的基础上进行改进&#xff0c;从而达到算出最大质因数的目的。 让我们先来了解一下埃氏筛法&#xff0c;知道…

第七篇、animateDiff使用

1、文生图 2、提示词游历 在不同帧设置不同的提示词&#xff0c;有公共提示词和游历提示词&#xff0c;上面是公共的&#xff0c;下面是游历

C#中如何定义带参数的EventHandler?

简述 事件调用的所有方法都需要两个参数&#xff1a;object sender&#xff0c;EventArgs e。该事件使用这两个参数调用方法&#xff0c;因此我们不能直接添加自定义参数。 比如下面这段代码&#xff0c;我们想在 MessageBox 中显示字符串 s &#xff0c;这必然是不成。 priv…

VS Code开发STM32F4xx jlink接口swd模式

VS Code开发STM32F4xx jlink接口swd模式(测试OK) 下面的代码(已验证),只作为参考,不同情况的更改参照文章末尾链接 c_cpp_properties.json代码 (其中include路径和宏定义可以参照makefile添加) : {"configurations": [{"name"…

数据库系统工程师之数据结构

在数据库系统工程师的考试中&#xff0c;数据结构是一个重要的考点。数据结构主要关注数据元素之间的相互关系以及它们的组织和存储方式。以下是数据库系统工程师考试中数据结构的主要考点&#xff1a; 1.线性结构&#xff1a;线性结构是数据元素之间存在一对一关系的数据结构。…

上市公司-人工智能的采纳测算程度数据集(2003-2021年)

01、数据简介 人工智能&#xff08;Artificial Intelligence&#xff0c;AI&#xff09;是一个研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的新技术科学。它是计算机科学的一个分支&#xff0c;旨在生产出一种能以人类智能相似的方式做出反应的智能机…

汽车新智能图谱里:理解腾讯的AI TO B路径

将自身的C2B产品和产业理解充分AI化&#xff0c;在自身内部场景率先验证跑通后&#xff0c;进而释放给产业伙伴&#xff0c;对应到具体的需求痛点&#xff0c;一起打磨对应的行业AI模型。 这也恰是腾讯“实用”标签背后的AI产业路径。 作者|皮爷 出品|产业家 成本、性价…

直流有刷电机入门

文章目录 123455.25.3 1 2 电刷 材质是 石墨 3 130马达 就几毛钱 几块钱这学的就是减速电机P MAX一定 pf*v 降低速度 扭矩就会大 4 还有空载电流 过大负载 时 有堵转电流 &#xff08;可分析电流 来看电机工作状态&#xff09;RPM 转每分钟 5 5.2 这的线圈 是简化后的转子绕组…

一个数据人眼中的《上游思维》

最近读了《上游思维》这本书&#xff0c;很受启发&#xff0c;我想从一个数据人的角度来聊一聊我对这本书的读后感。上游思维本质上是帮助我们解决问题&#xff0c;我发现在解决问题相关的每个阶段&#xff1a;发现问题、找到解决问题的方法、解决问题的过程中、评估问题以及预…

电磁仿真--基本操作-CST-(4)

目录 1. 简介 2. 建模过程 2.1 基本的仿真配置 2.2 构建两个圆环体和旋转轴 2.3 切分圆环体 2.4 衔接内外环 2.5 保留衔接部分 2.6 绘制内螺旋 2.7 绘制外螺旋 2.8 查看完整体 2.9 绘制引脚 2.10 设置端口 2.11 仿真结果 3. 使用Digilent AD2进行测试 3.1 进行…

Stable Diffusion入门指南

SD 保姆教程&#xff0c;从原理功能到案例输出展示&#xff0c;最后简述 ControlNet 的使用技巧。 Stable Diffusion 的基本介绍 Stable Diffusion是一种基于扩散过程的图像生成模型&#xff0c;可以生成高质量、高分辨率的图像。它通过模拟扩散过程&#xff0c;将噪声图像逐…

大厂面试题:两道来自京东的关于MyBatis执行器的面试题

大家好&#xff0c;我是王有志。 今天给大家带来两道来自于京东关于的 MyBatis 面试题&#xff1a; MyBatis 提供了哪些执行器&#xff08;Executor&#xff09;&#xff1f;它们有什么区别&#xff1f;Mybatis 中如何指定 Executor 的类型&#xff1f; MyBatis 提供了哪些执…

自然语言处理的发展历程

1.自然语言处理发展的7个阶段 序号阶段时间贡献代表人物1起源期1913-1956思考使用图灵算法计量模型来描述自然语言&#xff0c;描述词语及词语之间的关系。这一阶段停留在理论层面做探索图灵、马尔可夫、香农2基于规则的形式语言理论期1957-1970形式语言理论的提出&#xff0c…