Flink Flink中的分流

一、什么是分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
在这里插入图片描述

二、基于filter算子的简单实现分流

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

package com.flink.DataStream.SplitStream;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkSplitStreamByFilter {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, "8081"));//.getExecutionEnvironment();//TODO 设置全局并行度为2streamExecutionEnvironment.setParallelism(2);DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO 先将输入流转为Integer类型SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map((input) -> {int i = Integer.parseInt(input);return i;});//TODO 使用匿名函数分流偶数流SingleOutputStreamOperator<Integer> ds1 = mapResult.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer a) throws Exception {return a % 2 == 0;}});//TODO 使用lamda表达式分流奇数流SingleOutputStreamOperator<Integer> ds2 = mapResult.filter((a) -> a % 2 == 1);ds1.print("偶数流");ds2.print("奇数流");streamExecutionEnvironment.execute();}
}

执行结果

奇数流:1> 1
偶数流:2> 2
偶数流:1> 2
偶数流:2> 4
奇数流:1> 3
奇数流:2> 1Process finished with exit code 130 (interrupted by signal 2: SIGINT)

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

三、使用测输出流

关于处理函数中侧输出流的用法,我们已经在 7.5 节做了详细介绍。简单来说,只需要调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的 id 和类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/167460.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面了一个4年经验的测试工程师,自动化都不会也要15k,我也是醉了····

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;欢迎加入我们一起学习&#xff01;&#x1f4e2;资源分享&#xff1a;耗时200小时精选的「软件测试」资…

表单考勤签到作业周期打卡打分评价评分小程序开源版开发

表单考勤签到作业周期打卡打分评价评分小程序开源版开发 表单打卡评分 表单签到功能&#xff1a;学生可以通过扫描二维码或输入签到码进行签到&#xff0c;方便教师进行考勤管理。 考勤功能&#xff1a;可以记录学生的出勤情况&#xff0c;并自动生成出勤率和缺勤次数等统计数…

ruoyi-plus-vue部署

安装虚拟机 部署文档 安装docker 安装docker 安装docker-compose 可能遇到的错误 Failed to deploy ruoyi/ruoyi-server:5.1.0 Dockerfile: ruoyi-admin/Dockerfile: Cant retrieve im age ID from build stream 安装 vim 命令 yum install vim -y 修改文件 vim /etc/re…

基于Springboot的冬奥会科普平台(有报告),Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的冬奥会科普平台&#xff08;有报告&#xff09;&#xff0c;Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层…

ELK+kafka+filebeat企业内部日志分析系统

1、组件介绍 1、Elasticsearch&#xff1a; 是一个基于Lucene的搜索服务器。提供搜集、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎&#xff0c;基于RESTful web接口。Elasticsearch是用Java开发的&#xff0c;并作为Apache许可条款下的开放源码发布…

创新研报|企业如何在不确定时期突破至新高度?

报告下载地址&#xff1a; 创新研报&#xff5c;BCG 2023最创新企业研究-在不确定时期跃升新高度 创新从未如此重要&#xff0c;领先的企业创新者正在证明这一切。BCG&#xff08;于2005年首次发布年度创新报告&#xff0c;其中列出了全球创新高管最钦佩的50家企业&#xf…

【MATLAB源码-第88期】基于matlab的灰狼优化算法(GWO)的栅格路径规划,输出做短路径图和适应度曲线

操作环境&#xff1a; MATLAB 2022a 1、算法描述 灰狼优化算法&#xff08;Grey Wolf Optimizer, GWO&#xff09;是一种模仿灰狼捕食行为的优化算法。灰狼是群居动物&#xff0c;有着严格的社会等级结构。在灰狼群体中&#xff0c;通常有三个等级&#xff1a;首领&#xff…

数据结构-归并排序+计数排序

1.归并排序 基本思想&#xff1a; 归并排序是建立在归并操作上的一种有效的排序算法,该算法是采用分治法的一个非常典型的应用。将已有序的子序列合并&#xff0c;得到完全有序的序列&#xff1b;即先使每个子序列有序&#xff0c;再使子序列段间有序。若将两个有序表合并成一个…

2023年P气瓶充装证模拟考试题库及P气瓶充装理论考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年P气瓶充装证模拟考试题库及P气瓶充装理论考试试题是由安全生产模拟考试一点通提供&#xff0c;P气瓶充装证模拟考试题库是根据P气瓶充装最新版教材&#xff0c;P气瓶充装大纲整理而成&#xff08;含2023年P气瓶…

