Flink DataSource介绍

介绍

Flink的Data Source(数据源、源算子)是Flink作业的起点,它定义了数据输入的来源。Flink可以从各种数据来源获取数据,例如文件系统、消息队列、数据库等。以下是对Flink Data Source的详细介绍:

概述:

  • Flink中的Data Source用于定义数据输入的来源。
  • 将数据源添加到Flink执行环境中,可以创建一个数据流。
  • Flink支持多种类型的数据源,包括内置数据源和自定义数据源。

内置数据源:

  • 基于集合构建:使用Flink的API(如fromCollection、fromElements等)将Java或Scala中的集合数据转化为数据流进行处理。
  • 基于文件构建:从文件系统中读取数据,支持多种文件格式,如CSV、JSON等。
  • 基于Socket构建:从Socket连接中读取数据,适用于实时流数据场景。

自定义数据源:

  • Flink允许用户通过实现SourceFunction接口或扩展RichParallelSourceFunction来自定义数据源。
  • 常见的自定义数据源包括从第三方系统连接器(如Kafka、RabbitMQ、MongoDB等)中读取数据。

添加数据源到Flink执行环境:

  • 使用StreamExecutionEnvironment.addSource(sourceFunction)方法将数据源添加到Flink执行环境中。
  • sourceFunction需要实现SourceFunction接口或扩展RichParallelSourceFunction。

数据流处理:

  • 一旦数据源被添加到Flink执行环境中,就可以创建一个数据流(DataStream)。
  • 接下来,可以使用Flink的各种算子(如map、filter、reduce等)对数据流进行转换处理。

输出结果:

  • 处理后的数据可以写入其他系统,如文件系统、数据库、消息队列等。
  • Flink支持多种输出方式,如使用DataStream的writeAsText、writeAsCsv等方法将数据写入文件,或使用Flink的连接器将数据写入Kafka、HBase等系统。

总之,Flink的Data Source是构建Flink数据流处理应用的重要组成部分。通过选择合适的数据源和输出方式,可以方便地构建高效、可靠的数据流处理应用。

样例

程序中添加数据源

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new SourceMySQL()).print();env.execute("Flink add mysql sourc");

Flink 已经提供了若干实现好了的SourceFunctions也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source,

stream sources

StreamExecutionEnvironment 加载Source

基于集合:

  1. fromCollection(Collection)
  2. fromCollection(Iterator, Class)
  3. fromElements(T …)
  4. fromParallelCollection(SplittableIterator, Class)
  5. generateSequence(from, to)
    例如:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.fromElements(new Event(1, "ba", 4.0),new Event(2, "st", 5.0),new Event(3, "fo", 6.0),...
);

文件

  1. readTextFile(String filePath)
  2. readTextFile(String filePath, String charsetName)
  3. readFile(FileInputFormat inputFormat, String filePath)
    样例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/file");

Socket

  1. socketTextStream(String hostname, int port)
  2. socketTextStream(String hostname, int port, String delimiter)
  3. socketTextStream(String hostname, int port, String delimiter, long maxRetry)
    样例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 8888) // 监听 localhost 的 8888端口过来的数据.flatMap(new Splitter()).keyBy(0).timeWindow(Time.seconds(5)).sum(1);

自定义资源

通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。
继承类

SourceFunction 非并行

SourceFunction 是一个用于定义数据源的函数接口。这个接口的实现通常负责从外部系统(如 Kafka、文件系统、数据库等)读取数据,并将这些数据作为 Flink 流处理或批处理作业的输入。

SourceFunction 通常与 Flink 的 DataStream API 一起使用,以定义和构建数据流处理任务。尽管 Flink 的内部实现和 API 可能会随着版本的更新而有所变化,但通常,一个 SourceFunction 会被实现为:

  1. 创建一个可以持续生成数据或等待外部数据到达的组件。
  2. 当有新数据到达时,将数据作为 Flink 的 DataStream 的一部分进行发射(emit)。

