Flink多流处理之connect拼接流

Flink中的拼接流connect的使用其实非常简单,就是leftStream.connect(rightStream)的方式,但是有一点我们需要清楚,使用connect后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理leftStreamrightStream,也可以使用不同的逻辑处理leftStreamrightStream.
如下图:
在这里插入图片描述

下面的演示代码也可以通过这个图结合来看,其实connect算子最主要的作用就是共享状态,如常用的广播状态.

  • 代码
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;import java.util.Arrays;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/7* @Description: 多流操作-流连接**/
public class FlinkConnect {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 添加数据源1DataStreamSource<String> sourceStream1 = env.fromCollection(Arrays.asList("a", "b", "c", "d"));// 添加数据源2DataStreamSource<Double> sourceStream2 = env.fromCollection(Arrays.asList(22.2, 11.0, 6.0, 98.0, 100.0));// 拼接数据流ConnectedStreams<String, Double> connectedStream = sourceStream1.connect(sourceStream2);// 这里使用map算子作为演示SingleOutputStreamOperator<String> resultStream = connectedStream.map(new CoMapFunction<String, Double, String>() {/*** map1作为左流**/@Overridepublic String map1(String value) throws Exception {return "字符串: " + value;}/*** map2作为右流**/@Overridepublic String map2(Double value) throws Exception {return "数字: " + (value * 100);}});// 打印结果resultStream.print();env.execute("Connect Operator");}
}
  • 结果
3> 字符串: b
1> 数字: 600.0
2> 字符串: a
3> 数字: 1100.0
2> 数字: 2220.0
2> 字符串: d
2> 数字: 9800.0
3> 数字: 10000.0
1> 字符串: c

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28657.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【golang】工作区与GOPATH

在学习go语言时&#xff0c;我们会从官网下载go语言的二进制包&#xff0c;然后解压并安装到某个目录&#xff0c;最后会配置环境变量&#xff0c;通过输入命令go version来验证是否安装成功。 配置了path环境后&#xff0c;我们还需要再配置3个环境变量&#xff0c;GOROOT、G…

完美的分布式监控系统——Prometheus(普罗米修斯)与优雅的开源可视化平台——Grafana(格鲁夫娜)

一、基本概念 1、之间的关系 prometheus与grafana之间是相辅相成的关系。作为完美的分布式监控系统的Prometheus&#xff0c;就想布加迪威龙一样示例和动力强劲。在猛的车也少不了仪表盘来观察。于是优雅的可视化平台Grafana出现了。 简而言之Grafana作为可视化的平台&#xff…

在excel调用SAP函数执行SAP数据查找或提交

1、下载插件 2、安装插件 3、执行函数 3.1 第一步 通过SAPRegister连接SAP服务器 var reg SAPRegister("10.10.14.15", "00", "mes", "AQ123456", "800") 需要改为实际的连接信息 "10.10.14.15" 为SAP服务器I…

嘉楠勘智k230开发板上手记录(三)--K230_RVV实战

按照K230_RVV实战.md操作 在k230_sdk目录下运行&#xff0c;Makefile里默认的toolchain路径是在/opt下的&#xff0c;需要拷贝过去 cp -r toolchain /opt/ make rt-smart-apps 进入目录 src/big/rt-smart 运行脚本 source smart-env.sh riscv64 配置环境变量 source smart-e…

随着野火的增加,甲烷排放也会增加

2020 年对加利福尼亚州造成严重破坏的野火使大气中充满了强效温室气体。 2020 年&#xff0c;溪火烧毁了北加州的内华达山脉。图片来源&#xff1a;Zachary Cava/Flickr&#xff0c;CC BY-NC-SA 2.0 2020 年&#xff0c;在高温和干旱的推动下&#xff0c;加州野火烧毁了超过160…

Verilog求log10和log2近似

Verilog求log10和log2近似 Verilog求10对数近似方法&#xff0c;整数部分用位置index代替&#xff0c;小数部分用查找表实现 参考&#xff1a; Verilog写一个对数计算模块Log2(x) FPGA实现对数log2和10*log10

Netty:ChannelHandler的两个生命周期监听事件方法:handlerAdded 和 handlerRemoved

说明 io.netty.channel.ChannelHandler有两个生命周期监听事件方法&#xff1a; handlerAdded(ChannelHandlerContext ctx)&#xff1a;当ChannelHandler被添加到实际的上下文、并且已经准备就绪等待处理事件的时候被调用。 handlerRemoved(ChannelHandlerContext ctx)&#…

SQL-每日一题【1179. 重新格式化部门表】

题目 部门表 Department&#xff1a; 编写一个 SQL 查询来重新格式化表&#xff0c;使得新的表中有一个部门 id 列和一些对应 每个月 的收入&#xff08;revenue&#xff09;列。 查询结果格式如下面的示例所示&#xff1a; 解题思路 1.题目要求我们重新格式化表&#xff0c;…

