Flink Streaming Connector
Flink
是新一代流批统一的计算引擎,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。Connector
的作用就相当于一个连接器,连接Flink
计算引擎跟外界存储系统。Flink
里有以下几种方式,当然也不限于这几种方式可以跟外界进行数据交换:
【1】Flink
里面预定义了一些source
和sink
;
【2】Flink
内部也提供了一些Boundled connectors
;
【3】可以使用第三方Apache Bahir
项目中提供的连接器;
【4】是通过异步IO
方式;
预定义的 source 和 sink
Flink
里预定义了一部分source
和sink
。在这里分了几类。
基于文件的 source 和 sink
如果要从文本文件中读取数据,可以直接使用:
env.readTextFile(path)
就可以以文本的形式读取该文件中的内容。当然也可以使用:根据指定的fileInputFormat
格式读取文件中的内容。
env.readFile(fileInputFormat, path)
如果数据在Flink
内进行了一系列的计算,想把结果写出到文件里,也可以直接使用内部预定义的一些sink
,比如将结果已文本或csv
格式写出到文件中,可以使用DataStream
的writeAsText(path)
和DataSet
的writeAsCsv(path)
。
基于 Socket 的 Source 和 Sink
提供 Socket
的host name
及port
,可以直接用StreamExecutionEnvironment
预定的接口socketTextStream
创建基于Socket
的source
,从该 socket
中以文本的形式读取数据。当然如果想把结果写出到另外一个Socket
,也可以直接调用DataStream writeToSocket
。
//从 socket 中读取数据流
env.socketTextStream("localhost",777);
//输出至 socket
resultDataStream.writeToSocket("hadoop1",6666,new SimpleStringSchema())
基于内存 Collections、Iterators 的 Source
可以直接基于内存中的集合或者迭代器,调用StreamExecutionEnvironment fromCollection
、fromElements
构建相应的source
。结果数据也可以直接print
、printToError
的方式写出到标准输出或标准错误。详细也可以参考Flink
源码中提供的一些相对应的Examples
来查看异常预定义 source
和sink
的使用方法,例如WordCount
、SocketWindowWordCount
。
//从Java.util.Collection集合中读取数据作为数据源
ArrayList<String> list = new ArrayList<>(5);
list.add("flink");
list.add("scala");
list.add("spark");
list.add("hadoop");
list.add("hive");
env.fromCollection(list).print();//从Java.util.Collection集合中读取数据作为数据源env.fromElements("flink", "scala", "spark", "hadoop", "hive").print();
Bundled Connectors
Flink
里已经提供了一些绑定的Connector
,例如kafka source
和sink
,Es sink
等。读写kafka
、es
、rabbitMQ
时可以直接使用相应 connector
的api
即可。
虽然该部分是Flink
项目源代码里的一部分,但是真正意义上不算作Flink
引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交Job
时候需要注意,job
代码jar
包中一定要将相应的connetor
相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。
Apache Bahir 中的连接器
Apache Bahir
最初是从Apache Spark
中独立出来项目提供,以提供不限于Spark
相关的扩展 / 插件、连接器和其他可插入组件的实现。通过提供多样化的流连接器streaming connectors
和SQL
数据源扩展分析平台的覆盖面。如有需要写到flume
、redis
的需求的话,可以使用该项目提供的connector
。
Async I/O
流计算中经常需要与外部存储系统交互,比如需要关联MySQL
中的某个表。一般来说,如果用同步I/O
的方式,会造成系统中出现大的等待时间,影响吞吐和延迟。为了解决这个问题,异步I/O
可以并发处理多个请求,提高吞吐,减少延迟。Async
的原理可参考官方文档