大数据-玩转数据-Flink-Transform

一、Transform

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
在这里插入图片描述

二、基本转换算子

2.1、map(映射)

将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_map {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> s_num = svn.fromElements(1, 2, 3, 4, 5);s_num.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer values) throws Exception {return values*values;}}).print();}
}

2.2、filter(过滤)

根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

package com.lyh.flink05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_filter {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> elements = svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//        elements.filter(new FilterFunction<Integer>() {
//            @Override
//            public boolean filter(Integer integer) throws Exception {
//                if (integer % 2 == 0 )
//                        return false;
//                        else {
//                            return true;
//                }
//            }
//        }).print();elements.filter(value -> value % 2 != 0).print();}
}

2.3、flatMap(扁平映射)

消费一个元素并产生零个或多个元素

package com.lyh.flink05;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;import java.util.concurrent.ExecutionException;public class flatMap {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> dataSource = svn.fromElements(1, 2, 3);dataSource.flatMap(new FlatMapFunction<Integer, Integer>() {@Overridepublic void flatMap(Integer integer, Collector<Integer> collector) throws Exception {collector.collect(integer*integer);collector.collect(integer*integer*integer);}}).print();}
}

三、聚合算子

3.1、keyBy(按键分区)

把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1).print();env.execute("execute");}
}

3.2、sum,min,max,minBy,maxBy(简单聚合)

KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.sum(1).print();env.execute("execute");}
}

3.3、reduce(归约聚合)

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
在这里插入图片描述

package com.lyh.flink05;import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyByReduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L)).keyBy(0).reduce(new ReduceFunction<Tuple2<Long, Long>>() {@Overridepublic Tuple2<Long, Long> reduce(Tuple2<Long, Long> values1, Tuple2<Long, Long> values2) throws Exception {return new Tuple2<>(values1.f0,values1.f1+values2.f1);}}).print();env.execute();}
}

3.4、process(底层处理)

process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身),process 用法比较灵活,后面再做专门研究。

package com.lyh.flink05;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Process_s {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 3 == 0) {//测流数据ctx.output(new OutputTag<Integer>("3%0", TypeInformation.of(Integer.class)), value);}if (value % 3 == 1) {//测流数据ctx.output(new OutputTag<Integer>("3%1", TypeInformation.of(Integer.class)), value);}//主流 ,数据out.collect(value);}});DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));output1.print();env.execute();}
}

四、合流算子

4.1、connect(连接)

在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.protocol.types.Field;public class connect_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4, 5);DataStreamSource<String> data2 = env.fromElements("a", "b", "c");ConnectedStreams<Integer, String> data3 = data1.connect(data2);data3.getFirstInput().print("data1");data3.getSecondInput().print("data2");env.execute();}
}

4.2、union(合并)

package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class union_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> data2 = env.fromElements(555,666);DataStreamSource<Integer> data3 = env.fromElements(999);DataStream<Integer> data = data1.union(data2).union(data3);data.print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28204.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity 3D中使用tilemap创建关卡地图,瓦片间隙有漏缝

我们使用一张图片来作为Sprite图集&#xff0c;创建地形图&#xff1a; 运行后&#xff0c;会发现&#xff0c;瓦片之间似乎总是有间距。 检查了图片发现&#xff0c;并不是图片边界存在间隙。 最后发现问题是出在图片资源中的线性过滤属性值&#xff1a; 在设计界面就能够看…

ETHERCAT转CANOPEN协议网关canopen分类

1.产品功能 JM-ECT-COP是自主研发的一款 ETHERCAT 从站功能的通讯网关。该产品主要功能是将 ETHERCAT 网络和 CANOPEN 网络连接起来。 本网关连接到 ETHERCAT 总线中做为从站使用&#xff0c;连接到 CANOPEN 总线中做为主站使用。 3.技术参数 ETHERCAT 技术参数 网关做为 E…

【设计模式——学习笔记】23种设计模式——中介者模式Observer(原理讲解+应用场景介绍+案例介绍+Java代码实现)

文章目录 案例引入案例一普通实现中介者模式 案例二 介绍基础介绍登场角色尚硅谷 《图解设计模式》 案例实现案例一&#xff1a;智能家庭类图实现 案例二&#xff1a;登录页面逻辑实现说明类图实现 总结文章说明 案例引入 案例一 普通实现 在租房过程中&#xff0c;客户可能…

PHP最简单自定义自己的框架定义常量自动生成目录(三)

1、框架入口增加模块定义&#xff0c;实现多模块功能 index.php 定义模块 <?php //定义当前请求模块 define("MODULE",index); require "./core/KJ.php"; 创建后台模块admin.php <?php define("MODULE",admin); require "./cor…

每日一题——最小花费爬楼梯

题目 给定一个整数数组 cost &#xff0c;其中 cost[i] 是从楼梯第i 个台阶向上爬需要支付的费用&#xff0c;下标从0开始。一旦你支付此费用&#xff0c;即可选择向上爬一个或者两个台阶。 你可以选择从下标为 0 或下标为 1 的台阶开始爬楼梯。 请你计算并返回达到楼梯顶部的…

计算机网络(8) --- IP与IP协议

计算机网络&#xff08;7&#xff09; --- UDP协议和TCP协议_哈里沃克的博客-CSDN博客UDP协议和TCP协议https://blog.csdn.net/m0_63488627/article/details/132125374?spm1001.2014.3001.5501 目录 1.IP与IP协议 IP作用 协议​编辑 2.网段划分 DHCP划分 CIDR划分 特殊…

