Flink DataStream 编程入门

流处理是 Flink 的核心,流处理的数据集用 DataStream 表示。数据流从可以从各种各样的数据源中创建(消息队列、Socket 和 文件等),经过 DataStream 的各种 transform 操作,最终输出文件或者标准输出。这个过程跟之前文章中介绍的 Flink 程序基本骨架一样。本篇介绍 DataStream 相关的入门知识。

Flink 101

为了学习 Flink 的朋友能查看到每个例子的源码,我创建了一个 GitHub 项目:github.com/duma-repo/a… 这里会存放每一篇文章比较重要的示例的源码,目前支持 Java 和 Scala,仍在不断完善中。代码下载后可以在本地运行,也可以打包放在集群上运行。同时,欢迎各位将优质的资源提交到项目中。

简单示例

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WindowWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9999).flatMap(new Splitter()).keyBy(0).timeWindow(Time.seconds(5)).sum(1);dataStream.print();env.execute("Window WordCount");}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word: sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1)); //空格分割后,每个单词转换成 (word, 1) 二元组输出}}}}复制代码

这个例子跟之间介绍 WordCount 的例子类似,这里详细介绍下涉及的 API 和含义

  • 数据源:socketTextStream 是从 socket 创建的数据流,可以使用 nc -l 9000 创建 socket 客户端发送数据
  • transform:flatMap 将输入的数据按照空格分割后,扁平化处理(flat即为扁平的意思);keyBy 会按照指定的 key 进行分组,这里就是将单词作为 key;timeWindow 指定时间窗口,这里是 5s 处理一次;sum 是聚合函数,将分组好的单词个数求和
  • 输出:print 将处理完的数据输出到标准输出流中,可以在控制台看到输出的结果。调用 execute 方法提交 Job

Data Source

经过以上的介绍,我们知道常见的数据源有 socket、消息队列和文件等。对于常见的数据源 Flink 已经定义好了读取函数,接下来一一介绍。

基于文件

  • readTextFile(path):读文本文件,默认是文件类型是 TextInputFormat,并且返回类型是 String
  • readFile(fileInputFormat, path):读文件,需要指定输入文件的格式
  • readFile(fileInputFormat, path, watchType, interval, typeInfo):以上两个方法内部都会调用这个方法,参数说明:
    • fileInputFormat - 输入文件的类型
    • path - 输入文件路径
    • watchType - 取值为 FileProcessingMode.PROCESS_CONTINUOUSLY 和 FileProcessingMode.PROCESS_ONCE
      • FileProcessingMode.PROCESS_CONTINUOUSLY - 当输入路径下有文件被修改,整个路径下内容将会被重新处理
      • FileProcessingMode.PROCESS_ONCE - 只扫描一次,便退出。因此这种模式下输入数据只读取一次
    • interval - 依赖 watchType 参数,对于 FileProcessingMode.PROCESS_CONTINUOUSLY 每隔固定时间(单位:毫秒)检测路径下是否有新数据
    • typeInfo - 返回数据的类型

需要注意,在底层 Flink 将读文件的过程分为两个子任务 —— 文件监控和数据读取(reader)。监控任务由 1 个 task 实现,而读取的任务由多个 task 实现,数量与 Job 的并行度相同。监控任务的作用是扫描输入路径(周期性或者只扫描一次,取决于 watchType),当数据可以被处理时,会将数据分割成多个分片,将分片分配给下游的 reader 。一个分片只会被一个 reader 读取,一个 reader 可以读取多个分片。

基于 Socket

  • socketTextStream:从 socket 数据流中读数据

基于 Collection

  • fromCollection(Collection):从 Java.util.Collection 类型的数据中创建输入流,collection 中的所有元素类型必须相同
  • fromCollection(Iterator, Class):从 iterator (迭代器)中创建输入流,Class 参数指定从 iterator 中的数据类型
  • fromElements(T ...):从给定的参数中创建输入流, 所有参数类型必须相同
  • fromParallelCollection(SplittableIterator, Class):从 iterator 中创建并行的输入流,Class 指定 iterator 中的数据类型
  • generateSequence(from, to):从 from 至 to 之间的数据序列创建并行的数据流

自定义

  • addSource:可以自定义输入源,通过实现 SourceFunction 接口来自定义非并行的输入流;也可以实现 ParallelSourceFunction 接口或集成 RichParallelSourceFunction 类来自定义并行输入流,当然也可以定义好的数据源,如:Kafka,addSource(new FlinkKafkaConsumer08<>(...))

DataStream 的 transform

