Flink Flink中的分流

一、什么是分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
在这里插入图片描述

二、基于filter算子的简单实现分流

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

package com.flink.DataStream.SplitStream;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkSplitStreamByFilter {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, "8081"));//.getExecutionEnvironment();//TODO 设置全局并行度为2streamExecutionEnvironment.setParallelism(2);DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO 先将输入流转为Integer类型SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map((input) -> {int i = Integer.parseInt(input);return i;});//TODO 使用匿名函数分流偶数流SingleOutputStreamOperator<Integer> ds1 = mapResult.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer a) throws Exception {return a % 2 == 0;}});//TODO 使用lamda表达式分流奇数流SingleOutputStreamOperator<Integer> ds2 = mapResult.filter((a) -> a % 2 == 1);ds1.print("偶数流");ds2.print("奇数流");streamExecutionEnvironment.execute();}
}

执行结果

奇数流:1> 1
偶数流:2> 2
偶数流:1> 2
偶数流:2> 4
奇数流:1> 3
奇数流:2> 1Process finished with exit code 130 (interrupted by signal 2: SIGINT)

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

三、使用测输出流

关于处理函数中侧输出流的用法,我们已经在 7.5 节做了详细介绍。简单来说,只需要调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的 id 和类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/167460.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面了一个4年经验的测试工程师,自动化都不会也要15k,我也是醉了····

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;欢迎加入我们一起学习&#xff01;&#x1f4e2;资源分享&#xff1a;耗时200小时精选的「软件测试」资…

表单考勤签到作业周期打卡打分评价评分小程序开源版开发

表单考勤签到作业周期打卡打分评价评分小程序开源版开发 表单打卡评分 表单签到功能&#xff1a;学生可以通过扫描二维码或输入签到码进行签到&#xff0c;方便教师进行考勤管理。 考勤功能&#xff1a;可以记录学生的出勤情况&#xff0c;并自动生成出勤率和缺勤次数等统计数…

本地缓存与分布式缓存

一、缓存的概念 在服务端编程当中&#xff0c;缓存主要是指将数据库的数据加载到内存中&#xff0c;之后对该数据的访问都在内存中完成&#xff0c;从而减少了对数据库的访问&#xff0c;解决了高并发场景中数据库容易成为性能瓶颈的问题&#xff1b;以及基于内存的访问速度高…

ruoyi-plus-vue部署

安装虚拟机 部署文档 安装docker 安装docker 安装docker-compose 可能遇到的错误 Failed to deploy ruoyi/ruoyi-server:5.1.0 Dockerfile: ruoyi-admin/Dockerfile: Cant retrieve im age ID from build stream 安装 vim 命令 yum install vim -y 修改文件 vim /etc/re…

flutter 无法从H5 WebView 访问摄像头和录音权限

AndroidManifest.xml需要在 中添加以下权限&#xff1a; <uses-permission android:name"android.permission.INTERNET"/> <uses-permission android:name"android.permission.CAMERA" /> <uses-permission android:name"android.per…

基于Springboot的冬奥会科普平台(有报告),Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的冬奥会科普平台&#xff08;有报告&#xff09;&#xff0c;Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层…

线性表之链式表

文章目录 主要内容一.单链表1.头插法建立单链表代码如下&#xff08;示例&#xff09;: 2.尾插法建立单链表代码如下&#xff08;示例&#xff09;: 3.按序号查找结点值代码如下&#xff08;示例&#xff09;: 4.按值查找表结点代码如下&#xff08;示例&#xff09;: 5.插入节…

ELK+kafka+filebeat企业内部日志分析系统

1、组件介绍 1、Elasticsearch&#xff1a; 是一个基于Lucene的搜索服务器。提供搜集、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎&#xff0c;基于RESTful web接口。Elasticsearch是用Java开发的&#xff0c;并作为Apache许可条款下的开放源码发布…

module ‘d2l.torch‘ has no attribute ‘train_ch3‘

解决方法&#xff1a; 方法1&#xff1a; 如果没有安装d2l&#xff0c;请安装 详细步骤见安装d2l 方法2&#xff1a; 先卸载旧的版本 pip uninstall d2l再下载新的版本&#xff0c;需要以管理员身份运行下载指令 pip install d2l0.17.5 --user完美解决&#xff01; ☺☺☺☺…

创新研报|企业如何在不确定时期突破至新高度?

