4.2、Flink任务怎样读取文件中的数据

目录

1、前言

2、readTextFile(已过时,不推荐使用)

3、readFile(已过时,不推荐使用)

4、fromSource(FileSource) 推荐使用


1、前言

思考: 读取文件时可以设置哪些规则呢?

         1. 文件的格式(txt、csv、二进制...)        

         2. 文件的分隔符(按\n 分割)

         3. 是否需要监控文件变化(一次读取、持续读取)

基于以上规则,Flink为我们提供了非常灵活的 读取文件的方法


2、readTextFile(已过时,不推荐使用)

语法说明:

定义:def readTextFile(filePath: String): DataStream[String]def readTextFile(filePath: String, charsetName: String)功能:1.读取文本格式的文件2.按行读取(\n为分隔符),每行数据被封装为 DataStream 的一个元素3.可以指定字符集(默认为UDF-8)4.文件只会读取一次源码分析:public DataStreamSource<String> readTextFile(String filePath, String charsetName) {// 初始化 TextInputFormat对象TextInputFormat format = new TextInputFormat(new Path(filePath));  // 指定路径过滤器(使用默认过滤器)format.setFilesFilter(FilePathFilter.createDefaultFilter());  // 指定Flink中的数据类型    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; // 指定字符集format.setCharsetName(charsetName);     // 调用 readFile 方法return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); }

代码示例:

    public static void readTextFile() throws Exception {/** TODO 功能说明*   readTextFile(path) - 读取文本文件(一次读取),例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源env.readTextFile("data/1.txt").setParallelism(4).print();// 3.触发程序执行env.execute();}

3、readFile(已过时,不推荐使用)

语法说明:

定义:def readFile[T: TypeInformation](inputFormat: FileInputFormat[T],filePath: String,watchType: FileProcessingMode,interval: Long): DataStream[T] = {val typeInfo = implicitly[TypeInformation[T]] // 隐私转换(将java 数据类型 转换为 Flink数据类型)asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))}参数:inputFormat : 指定 FileInputFormat 实现类(根据文件类型 选择相适应的实例)filePath    : 指定 文件路径watchType   : 指定 读取模式(提供了2个枚举值)PROCESS_ONCE :只读取一次PROCESS_CONTINUOUSLY :按照指定周期扫描文件interval    : 指定 扫描文件的周期(单位为毫秒)功能:按照 指定的 文件格式 和 读取方式 读取数据
FileInputFormat 的实现类
FileInputFormat 的实现类

代码示例:

    public static void readFile() throws Exception {/** TODO 功能说明*    readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。*    readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)*       按照指定的文件输入格式读取(持续的读取)文件* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源String filePath = "data/1.txt";TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定过滤器textInputFormat.setCharsetName("UTF-8"); // 指定编码格式/** readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)* 参数说明:*      @inputFormat : 指定文件输入格式*      @filePath    : 指定文件路径*      @watchType   : 指定监控类型,提供了两种读取策略*            PROCESS_ONCE : 只读取一次*            PROCESS_CONTINUOUSLY :持续读取,监控新增数据*      @interval : 指定连续扫描文件的周期(毫秒)* 重点提示:*      1.如果watchType设置为PROCESS_CONTINUOUSLY时,当一个文件被修改时,将会导致重新读取该*           文件的全部内容,这将会打破`精确一次`的语义* */env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).print();// 3.触发程序执行env.execute();}

4、fromSource(FileSource) 推荐使用

    public static void FileSource() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/1.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "read fileSource").print();// 3.触发程序执行env.execute();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/29627.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

COCOS项目运行的时候图片模糊的原因

1、首先。用X坐标来分析&#xff0c;如果size*Anchor Position有小数&#xff0c;如上图57*0.5667695.5。这样就会导致x模糊。如果y同样计算结果包含小数&#xff0c;那么y也会模糊。xy同时模糊的情况是最模糊的。 2、如果当前node没有问题&#xff0c;那么就要检查上级node是…

