1、API说明
非并行数据源:
def fromElements[T: TypeInformation](data: T*): DataStream[T]
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T]
并行数据源:
def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T])
使用场景:
常用来调试代码使用
2、这是一个完整的入门案例
开发语言:Java1.8
Flink版本:flink1.17.0
package com.baidu.datastream.source;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.NumberSequenceIterator;import java.util.Arrays;
import java.util.List;// --------------------------------------------------------------------------------------------
// TODO 从集合中读取数据
// --------------------------------------------------------------------------------------------/** TODO 通过`读取Java集合中数据`来创建 DataStreamSource** 方法1:fromCollection* Collection、Iterator -> DataStreamSource* 方法2:fromElements* OUT... data -> DataStreamSource* 方法3:fromParallelCollection* SplittableIterator -> DataStreamSource* 重要提示:* fromCollection、fromElements 创建的是非并行source算子(并行度只能为1)* fromParallelCollection 创建的是并行算子(并行度>=1)* */public class ReadCollection {public static void main(String[] args) throws Exception {fromCollection();//fromElements();//fromParallelCollection();}public static void fromCollection() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.读取Java集合数据List<String> list = Arrays.asList("刘备", "张飞", "关羽", "赵云", "马超", "黄忠");env.fromCollection(list).print();// 3.触发程序执行env.execute();}public static void fromElements() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.读取给定的对象序列env.fromElements("刘备", "张飞", "关羽", "赵云", "马超", "黄忠").print();// 3.触发程序执行env.execute();}public static void fromParallelCollection() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 2.读取给定的对象序列NumberSequenceIterator numberSequenceIterator = new NumberSequenceIterator(1, 10);env.fromParallelCollection(numberSequenceIterator, Long.class).print();/** 注意: fromParallelCollection生成的source为并行算子* 集合中的数据会被平均分配到并行子任务中去* */// 3.触发程序执行env.execute();}
}