DataStreamAPI实践原理——快速上手

引入

通过编程模型,我们知道Flink的编程模型提供了多层级的抽象,越上层的API,其描述性和可阅读性越强,越下层API,其灵活度高、表达力越强,多数时候上层API能做到的事情,下层API也能做到,反过来未必,不过这些API的底层模型是一致的,可以混合使用。

Flink架构可以处理批和流,Flink 批处理数据需要使用到Flink中的DataSet API,此API主要是支持
Flink针对批数据进行操作,本质上Flink处理批数据也是看成一种特殊的流处理(有界流),所以没有必要分成批和流两套API,从Flink1.12版本往后,Dataset API 已经标记为Legacy(已过时),已被官方软弃用,官方建议使用Table API 或者SQL 来处理批数据,我们也可以使用带有Batch执行模式的DataStream API来处理批数据(DataSet和DataStream API做到了合并),而在后续Flink版本中DataSet API 也被删除。

DataStream API的学习对于理解Flink数据处理流程非常重要,下面我们先从核心API层开始学习,通过基于DataStream API 的编程实践,去学习Flink编程方式,处理数据流程以及转换处理。

现在我们先通过数据处理最经典的WordCount案例,来快速上手Flink的DataStream API开发。

代码编写流程

我们知道Flink编程模型主要有数据源、转换操作和数据输出三个部分,而实际开发编程的时候,则会多两个部分:

  1. 初始化上下文环境(Environment)
    Environment是编写Flink程序的基础,不同层级API编程中创建的Environment环境不同,如:Dataset 编程中需要创建ExecutionEnvironment,DataStream编程中需要创建
    StreamExecutionEnvironment,在Table和SQL API中需要创建TableExecutionEnvironment,使用不同语言编程导入的包也不同,在获取到对应的Environment后我们还可以进行外参数的配置,例如:并行度、容错机制设置等。
  2. 数据源(DataSource)<可以有多个>
    DataSource部分主要定义了数据接入功能,主要是将外部数据接入到Flink系统中并转换成DataStream对象供后续的转换使用。
  3. 转换操作(Transformation)
    Transformation部分有各种各样的算子操作可以对DataStream流进行转换操作,最终将转换结果数据通过DataSink写出到外部存储介质中,例如:文件、数据库、Kafka消息系统等。
  4. 数据输出(DataSink)
    经过一系列Transformation转换操作后,最后一定要调用Sink操作,才会形成一个完整的DataFlow拓扑。只有调用了Sink操作,才会产生最终的计算结果,这些数据可以写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台。
  5. 程序触发(env.execute())
    在DataStream编程中编写完成DataSink代码后并不意味着程序结束,由于Flink是基于事件驱动处理的,有一条数据时就会进行处理,所以最后一定要使用Environment.execute()来触发程序执行。

Flink数据类型

在 Apache Flink 中,为了能够在分布式计算过程中对数据的类型进行管理和判断,引入了 TypeInformation 类来对数据类型进行描述。TypeInformation 是 Flink 类型系统的基石,它允许 Flink 在编译时推断数据类型,从而为数据的序列化、反序列化、内存管理等操作提供必要的类型信息。以下是 Flink 中常见的数据类型及其对应的 TypeInformation 类型:

1. 基本数据类型

Flink 通过 BasicTypeInfo 支持 Java 的基本数据类型(如 int、double、boolean 等)以及它们的包装类(如 Integer、Double、Boolean 等),还支持 String 类型。

2. 数组类型

对于数组类型,Flink 提供了 BasicArrayTypeInfo,支持如 int[]、String[] 等数组数据类型。

3. Tuple 类型

Tuple 是 Flink 中一种常用的数据类型,用于表示固定长度的字段集合。Flink 提供了 TupleTypeInfo 来支持 Tuple 类型的数据。

4. POJO 类型

POJO(Plain Old Java Object)类型是 Flink 中非常重要的数据类型,它允许使用普通的 Java 类来表示数据对象。为了使 Flink 能够正确识别和处理 POJO 类型,需要满足以下条件:

  • POJO 类必须是公共类(public)且不能是内部类。
  • POJO 类必须包含一个默认的无参构造函数。
  • POJO 类的所有字段必须是公共的,或者提供公共的 getter 和 setter 方法。

当满足上述条件时,Flink 会自动识别 POJO 类型,并通过 PojoTypeInfo 来描述该类型。

5. Scala Case Class 类型

