Flink—— Data Source 介绍

Data Source 简介

        Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。

        Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加数据来源。

        Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

Flink Data Source分类

Flink的数据源可以根据数据的来源和特性进行分类。以下是常见的Flink数据源分类:

集合数据源

        集合数据源(Collection Data Source):集合数据源指的是将本地的集合或数组作为输入数据的数据源。在Flink中,可以使用fromCollection、fromElements等方法将Java或Scala中的集合数据转化为数据流进行处理。

1、fromCollection(Collection) - 从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。

2、fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。

3、fromElements(T …) - 从给定的对象序列中创建数据流。所有对象类型必须相同。

4、fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。

5、generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import java.util.Arrays;
import java.util.List;public class CollectionDataSourceExample {public static void main(String[] args) throws Exception {final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 创建一个包含整数的集合List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);// 将集合转化为Flink的DataSetDataSet<Integer> dataset = env.fromCollection(data);// 打印数据集中的元素dataset.print();}
}

关于使用集合数据源的注意事项:

  1. 数据规模:集合数据源适用于小规模数据集。确保你的数据集在内存中能够合理存放,不至于导致内存溢出。

  2. 内存消耗:集合数据源会将所有数据存储在内存中,因此需要谨慎处理大型数据集,避免对内存资源造成过大压力。

  3. 并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高作业的执行效率。

  4. 调试和测试:集合数据源非常适合用于本地调试和测试,可以快速验证处理逻辑并观察输出结果。

使用集合数据源时需要注意这些方面,以确保作业能够稳定运行并获得良好的性能表现。

文件数据源

        文件数据源(File Data Source):文件数据源用于从文件系统中读取数据,可以是本地文件系统或分布式文件系统(如HDFS)。Flink提供了readTextFile、readCsvFile等方法来支持常见文件格式的数据读取。

1、readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。

2、readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。

3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class FileDataSourceExample {public static void main(String[] args) throws Exception {final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 从文件创建数据集String filePath = "path/to/your/file.txt";DataSet<String> text = env.readTextFile(filePath);// 打印文件中的内容text.print();}
}

关于使用文件数据源的注意事项:

  1. 文件路径:确保提供的文件路径是正确的,可以是本地文件系统路径,也可以是HDFS路径或其他支持的文件系统路径。

  2. 文件格式:Flink支持多种文件格式,包括文本文件、CSV文件、Parquet文件等。根据实际情况选择合适的文件格式进行读取。

  3. 并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高文件读取的并行处理能力。

  4. 文件分区:对于大型文件,可以考虑文件分区和并行读取,以加速数据的加载和处理过程。

  5. 文件读取性能:尽量避免频繁的小文件读取操作,因为这会增加文件系统的负担并降低整体性能。

使用文件数据源时需要注意以上方面,以确保能够有效地读取文件数据,并且提高作业的执行效率。

Socket数据源

        Socket数据源(Socket Data Source):Socket数据源允许通过网络套接字接收数据,通常用于测试和演示目的。Flink可以使用socketTextStream方法从TCP socket接收数据流。

socketTextStream(String hostname, int port) - 从 socket 读取。元素可以用分隔符切分。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SocketDataSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从socket创建数据流String hostname = "localhost";int port = 9999;env.socketTextStream(hostname, port).print();// 执行作业env.execute("Socket Data Source Example");}
}

关于使用Socket数据源的注意事项:

  1. 主机和端口:确保指定的主机和端口是正确的,并且能够与数据源通信。

  2. 网络延迟:由于Socket数据源涉及网络通信,因此可能受到网络延迟的影响。需要考虑网络性能对作业整体性能的影响。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据格式:需要确保从Socket接收到的数据能够被正确解析和处理,例如按行读取文本数据等。

  5. 容错机制:在使用Socket数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

使用Socket数据源时需要注意以上方面,以确保能够有效地接收数据并提高作业的执行效率。

