4.2、Flink任务怎样读取文件中的数据

目录

1、前言

2、readTextFile(已过时,不推荐使用)

3、readFile(已过时,不推荐使用)

4、fromSource(FileSource) 推荐使用


1、前言

思考: 读取文件时可以设置哪些规则呢?

         1. 文件的格式(txt、csv、二进制...)        

         2. 文件的分隔符(按\n 分割)

         3. 是否需要监控文件变化(一次读取、持续读取)

基于以上规则,Flink为我们提供了非常灵活的 读取文件的方法


2、readTextFile(已过时,不推荐使用)

语法说明:

定义:def readTextFile(filePath: String): DataStream[String]def readTextFile(filePath: String, charsetName: String)功能:1.读取文本格式的文件2.按行读取(\n为分隔符),每行数据被封装为 DataStream 的一个元素3.可以指定字符集(默认为UDF-8)4.文件只会读取一次源码分析:public DataStreamSource<String> readTextFile(String filePath, String charsetName) {// 初始化 TextInputFormat对象TextInputFormat format = new TextInputFormat(new Path(filePath));  // 指定路径过滤器(使用默认过滤器)format.setFilesFilter(FilePathFilter.createDefaultFilter());  // 指定Flink中的数据类型    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; // 指定字符集format.setCharsetName(charsetName);     // 调用 readFile 方法return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); }

代码示例:

    public static void readTextFile() throws Exception {/** TODO 功能说明*   readTextFile(path) - 读取文本文件(一次读取),例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源env.readTextFile("data/1.txt").setParallelism(4).print();// 3.触发程序执行env.execute();}

3、readFile(已过时,不推荐使用)

语法说明:

定义:def readFile[T: TypeInformation](inputFormat: FileInputFormat[T],filePath: String,watchType: FileProcessingMode,interval: Long): DataStream[T] = {val typeInfo = implicitly[TypeInformation[T]] // 隐私转换(将java 数据类型 转换为 Flink数据类型)asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))}参数:inputFormat : 指定 FileInputFormat 实现类(根据文件类型 选择相适应的实例)filePath    : 指定 文件路径watchType   : 指定 读取模式(提供了2个枚举值)PROCESS_ONCE :只读取一次PROCESS_CONTINUOUSLY :按照指定周期扫描文件interval    : 指定 扫描文件的周期(单位为毫秒)功能:按照 指定的 文件格式 和 读取方式 读取数据
FileInputFormat 的实现类
FileInputFormat 的实现类

代码示例:

    public static void readFile() throws Exception {/** TODO 功能说明*    readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。*    readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)*       按照指定的文件输入格式读取(持续的读取)文件* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源String filePath = "data/1.txt";TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定过滤器textInputFormat.setCharsetName("UTF-8"); // 指定编码格式/** readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)* 参数说明:*      @inputFormat : 指定文件输入格式*      @filePath    : 指定文件路径*      @watchType   : 指定监控类型,提供了两种读取策略*            PROCESS_ONCE : 只读取一次*            PROCESS_CONTINUOUSLY :持续读取,监控新增数据*      @interval : 指定连续扫描文件的周期(毫秒)* 重点提示:*      1.如果watchType设置为PROCESS_CONTINUOUSLY时,当一个文件被修改时,将会导致重新读取该*           文件的全部内容,这将会打破`精确一次`的语义* */env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).print();// 3.触发程序执行env.execute();}

4、fromSource(FileSource) 推荐使用

    public static void FileSource() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/1.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "read fileSource").print();// 3.触发程序执行env.execute();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/29627.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

COCOS项目运行的时候图片模糊的原因

1、首先。用X坐标来分析&#xff0c;如果size*Anchor Position有小数&#xff0c;如上图57*0.5667695.5。这样就会导致x模糊。如果y同样计算结果包含小数&#xff0c;那么y也会模糊。xy同时模糊的情况是最模糊的。 2、如果当前node没有问题&#xff0c;那么就要检查上级node是…

