Flink Flink中的合流

一、Flink中的基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。

二、联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在这里插入图片描述
在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:

stream1.union(stream2, stream3, ...)

注意:union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。

代码实现:我们可以用下面的代码做一个简单测试:

package com.flink.DataStream.UnionStream;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkUnionStream {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);SingleOutputStreamOperator<Integer> source1 = streamExecutionEnvironment.socketTextStream("localhost", 1111).map(a -> Integer.parseInt(a));SingleOutputStreamOperator<Integer> source2 = streamExecutionEnvironment.socketTextStream("localhost", 2222).map(a -> Integer.parseInt(a));DataStreamSource<String> source3 = streamExecutionEnvironment.fromElements("3", "4", "5");DataStream<Integer> unionResult = source1.union(source2, source3.map(Integer::valueOf));unionResult.print();streamExecutionEnvironment.execute();}
}

在这里插入图片描述
在这里插入图片描述

三、连接(Connect)

为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的结果并不是DataStream,而是一个“连接流”。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中。
在这里插入图片描述

package com.flink.DataStream.UnionStream;import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class FlinkConnectStream {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);//TODO 定义数字流SingleOutputStreamOperator<Integer> source1 = streamExecutionEnvironment.socketTextStream("localhost", 1111).map(a -> Integer.parseInt(a));SingleOutputStreamOperator<String> source2 = streamExecutionEnvironment.socketTextStream("localhost", 2222);/**TODO 连接两个流一次只能连接 2 条流两条流的数据类型可以不一致所以得到的结果不再是一个DataStream,而是一个"连接流"ConnectedStreams连接后可以调用 map、flatmap、process 来处理,但是各处理各的*/ConnectedStreams<Integer, String> connectedStreams = source1.connect(source2);SingleOutputStreamOperator<Object> map = connectedStreams.map(new CoMapFunction<Integer, String, Object>() {@Overridepublic Object map1(Integer integer) throws Exception {return "来源于数字流" + integer.toString();}@Overridepublic Object map2(String s) throws Exception {return "来源于字符流" + s;}});map.print();streamExecutionEnvironment.execute();}
}

在这里插入图片描述
上面的代码中,ConnectedStreams 有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用.map()方法时传入的不再是一个简单的MapFunction,而是一个 CoMapFunction,表示分别对两条流中的数据执行 map 操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1()就是对第一条流中数据的 map 操作,.map2()则是针对第二条流。

四、CoProcessFunction

与 CoMapFunction 类似,如果是调用.map()就需要传入一个 CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个 CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是 processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。
值得一提的是,ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个 ConnectedStreams:
connectedStreams.keyBy(keySelector1, keySelector2);这里传入两个参数 keySelector1 和 keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的 keyBy 用法完全一致。ConnectedStreams 进行keyBy 操作,其实就是把两条流中 key 相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/181391.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开源免费跨平台数据同步工具-Syncthing

Syncthing是一款开源免费跨平台的文件同步工具&#xff0c;是基于P2P技术实现设备间的文件同步&#xff0c;所以它的同步是去中心化的&#xff0c;即你并不需要一个服务器&#xff0c;故不需要担心这个中心的服务器给你带来的种种限制&#xff0c;而且类似于torrent协议&#x…

卡码网语言基础课 | 16. 出现频率最高的字母

目录 一、 哈希表 二、 编写解题 2.1 统计出现次数 2.2 解答 通过本次练习&#xff0c;将学习到C中哈希表的基础知识 题目&#xff1a; 给定一个只包含小写字母的字符串&#xff0c;统计字符串中每个字母出现的频率&#xff0c;并找出出现频率最高的字母&#xff0c;如果…

比尔盖茨:GPT-5不会比GPT-4好多少,生成式AI已达到极限

比尔盖茨一句爆料&#xff0c;成为机器学习社区热议焦点&#xff1a; “GPT-5不会比GPT-4好多少。” 虽然他已不再正式参与微软的日常运营&#xff0c;但仍在担任顾问&#xff0c;并且熟悉OpenAI领导团队的想法。 消息来自德国《商报》&#xff08;Handelsblatt&#xff09;对…

搞定这三个问题 伦敦金止损就没问题

笔者多次强调&#xff0c;做伦敦金交易&#xff0c;重要的是风险控制。而止损是我们风险控制中一个很重要的概念。设定好止损&#xff0c;就是风险控制的好开始。下面我们通过三个问题&#xff0c;来解决止损的问题。 问题一&#xff0c;你的止损位在哪里&#xff1f;要做止损&…

数据结构与算法之美学习笔记:27 | 递归树:如何借助树来求解递归算法的时间复杂度?

目录 前言递归树与时间复杂度分析实战一&#xff1a;分析快速排序的时间复杂度实战二&#xff1a;分析斐波那契数列的时间复杂度实战三&#xff1a;分析全排列的时间复杂度内容小结 前言 本节课程思维导图&#xff1a; 今天&#xff0c;我们来讲这种数据结构的一种特殊应用&am…

YOLO改进系列之SKNet注意力机制

摘要 视皮层神经元的感受野大小受刺激的调节即对于不同的刺激&#xff0c;卷积核的大小应该不同&#xff0c;但在构建CNN时一般在同一层只采用一种卷积核&#xff0c;很少考虑因采用不同卷积核。于是SKNet被提出&#xff0c;在SKNet中&#xff0c;不同大小的感受视野&#xff…

深度学习框架配置