pulseaudio是如何测试出音频延迟的

通常专业的音频设备生产厂商都有专业的设备来测试精确的音频链路延时。 那么没有专业设备怎么测试出音频延迟呢?如下图,我们可以看到pulseaudio可以测试出硬件音频延迟。 那么,他是怎么测试出硬件延迟的呢?他的理论依据是什么呢?接下来我带大伙一起探索一下。 /*占位…

红队攻防实战之从边界突破到漫游内网(无cs和msf)

也许有一天我们再相逢&#xff0c;睁大眼睛看清楚&#xff0c;我才是英雄。 本文首发于先知社区&#xff0c;原创作者即是本人 本篇文章目录 网络拓扑图&#xff1a; 本次红队攻防实战所需绘制的拓扑图如下&#xff1a; 边界突破 访问网站&#xff1a; http://xxx.xxx.xxx…

Linux 排查必看文件

目录 1. 登录日志 1.1 /var/log/wtmp 1.2 /var/log/btmp.* 1.3 /var/log/lastlog 1.4 /var/log/faillog 1.5 /var/log/secure 1.6 /var/log/auth.log 2. 系统日志 2.1 /var/log/cron.* 2.2 /var/log/syslog 2.3 /var/log/audit/audit.*log 3. 历史命令 3.1 ~/…

Docker 中OpenResty下载与使用

1Panel安装OpenResty 查看到就说明安装成功 部署项目 在http中添加&#xff1a; server { listen 8001; //端口号 server_name localhost; location / { root /admin; //项目路径 index index.html index.htm; …

Java二级医院区域HIS信息管理系统源码(SaaS服务)

一个好的HIS系统&#xff0c;要具有开放性&#xff0c;便于扩展升级&#xff0c;增加新的功能模块&#xff0c;支撑好医院的业务的拓展&#xff0c;而且可以反过来给医院赋能&#xff0c;最终向更多的患者提供更好的服务。 系统采用前后端分离架构&#xff0c;前端由Angular、J…

P1028 [NOIP2001 普及组] 数的计算

时刻记住一句话&#xff1a;写递归&#xff0c;1画图&#xff0c;2大脑放空&#xff01;&#xff01;&#xff01; 意思是&#xff0c;自己写递归题目&#xff0c;先用样例给的数据画图&#xff0c;然后想一个超级简单的思路&#xff0c;直接套上去就可以了。 上题干&#xff…

牛客 HJ106 字符逆序 golang实现

牛客题目算法连接 题目 golang 实现 package mainimport ("fmt""bufio""os" )func main() {str, _ : bufio.NewReader(os.Stdin).ReadString(\n)if len(str) 0 {return } else {newstr:""strLen:len(str)-1for i:strLen;i>0;i-…

生产环境出现问题,测试人如何做工作复盘?

很多时候我们能把大部分的Bug或一些部署等问题在业务上线之前就解决了&#xff0c;但由于某些因素&#xff0c;线上问题还是时而出现&#xff0c;影响业务生产甚至是公司效益。 避免线上问题的发生以及线上问题及时处理是测试人员的一项重要职责&#xff0c;如何快速地处理&am…

XG916Ⅱ轮式装载机后驱动桥设计机械设计CAD

wx供重浩&#xff1a;创享日记 对话框发送&#xff1a;装载机 获取完整论文报告工程源文件 本次设计内容为XG916Ⅱ装载机后驱动桥设计&#xff0c;大致上分为主传动的设计&#xff0c;差速器的设计&#xff0c;半轴的设计&#xff0c;最终传动的设计四大部分。其中主传动锥齿轮…

【多线程】Thread类的使用

目录 1.概述 2.Thread的常见构造方法 3.Thread的几个常见属性 4.启动一个线程-start() 5.中断一个线程 5.1通过共享的标记来进行沟通 5.2 调用 interrupt() 方法来通知 6.等待一个进程 7.获取当前线程引用 8.线程的状态 8.1所有状态 8.2线程状态和转移的意义 1.概述 …

Relabel与Metic Relabel

Prometheus支持多种方式的自动发现目标&#xff08;targets&#xff09;&#xff0c;以下是一些常见的自动发现方式&#xff1a; 静态配置&#xff1a;您可以在Prometheus配置文件中直接列出要监测的目标。这种方式适用于目标相对稳定的情况下&#xff0c;例如固定的服务器或设…