跨境电商线上店铺智能装修系统源码开发

搭建一个跨境电商线上店铺智能装修系统源码开发需要以下步骤&#xff1a; 1. 确定需求&#xff1a;首先&#xff0c;需要明确线上店铺智能装修系统的具体需求。 2. 选择开发语言和框架&#xff1a;根据需求&#xff0c;选择合适的开发语言和框架进行开发&#xff0c;可以提高…

快速上手Vue开发:新一代Vue官方脚手架(create-vue)

文章目录 一、简介二、创建一个 Vue 应用1、前提条件2、安装命令3、可选插件 一、简介 create-vue 是 Vue3 的专用脚手架&#xff0c;使用 vite 创建 Vue3 的项目&#xff0c;也可以选择安装需要的各种插件&#xff0c;使用更简单。 二、创建一个 Vue 应用 官网地址&#xff…

Zookeeper特性与节点数据类型详解

CAP&Base理论 CAP理论 cap理论是指对于一个分布式计算系统来说&#xff0c;不可能满足以下三点: 一致性 &#xff1a; 在分布式环境中&#xff0c;一致性是指数据在多个副本之间是否能够保持一致的 特性&#xff0c;等同于所有节点访问同一份最新的数据副本。在一致性的需…

Apache DolphinScheduler 3.1.8 版本发布,修复 SeaTunnel 相关 Bug

近日&#xff0c;Apache DolphinScheduler 发布了 3.1.8 版本。此版本主要基于 3.1.7 版本进行了 bug 修复&#xff0c;共计修复 16 个 bug, 1 个 doc, 2 个 chore。 其中修复了以下几个较为重要的问题&#xff1a; 修复在构建 SeaTunnel 任务节点的参数时错误的判断条件修复 …

《24海南大学835软件工程考研经验贴》

1.经验之谈 首先&#xff0c;我是一个二战的考生&#xff0c;一战给我带来的经验有几点。第一&#xff0c;数学、专业课这两门越早复习越好&#xff0c;越拖到后面你就会发现来不及了&#xff0c;这学不完&#xff0c;那学不完的。第二、我认为是比较关键的一点&#xff0c;一定…

玩转graphQL

转载至酒仙桥的玩转graphQL - SecPulse.COM | 安全脉搏 前言 在测试中我发现了很多网站开始使用GraphQL技术&#xff0c;并且在测试中发现了其使用过程中存在的问题&#xff0c;那么&#xff0c;到底GraphQL是什么呢&#xff1f;了解了GraphQL后能帮助我们在渗透测试中发现哪些…

c++11 标准模板(STL)(std::basic_fstream)(三)

定义于头文件 <fstream> template< class CharT, class Traits std::char_traits<CharT> > class basic_fstream : public std::basic_iostream<CharT, Traits> 类模板 basic_fstream 实现基于文件的流上的高层输入/输出。它将 std::basic_i…

常见分布式ID解决方案总结:数据库、算法、开源组件

常见分布式ID解决方案总结 分布式ID分布式ID方案之数据库数据库主键自增数据库号段模式Redis自增MongoDB 分布式ID方案之算法UUIDSnowflake(雪花算法) 雪花算法的使用IdWorker工具类配置分布式ID生成器 分布式ID方案之开源组件uid- generator(百度)Tinyid&#xff08;滴滴&…

【LangChain学习】基于PDF文档构建问答知识库(三)实战整合 LangChain、OpenAI、FAISS等

接下来&#xff0c;我们开始在web框架上整合 LangChain、OpenAI、FAISS等。 一、PDF库 因为项目是基于PDF文档的&#xff0c;所以需要一些操作PDF的库&#xff0c;我们这边使用的是PyPDF2 from PyPDF2 import PdfReader# 获取pdf文件内容 def get_pdf_text(pdf):text "…

视频网站如何选择国外服务器?

