Apache Flink DataStream transformation

Operators transform one or more DataStreams into a new DataStream. 

Operators操作转换一个或多个DataStream到一个新的DataStream 。

filter function

Scala

object DataStreamTransformationApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentfilterFunction(env)env.execute("DataStreamTransformationApp")}def filterFunction(env: StreamExecutionEnvironment): Unit = {val data=env.addSource(new CustomNonParallelSourceFunction)data.map(x=>{println("received:" + x)x}).filter(_%2 == 0).print().setParallelism(1)}}

数据源选择之前的任意一个数据源即可。

这里的map中没有做任何实质性的操作,filter中将所有的数都对2取模操作,打印结果如下:

received:1
received:2
2
received:3
received:4
4
received:5
received:6
6
received:7
received:8
8

说明map中得到的所有的数据,而在filter中进行了过滤操作。

Java

public static void filterFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data = env.addSource(new JavaCustomParallelSourceFunction());data.setParallelism(1).map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("received:"+value);return value;}}).filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long value) throws Exception {return value % 2==0;}}).print().setParallelism(1);}

需要先使用data.setParallelism(1)然后再进行map操作,否则会输出多次。因为我们用的是JavaCustomParallelSourceFunction(),而当我们使用JavaCustomNonParallelSourceFunction时,默认就是并行度1,可以不用设置。

Union Function

Scala

def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//    filterFunction(env)unionFunction(env)env.execute("DataStreamTransformationApp")}def unionFunction(env: StreamExecutionEnvironment): Unit = {val data01 = env.addSource(new CustomNonParallelSourceFunction)val data02 = env.addSource(new CustomNonParallelSourceFunction)data01.union(data02).print().setParallelism(1)}

http://www.developcls.com/qa/d8c20a9e2ba34964a440f96b88730f2e.html

Union操作将两个数据集综合起来,可以一同处理,上面打印输出如下:

1
1
2
2
3
3
4
4

Java

public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        filterFunction(environment);unionFunction(environment);environment.execute("JavaDataStreamTransformationApp");}public static void unionFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data1 = env.addSource(new JavaCustomNonParallelSourceFunction());DataStreamSource<Long> data2 = env.addSource(new JavaCustomNonParallelSourceFunction());data1.union(data2).print().setParallelism(1);}

Split  Select  Function

Scala

split可以将一个流拆成多个流,select可以从多个流中进行选择处理的流。

def splitSelectFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)val split = data.split(new OutputSelector[Long] {override def select(value: Long): lang.Iterable[String] = {val list = new util.ArrayList[String]()if (value % 2 == 0) {list.add("even")} else {list.add("odd")}list}})split.select("odd","even").print().setParallelism(1)}

可以根据选择的名称来处理数据。

Java