C++入门篇6 C++的内存管理

在学习C的内存管理之前&#xff0c;我们先来回顾一下C语言中动态内存 int main() {int* p1 (int*)malloc(sizeof(int));free(p1);// 1.malloc/calloc/realloc的区别是什么&#xff1f;int* p2 (int*)calloc(4, sizeof(int));//calloc 可以初始化空间为0int* p3 (int*)reall…

SpringCloud实用篇1——eureka注册中心 Ribbon负载均衡原理 nacos注册中心

目录 1 微服务1.1 微服务的演变1.2 微服务1.3 SpringCloud1.4 小结 2 服务拆分及远程调用2.1 服务拆分2.2 服务拆分案例2.3 实现远程调用2.4 提供者与消费者 3 Eureka注册中心3.1 Eureka的结构和作用3.2 搭建eureka-server3.3 服务注册3.4 服务发现 4 Ribbon负载均衡4.1 负载均…

安全杂记 - Linux文本三剑客之awk

目录 1.什么是AWK2.正则表达式3.语法4.内置变量示例printf命令5.复现awk经典实例(1).插入几个新字段(2).格式化空白(3).筛选IPv4地址(4).筛选给定时间范围内的日志 1.什么是AWK awk、grep、sed是linux操作文本的三大利器&#xff0c;合称文本三剑客。三者的功能都是处理文本&a…

基于DETR (DEtection TRansformer)开发构建MSTAR雷达影像目标检测系统

关于DETR相关的实践在之前的文章中很详细地介绍过&#xff0c;感兴趣的话可以自行移步阅读即可&#xff1a; 《DETR (DEtection TRansformer)基于自建数据集开发构建目标检测模型超详细教程》 《书接上文——DETR评估可视化》 基于MSTAR雷达影像数据开发构建目标检测系统&am…

CentOS虚拟机更改屏幕锁屏时间

&#xff08;1&#xff09;点击“应用程序”&#xff0c;再点击“系统工具”&#xff0c;再点击“设置” &#xff08;2&#xff09; &#xff08;3&#xff09;在“设置”中点击“Privacy”&#xff0c;点击“锁屏”

免费实用的日记应用:Day One for Mac中文版

Day One for Mac是一款运行在Mac平台上的日记软件&#xff0c;你可以使用Day One for mac通过快速菜单栏条目、提醒系统和鼓舞人心的信息来编写更多内容&#xff0c;day one mac版还支持Dropbox同步功能&#xff0c;想要day one mac中文免费版的朋友赶紧来试试吧&#xff01; …

hive 字段注释乱码

hive 字段注释乱码: 在mysql中运行&#xff1a; alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;OK

【2.1】Java微服务: Nacos的使用

目录 Nacos介绍 Nacos安装 下载和安装 修改端口 启动 服务注册与发现 导入Nacos管理依赖 导入服务依赖 配置Nacos的服务地址 启动服务&#xff0c;查看已注册的服务 服务分级存储模型 分级存储模型介绍 具体结构 配置实例集群 同集群优先的负载均衡策略 服务权重配置…

Vue 整合 Element UI 、路由嵌套、参数传递、重定向、404和路由钩子(五)

一、整合 Element UI 1.1 工程初始化 使用管理员的模式进入 cmd 的命令行模式&#xff0c;创建一个名为 hello-vue 的工程&#xff0c;命令为&#xff1a; # 1、目录切换 cd F:\idea_home\vue# 2、项目的初始化&#xff0c;记得一路的 no vue init webpack hello-vue 1.2 安装…

探索未来:直播实时美颜SDK在增强现实(AR)直播中的前景

在AR直播中&#xff0c;观众可以与虚拟元素实时互动&#xff0c;为用户带来更加丰富、沉浸式的体验。那么&#xff0c;直播美颜SDK在AR中有哪些应用呢&#xff1f;下文小编将于大家一同探讨美颜SDK与AR有哪些关联。 一、AR直播与直播实时美颜SDK的结合 增强现实技术在直播中…

【MFC】08.MFC消息,自定义消息,常用控件(MFC菜单创建大总结),工具栏,状态栏-笔记

本专栏上几篇文章讲解了MFC几大机制&#xff0c;今天带领大家学习MFC自定义消息以及常用控件&#xff0c;最常用的控件请查看本专栏第一二篇文章&#xff0c;今天这篇文章介绍工具栏&#xff0c;菜单和状态栏&#xff0c;以及菜单创建大总结。 文章目录 MFC消息分类&#xff1…

【Sa-Token】9、Sa-Token实现在线用户管理功能

尽管框架将大部分操作提供了简易的封装&#xff0c;但在一些特殊场景下&#xff0c;我们仍需要绕过框架&#xff0c;直达数据底层进行一些操作。 1、官方文档 会话查询 https://sa-token.cc/doc.html#/up/search-sessionSa-Token提供以下API助你直接操作会话列表&#xff1a…