FlinkAPI_Environment_输入源_算子转化流程

Flink Environment

  1. getExecutionEnvironment()

    根据当前平台, 获取对应的执行环境, 若未设置并行度, 使用 flink-conf.yaml 中的并行度配置, 默认 1.
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
  2. createLocalEnviroment()

    创建本地环境, 并行度默认为 CPU 核数, 也可在构造函数中传参设置
    LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
    
  3. createRemoteEnviroment()

    创建远程环境, 将 jar 提交到远程环境执行
    StreamExecutionEnvironment remoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 7777, "/home/WordCount.jar");
    

Flink 输入源

  1. 使用集合数据作为输入源
    env.fromCollection(new ArrayList<>());
    env.fromElements(1, 2, 3);
    
  2. 使用文件作为输入源
    env.readTextFile("/home/test.txt");
    
  3. 使用消息队列作为输入源
    如下, 使用 Kafka 作为输入源
    引入连接器依赖:
    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version>
    </dependency>env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
    
  4. 用户自定义输入源(实现 SourceFunction 接口)
    主要用于测试, 定义假数据.

具体实操代码如下:

import com.regotto.entity.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.util.Arrays;
import java.util.Properties;
import java.util.Random;/*** @author regotto*/
public class SourceTest {private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();private static void readFromCollectionAndElement() {/*从集合中读取, SensorReading 自定义实体(String id, Long timestamp, Double temperature)*/DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(new SensorReading("1", 1111L, 35.1),new SensorReading("2", 2222L, 32.1),new SensorReading("3", 3333L, 33.1),new SensorReading("4", 12345L, 36.1)));DataStreamSource<Integer> elements = env.fromElements(1, 2, 3, 4, 5);dataStream.print("data");elements.print("int");}private static void readFromText() {DataStream<String> dataStream = env.readTextFile("D:\\sensor.txt");dataStream.print();}private static void readFromKafka() {Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9999");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));dataStream.print();}/*** 用户自定义输入源*/private static void readFromUserDefine() {// 实现 SourceFunction 接口, run 方法中定义数据并使用 collect 资源输出DataStream<SensorReading> dataStream = env.addSource(new SourceFunction<SensorReading>() {private volatile boolean running = true;public void run(SourceContext<SensorReading> ctx) throws Exception {Random random = new Random();while (running) {for (int i = 0; i < 10; i++) {ctx.collect(new SensorReading(i + "", System.currentTimeMillis(), random.nextGaussian()));}}}public void cancel() {running = false;}});dataStream.print();}public static void main(String[] args) throws Exception {readFromUserDefine();env.execute();}
}

Transform

映射转换算子

  1. map: 将数据一一映射
  2. flatMap: 将数据打散后进行映射
  3. filter: 对数据进行过滤

聚合转换算子

  1. keyBy: 聚合操作, 将一个流 hash 运算拆分为不相交的分区, 每个分区包含相同key
    滚动聚合: sum, min, max, minBy(), maxBy();
  2. reduce: 聚合操作, 合并当前元素与上次聚合的结果, 返回流包含所有聚合的结果

多流转换算子

  1. split 和 select: 根据某些特征将 DataStream 拆分为 2 个或 多个 DataStream
    split: 将 DataStream 打上标签.
    select: 将打上标签的 DataStream 进行一个拆分.
  2. Connect 和 CoMap: 2个 DataStream 包装为 1 个 DataStream
    connect: 包装后内部流依旧保持各自的状态, 流与流之间相互独立
    coMap/coFlatMap: 对 connect 操作后的流, 进行 map/flatMa 合并操作
  3. union: 将 2 个以上相同类型的 DataStream 合并为同一个流

具体实操代码如下:

import com.regotto.entity.SensorReading;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.Random;/*** @author regotto*/
public class TransformTest {private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();public static void main(String[] args) throws Exception {DataStream<String> dataStream = env.readTextFile("D:\\sensor.txt");env.setParallelism(1);// map, 映射操作, 将数据映射封装为 SensorReadingDataStream<SensorReading> map = dataStream.map(value -> {String[] fields = value.split(",");return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));});map.print("map");// flatMap, 将原来的数据打散然后映射dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {for (String s : value.split(",")) {out.collect(s);}}}).print("flatMap");// filter, 过滤器dataStream.filter((FilterFunction<String>) value -> value.startsWith("1")).print("filter");// map 进行滚动聚合求当前温度最大值, keyBy 可以用指定位置, 属性, 自定义 keySelectorKeyedStream<SensorReading, String> keyedStream = map.keyBy(SensorReading::getId);keyedStream.max("temperature").print("max temperature");// reduce 聚合, 求最大温度下的最大时间戳记录keyedStream.reduce(new ReduceFunction<SensorReading>() {@Overridepublic SensorReading reduce(SensorReading curData, SensorReading newData) throws Exception {return new SensorReading(curData.getId(), newData.getTimestamp(), Math.max(curData.getTemperature(), newData.getTemperature()));}}).print("最大温度下的最新时间");// split&select 根据温度把数据分为高温, 低温SplitStream<SensorReading> splitStream = keyedStream.split(new OutputSelector<SensorReading>() {@Overridepublic Iterable<String> select(SensorReading value) {return value.getTemperature() > 36 ? Collections.singletonList("high") : Collections.singletonList("low");}});DataStream<SensorReading> high = splitStream.select("high");DataStream<SensorReading> low = splitStream.select("low");DataStream<SensorReading> all = splitStream.select("high", "low");high.print("高温流");low.print("低温流");all.print("all");// connect&coMap, 将高温处理为二元组, 与低温进行合并, 输出状态信息ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStream =high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), value.getTemperature());}}).connect(low);connectedStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {@Overridepublic Object map1(Tuple2<String, Double> value) throws Exception {return new Tuple3<>(value.f0, value.f1, "高温报警");}@Overridepublic Object map2(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), "温度正常");}}).print("connect&coMap");// 使用 union 合并 hig, lowhigh.union(low, all).print("union");env.execute();}
}

算子运算转化图:
在这里插入图片描述

RichMapFunction

对于 MapFunction的增强, 可以获取 RuntimeContext, 一个运行上下文代表一个分区, 每个分区创建销毁都执行 open, close 操作, 对资源预处理, 资源销毁进行操作, 继承 RichMapFunction重写 open, close 实现资源预处理与回收操作. 使操作更为灵活, 其余 RichXXX操作同理.

遇到的问题

写函数的时候, 把匿名内部类简写为 lambda 表达式, 导致泛型擦除的问题, 出现报错:
The generic type parameters of ‘Collector’ are missing. In many cases lambda methods don’t provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the ‘org.apache.flink.api.common.functions.FlatMapFunction’ interface. Otherwise the type has to be specified explicitly using type information.

总结

数据在运算, 转化过程, 一定要搞清楚, 输入是啥, 输出是啥.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/468524.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第2章 Linux内核模块

宏内核和微内核继续前面第一章的知识&#xff0c;虽然有点啰嗦&#xff0c;既然啰嗦了就继续啰嗦下去吧&#xff0c;也是给第一章的内容增加解释。我们知道内核如果按种类来划分的话&#xff0c;可以分为宏内核和微内核&#xff0c;微内核是一个比较先进的内核&#xff0c;我不…

的注册表怎么才能删干净_油烟净化器怎么清洗才能清理干净呢?

油烟机的净化器的主要功能是过滤厨房里的油烟&#xff0c;因为它总是处理油烟&#xff0c;所以清洗净化器很麻烦&#xff0c;那么如何清洗呢&#xff1f;经常清洗油烟净化器是很有必要的&#xff0c;但清洗起来很麻烦&#xff0c;清洗起来也不容易。今天&#xff0c;我想告诉大…

ado 字符串变量

这次变量主要针对 Mfc 的 Cstring 类型的变量&#xff08;前面VC 链接Access 数据库 插入变量到表&#xff09; 思路; 1 把cstring 类型 转为 string 2 string 转 char 数组 3 sprintf 写入数组 string 转 char 数组函数[cpp]view plaincopyprint?char* zhuanhuan(std::strin…

周立功先生和他的AWorks团队招聘

我之前写的一篇文章&#xff0c;介绍了周立功先生&#xff0c;我记得那篇文章的阅读量非常多&#xff0c;也让我迎来一段小高潮&#xff0c;随着时间的推移&#xff0c;慢慢的增加了我对周立功先生的了解&#xff0c;我们很多人&#xff0c;像我吧&#xff0c;工作的时候&#…

mongodb python 大于_Python中使用MongoDB详解

作者&#xff1a;Zarten知乎专栏&#xff1a;Python爬虫深入详解知乎ID&#xff1a; Zarten简介&#xff1a; 互联网一线工作者&#xff0c;尊重原创并欢迎评论留言指出不足之处&#xff0c;也希望多些关注和点赞是给作者最好的鼓励 &#xff01;介绍MongoDB是一种面向文档型的…

这不是商业互吹,是学习的宝藏

学习如逆水行舟&#xff0c;不进则退&#xff1b;只有坚持不断的学习,才能保持进步。今天给大家精心挑选的这几个优质的公众号&#xff0c;在行业深耕已久&#xff0c;相信大家一定会有所收获&#xff0c;感兴趣的可以关注一下。互联网架构师 号主985计算机硕士毕业&#xff…

关于这些那些

关于篮球先说下&#xff0c;我刚才已经写完文章了&#xff0c;但是因为没有保存&#xff0c;浏览器想着周末早点回去休息就闪退了&#xff0c;把写好的文章给闪退没有了&#xff0c;这个真是拿起自己的坑砸死了自己&#xff0c;那种赶脚只有自己能够明白&#xff0c;真的是太难…

mysqldump 定时备份数据(全量)

MYSQL 数据库备份有很多种(cp、tar、lvm2、mysqldump、xtarbackup)等等&#xff0c;具体使用哪一个还要看你的数据规模。下面给出一个表 #摘自《学会用各种姿态备份Mysql数据库》 备份方法备份速度恢复速度便捷性功能一般用于cp快快一般、灵活性低很弱少量数据备份mysqldump慢慢…

第3章 Linux内核调试手段之内核打印

开始前面说的话在我写代码的生涯里&#xff0c;我看到过很多大神炫耀自己的调试手段&#xff0c;也看到很多大神写过非常厉害的代码&#xff0c;我认为&#xff0c;相比于写代码&#xff0c;调试更加重要&#xff0c;而那些能在写代码的时候就加入了自己的调试信息的&#xff0…

电源管理 解析_智能电源控制箱

智能电源控制箱?智能电源控制箱又被称之为&#xff1a;智能监控箱、智能设备箱、智能运维箱&#xff0c;智能电源控制箱的作用主要就是为视频监控打造良好的运行环境&#xff0c;保障视频监控系统稳定的运行。说到视频监控&#xff0c;大家都知道视频监控的故障率是比较高的&a…

centos7 开机后进去了命令行_Linux系统管理:开机启动流程(二)

CentOS71.BIOS(开机自检)2.MBR ( Master Boot Record 主引导记录)3.GRUB2 Bootloader&#xff08;引导菜单&#xff09;4.Kernel&#xff08;内核引导&#xff09;5.Systemd &#xff08;不再使用init&#xff0c;改成了systemd&#xff09;6.Runlevel-Target &#xff08;运行…

一点小思考

我记得12年的时候&#xff0c;我就申请了微信公众号&#xff0c;那时候我的号主是TCL&#xff0c;是公司的同事用我的微信号申请公司的主体号&#xff0c;那时候我也有一点想法自己做个公众号写点文章&#xff0c;但是一直没有下决心&#xff0c;后来离职了&#xff0c;原来用我…

第3章 Linux内核调试手段之二

gdb 和 addr2line 调试内核模块内核模块插入内核链表的时候&#xff0c;会调用 init 里面的程序&#xff0c;我们上面给的那个例程的程序因为是经过多年风吹雨打的&#xff0c;但是如果你是一个萌新的码农&#xff0c;你能保证自己写的内核模块没有问题吗&#xff1f;所以就需要…

儿童手表怎么删除联系人_华为儿童手表4X体验:与你一起守护孩子的成长,带娃不再辛苦...

带娃是一件很辛苦的事情&#xff0c;从身体到精神的辛苦&#xff0c;相信各位家长都懂。对于照看正在成长期的孩子&#xff0c;更是让很多家长亲身感受"成长的烦恼"。孩子活泼好动的天性让很多家长都不放心&#xff0c;同时大部分家长又没有能力随时在身边守护&#…

Jmeter分布式测试过程中遇到的问题及摘抄前辈问题汇总

遇到的常见问题&#xff1a; 1、在Controller端上控制某台机器Run&#xff0c;提示"Bad call to remote host"。 解决方法&#xff1a;检查被控制机器上的jmeter-server有没有启动&#xff0c;或者JMeter.properties中remote_hosts的配置错误。2、Agent机器启动Jmete…

介绍一个我创业的朋友

大家好&#xff0c;今天给大家介绍一位跟我一样正在创业路上的朋友&#xff0c;不知道大家对我之前的文章还有没有印象&#xff0c;最近我在做一件特别有意思的事情&#xff0c;这件有意思的事情一直催促着我起床上班&#xff0c;每天都充满能量和动力&#xff0c;又累又充实的…

微信小程序装修解决方案ppt_装修公司微信小程序都有哪些功能?

传统装修行业存在收费不透明、消费者装修服务过程体验差等问题&#xff0c;传统装修已无法满足消费者的实际需求&#xff0c;面临无客量、无签约的困境。然而&#xff0c;随着移动互联网的发展&#xff0c;许多装饰企业利用微信小程序来帮助其疏导和推广。一个装修公司小程序能…

要用什么态度去面对生活?

最近&#xff0c;张扣扣的新闻铺天盖地&#xff0c;因为我非常喜欢逛知乎&#xff0c;刚好张扣扣的新闻这几天上了知乎热搜&#xff0c;所以我就关注上了&#xff0c;说实话&#xff0c;有点痛心&#xff0c;外人看起来很爽&#xff0c;忍辱负重好多年&#xff0c;终于把自己的…

第3章 Linux内核调试手段之三

之前的内核调试&#xff0c;我觉得应该再加上下面的东西&#xff0c;只有好好把下面的几个问题研究透了&#xff0c;你可能才是一个真正的内核高手&#xff0c;或者说&#xff0c;你还不是一个高手&#xff0c;就是一个内核的普通工程师&#xff0c;这个是我和Z总聊天说的&…

中希尔排序例题代码_【数据结构与算法】这或许是东半球分析十大排序算法最好的一篇文章...

码农有道 历史文章目录(请戳我)关于码农有道(请戳我)前言本文全长 14237 字&#xff0c;配有 70 张图片和动画&#xff0c;和你一起一步步看懂排序算法的运行过程。预计阅读时间 47 分钟&#xff0c;强烈建议先收藏然后通过电脑端进行阅读。No.1 冒泡排序冒泡排序无疑是最为出名…