在 Flink 的内部,SourceFunction 可能会有多种不同的实现,具体取决于其要处理的数据源类型。例如,对于 Kafka 这样的消息队列,Flink 提供了专门的 Kafka 连接器和相应的 SourceFunction 实现,用于从 Kafka 主题中读取数据。

在实现自定义的 SourceFunction 时,你需要考虑以下几个方面:

  • 数据源的连接和断开连接逻辑。
  • 数据的读取和解析逻辑。
  • 如何在 Flink 运行时环境中优雅地处理可能的错误和失败。
  • 如何将读取的数据转换为 Flink 可以理解的格式(如 Tuple、POJO 或其他自定义类型)。

ParallelSourceFunction 并行

ParallelSourceFunction 是一个接口,用于定义并行数据源的行为。这个接口允许你创建自定义的数据源,这些数据源能够并行地读取数据并传递给 Flink 的数据处理管道。

ParallelSourceFunction 继承自 SourceFunction,但增加了并行处理的能力。当 Flink 任务需要并行处理多个数据流时,你可以通过实现 ParallelSourceFunction 来创建并行数据源。

Flink 还提供了一个 RichParallelSourceFunction 抽象类,它是 ParallelSourceFunction 的子类,并提供了更多的生命周期方法和上下文信息。使用 RichParallelSourceFunction 可以让你更容易地管理你的并行数据源,因为它提供了诸如 open()、close() 和 cancel() 等方法,这些方法可以在数据源的生命周期中的不同阶段被调用。