​ 视频网站如何选择国外服务器? 地理位置&#xff1a;选择靠近目标用户群体的国外服务器位置是至关重要的。若用户主要集中在中国以外的地区&#xff0c;因您应选择位于用户所在地附近的服务商&#xff0c;以确保视频的传输速度。 带宽和速度&#xff1a;选择带宽足够且方便升…

如何解决 Elasticsearch 查询缓慢的问题以获得更好的用户体验

作者&#xff1a;Philipp Kahr Elasticsearch Service 用户的重要注意事项&#xff1a;目前&#xff0c;本文中描述的 Kibana 设置更改仅限于 Cloud 控制台&#xff0c;如果没有我们支持团队的手动干预&#xff0c;则无法进行配置。 我们的工程团队正在努力消除对这些设置的限制…

传统图像算法 - 运动目标检测之KNN运动背景分割算法

以下代码用OpenCV实现了视频中背景消除和提取的建模&#xff0c;涉及到KNN&#xff08;K近邻算法&#xff09;&#xff0c;整体效果比较好&#xff0c;可以用来进行运动状态分析。 原理如下&#xff1a; 背景建模&#xff1a;在背景分割的开始阶段&#xff0c;建立背景模型。 …

1999-2021年全国各地级市专利申请与获得情况、绿色专利申请与获得情况面板数据

1999-2021年全国各地级市专利申请与获得情况、绿色专利申请与获得情况面板数据 1、时间&#xff1a;2000-2021年 2、来源&#xff1a;国家知识产权局 3、范围&#xff1a;地级市&#xff08;具体每年地级市数量参看下文图片&#xff09; 4、指标&#xff1a;申请专利数&…

Jenkins 中 shell 脚本执行失败却不自行退出

Jenkins 中 执行 shell 脚本时&#xff0c;有时候 shell 执行失败了&#xff0c;或者判断结果是错误的&#xff0c;但是 Jenkins 执行完成后确提示成功 success 。 此时&#xff0c;可以通过条件判断来解决这个问题&#xff0c;让 Jenkins 强制退出并提示执行失败 failed 。 …

【MySQL系列】表约束的学习

「前言」文章内容大致是MySQL的表的约束。 「归属专栏」MySQL 「主页链接」个人主页 「笔者」枫叶先生(fy) 目录 一、MySQL表的约束1.1 空属性1.2 默认值&#xff08;default&#xff09;1.3 列描述&#xff08;comment&#xff09;1.4 zerofill1.5 主键&#xff08;primary ke…

stm32_断点调试无法进入串口接收中断

先说结果&#xff0c;可能是stm32调试功能/keil软件/调试器&#xff08;试过STLINK和JLINK两种&#xff09;的问题&#xff0c;不是代码&#xff1b; 1、入坑 配置完串口后&#xff0c;可以发送数据到串口助手&#xff0c;但不能接收数据并做处理&#xff0c;所以第一步&…

【阻止IE强制跳转到Edge浏览器】

由于微软开始限制用户使用Internet Explorer浏览网站&#xff0c;IE浏览器打开一些网页时会自动跳转到新版Edge浏览器&#xff0c;那应该怎么禁止跳转呢&#xff1f; 1、点击电脑左下角的“搜索框”或者按一下windows键。 2、输入“internet”&#xff0c;点击【Internet选项…

LabVIEW开发分段反射器测试台

LabVIEW开发分段反射器测试台 随着对太空的观察需求越来越远&#xff0c;而不是当前技术&#xff08;如哈勃望远镜&#xff09;所能达到的&#xff0c;有必要增加太空望远镜主镜的尺寸。但是&#xff0c;增加主镜像的大小时存在几个问题。随着反射镜尺寸的增加&#xff0c;制造…

vue3官网文档学习、复习笔记(快速上手)

目录 2.Attribute 绑定&#xff08;v-bind&#xff09; 3.事件监听&#xff08;v-on&#xff09; 4.表单绑定&#xff08;v-model&#xff09; 5.条件渲染&#xff08;v-if&#xff09; 6.列表渲染&#xff08;v-for&#xff09; all.value all.value.filter&#xff08;…