c#中connect函数_Flink算子使用方法及实例演示:union和connect

Flink的Transformation转换主要包括四种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。读者可以使用Flink Scala Shell或者Intellij Idea来进行练习:

  • Flink Scala Shell:使用交互式编程环境学习和调试Flink
  • Flink 01 | 十分钟搭建第一个Flink应用和本地集群
  • Flink算子使用方法及实例演示:map、filter和flatMap
  • Flink算子使用方法及实例演示:keyBy、reduce和aggregations

很多情况下,我们需要对多个数据流进行整合处理,Flink为我们提供了多流转换算子,本文主要介绍多流转换。

50f2b14a5f4016d615fa588e617e8f57.png

union

在DataStream上使用union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。下图union对白色和深色两个数据流进行合并,生成一个数据流。

b4a6bce87f53fa8e91f1aac0d38d006f.png

union示意图

假设股票价格数据流来自不同的交易所,我们将其合并成一个数据流:

val shenzhenStockStream: DataStream[StockPrice] = ...val hongkongStockStream: DataStream[StockPrice] = ...val shanghaiStockStream: DataStream[StockPrice] = ...val unionStockStream: DataStream[StockPrice] = shenzhenStockStream.union(hongkongStockStream, shanghaiStockStream)

connect

union虽然可以合并多个数据流,但有一个限制,即多个数据流的数据类型必须相同。connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  1. connect只能连接两个数据流,union可以连接多个数据流。
  2. connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
  3. 两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

connect经常被应用在对一个数据流使用另外一个流进行控制处理的场景上,如下图所示。控制流可以是阈值、规则、机器学习模型或其他参数。

df417d9653143cda09e363ba7d2393fe.png

对一个数据流进行控制处理

对于ConnectedStreams,我们需要重写CoMapFunction或CoFlatMapFunction。这两个接口都提供了三个泛型,这三个泛型分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型。在重写函数时,对于CoMapFunction,map1处理第一个流的数据,map2处理第二个流的数据;对于CoFlatMapFunction,flatMap1处理第一个流的数据,flatMap2处理第二个流的数据。Flink并不能保证两个函数调用顺序,两个函数的调用依赖于两个数据流数据的流入先后顺序,即第一个数据流有数据到达时,map1或flatMap1会被调用,第二个数据流有数据到达时,map2或flatMap2会被调用。下面的代码对一个整数流和一个字符串流进行了connect操作。

val intStream: DataStream[Int] = senv.fromElements(1, 0, 9, 2, 3, 6)val stringStream: DataStream[String] = senv.fromElements("LOW", "HIGH", "LOW", "LOW")val connectedStream: ConnectedStreams[Int, String] = intStream.connect(stringStream)// CoMapFunction三个泛型分别对应第一个流的输入、第二个流的输入,map之后的输出class MyCoMapFunction extends CoMapFunction[Int, String, String] {  override def map1(input1: Int): String = input1.toString  override def map2(input2: String): String = input2}val mapResult = connectedStream.map(new MyCoMapFunction)

我们知道,如果不对DataStream按照Key进行分组,数据是随机分配在各个TaskSlot上的,而绝大多数情况我们是要对某个Key进行分析和处理,Flink允许我们将connect和keyBy或broadcast结合起来使用。例如,我们将之前的股票价格数据流与一个媒体评价数据流结合起来,按照股票代号进行分组。

// 先将两个流connect,再进行keyByval keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream  .connect(mediaStatusStream)  .keyBy(0,0)// 先keyBy再connectval keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0).connect(mediaStatusStream.keyBy(0))

无论先keyBy还是先connect,我们都可以将含有相同Key的数据转发到下游同一个算子实例上。这种操作有点像SQL中的join操作。Flink也提供了join算子,join主要在时间窗口维度上,connect相比而言更广义一些,关于join的介绍将在后续文章中介绍。

下面的代码展示了如何将股票价格和媒体正负面评价结合起来,当媒体评价为正且股票价格大于阈值时,输出一个正面信号。完整代码在我的github上:https://github.com/luweizheng/flink-tutorials

