Flink之DataStream API开发Flink程序过程与Flink常见数据类型

开发Flink程序过程与Flink常见数据类型

  • DataStream API
    • Flink三层API
    • DataStream API概述
  • 开发Flink程序过程
    • 添加依赖
    • 创建执行环境
    • 执行模式
    • 创建Data Source
    • 应用转换算子
    • 创建Data Sink
    • 触发程序执行
    • 示例
  • Flink常见数据类型
    • 基本数据类型
    • 字符串类型
    • 时间和日期类型
    • 数组类型
    • 元组类型
    • 列表类型
    • 映射类型
    • POJO类型
    • Row类型
    • 可序列化类型
    • 类型提示

DataStream API

Flink三层API

在这里插入图片描述

SQL & TableAPI

SQL & TableAPI同时适用于批处理和流处理,意味着可以对有界数据流和无界数据流以相同的语义进行查询,并产生相同的结果。除了基本查询外,它还支持自定义的标量函数,聚合函数以及表值函数,可以满足多样化的查询需求。

DataStream & DataSetAPI

DataStream & DataSetAPI是Flink数据处理的核心API,支持使用Java语言或Scala语言进行调用,提供了数据读取,数据转换和数据输出等一系列常用操作的封装。

StatefulStreamProcessing

StatefulStreamProcessing是最低级别的抽象,它通过ProcessFunction函数内嵌到DataStreamAPI中。ProcessFunction是Flink提供的最底层API,具有最大的灵活性,允许开发者对于时间和状态进行细粒度的控制。

DataStream API概述

Flink的DataStream API是Flink中最主要的API之一,它用于处理无限流数据。DataStreamAPI支持高级的流处理操作,例如窗口计算、状态管理、流分区等,并且在处理大规模数据时表现出色。

由于Flink DataSet和DataStream API的高度相似,并且DataStream API提供流批一体处理的能力,官方也推荐直接使用DataStream API,因此学习DataStream API如何使用即可。

流(STREAMING)执行模式适用于需要连续增量处理,而且预计无限期保持在线的无边界作业。

批(BATCH)执行模式适用于有一个已知的固定输入,而且不会连续运行的有边界作业。

开发Flink程序过程

确定需求:明确想要解决的问题或实现的功能。导入依赖:在项目中导入Apache Flink相关的依赖,可以使用Maven、Gradle或其他构建工具来管理依赖关系。创建StreamExecutionEnvironment:使用StreamExecutionEnvironment.getExecutionEnvironment()创建Flink的执行环境对象,它用于配置和执行流处理作业。读取数据:从适合的数据源(例如文件、Kafka、Socket等)读取数据,可以使用readTextFile()、addSource()等方法来读取数据并转换为DataStream。转换操作:对读取到的数据进行处理和转换操作,可以使用诸如map、flatmap、filter等方法来进行各种转换和处理。窗口操作(可选):如果需要对数据进行窗口操作(例如滚动窗口、滑动窗口等),可以使用Flink提供的窗口操作方法。结果处理:将转换后的数据写入文件、数据库、消息队列或其他输出源,或者使用print()、collect()等方法将数据打印到控制台。设置作业配置和调优(可选):根据需求和性能要求,可以设置作业的并行度、时间特性、状态后端、容错机制、资源配置等。执行作业:通过调用env.execute()方法来执行流处理作业。作业将提交到Flink集群或本地运行。监控和调试(可选):可以通过Flink的监控界面查看作业的状态和指标,并使用日志和调试工具追踪和解决问题。

添加依赖

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.0</version></dependency>

创建执行环境

Flink程序可以在各种上下文环境中运行:

可以在本地 JVM 中执行程序可以提交到远程集群上运行

创建执行环境是使用StreamExecutionEnvironment类,调用这个类的静态方法来创建执行环境。

在这里插入图片描述

获取到程序执行环境后,还可以对执行环境进行灵活的设置。

可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

1.本地执行环境

使用createLocalEnvironment()方法创建一个本地执行环境

可以在调用时传入一个参数,指定默认的并行度,默认并行度是电脑CPU核心数

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