对于使用 Scala 编写的 Flink 应用,Flink 提供了 CaseClassTypeInfo 来支持 Scala 的 Case Class 类型。Case Class 是 Scala 中一种特殊的类,通常用于表示不可变的数据对象,非常适合在 Flink 中作为数据类型使用。

在使用Java API开发Flink应用时,通常情况下Flink都能正常进行数据类型推断进而选择合适的serializers以及comparators,但是在定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,Flink就获取不到对应的类型信息,这就需要借助类型提示(Type Hints)来告诉系统函数中传入的参数类型信息和输出类型,进而对数据类型进行推断处理。

在使用Scala API 开发Flink应用时,Scala API通过使用Manifest和类标签在编译器运行时获取类型信息,即使在函数定义中使用了泛型,也不会像Java API出现类型擦除问题,但是在使用到Flink已经通过TypeInformation定义的数据类型时,TypeInformation类不会自动创建,需要使用隐式参数的方式引入:import org.apache.flink.api.scala._,否则在运行代码过程中会出现“could not find implicit value for evidence parameter of type TypeInformation”的错误。

Flink 序列化机制

在两个进程进行远程通信时,它们需要将各种类型的数据以二进制序列的形式在网络上传输,数据发送方需要将对象转换为字节序列,进行序列化,而接收方则将字节序列恢复为各种对象,进行反序列化。对象的序列化有两个主要用途:

  • 一是将对象的字节序列永久保存到硬盘上,通常存放在文件中;
  • 二是在网络上传输对象的字节序列。序列化的好处包括减少数据在内存和硬盘中的占用空间,减少网络传输开销,精确推算内存使用情况,降低垃圾回收的频率。

序列化和反序列化是分布式计算框架中的关键环节,尤其是在节点之间需要进行数据传输时。Flink 的序列化机制负责将数据对象转换为字节序列以便在网络上传输或在磁盘上存储,并能够在需要时将字节序列恢复为原始对象。Flink 提供了多种序列化器,以满足不同类型的数据序列化需求。高效的序列化和反序列化对于分布式计算框架至关重要,原因如下:

  • 减少数据传输开销:通过将对象转换为紧凑的字节序列,可以减少网络传输的数据量,提高数据传输效率。

  • 降低内存占用:序列化后的数据通常占用更少的内存空间,有助于提高内存利用率,尤其是在处理大规模数据集时。

  • 支持数据持久化:序列化后的数据可以方便地写入磁盘进行持久化存储,便于后续的数据恢复和分析。

Flink序列化机制负责在节点之间传输数据时对数据对象进行序列化和反序列化,确保数据的正确性和一致性。Flink提供了多种序列化器,包括Kryo、Avro和Java序列化器等,大多数情况下,用户不用担心flink的序列化框架,Flink会通过TypeInfomation在数据处理之前推断数据类型,进而使用对应的序列化器,例如:针对标准类型(int,double,long,string)直接由Flink自带的序列化器处理,其他类型默认会交给Kryo处理。

但是对于Kryo仍然无法处理的类型,可以采取以下两种解决方案:

1. 强制使用Avro替代Kryo序列化

//设置flink序列化方式为avro
env.getConfig().enableForceAvro();

2. 自定义注册Kryo序列化

//注册kryo 自定义序列化器
env.getConfig().registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

单词统计案例

下面我们通过一个单词统计的案例,快速上手应用Flink,进行流处理。

引入依赖

    <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><flink.version>1.16.0</flink.version><slf4j.version>1.7.31</slf4j.version><log4j.version>2.17.1</log4j.version><scala.version>2.12.10</scala.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><!-- Flink批和流开发依赖包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Scala包 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><!-- slf4j&log4j 日志相关包 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>${log4j.version}</version></dependency></dependencies>

代码实现