package com.flink.tutorials.demos.stockimport java.util.Calendarimport com.flink.tutorials.demos.stock.StockPriceDemo.{StockPrice, StockPriceSource, StockPriceTimeAssigner}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunctionimport org.apache.flink.streaming.api.functions.source.RichSourceFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContextimport org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collectorimport scala.util.Randomobject StockMediaConnectedDemo {  def main(args: Array[String]) {    // 设置执行环境    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    // 每5秒生成一个Watermark    env.getConfig.setAutoWatermarkInterval(5000L)    // 股票价格数据流    val stockPriceRawStream: DataStream[StockPrice] = env      // 该数据流由StockPriceSource类随机生成      .addSource(new StockPriceSource)      // 设置 Timestamp 和 Watermark      .assignTimestampsAndWatermarks(new StockPriceTimeAssigner)    val mediaStatusStream: DataStream[Media] = env      .addSource(new MediaSource)    // 先将两个流connect,再进行keyBy    val keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream      .connect(mediaStatusStream)      .keyBy(0,0)    // 先keyBy再connect    val keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0)      .connect(mediaStatusStream.keyBy(0))    val alert1 = keyByConnect1.flatMap(new AlertFlatMap).print()    val alerts2 = keyByConnect2.flatMap(new AlertFlatMap).print()    // 执行程序    env.execute("connect stock price with media status")  }  /** 媒体评价    *    * symbol 股票代号    * timestamp 时间戳    * status 评价 正面/一般/负面    */  case class Media(symbol: String, timestamp: Long, status: String)  class MediaSource extends RichSourceFunction[Media]{    var isRunning: Boolean = true    val rand = new Random()    var stockId = 0    override def run(srcCtx: SourceContext[Media]): Unit = {      while (isRunning) {        // 每次从列表中随机选择一只股票        stockId = rand.nextInt(5)        var status: String = "NORMAL"        if (rand.nextGaussian() > 0.9) {          status = "POSITIVE"        } else if (rand.nextGaussian() < 0.05) {          status = "NEGATIVE"        }        val curTime = Calendar.getInstance.getTimeInMillis        srcCtx.collect(Media(stockId.toString, curTime, status))        Thread.sleep(rand.nextInt(100))      }    }    override def cancel(): Unit = {      isRunning = false    }  }  case class Alert(symbol: String, timestamp: Long, alert: String)  class AlertFlatMap extends RichCoFlatMapFunction[StockPrice, Media, Alert] {    var priceMaxThreshold: List[Double] = List(101.0d, 201.0d, 301.0d, 401.0d, 501.0d)    var mediaLevel: String = "NORMAL"    override def flatMap1(stock: StockPrice, collector: Collector[Alert]) : Unit = {      val stockId = stock.symbol.toInt      if ("POSITIVE".equals(mediaLevel) && stock.price > priceMaxThreshold(stockId)) {        collector.collect(Alert(stock.symbol, stock.timestamp, "POSITIVE"))      }    }    override def flatMap2(media: Media, collector: Collector[Alert]): Unit = {      mediaLevel = media.status    }  }}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/560829.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java移动端接口测试_借助Charles来测试移动端-下篇

本篇是借助Charles来测试移动端的下半篇。(上篇任意门点我)上次说到可以借助Charles来抓移动端的网络请求&#xff0c;接下来&#xff0c;我们来看一下怎么通过Charles来模拟返回&#xff0c;还是以网页版豆瓣为例。先找到网页版豆瓣的请求通过上面这句话&#xff0c;我们知道&…

仓库货位卡标识牌_【干货】仓库布局,你想学啊,我教你啊!

下面两张仓库布局图&#xff0c;哪一个更加好&#xff1f;那再来看下这两张对比图&#xff0c;哪个布局更加好&#xff1f;为什么要关注仓库布局&#xff1f;在仓库操作中&#xff0c;什么操作最耗时&#xff1f;有一个大概的统计&#xff0c;假如说总工作量是100%的话&#xf…

openglshader实现虚拟场景_虚拟演播室设计原则

所谓虚拟演播室&#xff0c;就是利用计算机产生出虚拟的三维背景和道具&#xff0c;然后通过视频合成系统将演员与其进行合成&#xff0c;生成全三维、真人与虚拟布景和道具融合的效果。虚拟演播室系统只需要在一个蓝色背景下进行演播&#xff0c;不需要真正去搭建演播室布景&a…

功率谱 幅值谱_语音合成中的Mel谱和MFCC谱无区别

语音合成目前比较流行的方案是Tacotron(2) WaveNet(WaveRNN, LPCNet)等神经网络声码器。这些方案的流程大致相同&#xff0c;先由文本生成特征谱&#xff0c;再将特征谱重建为音频。在选择特征谱的时候&#xff0c;有的使用了Mel谱&#xff0c;有的使用了倒谱。本文通过梳理计…

c++builder tadoquery存储过程_Electron桌面应用程序从创建项目、启动项目到打包程序的详细过程...

开发环境本文使用环境node12.14.1electron10.1.5electron-builder22.9.1electron-updater4.3.5&#xff1b;操作过程一、新建和启动项目1. 直接使用官网示例&#xff0c;先克隆示例项目的仓库&#xff0c;然后进入该仓库&#xff1b;# 克隆示例项目的仓库$ git clone https://g…

(BFS+hash去重)八数码问题

题目&#xff1a; 编号为1~8的8个正方形滑块被摆成3行3列&#xff08;有一个格子空留&#xff09;。每次可以把与空格相邻的滑块&#xff08;有公共边才算相邻&#xff09;移到空格中&#xff0c;而它原来的位置就称为了新的空格。给定初始局面和目标局面&#xff08;用0表示空…

变速后没有声音_问答 | 现代朗动at,启动后怠速不稳,热车后正常,是什么问题?...

今日老陈问答Q陈工你好&#xff1a;1.手动家车&#xff0c;停车怠速&#xff0c;有挡或空挡行驶都有异响&#xff0c;踩离合异响消失&#xff0c;抬离合异响恢复&#xff0c;换分离轴承未解决&#xff0c;压板轻磨损没更换&#xff0c;舱室粉末没清理 2.手动挡加自动变速箱油行…

怎么加载文件_Java虚拟机从入门到入土之JVM的类加载机制

作者&#xff1a;六脉神剑转载于&#xff1a;https://juejin.im/post/5e1aaf626fb9a0301d11ac8eJVM总体概述JVM总体上是由类装载子系统(ClassLoader)运行时数据区执行引擎内存回收类文件结构以上5个部分组成&#xff0c;每一个都是非常重要的&#xff0c;如果你要了解JVM&#…

的向上取整函数_计算机二级Excel常用函数解析

决定为大家推点干货让大家学习一下Excel的函数应用ABS绝对值 从最简单的开始第一个是ABS函数简而言之就是取绝对值作用就是MAX&MIN函数 这两个函数是好哥们也比较简单的就是从一堆数字中选出最大值和最小值如图C6格所示ROUND函数 四舍五入函数可不是回合ROUND 1算是比较…

多个 本地仓库_【运维工具】搭建npm私有镜像仓库,天下苦于npm build久矣

​01 前 言当你的研发团队越来越大&#xff0c;或是你无法忍受node超慢的构建时你可以考虑继续读下去&#xff0c;给大家推荐一个基于Verdaccio相对较完整的解决方案。由于环境的原因&#xff0c;我们直接去 http://npmjs.org 下载就不要考虑了&#xff0c;可以将npm config se…

地图自定义图标_如何在H5里添加地图导航?这份教程请收藏!

智能手机的出现为我们的生活带来了翻天覆地的改变&#xff0c;比如说衣食住行都有了显著的变化。外卖让就餐更加方便、手机支付也让生活更加便利&#xff0c;地图导航功能更是让大家从此不再迷路&#xff0c;有了手机以后&#xff0c;大家都开始习惯直接用手机搜索目的地&#…

c语言年月日问题思路总结 闰年非闰年每个月份的天数 解决今天是妹子出生的第多少天的问题

1.闰年非闰年每个月份的天数&#xff1a; int year[2][13]{0, 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31,0, 31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31 };经观察发现&#xff1a; a。2月闰年有29天&#xff0c;非闰年28天 b。1、3、5、7、8、10、12月份&#xf…

aop判断方法是否执行成功_判断图中是否有环的三种方法

0、什么是环&#xff1f;在图论中&#xff0c;环&#xff08;英语&#xff1a;cycle&#xff09;是一条只有第一个和最后一个顶点重复的非空路径。在有向图中&#xff0c;一个结点经过两种路线到达另一个结点&#xff0c;未必形成环。1、拓扑排序1.1、无向图使用拓扑排序可以判…

sap运维要做哪些工作_患上腰椎间盘突出,适合做哪些工作?不适合做哪些工作?...

腰椎间盘突出的患者&#xff0c;大多数是年轻人。年轻人生活和工作压力比较大&#xff0c;大多数人都不可能因为腰椎病完全停止工作&#xff0c;事实上也不用完全停止工作&#xff0c;我们更多地应该虑如何平衡养病和工作之间的关系&#xff0c;那我们今天就来和大家讲讲&#…

bat执行exe程序_dos命令start教程,并行运行exe程序或者启动bat批处理cmd脚本

大家好&#xff0c;我是老盖&#xff0c;首先感谢观看本文&#xff0c;本篇文章做的有视频&#xff0c;视频讲述的比较详细&#xff0c;也可以看我发布的视频。今天我们学习DOS命令start这个命令&#xff0c;它可以启动一个EXE程序&#xff0c;也可以启动一个BAT批处理脚本&…

数据库备份mysql_MySQL数据库备份与恢复方法

常有新手问我该怎么备份数据库&#xff0c;下面介绍3种备份数据库的方法&#xff1a;(1)备份数据库文件MySQL中的每一个数据库和数据表分别对应文件系统中的目录和其下的文件。在Linux下数据库文件的存放目录一般为/var/lib/mysql。在Windows下这个目录视MySQL的安装路径而定&a…

(stack栈)rails

题目&#xff1a; 某城市有一个火车站&#xff0c;铁轨铺设如图所示&#xff0c;有n节车厢从A方向驶入车站&#xff0c;按进站顺序编号为1至n。你的任务是判断是否能让它们按照某种特定的顺序进入B方向的铁轨并驶出车站。为了重组车厢&#xff0c;你可以借助中转站C。这是一个…

docker 查看镜像_Docker 核心概念、安装、端口映射及常用操作命令,详细到令人发指!...

来自小洋人最HAPPY投稿一、Docker简介Docker是开源应用容器引擎&#xff0c;轻量级容器技术。基于Go语言&#xff0c;并遵循Apache2.0协议开源Docker可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中&#xff0c;然后发布到任何流行的Linux系统上&#xff0c…

(完全二叉树编号)小球下落

题目 有一棵二叉树&#xff0c;最大深度为D&#xff0c;且所有的叶子深度都相同。所有结点从上到下从左到右编号为1&#xff0c;2&#xff0c;3&#xff0c;…&#xff0c;2eD-1。在结点1处放一个小球&#xff0c;它会往下落。每个结点上都有一个开关&#xff0c;初始全部关闭…

python range 步长为负数_【Python面试】 说说Python中xrange和range的区别?

公众号新增加了一个栏目&#xff0c;就是每天给大家解答一道Python常见的面试题&#xff0c;反正每天不贪多&#xff0c;一天一题&#xff0c;正好合适&#xff0c;只希望这个面试栏目&#xff0c;给那些正在准备面试的同学&#xff0c;提供一点点帮助&#xff01;小猿会从最基…