下面是一个简单的示例,演示了如何使用 RichParallelSourceFunction 创建一个并行数据源,该数据源生成递增的数字:

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;  
public class IncrementingNumberSource extends RichParallelSourceFunction<Long> {  private volatile boolean running = true;  private long count = 0L;  @Override  public void run(SourceContext<Long> ctx) throws Exception {  while (running) {  synchronized (ctx.getCheckpointLock()) {  ctx.collect(count++);  // 这里可以添加一些休眠或其他逻辑来控制数据的生成速度  Thread.sleep(100);  }  }  }  @Override  public void cancel() {  running = false;  }  
}

在这个示例中,IncrementingNumberSource 类继承了 RichParallelSourceFunction,并覆盖了 run() 和 cancel() 方法。在 run() 方法中,我们创建了一个无限循环来生成递增的数字,并使用 ctx.collect() 方法将每个数字发送到 Flink 的数据处理管道中。在 cancel() 方法中,我们设置了一个标志来停止 run() 方法中的循环,以便在需要时可以优雅地关闭数据源。

自定义资源DEMO

/*** Desc: 自定义 source mysql 数据*/
public class SourceMySQL extends RichSourceFunction<Map<String, Object>> {PreparedStatement ps;private Connection connection;/*** open 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = MySQLUtil.getConnection("com.mysql.jdbc.Driver","jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8","root","123456");String sql = "select * from ST;";ps = this.connection.prepareStatement(sql);}/*** 关闭连接和释放资源的动作*/@Overridepublic void close() throws Exception {super.close();if (connection != null) {connection.close();}if (ps != null) {ps.close();}}/*** DataStream 从run方法用来获取数据*/@Overridepublic void run(SourceContext<Map<String, Object>> ctx) throws Exception {ResultSet resultSet = ps.executeQuery();while (resultSet.next()) {Map<String, Object> rs = new HashMap<>();rs.put("id", resultSet.getInt("id"));rs.put("name", resultSet.getString("name").trim());ctx.collect(rs);}}@Overridepublic void cancel() {}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/8388.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

YTM32的片内flash应用答疑 - 释疑efm_sts[accerr]寄存器位

YTM32的片内flash应用答疑 - 释疑efm_sts[accerr]寄存器位 文章目录 YTM32的片内flash应用答疑 - 释疑efm_sts[accerr]寄存器位IntroductionConceptConclusion Introduction 之前有客户在基于ytm32b1le05微控制器做ota方案&#xff0c;其中在擦写片内flash模块时&#xff0c;需…

SpringCloudAlibaba:4.2云原生网关higress的基本使用

概述 简介 Higress是基于阿里内部的Envoy Gateway实践沉淀、以开源Istio Envoy为核心构建的下一代云原生网关&#xff0c; 实现了流量网关 微服务网关 安全网关三合一的高集成能力&#xff0c;深度集成Dubbo、Nacos、Sentinel等微服务技术栈 定位 在虚拟化时期的微服务架构…

138.随机链表的复制

/*** Definition for a Node.* struct Node {* int val;* struct Node *next;* struct Node *random;* };*/ typedef struct Node Node; struct Node* copyRandomList(struct Node* head) {Node* curhead;//拷贝节点插入到原节点后面while(cur){Node* copy(Node*)m…

基于参数化建模的3D产品组态实现

我们最近为荷兰设计师家具制造商 KILO 发布了基于网络的 3D 配置器的第一个生产版本。我们使用了 Salsita 3D 配置器&#xff0c;这是一个内部 SDK&#xff0c;使新的 3D 配置器的实施变得轻而易举。虽然它给我们带来了巨大帮助&#xff0c;但我们仍然面临一些有趣的挑战。 NSD…

C语言【文件操作】(1)

文章目录 1.为什么使用文件2.文件是什么&#xff1f;2.1程序文件2.2数据文件 3.二进制文件和文本文件4.文件的打开和关闭4.1流和标准流流标准流 4.2文件指针4.3文件的打开和关闭 结语 1.为什么使用文件 很简单 长久的存储数据 如果没有文件&#xff0c;我们写程序所产生的数据…

企业内部适用的五大知识库工具测评推荐

随着企业规模的不断扩大和业务复杂性的增加&#xff0c;要想更高效地进行企业管理就不得不使用知识库管理工具。本文将对五款企业内部适用的知识库工具进行测评推荐&#xff0c;帮助企业选择出更适合自己的知识库管理工具。 一、Helplook AI知识库 Helplook AI知识库是一款搭建…

【LeetCode刷题记录】124. 二叉树中的最大路径和

124 二叉树中的最大路径和 二叉树中的 路径 被定义为一条节点序列&#xff0c;序列中每对相邻节点之间都存在一条边。同一个节点在一条路径序列中 至多出现一次 。该路径 至少包含一个 节点&#xff0c;且不一定经过根节点。 路径和 是路径中各节点值的总和。 给你一个二叉树的…

【Java开发的我出书啦,各位同仁快过来围观】!!!

文章目录 &#x1f50a;博主介绍&#x1f964;本文内容出书的目的出书的过程书籍的内容 &#x1f4e5;博主的话 &#x1f50a;博主介绍 文章目录 &#x1f50a;博主介绍&#x1f964;本文内容出书的目的出书的过程书籍的内容 &#x1f4e5;博主的话 &#x1f33e;阅读前&#x…

「网络流 24 题」太空飞行计划 【最大权值闭合图】

「网络流 24 题」太空飞行计划 题意 有 n n n 个实验 和 m m m 个器械&#xff0c;每个实验都需要若干个指定的器械才能进行 实验 i i i 的盈利为 p i p_i pi​&#xff0c; 器械 j j j 的花销为 c j c_j cj​ 找出纯利润最大的实验计划 思路 这是非常典型的最大权值…

如何定时关闭程序

首先&#xff0c;需要用到的这个工具&#xff1a; 度娘网盘 提取码&#xff1a;qwu2 蓝奏云 提取码&#xff1a;2r1z 前面的流程步骤参考这条攻略&#xff1a; 快捷自由定时重启、注销、关机 只不过最后的地方&#xff0c;选择 关闭程序 &#xff0c;再填写程序名称即可 补…

全栈开发之路——前端篇(5)组件间通讯和接口等知识补充

全栈开发一条龙——前端篇 第一篇&#xff1a;框架确定、ide设置与项目创建 第二篇&#xff1a;介绍项目文件意义、组件结构与导入以及setup的引入。 第三篇&#xff1a;setup语法&#xff0c;设置响应式数据。 第四篇&#xff1a;数据绑定、计算属性和watch监视 辅助文档&…

Linux中的软连接和硬链接

一、软和硬链接连接 在Linux系统中&#xff0c;软连接&#xff08;符号链接&#xff09;和硬链接是文件系统中两种不同类型的链接&#xff0c;它们用于创建对文件的引用。下面详细解释这两种链接的特点和区别&#xff1a;、 软连接&#xff08;符号链接&#xff09; 定义&…

西红柿叶病害检测(yolov8模型,从图像、视频和摄像头三种路径识别检测,包含登陆页面、注册页面和检测页面)

1.基于最新的YOLOv8训练的西红柿病害检测模型&#xff0c;和基于PyQt5制作的可视西红柿病害系统&#xff0c;包含登陆页面、注册页面和检测页面&#xff0c;该系统可自动检Bacterial Spot, Early_Blight, Healthy, Late_blight, Leaf Mold, Target_Spot, black spot&#xff0c…

电商风口的最后一班快车?有些人甚至正在All in视频号!

我是王路飞。 当抖音、快手、淘宝上的商家还在内卷的时候&#xff0c;有些人却转移了阵地&#xff0c;搭上了电商风口的“最后一般列车”&#xff0c;甚至正在All in 视频号。 内容来源于【醒醒团队-电商王路飞】 随着“微视”想要三分天下野心的失败&#xff08;与抖音、快手…

SD-WAN:企业低成本与高性能组网的理想选择

在竞争日益激烈的市场中&#xff0c;企业要立足就需要一种既能控制成本又能保证卓越性能的网络解决方案。尽管传统组网方式稳定可靠&#xff0c;但其高昂的硬件投入和升级成本让许多企业望而却步&#xff0c;而SD-WAN为企业提供了一个理想的解决方案。 成本效益&#xff1a;精打…

[Kubernetes] KubeKey 部署 K8s v1.28.8

文章目录 1.K8s 部署方式2.操作系统基础配置3.安装部署 K8s4.验证 K8s 集群5.部署测试资源 1.K8s 部署方式 kubeadm: kubekey, sealos, kubespray二进制: kubeaszrancher 2.操作系统基础配置 主机名内网IP外网IPmaster192.168.66.2139.198.9.7node1192.168.66.3139.198.40.17…

独家专访辉羲智能章健勇:数据闭环定义芯片,帮车厂造中国版FSD

‍采访、编辑 |德新 撰文 |苗岭 辉羲智能&#xff0c;智能驾驶芯片行业最新的进入者。 这家公司成立于2022年&#xff0c;今年辉羲即将发布它的首款高阶智驾芯片。而另外两家智驾计算平台的头部公司地平线和黑芝麻已经在前不久分别向港交所提交了IPO申请。 国内的自动驾驶行…

【问题记录】Windows命令行中执行.exe文件路径有空格的解决方法

Windows命令行中执行.exe文件路径有空格的解决方法 一&#xff0c;问题现象&#xff1a;二&#xff0c;问题原因三&#xff0c;解决方法 一&#xff0c;问题现象&#xff1a; 在Windows命令行中执行路径中带空格的.exe文件&#xff1a; 会报错&#xff1a; 二&#xff0c;问…

数据库系统理论——关系数据库

文章目录 一、关系&#xff08;数据结构&#xff09;1、概述2、名词解释3、关系模式、关系数据库、关系数据库模式4、基本关系的性质 二、关系操作&#xff08;数据操作&#xff09;三、关系的完整性1、实体完整性2 、参照完整性3、用户自定义的完整性 四、关系代数五、习题 前…

Qwen大模型实践之初体验

Qwen大模型实践之初体验 测试机器, 使用InternStudio提供的开发机&#xff0c;配置如下&#xff1a; 部分资源详细信息&#xff1a; # CPUIntel(R) Xeon(R) Platinum 8369B CPU 2.90GHz# GPU(base) rootintern-studio-50014188:~# studio-smi Running studio-smi by vgpu-smiW…