之前已经介绍了一些 transfrom 函数,如:map、flatMap 和 filter 等。同时还有窗口函数:window、timeWindow 等,聚合函数:sum、reduce 等。更多的 transform 函数以及使用将会单独写一篇文章介绍。

Data Sink

Data Sink 便是数据的输出。同 Data Source 类似, Flink 也内置了一些输出函数,如下:

  • writeAsText(path) / TextOutputFormat:将数据作为 String 类型输出到指定文件
  • writeAsCsv(...) / CsvOutputFormat:将 Tuple 类型输出到 ',' 分隔的 csv 类型的文件。行和列的分隔符可以通过参数配置,默认的为 '\n' 和 ','
  • print() / printToErr():将数据打印到标准输出流或者标准错误流,可以指定打印的前缀。
  • writeUsingOutputFormat() / FileOutputFormat:输出到 OutputFormat 类型指定的文件,支持对象到字节的转换。
  • writeToSocket:根据 SerializationSchema 将数据输出到 socket
  • addSink:自定义输出函数,如:自定义将数据输出到 Kafka

小结

本篇文章主要介绍了 Flink Streaming 编程的基本骨架。详细介绍了 Streaming 内置的 Data Source 和 DataSink 。下篇将继续介绍 Flink Streaming 编程涉及的基本概念。

代码地址: github.com/duma-repo/a…

欢迎关注公众号「渡码」


转载于:https://juejin.im/post/5d09814651882528fd530789

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/387544.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

腾讯手游如何提早揭露游戏外挂风险?

目前腾讯SR手游安全测试限期开放免费专家预约&#xff01;点击链接&#xff1a;手游安全测试立即预约&#xff01; 作者&#xff1a;sheldon&#xff0c;腾讯高级安全工程师 商业转载请联系腾讯WeTest获得授权&#xff0c;非商业转载请注明出处。 文中动图无法显示&#xff0c…

基于ARM Cortex-M0+ 的Bootloader 参考

源&#xff1a; 基于ARM Cortex-M0内核的bootloader程序升级原理及代码解析转载于:https://www.cnblogs.com/LittleTiger/p/10312784.html

小猿圈web前端之网站性能优化方案

现在前端不仅要能做出一个网站页面&#xff0c;还要把这个页面做的炫酷&#xff0c;那需要很大程度的优化&#xff0c;那么怎么优化才更好呢&#xff1f;小猿圈总结了一下自己优化的方案&#xff0c;感兴趣的朋友可以看一下。一般网站优化都是优化后台&#xff0c;如接口的响应…

下面介绍一个开源的OCR引擎Tesseract2。值得庆幸的是虽然是开源的但是它的识别率较高,并不比其他引擎差劲。网上介绍Tessnet2也是当时时间排名第三的识别引擎,只是后来慢慢不维护了,目前是G

下面介绍一个开源的OCR引擎Tesseract2。值得庆幸的是虽然是开源的但是它的识别率较高&#xff0c;并不比其他引擎差劲。网上介绍Tessnet2也是当时时间排名第三的识别引擎&#xff0c;只是后来慢慢不维护了&#xff0c;目前是Google在维护&#xff0c;大家都知道Google 在搞电子…

js 更改json的 key

let t data.map(item > {return{fee: item[费用],companyName1: item.companyName,remark1: item.remark,beginTime1: item.beginTime,endTime1: item.endTime}})console.log(t) 源地址&#xff1a;https://www.cnblogs.com/Marydon20170307/p/8676611.html转载于:https:/…

1.4版本上线(第八次会议)

在小组成员连夜赶工的奋斗下&#xff0c;终于在昨天深夜成功实现了UI界面功能 至此&#xff0c;我们的系统终于真正可实用而不是局限在命令行进行互动了 由于python嵌入数据库功能实现难度较大&#xff0c;迫于时间的局限性&#xff0c;我们选择了用json文件与txt文件进行替代&…

分UV教程

第一步 首先&#xff0c;打开一个练习场景“空中预警机1.max”&#xff08;这事小弟平时的练习做的不好献丑了&#xff09;。&#xff08;图01&#xff09; 图01 第二步 这里我们拿机翼来举例子&#xff0c;隐藏除机翼意外的其他模型。&#xff08;图02&#xff09; 图02 第三步…

k8s系列--- dashboard认证及分级授权

http://blog.itpub.net/28916011/viewspace-2215214/ 因版本不一样&#xff0c;略有改动 Dashboard官方地址&#xff1a; https://github.com/kubernetes/dashboard dashbord是作为一个pod来运行&#xff0c;需要serviceaccount账号来登录。 先给dashboad创建一个专用的认证信息…

JAVA项目开发