/*** WordCount 类实现了一个简单的 Flink 流式处理程序,用于统计输入文本文件中每个单词的出现次数。*/
public class WordCount {/*** 程序的主入口方法,负责创建 Flink 流式处理环境,读取输入文件,进行单词计数,并输出结果。* * @param args 命令行参数,在本程序中未使用。* @throws Exception 当执行 Flink 任务时可能抛出异常。*/public static void main(String[] args) throws Exception {// 1. 创建流式处理环境,用于配置和执行 Flink 流式计算任务StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 从指定的文本文件中读取数据,返回一个 DataStreamSource 对象,其中每个元素是文件中的一行文本DataStreamSource<String> lines = env.readTextFile("./data/words.txt");// 3. 对读取的每行文本进行处理,将其切分为单词,并转换为 <单词, 1> 的键值对形式// flatMap 方法用于将每行文本拆分为多个单词,并为每个单词生成一个键值对// returns 方法用于指定 flatMap 操作返回的数据类型SingleOutputStreamOperator<Tuple2<String, Long>> kvWordsDS =lines.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {// 将每行文本按空格分割成单词数组String[] words = line.split(" ");// 遍历单词数组,为每个单词生成一个 <单词, 1> 的键值对,并收集到 Collector 中for (String word : words) {collector.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 对键值对数据进行分组统计,按照单词(键)进行分组,对值(出现次数)进行求和// keyBy 方法用于按照指定的键对数据进行分组// sum 方法用于对分组后的数据的指定字段进行求和操作// print 方法用于将统计结果输出到控制台kvWordsDS.keyBy(tp -> tp.f0).sum(1).print();// 5. 在流式计算中,需要调用 execute 方法来触发任务的执行// 该方法会阻塞当前线程,直到任务执行完成或被中断env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/77510.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WPF 图片文本按钮 自定义按钮

效果 上面图片,下面文本 样式 <!-- 图片文本按钮样式 --> <Style x:Key="ImageTextButtonStyle" TargetType="Button"><Setter Property="Background" Value="Transparent"/><Setter Property="BorderTh…

驱动开发硬核特训 · Day 22(上篇): 电源管理体系完整梳理:I2C、Regulator、PMIC与Power-Domain框架

&#x1f4d8; 一、电源子系统总览 在现代Linux内核中&#xff0c;电源管理不仅是系统稳定性的保障&#xff0c;也是实现高效能与低功耗运行的核心机制。 系统中涉及电源管理的关键子系统包括&#xff1a; I2C子系统&#xff1a;硬件通信基础Regulator子系统&#xff1a;电源…

设计模式全解析:23种经典设计模式及其应用

创建型模式 1. 单例模式&#xff08;Singleton Pattern&#xff09; 核心思想&#xff1a;确保一个类只有一个实例&#xff0c;并提供一个全局访问点。适用场景&#xff1a;需要共享资源的场景&#xff0c;如配置管理、日志记录等。 public class Singleton {// 静态变量保存…

力扣热题100题解(c++)—矩阵

73.矩阵置零 给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 int m matrix.size(); // 行数int n matrix[0].size(); // 列数bool firstRowZero false; // 标记第一行是否包含 0bool f…

本地部署DeepSeek-R1(Dify升级最新版本、新增插件功能、过滤推理思考过程)

下载最新版本Dify Dify1.0版本之前不支持插件功能&#xff0c;先升级DIfy 下载最新版本&#xff0c;目前1.0.1 Git地址&#xff1a;https://github.com/langgenius/dify/releases/tag/1.0.1 我这里下载到老版本同一个目录并解压 拷贝老数据 需先停用老版本Dify PS D:\D…

PostSwigger Web 安全学习:CSRF漏洞3

CSRF 漏洞学习网站&#xff1a;What is CSRF (Cross-site request forgery)? Tutorial & Examples | Web Security Academy CSRF Token 基本原理 CSRF Token 是服务端生成的唯一、随机且不可预测的字符串&#xff0c;用于验证客户端合法校验。 作用&#xff1a;防止攻击…

用 Nodemon 解决 npm run serve 频繁重启服务

Nodemon 是一个基于 Node.js 构建的开发工具&#xff0c;专为帮助开发者自动监控项目文件的更改而设计。每当文件发生变更时&#xff0c;Nodemon 会自动重启 Node.js 服务器&#xff0c;无需手动停止并重启。这对于提升开发速度、减少人工操作非常有帮助&#xff0c;尤其适用于…

django admin 中更新表数据 之后再将数据返回管理界面

在Django中&#xff0c;更新数据库中的数据并将其重新显示在Django Admin界面上通常涉及到几个步骤。这里我将详细说明如何在Django Admin中更新表数据&#xff0c;并确保更新后的数据能够立即在管理界面上显示。 定义模型 首先&#xff0c;确保你的模型&#xff08;Model&…

真.从“零”搞 VSCode+STM32CubeMx+C <1>构建

目录 前言 准备工作 创建STM32CubeMx项目 VSCode导入项目&配置 构建错误调试 后记 前言 去年10月开始接触单片机&#xff0c;一直在用树莓派的Pico&#xff0c;之前一直用Micropython&#xff0c;玩的不亦乐乎&#xff0c;试错阶段优势明显&#xff0c;很快就能鼓捣一…

C语言学习之结构体

在C语言中&#xff0c;我们已经学了好几种类型的数据。比如整型int、char、short等&#xff0c;浮点型double、float等。但是这些都是基本数据类型&#xff0c;而这些数据类型应用在实际编程里显然是不够用的。比如我们没有办法用一旦数据类型来定义一个”人“的属性。因此这里…

架构-计算机系统基础

计算机系统基础 一、计算机系统组成 &#xff08;一&#xff09;计算机系统层次结构 硬件组成 主机&#xff1a;包含CPU&#xff08;运算器控制器&#xff09;、主存储器&#xff08;内存&#xff09;。外设&#xff1a;输入设备、输出设备、辅助存储器&#xff08;外存&…

【计算机网络性能优化】从基础理论到实战调优

目录 前言技术背景与价值当前技术痛点解决方案概述目标读者说明 一、技术原理剖析核心概念图解核心作用讲解关键技术模块说明技术选型对比 二、实战演示环境配置要求核心代码实现案例1&#xff1a;iPerf3带宽测试案例2&#xff1a;TCP窗口优化案例3&#xff1a;QoS流量整形 运行…

Python 自动化办公:Excel 数据处理的“秘密武器”

引言 在日常的 IT 办公场景里&#xff0c;Excel 是数据处理与分析的 “常胜将军”。无论是财务人员整理账目、销售团队统计业绩&#xff0c;还是运营人员分析用户数据&#xff0c;Excel 都发挥着关键作用。但面对海量数据&#xff0c;手动操作 Excel 不仅效率低下&#xff0c;还…

缓存集群技术深度解析:从原理到实战

缓存集群技术深度解析&#xff1a;从原理到实战 一、缓存集群核心定位与架构选型 1. 集群模式核心价值 缓存集群通过数据分片、高可用保障、水平扩展解决单节点瓶颈&#xff0c;核心能力包括&#xff1a; 数据分片&#xff1a;将数据分散到多个节点&#xff0c;突破单节点内…

CSDN编辑文章时如何自动生成目录

如何自动生成目录 前置条件1. 插入目录标识符2. 编写标题层级 前置条件 需要使用markdown编辑&#xff0c;并且只有按照markdown语法编写不同的标题级别&#xff0c;才能使用这个方法自动生成对应的目录。 1. 插入目录标识符 在文章的顶部添加以下代码&#xff1a; [TOC](文…

产品经理对于电商接口的梳理||电商接口文档梳理与接入

接口梳理7个注意点总结 ①注意要测试环境和生产环境。生产上线时候要提醒研发换到生产环境调用。 ②注意必输字段和选输字段&#xff0c;要传入字段的含义和校验。枚举值不清楚含义的要询问对方含义&#xff0c;比如说单据类型字段枚举值是B2C发货单&#xff0c;BBC发货单&am…

更快的图像局部修改与可控生成:Flex.2-preview

Flex.2-preview 文本生成图像扩散模型介绍 一、模型简介 Flex.2-preview 是一种 开源的 80 亿参数文本生成图像扩散模型&#xff0c;具备通用控制和修复支持功能&#xff0c;是 Flex.1alpha 的下一代版本。该模型由社区开发并为社区服务&#xff0c;采用 Apache 2.0 许可证&a…

【Castle-X机器人】一、模块安装与调试:机器人底盘

持续更新。。。。。。。。。。。。。。。 【ROS机器人】模块安装 一、Castle-X机器人底盘1.1 结构概述1.2 驱动执行结构1.3 环境传感器1.4 电气系统1.5 Castle-x机器人底盘测试激光雷达传感器测试及数据可视化超声波传感器实时数据获取防跌落传感器测试陀螺仪测试键盘控制测试…

条件、列表渲染.

#### v-for 1. 渲染列表 vue <template> <ul v-for"(item,index) in list" > <li>{{ item }}</li> </ul> </template> <script setup> import { ref } from vue; let list ref([苹果, 香蕉, 橙子]) </script>…

node20的安装和vue的入门准备

一、node20的安装 直接下载路径&#xff1a;https://nodejs.org/download/release/v20.11.0/node-v20.11.0-x64.msi 安装&#xff0c;双击msi文件 点击同意协议 更改下载路径 什么也不用选&#xff0c;点击next进行下一步 什么也不用选&#xff0c;点击next进行下一步 点击安…