自定义数据源

        自定义数据源(Custom Data Source):除了上述内置的数据源外,Flink还支持自定义数据源。用户可以实现自己的SourceFunction接口来定义特定的数据生成逻辑,例如从消息队列、数据库、传感器等实时数据源中读取数据。

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class CustomDataSource extends RichParallelSourceFunction<String> {private boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (running) {// 生成数据String data = generateData();// 发射数据ctx.collect(data);// 控制数据生成频率Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}private String generateData() {// 实现自定义的数据生成逻辑return "some data";}
}

        在这个示例中,我们创建了一个名为CustomDataSource的类,它继承自RichParallelSourceFunction并指定了数据类型为String。在run方法中,我们使用一个循环来生成数据并通过collect方法将数据发射出去。在cancel方法中,我们设置了一个标志位来控制数据源的运行状态。

关于使用自定义数据源的注意事项:

  1. 并行度设置:根据数据源的性质和数据量合理地设置并行度,以充分利用集群资源。

  2. 数据生成频率:确保数据生成的频率和速度能够适应作业的处理能力,避免数据源产生过快导致作业无法及时处理。

  3. 容错机制:在自定义数据源中,需要考虑作业的容错机制,例如在发生故障时如何正确处理和恢复。

  4. 数据格式:确保从自定义数据源产生的数据能够被正确解析和处理,符合作业的输入要求。

  5. 资源管理:需要确保自定义数据源的资源占用和生命周期管理,避免资源泄露或过度占用资源。

使用自定义数据源时需要考虑以上方面,并确保能够有效地产生数据并提高作业的执行效率。

Apache Kafka数据源

        Apache Kafka数据源(Kafka Data Source):作为流数据处理框架,Flink对Kafka提供了良好的集成支持。可以使用addSource方法结合Flink的Kafka Connector来从Kafka主题中读取数据。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaDataSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Kafka配置Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-consumer-group");// 创建Kafka数据流FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);kafkaDataStream.print();// 执行作业env.execute("Kafka Data Source Example");}
}

在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后设置Kafka的连接配置,包括bootstrap servers和consumer group id等。接下来,我们创建了一个FlinkKafkaConsumer对象,指定了要消费的topic以及数据的序列化方式,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。

关于使用Kafka数据源的注意事项:

  1. Kafka配置:确保指定的Kafka配置正确,并能够与Kafka集群进行通信。

  2. 序列化方式:根据实际情况选择合适的数据序列化方式,例如SimpleStringSchema、JSON、Avro等。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。

  5. 容错机制:在使用Kafka数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

使用Kafka数据源时需要注意以上方面,以确保能够有效地消费Kafka中的数据并提高作业的执行效率。

Apache Pulsar数据源

        Apache Pulsar数据源(Pulsar Data Source):类似于Kafka,Flink也集成了对Pulsar的支持,可以直接从Pulsar主题中读取数据。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException;public class PulsarDataSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String serviceUrl = "pulsar://localhost:6650";String topic = "my-topic";FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>(serviceUrl,topic,Schema.STRING);DataStream<String> pulsarDataStream = env.addSource(pulsarSource);pulsarDataStream.print();env.execute("Pulsar Data Source Example");}
}

        在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后指定了Pulsar的连接信息和要消费的topic。接下来,我们创建了一个FlinkPulsarSource对象,并指定了Pulsar的serviceUrl、topic以及数据的Schema,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。

关于使用Pulsar数据源的注意事项:

  1. Pulsar连接配置:确保指定的Pulsar连接信息正确,并能够与Pulsar集群进行通信。

  2. Schema设置:根据实际情况选择合适的数据Schema,例如STRING、JSON、AVRO等。

  3. 并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。

  4. 数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。

  5. 容错机制:在使用Pulsar数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。

        使用Pulsar数据源时需要注意以上方面,以确保能够有效地消费Pulsar中的数据并提高作业的执行效率。

        这些不同类型的数据源为Flink应用程序提供了灵活的数据接入方式,使得Flink可以轻松地处理不同来源和格式的数据。根据具体的业务需求和场景特点,可以选择合适的数据源类型来构建流处理和批处理应用程序。