目录 1. 配置cuda环境 1.1. 安装cuda和cudnn 1.1.1. 显卡驱动配置 1.1.2. 下载安装cuda 1.1.3. 下载cudnn&#xff0c;将解压后文件复制到cuda目录下 1.2. 验证是否安装成功 2. 配置conda环境 2.1. 安装anaconda 2.2. conda换源 2.3. 创建conda环境 2.4. pip换源 3.…

【工作记录】spider-flow使用插件连接并操作mongodb数据库

前言 前面说过&#xff0c;spider-flow有着非常优秀的插件机制&#xff0c;可以通过插件实现功能的扩展。前面有小伙伴问到mongodb的集成使用&#xff0c;本文就来梳理下spider-flow中使用mongodb插件的过程&#xff0c;其实非常简单。 PS: spider-flow的作者已经实现了一些常…

飞翔的小鸟小游戏

主类 package APP;import 框架.GameFrame;public class GameApp {public static void main(String[] args) {//游戏的入口new GameFrame();} }场景实物 package 框架;import 图导.Constant; import 图导.GameUtil;import java.awt.*; import java.awt.image.BufferedImage; …

C语言——数字金字塔

实现函数输出n行数字金字塔 #define _CRT_SECURE_NO_WARNINGS 1#include <stdio.h>void pyramid(int n) {int i,j,k;for (i1; i<n; i){//输出左边空格&#xff0c;空格数为n-i for (j1; j<n-i; j){printf(" "); } //每一行左边空格输完后输出数字&#…

STM32g70开启定时器死机原因

在做低功耗产品时&#xff0c;检查发现由于之前开启了BOOTLOADER升级程序&#xff0c;修改了中断向量FALSH起始地址&#xff0c;只在KEIL TARGET IROM1中修改了&#xff0c; 而忘记在程序文件system_stm32f10x.c里修改中断向量表flash起始地址 system_stm32f10x.c里&#xff0…

8款前端特效动画及源码分享

3D立体数字时钟滚动特效 基于Splitting制作的一款3D立体数字时钟滚动特效&#xff0c;创意感满满&#xff0c;可以下载使用。 预览获取 核心代码 <div class"clock"><span class"cog hours tens" data-splitting>0123456789</span>&l…

智能优化算法应用:基于鸡群算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于鸡群算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于鸡群算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鸡群算法4.实验参数设定5.算法结果6.参考文献7.MATLAB…

EZDML基本介绍

一、表结构设计器(EZDML) 这是一个数据库建表的小软件&#xff0c;可快速的进行数据库表结构设计&#xff0c;建立数据模型。类似大家常用的数据库建模工具如PowerDesigner、ERWIN、ER-Studio和Rational-Rose等的超级精简版。 官方下载地址&#xff1a;http://www.ezdml.com/d…

一文教你uni-app开发小程序直播功能,轻松打造专属直播间!

1、微信后台申请插件开通 微信后台 登录微信后台 点击设置中的第三方设置 —> 添加插件 --> 点击小程序直播组件&#xff08;获取AppID&#xff09; 2、微信后台开通直播功能 点击进入直播后台系统 这里就是我们创建的直播功能区域 3、代码中接入直播插件AppID 支持在…

思维导图软件MindNode 5 mac使用场景

MindNode 5 for Mac是一款思维导图软件产品&#xff0c;为用户在灵感启发、思绪整理、记忆协助、项目规划、授课讲演等诸多场景下提升学习和工作效率。通过导图社区和云文件无缝链接用户设备&#xff0c;方便用户随时随地收集灵感和展示文档。 MindNode 5 for Mac应用场景 助力…

【axios】TypeScript实战,结合源码,从0到1教你封装一个axios - 基础封装篇

目录 前言版本环境变量配置引入的类型1、AxiosIntance: axios实例类型2、InternalAxiosRequestConfig: 高版本下AxiosRequestConfig的拓展类型3、AxiosRequestConfig: 请求体配置参数类型4、AxiosError: 错误对象类型5、AxiosResponse: 完整原始响应体类型 目标效果开始封装骨架…

【古月居《ros入门21讲》学习笔记】15_ROS中的坐标系管理系统

目录 说明&#xff1a; 1. 机器人中的坐标变换 tf功能包能干什么&#xff1f; tf坐标变换如何实现 2. 小海龟跟随实验 安装 ros-melodic-turtle-tf 实验命令 运行效果 说明&#xff1a; 1. 本系列学习笔记基于B站&#xff1a;古月居《ROS入门21讲》课程&#xff0c;且使…

KT1404C语音芯片做的板子连接usb到电脑出来空的盘符 怎么处理?

一、问题简介 KT1404C画的板子&#xff0c;连接usb到电脑&#xff0c;出来空的盘符&#xff0c;可以确定KT404C没问题放别的板子OK&#xff0c;就是这个板子不正常&#xff0c;并且芯片5脚的电压输出是3.5v &#xff0c;正常的板子是3.3v&#xff0c;什么问题呢&#xff1f; 问…

数据结构之二叉树与堆以及力扣刷题函数扩展

个人主页&#xff1a;点我进入主页 专栏分类&#xff1a;C语言初阶 C语言程序设计————KTV C语言小游戏 C语言进阶 C语言刷题 数据结构初阶 欢迎大家点赞&#xff0c;评论&#xff0c;收藏。 一起努力 目录 1.前言 2.树 2.1概念 2.2树的相关概念 3.…