public static void splitSelectFunction(StreamExecutionEnvironment env) {DataStreamSource<Long> data = env.addSource(new JavaCustomNonParallelSourceFunction());SplitStream<Long> split = data.split(new OutputSelector<Long>() {@Overridepublic Iterable<String> select(Long value) {List<String> output = new ArrayList<>();if (value % 2 == 0) {output.add("odd");} else {output.add("even");}return output;}});split.select("odd").print().setParallelism(1);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/6853.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用OpenCV图像处理技巧之白平衡算法(一)

1. 引言 欢迎继续来到我们的图像处理系列&#xff0c;在这里我们将探讨白平衡的关键技术。如果大家曾经拍过一张看起来暗淡、褪色或颜色不自然的照片&#xff0c;那么此时大家就需要了解到白平衡技术的重要性。在本文中&#xff0c;我们将深入探讨白平衡的概念&#xff0c;并探…

idea+springboot+jpa+maven+jquery+mysql进销存管理系统源码

ideaspringbootjpamavenjquerymysql进销存管理系统 一、系统介绍1.环境配置 二、系统展示1. 管理员登录2.首页3.采购订单4.收货入库5. 采购退货6. 商品入库7. 商品出库8. 库存查询9.商品移库10.库存盘点11.销售订单12.发货出库13.销售退货14.商品查询15. 供应商查询16.客户查询…

微信小程序将接口返回的文件流预览导出Excel文件并转发

把接口url替换就可以用了 exportExcel () {wx.request({url: importMyApply, //这个地方是你获取二进制流的接口地址method: POST,responseType: "arraybuffer", //特别注意的是此处是请求文件流必须加上的属性&#xff0c;不然你导出到手机上的时候打不开&#xff…

8.4Java EE——基于注解的AOP实现

Spring AOP的注解 元素 描述 @Aspect 配置切面 @Pointcut 配置切点 @Before 配置前置通知 @After 配置后置通知 @Around 配置环绕方式 @AfterReturning 配置返回通知 @AfterThrowing 配置异常通知 下面通过一个案例演示基于注解的AOP的实现,案例具体实现步骤如下。 1、创建A…

Jetson Orin Nano 平台适配IMX585 camera驱动调试记录

1. 前言 Jetson Orin Nano Devkit适配imx585 camera 使用argus_camera捕获流,图片是黑色的 用示波器来测量mipi信号,信号正常 Jetpack版本: sensor参数: dts配置: mode0 { /* */ mclk_khz = “24000”; num_lanes = “4”; tegra_sinterface = “serial_c”; ph…

“VCMessage”任务意外失败

从网上查到很多都是说设置这个位置&#xff0c;但是我的已经是对的&#xff0c;还是出现 “VCMessage”任务意外失败这个错误。 又查到一个人说解决方法是更正OutputPath或从父级继承&#xff1a;右键单击项目,然后转到"属性">"链接器">"常规&q…

mac brew安装 node 踩坑日记- n切换node不生效

最近用了一个旧电脑开发&#xff0c;发现里面node管理混乱&#xff0c;有nvm、n和homebrew&#xff0c;导致切换node 切换不了&#xff0c;开发也有莫名其妙的错误。所以我打算重新装一下node&#xff0c;使用n做为管理工具。 1. 删除nvm cd ~ rm -rf .nvm2. 删除n sudo rm -…

GAMS---典型优化模型和算法介绍、GAMS安装和介绍、GAMS程序编写、GAMS程序调试、实际应用算例演示与经验分享

优化分析是很多领域中都要面临的一个重要问题&#xff0c;求解优化问题的一般做法是&#xff1a;建立模型、编写算法、求解计算。常见的问题类型有线性规划、非线性规划、混合整数规划、混合整数非线性规划、二次规划等&#xff0c;优化算法包括人工智能算法和内点法等数学类优…

什么是RESTful API

什么是RESTful API RESTful API是利用HTTP请求访问或使用数据的应用程序接口&#xff08;API&#xff09;的体系结构样式。这些数据可用于GET&#xff0c;PUT&#xff0c;POST和DELETE数据类型&#xff0c;这些数据类型指的是与资源相关的操作读取、更新、创建和删除。 网站的…

Vue移动端项目--瑞幸咖啡重构优化

来了客官&#xff0c;好久不见&#xff01; 从年初开始&#xff0c;就有个想法&#xff0c;想着把之前做过的项目重新整理一下。毕竟今时不同往日&#xff0c;从现在的角度去看曾经做过的项目&#xff0c;倒是觉得有很多稚嫩的地方。毕竟无论做什么都是熟能生巧&#xff0c;由浅…

字符串 (2)--- 前缀函数与 KMP 算法

/* https://www.luogu.com.cn/problem/UVA455 最小周期&#xff1a; n - pi[n -1] */ #include <iostream> #include <string> #include <vector> using namespace std; vector<int> prefix_fun(string s) { int len s.length(); /…

基于java在线点餐系统设计与实现

基于java在线点餐系统设计与实现 随着科学技术与经济的快速发展&#xff0c;网络信息技术也有了显著的提升与进步&#xff0c;当今的社会是一个集数字化&#xff0c;网络化&#xff0c;信息化的&#xff0c;并且是以网络为核心的现代化社会。伴随信息互联网的高速成长&#xf…

深度学习——批标准化Batch Normalization

什么是批标准化&#xff1f; 批标准化&#xff08;Batch Normalization&#xff09;是深度学习中常用的一种技术&#xff0c;旨在加速神经网络的训练过程并提高模型的收敛速度。 批标准化通过在神经网络的每一层中对输入数据进行标准化来实现。具体而言&#xff0c;对于每个输…

Linux基本指令操作

登陆指令&#xff08;云服务器版&#xff09; 当我们获取公网IP地址后&#xff0c;我们就可以打开xshell。 此时会有这样的界面&#xff0c;我们若是想的登陆&#xff0c;则需要输入以下的指令 ssh 用户名公网IP地址 然后会跳出以下的窗口 接着输入密码——密码便是先前定好…

微服务安全简介

​由于其可扩展性、灵活性和敏捷性&#xff0c;微服务架构已经变得越来越受欢迎。然而&#xff0c;随着这种架构的分布和复杂性增加&#xff0c;确保强大的安全措施变得至关重要。微服务的安全性超越了传统的方法&#xff0c;需要采用全面的策略来保护免受不断演变的威胁和漏洞…

CentOS 7 x86_64 制作openssh 9.3p2 rpm包修复安全漏洞 —— 筑梦之路

最近openssh 暴露出一个安全漏洞CVE-2023-38408&#xff0c;以下是相关资讯&#xff1a; 2023年7月19日&#xff0c;OpenSSH发布紧急安全补丁&#xff0c;以解决OpenSSH ssh-agent转发中存在安全漏洞远程执行CVE-2023-38408。漏洞由Qualys威胁研究单位(TRU)发现。 OpenSSH 是Se…

Promise 讲解,js知识,es6

文章目录 一、Promise的三种状态1. 初始态pending2. 成功态fulfilled&#xff0c;调用resolve方法3. 失败态rejected&#xff0c;调用reject方法 二、Promise的方法then方法catch方法 三、async和awaitasync 函数await 表达式 四、代码举例帮助理解1、Promise的值通过then方法获…

网络设备身份鉴别使用TACACS+和RADIUS

TACACS&#xff08;Terminal Access Controller Access Control System Plus&#xff09;和RADIUS&#xff08;Remote Authentication Dial-In User Service&#xff09;是两种常用的网络认证协议&#xff0c;用于管理网络设备的用户身份验证和访问控制。 TACACS是一种基于TCP/…

在vsCode 中执行Electron 项目时,出现中文乱码问题

问题&#xff1a;vscode 中执行Electron 项目时&#xff0c;控制台出现乱码 解决方法&#xff1a; 在 terminal 修改编码格式&#xff1a;65001代表UTF-8&#xff0c;936代表GBK

IC设计从业者必备的宝藏网站!

对于IC设计从业者而言&#xff0c;获取准确的学习资源&#xff0c;行业资讯直观重要&#xff0c;今日我们推荐ic行业专业的宝藏网站&#xff0c;希望对从业者有所帮助。 01-找开源项目的网站 GitHub除了Git代码仓库托管及基本的 Web管理界面以外&#xff0c;还提供了订阅、讨论…