2.集群执行环境
使用createRemoteEnvironment("node01", 8888,"/root/demo.jar")方法创建一个集群执行环境

需要在调用时指定JobManager的主机名和端口号,以及在集群中运行的Jar包

/*** JobManager 主机名* JobManager 进程端口号* 提交给JobManager的JAR包*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("node01", 8888,"/root/demo.jar");

3.自适应执行环境

使用getExecutionEnvironment()方法根据当前运行的上下文直接得到正确的执行环境

如果程序独立运行,则返回一个本地执行环境。如果创建了jar包,然后在命令行调用它并提交到集群执行,则返回集群的执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

4.本地执行环境+Web UI
使用createLocalEnvironmentWithWebUI(conf)方法创建一个本地执行环境,同时启动Web监控UI。

需要创建一个配置文件,设置相关参数,如设置Web UI端口,默认使用端口8081

Configuration conf = new Configuration();
conf.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

执行模式

DataStream API执行模式包括:流执行模式、批执行模式和自动模式。

流执行模式Streaming

流执行模式是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。

批执行模式Batch

批执行模式是专门用于批处理的执行模式

自动模式AutoMatic

在自动模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

配置批执行模式

执行模式可以通过 execute.runtime-mode 设置来配置。有三种可选的值:

STREAMING: 经典 DataStream 执行模式(默认)BATCH:DataStream API 上进行批量式执行AUTOMATIC: 让系统根据数据源的边界性来决定

1.通过命令行配置

提交作业时,增加execution.runtime-mode参数,指定值为BATCH

bin/flink run -Dexecution.runtime-mode=BATCH

2.通过代码配置

// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 基于执行环境调用setRuntimeMode方法,传入BATCH模式。不建议,推荐通过命令行传递参数
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

创建Data Source

创建执行环境后,可以使用其提供的一些方法,通过这些方法可以创建Data Source

例如:从文件中读取数据:可以直接逐行读取数据,像读CSV文件一样,或使用任何第三方提供的source

String filePath = "data/test.text";final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.readTextFile(filePath);

应用转换算子

这将生成一个DataStream,然后可以在上面应用转换算子transformation来创建新的派生DataStream。可以调用DataStream上具有转换功能的方法来应用转换。

例如: 应用一个map的转换算子,它将通过把原始集合中的每一个字符串转换为一个整数来创建一个新的DataStream。

DataStream<String> text = ...;DataStream<Integer> parsed = text.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) {return Integer.parseInt(value);}
});

创建Data Sink

一旦有了包含最终结果的DataStream,就可以通过创建sink把它写到外部系统。

// 简单skin:将DataStream以文本格式写入path指定的文件
parsed.writeAsText("data/out");// 控制台打印
parsed.print();

触发程序执行

需要调用StreamExecutionEnvironment 的execute()、executeAsync()方法来触发程序执行

execute()方法将等待作业完成,然后返回一个JobExecutionResult,其中包含执行时间和累加器结果。

JobExecutionResult result = env.execute();

如果不想等待作业完成,使用executeAsync() 方法来触发作业异步执行。它会返回一个 JobClient,可以通过它与刚刚提交的作业进行通信。

JobClient jobClient = env.executeAsync();
JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

示例

public static void main(String[] args) throws Exception {// 获取运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从文件中读取数据DataStreamSource<String> text = env.readTextFile("data/test.text");// 应用转换算子DataStream<Integer> parsed = text.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) {int number = Integer.parseInt(value);System.out.println("number = " + number);return number;}});// 简单skin:将DataStream以文本格式写入path指定的文件parsed.writeAsText("data/out");// 控制台打印parsed.print();// 触发执行env.execute();}

Flink常见数据类型

原始数据类型:例如布尔值、整数(byte、short、int、long)、浮点数(float、double)和字符(char)字符串类型:表示为 Java 类型 String 或 scala 类型 String时间和日期类型:包括 Timestamp 和 Date,以及 Interval 类型,用于表示时间间隔数组类型:数组是同一类型的元素的有序集合元组类型:元组是不同类型的元素的有序集合列表类型:列表是具有相同元素类型的有序元素集合映射类型:映射是键值对的无序集合,键和值可以是任何类型POJO类型:POJO 是普通的 Java 对象,它们包含字段或属性,可以通过名称或 getter 和 setter 方法进行访问Row类型:Row 是一个有序的、命名的字段集合。与POJO类型类似,但没有setter 和getter方法可序列化类型:即实现 java.io.Serializable 接口的类型

基本数据类型

Flink支持Java中的所有基本数据类型,例如布尔值、整数(byte、short、int、long)、浮点数(float、double)和字符(char)。

在Flink中定义一个int类型的流

DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);

字符串类型

字符串类型在Flink中也很常见,可以使用Java或Scala中的String类型表示。

DataStream<String> stream = env.fromElements("hello", "world");

时间和日期类型

时间和日期类型包括DATE、TIME、TIMESTAMP类型,用于表示时间间隔。

DataStream<Tuple2<String, Timestamp>> stream = env.fromElements(Tuple2.of("event-1", new Timestamp(System.currentTimeMillis())),Tuple2.of("event-2", new Timestamp(System.currentTimeMillis() - 1000))
);

数组类型

数组是同一类型的元素的有序集合。包括基本数据类型数组(PRIMITIVE_ARRAY)和复杂数据类型数组(OBJECT_ARRAY。其中,基本数据类型数组可以是任意基本数据类型的数组,而复杂数据类型数组则可以是结构体或者嵌套的数组。

DataStream<int[]> stream = env.fromElements(new int[]{1, 2, 3}, new int[]{4, 5, 6});

元组类型

元组是复合类型,包含固定数量的各种类型的字段。Java API提供了从Tuple1到Tuple25,不支持空字段

元组是不同类型的元素的有序集合。也就是说元组的每个字段都可以是任意Flink 类型,包括更多元组,从而产生嵌套元组

DataStream<Tuple3<String, Integer, Double>> stream = env.fromElements(Tuple3.of("a", 1, 1.1),Tuple3.of("b", 2, 2.2)
);

列表类型

列表是具有相同元素类型的有序元素集合。

DataStream<List<String>> stream = env.fromElements(Arrays.asList("hello", "world"), Arrays.asList("foo", "bar"));

映射类型

映射是键值对的无序集合,键和值可以是任何类型。

Map<String, Integer> map1 = new HashMap<>();
map1.put("a", 1);
map1.put("b", 2);Map<String, Integer> map2 = new HashMap<>();
map2.put("c", 3);
map2.put("d", 4);DataStream<Map<String, Integer>> stream = env.fromElements(map1, map2);

POJO类型

POJO是普通的Java对象,它们包含字段或属性,可以通过名称或getter和setter方法进行访问

Flink对POJO 类型的要求如下:

类是公有public的
有一个无参的构造方法
所有属性都是公有public的,要么必须可通过 getter 和 setter 函数访问
所有属性的类型都是可以序列化的
public class Person {public String name;public int age;public String getName() { return name; }public void setName(String name) { this.name = name; }public int getAge() { return age; }public void setAge(int age) { this.age = age; }
}DataStream<Person> stream = env.fromElements(new Person("Alice", 25),new Person("Bob", 30)
);

Row类型

Row是一个有序的、命名的字段集合。与 POJO类型类似,但没有setter 和 getter 方法。可以认为是具有任意个字段的元组,并支持空字段。

可序列化类型

即实现 java.io.Serializable 接口的类型。

public class MySerializableClass implements Serializable {private int value;public MySerializableClass(int value)

类型提示

Flink的类型提示Type Hints机制,它可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。也就是说可以帮助Flink更好地理解数据集中元素的类型,从而提高程序的性能。

使用TypeHint或Types类来指定数据集元素的类型

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> input = env.fromCollection(Arrays.asList("a b", "b c", "c d"));SingleOutputStreamOperator<Tuple2<String, Integer>> sum = input.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}})// 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型,才能正确解析出完整数据.returns(new TypeHint<Tuple2<String, Integer>>() {})
//                .returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(value -> value.f0).sum(1);sum.print();env.execute();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/103747.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1.1 向量与线性组合

一、向量的基础知识 两个独立的数字 v 1 v_1 v1​ 和 v 2 v_2 v2​&#xff0c;将它们配对可以产生一个二维向量 v \boldsymbol{v} v&#xff1a; 列向量 v v [ v 1 v 2 ] v 1 v 的第一个分量 v 2 v 的第二个分量 \textbf{列向量}\,\boldsymbol v\kern 10pt\boldsymbol …

GPIO子系统(三)

1&#xff0c;简述 GPIO 资源是相对来说较为简单&#xff0c;而且比较通用&#xff08;比如 LED 灯&#xff09;&#xff0c;而 Linux 的 GPIO 驱动属于 Linux Driver 中较为容易上手的部分&#xff0c;但是简单归简单&#xff0c;在 Linux 系统中&#xff0c;要使用 GPIO 资源…

高级网络调试技巧:使用Charles Proxy捕获和修改HTTP/HTTPS请求

今天我将与大家分享一种强大的网络调试技巧&#xff0c;那就是使用Charles Proxy来捕获和修改HTTP/HTTPS请求。如果您是一位开发人员或者网络调试爱好者&#xff0c;那么这个工具肯定对您有着很大的帮助。接下来&#xff0c;让我们一起来学习如何使用Charles Proxy进行高级网络…

区块链加密虚拟货币交易平台安全解决方案

区块链机密货币交易锁遭入侵&#xff0c;安全存在隐患。使用泰雷兹Protect server HSM加密机&#xff0c;多方位保护您的数据&#xff0c;并通过集中化管理&#xff0c;安全的存储密钥。 引文部分&#xff1a; 损失7000万美元!黑客入侵香港区块链加密货币交易所 2023年9月&…

计算机毕业设计选什么题目好?springboot 健身房管理系统

✍✍计算机编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java实战 |…

github创建个人网页登录后404无法显示的问题

1.首先必须要有内容&#xff0c;默认是会找index.html文件&#xff0c;找不到该文件会找readme.md文件&#xff0c;也就是说最简单的方法是&#xff0c;创建了与用户名同名的repository后username.github.io后&#xff0c;添加一个readme.md文件&#xff0c;得在readme里打点字…

十四、【图章工具组】

文章目录 仿制图章图案图章 仿制图章 纺织图和章工具跟我们之前所用到的修补工具类似,需要我们先按住Alt键选住一块区域&#xff0c;然后调整它的硬度在用我们选择的区域去覆盖&#xff0c;需要注意的是&#xff0c;我们去做的时候尽量一笔覆盖我们想要遮住的区域: 图案图章…

Blender:对模型着色

Blender&#xff1a;使用立方体制作动漫头像-CSDN博客 上一步已经做了一个头像模型&#xff0c;我做的太丑了&#xff0c;就以这个外星人头像为例 首先切换到着色器编辑器 依次搜索&#xff1a;纹理坐标、映射、分离xyz和颜色渐变 这里的功能也是非常丰富和强大&#xff0c…

RFID拓展的相关问答

基于&#xff1a; Research Reading: Smart Parking Applications Using RFID Technology-CSDN博客这篇文章总结了无线射频识别&#xff08;RFID&#xff09;技术在自动化中的应用及其在停车场管理系统中的解决方案。文章提到&#xff0c;RFID技术在自动化中可以降低交易成本&…

Macos音乐制作:Ableton Live 11 Suite for Mac中文版

Ableton Live 11是一款数字音频工作站软件&#xff0c;用于音乐制作、录音、混音和现场演出。它由Ableton公司开发&#xff0c;是一款极其流行的音乐制作软件之一。 以下是Ableton Live 11的一些主要特点和功能&#xff1a; Comping功能&#xff1a;Live 11增加了Comping功能…

Python接口自动化-requests模块之post请求

一、源码解析 def post(url, dataNone, jsonNone, **kwargs):r"""Sends a POST request.:param url: URL for the new :class:Request object.:param data: (optional) Dictionary, list of tuples, bytes, or file-likeobject to send in the body of the :cl…

软件项目管理实践指南:有效规划、执行和控制

软件项目管理是使软件产品、应用程序和系统成功交付的重要规程。它有助于确保软件在预算内按时开发&#xff0c;同时满足客户的质量和功能需求。 软件项目管理是管理软件项目生命周期的一种有组织的方法&#xff0c;包括计划、开发、发布、维护和支持。它是在满足客户需求的同时…

RocketMQ为什么要保证订阅关系一致

这篇文章&#xff0c;笔者想聊聊 RocketMQ 最佳实践之一&#xff1a;保证订阅关系一致。 订阅关系一致指的是同一个消费者 Group ID 下所有 Consumer 实例所订阅的 Topic 、Tag 必须完全一致。 如果订阅关系不一致&#xff0c;消息消费的逻辑就会混乱&#xff0c;甚至导致消息丢…

EMNLP 2023 录用论文公布,速看NLP各领域最新SOTA方案

EMNLP 2023 近日公布了录用论文。 开始前以防有同学不了解这个会议&#xff0c;先简单介绍介绍&#xff1a;EMNLP 是NLP 四大顶会之一&#xff0c;ACL大家应该都很熟吧&#xff0c;EMNLP就是由 ACL 下属的SIGDAT小组主办的NLP领域顶级国际会议&#xff0c;一年举办一次。相较于…

C++11新特性(lambda,可变参数模板,包装器,bind)

lambda表达式是什么&#xff1f;包装器又是什么&#xff1f;有什么作用&#xff1f;莫急&#xff0c;此篇文章将详细带你探讨它们的作用。很多同学在学习时害怕这些东西&#xff0c;其实都是方便使用的工具&#xff0c;很多情况下我们学这些新的东西觉得麻烦&#xff0c;累赘&a…

uni-app开发微信小程序的报错[渲染层错误]排查及解决

一、报错信息 [渲染层错误] Framework nner error (expect FLOW INITIALCREATION end but get FLOW CREATE-NODE) 二、原因分析及解决方案 第一种 原因&#xff1a;基础库版本的原因导致的。 解决&#xff1a; 1.修改调试基础库版本 2.详情—>本地设置—>调试基础库…

扎根嵌入式行业需要什么学历文凭?

在嵌入式行业&#xff0c;学历并不是唯一关键。我本人拥有电子工程学士学位&#xff0c;但嵌入式行业更看重实际技能和经验。视频后方有免费的嵌入式学习资料&#xff0c;入门和进阶内容都涵盖其中。嵌入式行业一般接纳各种学历&#xff0c;从专科到本科到研究生&#xff0c;甚…

CentOS 安装MySQL 详细教程

参考:https://www.cnblogs.com/dotnetcrazy/p/10871352.html 参考:https://www.cnblogs.com/qiujz/p/13390811.html 参考:https://blog.csdn.net/darry1990/article/details/130419433 一、安装 一、进入安装目录 将账户切换到root账户下&#xff0c;进入local目录下 cd /usr…

通过商品ID获取淘宝天猫商品评论数据,淘宝商品评论接口,淘宝商品评论api接口

淘宝商品评论内容数据接口可以通过以下步骤获取&#xff1a; 登录淘宝开放平台&#xff0c;进入API管理控制台。在API管理控制台中创建一个应用&#xff0c;获取到应用的App Key和Secret Key。构造请求URL&#xff0c;请求URL由App Key和Secret Key拼接而成&#xff0c;请求UR…

VUE3页面截取部署后的二级目录地址

用vue3开发了一个项目&#xff0c;只能部署在根目录&#xff0c;不能加二级目录&#xff0c;后来网上找了解决方案&#xff0c;在vite.config.ts中增加base: ./,配置解决问题&#xff0c;参考下图&#xff1a; 但部署后要获取部署的二级目录地址切遇到问题&#xff0c;后来想了…