更多消息资讯,请访问昂焱数据(https://www.ayshuju.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/137845.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue的状态管理有哪些?

在Vue中&#xff0c;有多种方式可以进行状态管理&#xff0c;以下是一些常见的Vue状态管理解决方案&#xff1a; 1&#xff1a;Vuex&#xff1a; Vuex是Vue官方提供的状态管理库&#xff0c;用于管理Vue应用程序中的状态。Vuex使用一个单一的全局状态树&#xff08;state tre…

logback

logback集成 springboot 集成了logback日志系统&#xff0c;默认读取logbak配置文件&#xff0c;配置文件的名称默认&#xff1a;logback-spring.xml&#xff0c;如果想自定义配置文件的名称&#xff0c;需要在application.yml配置文件中作如下配置来指定logback的配置文件 l…

python能用来做什么

Python是一种流行的编程语言&#xff0c;由Guido van Rossum创建&#xff0c;并于1991年发布。 它用于&#xff1a; Web开发&#xff08;服务器端&#xff09;&#xff1b; 软件开发&#xff0c;数学计算&#xff0c;系统脚本编写。 Python能做什么&#xff1f; Python可以…

LogBack的日志报错解决办法 org.xml.sax.SAXNotRecognizedException

报错信息如下&#xff1a; 18:00:57,395 |-ERROR in ch.qos.logback.core.joran.event.SaxEventRecorder48aaecc3 - Error during parser creation or parser configuration org.xml.sax.SAXNotRecognizedException: unrecognized feature http://xml.org/sax/features/extern…

Mybatis-plus 内部提供的 ServiceImpl<M extends BaseMapper<T>, T> 学习总结

作用 当集成Mybatis-Plus 后&#xff0c;我们的大部分数据库操作都可以通过 XxxxxMapper &#xff0c;同时 Mybatis-plus 在Mapper 提供基本操作方法的同时&#xff0c;也提供类基础的 serviceImpl 来帮助我们完成一些常见的基本操作。 使用 一般情况下&#xff0c;我们首先…

k8s之配置资源管理

一&#xff0c;secret Secret 是用来保存密码、token、密钥等敏感数据的 k8s 资源&#xff0c;这类数据虽然也可以存放在 Pod 或者镜像中&#xff0c;但是放在 Secret 中是为了更方便的控制如何使用数据&#xff0c;并减少暴露的风险。 有三种类型&#xff1a; 1&#xff0c;k…

多平台商品采集——API接口:支持淘宝、天猫、1688、拼多多等多个电商平台的爆款、销量、整店商品采集和淘客功能

item_get-获得淘宝商品详情 item_get_app-获得淘宝app商品详情原数据 item_get_pro-获得淘宝商品详情高级版 item_search-按关键字搜索淘宝商品 item_search_img-按图搜索淘宝商品&#xff08;拍立淘&#xff09; item_search_shop-获得店铺的所有商品 API请求地址 公共…

网络安全——

文章目录 网络安全TCP/IP与网络安全网络安全构成要素加密技术基础 网络安全 TCP/IP与网络安全 起初&#xff0c;TCP/IP只用于一个相对封闭的环境&#xff0c;之后才发展为并无太多限制、可以从远程访问更多资源的形式。因此&#xff0c;“安全”这个概念并没有引起人们太多的…

JAVA集合学习和源码分析

一、结构 List和Set继承了Collection接口&#xff0c;Collection继承了Iterable Object类是所有类的根类&#xff0c;包括集合类&#xff0c;集合类中的元素通常是对象&#xff0c;继承了Object类中的一些基本方法&#xff0c;例如toString()、equals()、hashCode()。 Collect…

linux rsyslog三种远程转发配置方式

搭建完成rsyslog日志服务器后,它的传输方式有三种,分别为:udp、tcp及relp(Reliable Event Logging Protocol) rsyslog三种远程转发配置 传输方式说明udp传输基于udp协议进行日志传输,可靠性较低,性能损耗最少;但在网络比较差或者接收服务器压力比较高的情况下,可能存在丢…

Lib文件和netlist的关系,DDC文件和netlist的区别

今天来说一说两个基础的概念&#xff1a; 1&#xff1a;综合用的Lib文件和netlist网表的关系 在数字IC设计中&#xff0c;Lib和网表都是非常重要的文件&#xff0c;但它们的作用和用途有很大的区别。 Lib文件&#xff0c;也称为库文件&#xff0c;主要包含单元级的信息&…

线性代数-Python-04:线性系统+高斯消元的实现

文章目录 1 线性系统2 高斯-jordon消元法的实现2.1 Matrix2.2 Vector2.3 线性系统 3 行最简形式4 线性方程组的结构5 线性方程组-通用高斯消元的实现5.1 global5.2 Vector-引入is_zero5.3 LinearSystem5.4 main 1 线性系统 2 高斯-jordon消元法的实现 2.1 Matrix from .Vecto…

前端设计模式之【迭代器模式】

文章目录 前言介绍实现接口优缺点应用场景后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;前端设计模式 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努力填补技术短板。(如果出现错误&a…

centerOS下docker 搭建IotDB集群

一、准备3台机器&#xff0c;IP地址依次为IP1&#xff0c;IP2&#xff0c;IP3&#xff0c;找一个目录下建立文件夹如下&#xff1a; ./data/confignode ./logs/confignode ./data/datanode ./logs/datanode二、在当前目录下建立docker-compose.yml文件&#xff0c;3台都要 1、…

比较PID控制和神经网络控制在机器人臂上的应用

机器人臂是自动化领域中常见的机器人形式&#xff0c;其精确控制对于实现复杂任务具有重要意义。在机器人臂的控制中&#xff0c;PID控制和神经网络控制是两种常用的控制方法。本文将比较PID控制和神经网络控制在机器人臂控制方面的应用&#xff0c;包括控制原理、优缺点以及在…

Angular 由一个bug说起之一:List / Grid的性能问题

在angular中&#xff0c;MatTable构建简单&#xff0c;使用范围广。但某些时候会出现卡顿 卡顿情景&#xff1a; 1&#xff1a;一次性请求太多的数据 2&#xff1a;一次性渲染太多数据&#xff0c;这会花费CPU很多时间 3&#xff1a;行内嵌套复杂的元素 4&#xff1a;使用过多的…

【Docker】Docker 网络

引言 Docker是一个开源的应用容器引擎&#xff0c;它允许开发者将应用及其依赖打包到一个可移植的容器中&#xff0c;然后发布到任何流行的Linux机器或Windows机器上&#xff0c;也可以实现虚拟化。Docker的主要优势之一是其网络功能&#xff0c;而网络功能的核心就是网络驱动…

HTTP协议详解-下(Tomcat)

如何构造 HTTP 请求 对于 GET 请求 地址栏直接输入点击收藏夹html 里的 link script img a…form 标签 通过 form 标签构造GET请求 <body><!-- 表单标签, 允许用户和服务器之间交互数据 --><!-- 提交的数据报以键值对的结果来组织 --><form action&quo…

jQuery 网页属性操作

jQuery提供了一些方法&#xff0c;例如 attr() 、 html() 、 text() 和 val() &#xff0c;它们充当了HTML文档中内容的获取器和设置器。 jQuery – 获取内容 jQuery提供了 html() 和 text() 方法来提取匹配的HTML元素的内容。以下是这两种方法的语法&#xff1a; $(selector…

18 Linux 阻塞和非阻塞 IO

一、阻塞和非阻塞 IO 1. 阻塞和非阻塞简介 这里的 IO 指 Input/Output&#xff08;输入/输出&#xff09;&#xff0c;是应用程序对驱动设备的输入/输出操作。当应用程序对设备驱动进行操作的时候&#xff0c;如果不能获取到设备资源&#xff0c;那么阻塞式 IO 就会将对应应用…