跨境电商线上店铺智能装修系统源码开发

搭建一个跨境电商线上店铺智能装修系统源码开发需要以下步骤&#xff1a; 1. 确定需求&#xff1a;首先&#xff0c;需要明确线上店铺智能装修系统的具体需求。 2. 选择开发语言和框架&#xff1a;根据需求&#xff0c;选择合适的开发语言和框架进行开发&#xff0c;可以提高…

快速上手Vue开发:新一代Vue官方脚手架(create-vue)

文章目录 一、简介二、创建一个 Vue 应用1、前提条件2、安装命令3、可选插件 一、简介 create-vue 是 Vue3 的专用脚手架&#xff0c;使用 vite 创建 Vue3 的项目&#xff0c;也可以选择安装需要的各种插件&#xff0c;使用更简单。 二、创建一个 Vue 应用 官网地址&#xff…

JVM源码剖析之System.getProperty实现

版本信息 jdk版本&#xff1a;jdk8u40 操作系统&#xff1a;Mac System.getProperty 方法大家并不陌生&#xff0c;在各大框架源码中都能见到&#xff0c;项目中也能使用到&#xff0c;那么此篇文章将带你揭开System.getProperty方法底层实现。 System.getProperty 可以拿到…

Zookeeper特性与节点数据类型详解

CAP&Base理论 CAP理论 cap理论是指对于一个分布式计算系统来说&#xff0c;不可能满足以下三点: 一致性 &#xff1a; 在分布式环境中&#xff0c;一致性是指数据在多个副本之间是否能够保持一致的 特性&#xff0c;等同于所有节点访问同一份最新的数据副本。在一致性的需…

ChatGPT在大规模数据处理和信息管理中的应用如何?

ChatGPT作为一种强大的自然语言处理模型&#xff0c;在大规模数据处理和信息管理领域有着广泛的应用潜力。它可以利用其文本生成、文本理解和问答等能力&#xff0c;为数据分析、信息提取、知识管理等任务提供智能化的解决方案。以下将详细介绍ChatGPT在大规模数据处理和信息管…

Langchain module ‘hnswlib‘ has no attribute ‘Index‘ 错误解决

Langchain module hnswlib has no attribute Index 错误解决 使用 Langchain 操作 Chroma 向量数据库时&#xff0c;报一下错误信息&#xff0c; module hnswlib has no attribute Index试着重装了不同 hnswlib 版本没有解决&#xff0c;最后解决方法是&#xff0c;不要使用 h…

Apache DolphinScheduler 3.1.8 版本发布,修复 SeaTunnel 相关 Bug

近日&#xff0c;Apache DolphinScheduler 发布了 3.1.8 版本。此版本主要基于 3.1.7 版本进行了 bug 修复&#xff0c;共计修复 16 个 bug, 1 个 doc, 2 个 chore。 其中修复了以下几个较为重要的问题&#xff1a; 修复在构建 SeaTunnel 任务节点的参数时错误的判断条件修复 …

《24海南大学835软件工程考研经验贴》

1.经验之谈 首先&#xff0c;我是一个二战的考生&#xff0c;一战给我带来的经验有几点。第一&#xff0c;数学、专业课这两门越早复习越好&#xff0c;越拖到后面你就会发现来不及了&#xff0c;这学不完&#xff0c;那学不完的。第二、我认为是比较关键的一点&#xff0c;一定…

玩转graphQL

转载至酒仙桥的玩转graphQL - SecPulse.COM | 安全脉搏 前言 在测试中我发现了很多网站开始使用GraphQL技术&#xff0c;并且在测试中发现了其使用过程中存在的问题&#xff0c;那么&#xff0c;到底GraphQL是什么呢&#xff1f;了解了GraphQL后能帮助我们在渗透测试中发现哪些…

2 指针与数组:理解指针与数组的关系与转换