报告下载地址&#xff1a; 创新研报&#xff5c;BCG 2023最创新企业研究-在不确定时期跃升新高度 创新从未如此重要&#xff0c;领先的企业创新者正在证明这一切。BCG&#xff08;于2005年首次发布年度创新报告&#xff0c;其中列出了全球创新高管最钦佩的50家企业&#xf…

2824. 统计和小于目标的下标对数目 --力扣 --JAVA

题目 给你一个下标从 0 开始长度为 n 的整数数组 nums 和一个整数 target &#xff0c;请你返回满足 0 < i < j < n 且 nums[i] nums[j] < target 的下标对 (i, j) 的数目。 解题思路 对数组进行排序&#xff0c;可以利用List自带的sort函数传递比较规则(代码中的…

【MATLAB源码-第88期】基于matlab的灰狼优化算法(GWO)的栅格路径规划,输出做短路径图和适应度曲线

操作环境&#xff1a; MATLAB 2022a 1、算法描述 灰狼优化算法&#xff08;Grey Wolf Optimizer, GWO&#xff09;是一种模仿灰狼捕食行为的优化算法。灰狼是群居动物&#xff0c;有着严格的社会等级结构。在灰狼群体中&#xff0c;通常有三个等级&#xff1a;首领&#xff…

数据结构-归并排序+计数排序

1.归并排序 基本思想&#xff1a; 归并排序是建立在归并操作上的一种有效的排序算法,该算法是采用分治法的一个非常典型的应用。将已有序的子序列合并&#xff0c;得到完全有序的序列&#xff1b;即先使每个子序列有序&#xff0c;再使子序列段间有序。若将两个有序表合并成一个…

2023年P气瓶充装证模拟考试题库及P气瓶充装理论考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年P气瓶充装证模拟考试题库及P气瓶充装理论考试试题是由安全生产模拟考试一点通提供&#xff0c;P气瓶充装证模拟考试题库是根据P气瓶充装最新版教材&#xff0c;P气瓶充装大纲整理而成&#xff08;含2023年P气瓶…

pulseaudio是如何测试出音频延迟的

通常专业的音频设备生产厂商都有专业的设备来测试精确的音频链路延时。 那么没有专业设备怎么测试出音频延迟呢?如下图,我们可以看到pulseaudio可以测试出硬件音频延迟。 那么,他是怎么测试出硬件延迟的呢?他的理论依据是什么呢?接下来我带大伙一起探索一下。 /*占位…

红队攻防实战之从边界突破到漫游内网(无cs和msf)

也许有一天我们再相逢&#xff0c;睁大眼睛看清楚&#xff0c;我才是英雄。 本文首发于先知社区&#xff0c;原创作者即是本人 本篇文章目录 网络拓扑图&#xff1a; 本次红队攻防实战所需绘制的拓扑图如下&#xff1a; 边界突破 访问网站&#xff1a; http://xxx.xxx.xxx…

leetcode刷题记录——1991. 找到数组的中间位置

找到数组的中间位置 给你一个下标从 0 开始的整数数组 nums &#xff0c;请你找到 最左边 的中间位置 middleIndex &#xff08;也就是所有可能中间位置下标最小的一个&#xff09;。 中间位置 middleIndex 是满足 nums[0] nums[1] … nums[middleIndex-1] nums[middleInd…

数据传输的思考

Wi-Fi&#xff1a;Wi-Fi是一种无线网络技术&#xff0c;可以用于无线互联网接入、局域网通信和数据传输等。Wi-Fi基于IEEE 802.11标准&#xff0c;通过无线信号传输数据&#xff0c;提供高速的无线网络连接。Wi-Fi可用于连接设备与路由器或者设备之间的直接通信&#xff0c;可以…

Linux 排查必看文件

目录 1. 登录日志 1.1 /var/log/wtmp 1.2 /var/log/btmp.* 1.3 /var/log/lastlog 1.4 /var/log/faillog 1.5 /var/log/secure 1.6 /var/log/auth.log 2. 系统日志 2.1 /var/log/cron.* 2.2 /var/log/syslog 2.3 /var/log/audit/audit.*log 3. 历史命令 3.1 ~/…

Docker 中OpenResty下载与使用

1Panel安装OpenResty 查看到就说明安装成功 部署项目 在http中添加&#xff1a; server { listen 8001; //端口号 server_name localhost; location / { root /admin; //项目路径 index index.html index.htm; …