【果树农药喷洒机器人】Part5:基于深度相机与分割掩膜的果树冠层体积探测方法

文章目录 一、引言二、树冠体积测量对比方法2.1冠层体积人工测量法2.2冠层体积拟合测量法 三、基于深度相机与分割掩膜探测树冠体积方法3.1像素值与深度值的转换3.2树冠体积视觉探测法3.3实验分析 总结 一、引言 果树靶标探测是实现农药精准喷施的关键环节&#xff0c;本章以果…

Syncfusion Essential Edit for WPF Crack

Syncfusion Essential Edit for WPF Crack 在任何WPF应用程序中启用语法高亮显示。 Syncfusion Essential Edit for WPF是一款具有所有基本功能的编辑器&#xff0c;如文本编辑、剪切、复制和粘贴。它允许用户从各种文件格式打开文件并将其保存为各种文件格式。Syncfusion Esse…

java.lang.IllegalStateException: 不允许修改锁定的参数映射

问题描述&#xff1a; 这个问题是在我写javaweb项目&#xff0c;做敏感词过滤时出现的 需求是&#xff1a;如果是由 getParameter(String s) 得到的数据&#xff0c;可以直接修改value值&#xff0c;将含有敏感词的部分替换为 *** request.getParameterMap() 方法返回一个包…

1749. 任意子数组和的绝对值的最大值

诸神缄默不语-个人CSDN博文目录 力扣刷题笔记 文章目录 1. 暴力搜索2. 动态规划3. 前缀和 1. 暴力搜索 直接用2个指针从索引0开始找到最后一个索引&#xff0c;时间复杂度大概是 O ( n 2 ) O(n^2) O(n2)吧&#xff0c;总之这么搞不行&#xff0c;以下是我用Python写的一些典型…

设计模式之六:命令模式(封装调用)

命令模式可以将请求的对象和执行请求的对象解耦&#xff08;实际上是通过命令对象进行沟通&#xff0c;即解耦&#xff09;。&#xff08;个人感觉&#xff0c;这章讲的很一般&#xff09; 按个人理解来讲&#xff1a; 假如需要一个遥控器&#xff0c;遥控器有一个插口可以插上…

【JAVA】有关时间的操作在编程中如何实现?

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️初识JAVA】 文章目录 前言Date 类Date 类方法Data的缺陷实例获取当前日期时间日期比较java中设置date数据的显示格式 前言 在许多应用程序中&#xff0c;日期和时间的处理是必不可少的。Java提供了一…

【80天学习完《深入理解计算机系统》】第二天 2.2 整数的表示【有符号数,无符号数,符号数的扩展,有无符号数的转变】

专注 效率 记忆 预习 笔记 复习 做题 欢迎观看我的博客&#xff0c;如有问题交流&#xff0c;欢迎评论区留言&#xff0c;一定尽快回复&#xff01;&#xff08;大家可以去看我的专栏&#xff0c;是所有文章的目录&#xff09;   文章字体风格&#xff1a; 红色文字表示&#…

【Matlab】Elman神经网络遗传算法(Elman-GA)函数极值寻优——非线性函数求极值

往期博客&#x1f449; 【Matlab】BP神经网络遗传算法(BP-GA)函数极值寻优——非线性函数求极值 【Matlab】GRNN神经网络遗传算法(GRNN-GA)函数极值寻优——非线性函数求极值 【Matlab】RBF神经网络遗传算法(RBF-GA)函数极值寻优——非线性函数求极值 本篇博客将主要介绍Elman神…

【数据结构】“单链表”的练习题

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

Unity之ShaderGraph 节点介绍 数学节点

数学 高级Absolute&#xff08;绝对值&#xff09;Exponential&#xff08;幂&#xff09;Length&#xff08;长度&#xff09;Log&#xff08;对数&#xff09;Modulo&#xff08;余数&#xff09;Negate&#xff08;相反数&#xff09;Normalize&#xff08;标准化矢量&…

Django Rest_Framework(二)

文章目录 1. http请求响应1.1. 请求与响应1.1.1 Request1.1.1.1 常用属性1&#xff09;.data2&#xff09;.query_params3&#xff09;request._request 基本使用 1.1.2 Response1.1.2.1 构造方式1.1.2.2 response对象的属性1&#xff09;.data2&#xff09;.status_code3&…

技术应用:Docker安全性的最佳实验|聊聊工程化Docker

&#x1f525; 技术相关&#xff1a;《技术应用》 ⛺️ I Love you, like a fire! 文章目录 首先&#xff0c;使用Docker Hub控制访问其次&#xff0c;保护密钥写在最后 不可否认&#xff0c;能生存在互联网上的软件都是相互关联的&#xff0c;当我们开发一款应用程序时&#x…

Prometheus技术文档--基本安装-docker安装并挂载数据卷-《十分钟搭建》

一、查看可安装的版本 docker search prom/prometheus 二、拉取镜像 docker pull prom/prometheus 三、查看镜像 docker images 四、书写配置文件-以及创建挂载目录 宿主机挂载目录位置&#xff1a; 以及准备对应的挂载目录&#xff1a; /usr/local/docker/promethues/se…

字节C++后端面试总结

字节的面经,技术栈是 C++ 后端。 计算机网络 UDP和TCP区别 先说了概念一个是面向连接的基于字节流的可靠连接,一个是不需要连接的基于数据报的不可靠传输 然后说了几个小点,比如首部长度、应用场景、服务对象什么的。 补充: 还有一个很重要的点:UDP 的实时性比 TCP 好…