推荐最近在工作学习用的一款好用的智能助手AIRight 网址是www.airight.fun。 引言 在计算机科学中&#xff0c;指针与数组是两个基础且重要的概念。指针是一个用于存储变量地址的变量&#xff0c;而数组是一系列相同类型的元素的集合。虽然指针和数组看起来是两个不同的概念&…

在Mac本地搭建Kubernetes和Istio的详细教程

系列文章目录 文章目录 系列文章目录前言一、安装Docker和kind二、创建kind集群三、安装Istio四、部署示例应用五、配置Ingress Gateway六、访问示例应用总结前言 Kubernetes(简称K8s)是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。而Istio是一个服…

如何使用HAL库手动配置一个可输出可输入的引脚

在使用CubeMX配置GPIO口时的时候&#xff0c;对于某一个引脚只能选择用来输出或者输入。而有时我们需要在STM32上使用一些外设&#xff0c;比如DHT11温湿度传感器&#xff0c;其中的DATA口需要既能接收信号又能发送信号&#xff0c;所以我们可以参照CubeMX自动生成的GPIO初始化…

c++11 标准模板(STL)(std::basic_fstream)(三)

定义于头文件 <fstream> template< class CharT, class Traits std::char_traits<CharT> > class basic_fstream : public std::basic_iostream<CharT, Traits> 类模板 basic_fstream 实现基于文件的流上的高层输入/输出。它将 std::basic_i…

常见分布式ID解决方案总结:数据库、算法、开源组件

常见分布式ID解决方案总结 分布式ID分布式ID方案之数据库数据库主键自增数据库号段模式Redis自增MongoDB 分布式ID方案之算法UUIDSnowflake(雪花算法) 雪花算法的使用IdWorker工具类配置分布式ID生成器 分布式ID方案之开源组件uid- generator(百度)Tinyid&#xff08;滴滴&…

【LangChain学习】基于PDF文档构建问答知识库(三)实战整合 LangChain、OpenAI、FAISS等

接下来&#xff0c;我们开始在web框架上整合 LangChain、OpenAI、FAISS等。 一、PDF库 因为项目是基于PDF文档的&#xff0c;所以需要一些操作PDF的库&#xff0c;我们这边使用的是PyPDF2 from PyPDF2 import PdfReader# 获取pdf文件内容 def get_pdf_text(pdf):text "…

视频网站如何选择国外服务器?

​ 视频网站如何选择国外服务器? 地理位置&#xff1a;选择靠近目标用户群体的国外服务器位置是至关重要的。若用户主要集中在中国以外的地区&#xff0c;因您应选择位于用户所在地附近的服务商&#xff0c;以确保视频的传输速度。 带宽和速度&#xff1a;选择带宽足够且方便升…

如何解决 Elasticsearch 查询缓慢的问题以获得更好的用户体验

作者&#xff1a;Philipp Kahr Elasticsearch Service 用户的重要注意事项&#xff1a;目前&#xff0c;本文中描述的 Kibana 设置更改仅限于 Cloud 控制台&#xff0c;如果没有我们支持团队的手动干预&#xff0c;则无法进行配置。 我们的工程团队正在努力消除对这些设置的限制…

传统图像算法 - 运动目标检测之KNN运动背景分割算法

以下代码用OpenCV实现了视频中背景消除和提取的建模&#xff0c;涉及到KNN&#xff08;K近邻算法&#xff09;&#xff0c;整体效果比较好&#xff0c;可以用来进行运动状态分析。 原理如下&#xff1a; 背景建模&#xff1a;在背景分割的开始阶段&#xff0c;建立背景模型。 …

深入探索Linux文件链接技术:ln命令的妙用

当谈及 Linux 系统中的文件管理和链接技术&#xff0c;ln 命令是一个不可或缺的工具。ln 命令用于创建硬链接和软链接&#xff0c;它在 Linux 文件系统中发挥着重要作用&#xff0c;为用户提供了更大的灵活性和组织能力。在本文中&#xff0c;我们将深入探讨 ln 命令是什么&…