16年java软件开发经验&#xff0c;全职项目开发&#xff0c;项目可签合同、开普票和专票。 主要承接项目&#xff1a; 1、网站开发项目 自主开发千帆CMS动态发布系统&#xff0c;基于java/springboot2/jpa/easyui开发&#xff0c;简单易用&#xff0c;后台与前端分离&#xff0…

3dmax基本操作

1、基本操作平移视图&#xff08;你所说的移动&#xff09;&#xff1a;CTRLP&#xff0c;或者用&#xff0c;滚轮。按住鼠标滚轮不放拖动&#xff0c;就行了。旋转&#xff1a; ALT滚轮。按住ALT键不放&#xff0c;利用滚轮的移动&#xff08;滚轮也要按着不放&#xff09…

padding影响整个div的实际宽度

padding影响整个div的实际宽度 1.不让padding影响整个div的实际宽度 所以要设置css属性&#xff1a; box-sizing:box-sizingposted on 2019-01-25 16:58 玉貔貅 阅读(...) 评论(...) 编辑 收藏 转载于:https://www.cnblogs.com/yupixiu/p/10320564.html

unity3d 任务头上的血条

人物的名称与血条的绘制方法很简单&#xff0c;但是我们需要解决的问题是如何在3D世界中寻找合适的坐标。因为3D世界中的人物是会移动的&#xff0c;它是在3D世界中移动&#xff0c;并不是在2D平面中移动&#xff0c;但是我们需要将3D的人物坐标换算成2D平面中的坐标&#xff0…

如何在C#中使用Win32和其他库之三

具有内嵌字符数组的结构 某些函数接受具有内嵌字符数组的结构。例如&#xff0c;GetTimeZoneInformation() 函数接受指向以下结构的指针&#xff1a; typedef struct _TIME_ZONE_INFORMATION { LONG Bias; WCHAR StandardName[ 32 ]; SYSTEMTIME Standa…

unity3d 预制体

首先要说明一下什么是预制体&#xff1f; 在Unity3D里面我们叫它Prefab&#xff1b;我们也可以这样理解&#xff1a;当制作好了游戏组件&#xff08;场景中的任意一个gameobject &#xff09;,我们希望将它制作成一个组件模版&#xff0c;用于批量的套用工作&#xff0c;例如说…

Python小数据池,代码块

今日内容一些小的干货 一. id is 二. 代码块三. 小数据池四. 总结python小数据池&#xff0c;代码块的最详细、深入剖析 一. id is 二. 代码块三. 小数据池四. 总结一&#xff0c;id&#xff0c;is&#xff0c; 在Python中&#xff0c;id是什么&#xff1f;id是内存地址…

【Wax】使用Wax (framework方式,XCode 4.6)

前情提示&#xff1a;【Wax】使用Wax &#xff08;非framework方式&#xff0c;XCode 4.6&#xff09; 这次&#xff0c;将以framework的方式来使用Wax 那么&#xff0c;让我们开始吧&#xff01;&#xff01;&#xff01; 准备工作&#xff1a; 下载wax.framework&#xff1a;…

unity3d 简单动画

1&#xff0c;动画系统配置 创建游戏对象并添加Animation组件&#xff0c;然后将动画文件拖入组件。 进入动画文件的Debug属性面板 选中Legacy属性 选中游戏对象&#xff0c;打开Animation编辑窗口 添加动画变化属性 需改关键帧的属性值 配置完成后运行即可得到动画效果 2&…

人月神话阅读笔记(二)

今天对人月神话的正文部分进行了阅读&#xff0c;从人月神话这一部分中了解到缺乏合理的时间进度控制是造成滞后的主要原因&#xff0c;比其他任何事情影响的和还大&#xff0c;书中也对造成这种这种普遍灾难的原因进行了并进行了详细列举。 首先&#xff0c;我们对估算技术缺乏…

3dmax导出到unity3d下分割动画

1、在3dmax 导出时候&#xff0c;要导出FBX文件&#xff0c;同时包含动画&#xff0c;骨骼&#xff0c;皮肤等内容 2、把FBX文件导入到Unity3d后会默认有一个超长的大动画&#xff0c;就是一个整体的动画&#xff0c;如图Take001&#xff0c;这个时候要分割哪部分是跑&#xf…

华硕首款平板电脑周五开售

新浪科技讯北京时间3月21日晚间消息&#xff0c;华硕周一宣布&#xff0c;将于本周开售首款平板电脑EeePadTransformer。本周五&#xff0c;台湾地区用户将可以率先预定这款平板电脑&#xff0c;随后还将在全球其他国家和地区推出,悠语yoryu化妆品玻尿酸水润弹力面膜120ml补水保…