c#中connect函数_Flink算子使用方法及实例演示:union和connect

Flink的Transformation转换主要包括四种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。读者可以使用Flink Scala Shell或者Intellij Idea来进行练习:

  • Flink Scala Shell:使用交互式编程环境学习和调试Flink
  • Flink 01 | 十分钟搭建第一个Flink应用和本地集群
  • Flink算子使用方法及实例演示:map、filter和flatMap
  • Flink算子使用方法及实例演示:keyBy、reduce和aggregations

很多情况下,我们需要对多个数据流进行整合处理,Flink为我们提供了多流转换算子,本文主要介绍多流转换。

50f2b14a5f4016d615fa588e617e8f57.png

union

在DataStream上使用union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。下图union对白色和深色两个数据流进行合并,生成一个数据流。

b4a6bce87f53fa8e91f1aac0d38d006f.png

union示意图

假设股票价格数据流来自不同的交易所,我们将其合并成一个数据流:

val shenzhenStockStream: DataStream[StockPrice] = ...val hongkongStockStream: DataStream[StockPrice] = ...val shanghaiStockStream: DataStream[StockPrice] = ...val unionStockStream: DataStream[StockPrice] = shenzhenStockStream.union(hongkongStockStream, shanghaiStockStream)

connect

union虽然可以合并多个数据流,但有一个限制,即多个数据流的数据类型必须相同。connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  1. connect只能连接两个数据流,union可以连接多个数据流。
  2. connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
  3. 两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

connect经常被应用在对一个数据流使用另外一个流进行控制处理的场景上,如下图所示。控制流可以是阈值、规则、机器学习模型或其他参数。

df417d9653143cda09e363ba7d2393fe.png

对一个数据流进行控制处理

对于ConnectedStreams,我们需要重写CoMapFunction或CoFlatMapFunction。这两个接口都提供了三个泛型,这三个泛型分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型。在重写函数时,对于CoMapFunction,map1处理第一个流的数据,map2处理第二个流的数据;对于CoFlatMapFunction,flatMap1处理第一个流的数据,flatMap2处理第二个流的数据。Flink并不能保证两个函数调用顺序,两个函数的调用依赖于两个数据流数据的流入先后顺序,即第一个数据流有数据到达时,map1或flatMap1会被调用,第二个数据流有数据到达时,map2或flatMap2会被调用。下面的代码对一个整数流和一个字符串流进行了connect操作。

val intStream: DataStream[Int] = senv.fromElements(1, 0, 9, 2, 3, 6)val stringStream: DataStream[String] = senv.fromElements("LOW", "HIGH", "LOW", "LOW")val connectedStream: ConnectedStreams[Int, String] = intStream.connect(stringStream)// CoMapFunction三个泛型分别对应第一个流的输入、第二个流的输入,map之后的输出class MyCoMapFunction extends CoMapFunction[Int, String, String] {  override def map1(input1: Int): String = input1.toString  override def map2(input2: String): String = input2}val mapResult = connectedStream.map(new MyCoMapFunction)

我们知道,如果不对DataStream按照Key进行分组,数据是随机分配在各个TaskSlot上的,而绝大多数情况我们是要对某个Key进行分析和处理,Flink允许我们将connect和keyBy或broadcast结合起来使用。例如,我们将之前的股票价格数据流与一个媒体评价数据流结合起来,按照股票代号进行分组。

// 先将两个流connect,再进行keyByval keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream  .connect(mediaStatusStream)  .keyBy(0,0)// 先keyBy再connectval keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0).connect(mediaStatusStream.keyBy(0))

无论先keyBy还是先connect,我们都可以将含有相同Key的数据转发到下游同一个算子实例上。这种操作有点像SQL中的join操作。Flink也提供了join算子,join主要在时间窗口维度上,connect相比而言更广义一些,关于join的介绍将在后续文章中介绍。

下面的代码展示了如何将股票价格和媒体正负面评价结合起来,当媒体评价为正且股票价格大于阈值时,输出一个正面信号。完整代码在我的github上:https://github.com/luweizheng/flink-tutorials

package com.flink.tutorials.demos.stockimport java.util.Calendarimport com.flink.tutorials.demos.stock.StockPriceDemo.{StockPrice, StockPriceSource, StockPriceTimeAssigner}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunctionimport org.apache.flink.streaming.api.functions.source.RichSourceFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContextimport org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collectorimport scala.util.Randomobject StockMediaConnectedDemo {  def main(args: Array[String]) {    // 设置执行环境    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    // 每5秒生成一个Watermark    env.getConfig.setAutoWatermarkInterval(5000L)    // 股票价格数据流    val stockPriceRawStream: DataStream[StockPrice] = env      // 该数据流由StockPriceSource类随机生成      .addSource(new StockPriceSource)      // 设置 Timestamp 和 Watermark      .assignTimestampsAndWatermarks(new StockPriceTimeAssigner)    val mediaStatusStream: DataStream[Media] = env      .addSource(new MediaSource)    // 先将两个流connect,再进行keyBy    val keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream      .connect(mediaStatusStream)      .keyBy(0,0)    // 先keyBy再connect    val keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0)      .connect(mediaStatusStream.keyBy(0))    val alert1 = keyByConnect1.flatMap(new AlertFlatMap).print()    val alerts2 = keyByConnect2.flatMap(new AlertFlatMap).print()    // 执行程序    env.execute("connect stock price with media status")  }  /** 媒体评价    *    * symbol 股票代号    * timestamp 时间戳    * status 评价 正面/一般/负面    */  case class Media(symbol: String, timestamp: Long, status: String)  class MediaSource extends RichSourceFunction[Media]{    var isRunning: Boolean = true    val rand = new Random()    var stockId = 0    override def run(srcCtx: SourceContext[Media]): Unit = {      while (isRunning) {        // 每次从列表中随机选择一只股票        stockId = rand.nextInt(5)        var status: String = "NORMAL"        if (rand.nextGaussian() > 0.9) {          status = "POSITIVE"        } else if (rand.nextGaussian() < 0.05) {          status = "NEGATIVE"        }        val curTime = Calendar.getInstance.getTimeInMillis        srcCtx.collect(Media(stockId.toString, curTime, status))        Thread.sleep(rand.nextInt(100))      }    }    override def cancel(): Unit = {      isRunning = false    }  }  case class Alert(symbol: String, timestamp: Long, alert: String)  class AlertFlatMap extends RichCoFlatMapFunction[StockPrice, Media, Alert] {    var priceMaxThreshold: List[Double] = List(101.0d, 201.0d, 301.0d, 401.0d, 501.0d)    var mediaLevel: String = "NORMAL"    override def flatMap1(stock: StockPrice, collector: Collector[Alert]) : Unit = {      val stockId = stock.symbol.toInt      if ("POSITIVE".equals(mediaLevel) && stock.price > priceMaxThreshold(stockId)) {        collector.collect(Alert(stock.symbol, stock.timestamp, "POSITIVE"))      }    }    override def flatMap2(media: Media, collector: Collector[Alert]): Unit = {      mediaLevel = media.status    }  }}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/560829.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(分治)取余运算

题目描述 输入b&#xff0c;p&#xff0c;k的值&#xff0c;求b^p mod k的值。其中b&#xff0c;p&#xff0c;k*k为长整型数。 输入 输入b&#xff0c;p&#xff0c;k的值。 输出 求b^p mod k的值。 样例输入 2 10 9 样例输出 2^10 mod 97 余数公式&#xff1a; a*…

java移动端接口测试_借助Charles来测试移动端-下篇

本篇是借助Charles来测试移动端的下半篇。(上篇任意门点我)上次说到可以借助Charles来抓移动端的网络请求&#xff0c;接下来&#xff0c;我们来看一下怎么通过Charles来模拟返回&#xff0c;还是以网页版豆瓣为例。先找到网页版豆瓣的请求通过上面这句话&#xff0c;我们知道&…

仓库货位卡标识牌_【干货】仓库布局,你想学啊,我教你啊!

下面两张仓库布局图&#xff0c;哪一个更加好&#xff1f;那再来看下这两张对比图&#xff0c;哪个布局更加好&#xff1f;为什么要关注仓库布局&#xff1f;在仓库操作中&#xff0c;什么操作最耗时&#xff1f;有一个大概的统计&#xff0c;假如说总工作量是100%的话&#xf…

(回溯Uva524)素数环

题目 输入正整数&#xff0c;把整数1&#xff0c;2&#xff0c;3&#xff0c;&#xff0c;n组成一个环&#xff0c;使得相邻两个整数之和均为素数。输出时从整数1开始逆时针排列。同一个环应恰好输出一次。n<16 样例输入 6 样例输出 1 4 3 2 5 6 1 6 5 2 3 4 分析与解…

openglshader实现虚拟场景_虚拟演播室设计原则

所谓虚拟演播室&#xff0c;就是利用计算机产生出虚拟的三维背景和道具&#xff0c;然后通过视频合成系统将演员与其进行合成&#xff0c;生成全三维、真人与虚拟布景和道具融合的效果。虚拟演播室系统只需要在一个蓝色背景下进行演播&#xff0c;不需要真正去搭建演播室布景&a…

(回溯 UVa129)困难的串

题目:分析与解答&#xff1a; 题目: 如果一个字符串包含两个相邻的重复子串&#xff0c;则称它是“容易的串”&#xff0c;其他串成为“困难的串”。例如:BB,ABCDACABCAB,ABCDABCD都是容易的&#xff0c;而D、DC、ABDAB、CBABCBA 都是困难的。 输入正整数n和L&#xff0c;…

功率谱 幅值谱_语音合成中的Mel谱和MFCC谱无区别

语音合成目前比较流行的方案是Tacotron(2) WaveNet(WaveRNN, LPCNet)等神经网络声码器。这些方案的流程大致相同&#xff0c;先由文本生成特征谱&#xff0c;再将特征谱重建为音频。在选择特征谱的时候&#xff0c;有的使用了Mel谱&#xff0c;有的使用了倒谱。本文通过梳理计…

DFS实现floodfill算法

题目&#xff1a;分析与解答 题目&#xff1a; 多组案例&#xff0c;每组案例输入一个m行n列的字符矩阵&#xff0c;统计字符‘’组成多少个连通块。如果两个字符‘’所在的格子相邻&#xff08;横、竖或对角线&#xff09;&#xff0c;则说明它们属于同一连通块。 Sample …

c++builder tadoquery存储过程_Electron桌面应用程序从创建项目、启动项目到打包程序的详细过程...

开发环境本文使用环境node12.14.1electron10.1.5electron-builder22.9.1electron-updater4.3.5&#xff1b;操作过程一、新建和启动项目1. 直接使用官网示例&#xff0c;先克隆示例项目的仓库&#xff0c;然后进入该仓库&#xff1b;# 克隆示例项目的仓库$ git clone https://g…

(BFS+hash去重)八数码问题

题目&#xff1a; 编号为1~8的8个正方形滑块被摆成3行3列&#xff08;有一个格子空留&#xff09;。每次可以把与空格相邻的滑块&#xff08;有公共边才算相邻&#xff09;移到空格中&#xff0c;而它原来的位置就称为了新的空格。给定初始局面和目标局面&#xff08;用0表示空…

变速后没有声音_问答 | 现代朗动at,启动后怠速不稳,热车后正常,是什么问题?...

今日老陈问答Q陈工你好&#xff1a;1.手动家车&#xff0c;停车怠速&#xff0c;有挡或空挡行驶都有异响&#xff0c;踩离合异响消失&#xff0c;抬离合异响恢复&#xff0c;换分离轴承未解决&#xff0c;压板轻磨损没更换&#xff0c;舱室粉末没清理 2.手动挡加自动变速箱油行…

怎么加载文件_Java虚拟机从入门到入土之JVM的类加载机制

作者&#xff1a;六脉神剑转载于&#xff1a;https://juejin.im/post/5e1aaf626fb9a0301d11ac8eJVM总体概述JVM总体上是由类装载子系统(ClassLoader)运行时数据区执行引擎内存回收类文件结构以上5个部分组成&#xff0c;每一个都是非常重要的&#xff0c;如果你要了解JVM&#…

的向上取整函数_计算机二级Excel常用函数解析

决定为大家推点干货让大家学习一下Excel的函数应用ABS绝对值 从最简单的开始第一个是ABS函数简而言之就是取绝对值作用就是MAX&MIN函数 这两个函数是好哥们也比较简单的就是从一堆数字中选出最大值和最小值如图C6格所示ROUND函数 四舍五入函数可不是回合ROUND 1算是比较…

(stl排序+检索)大理石在哪

问题&#xff1a; 现有N个大理石&#xff0c;每个大理石上写了一个非负整数、首先把各数从小到大排序&#xff1b;然后回答Q个问题。每个问题问是否有一个大理石写着某个整数x&#xff0c;如果是&#xff0c;还要回答哪个大理石上写着x。排序后的大理石从左到右编号为1~N。 (…

多个 本地仓库_【运维工具】搭建npm私有镜像仓库,天下苦于npm build久矣

​01 前 言当你的研发团队越来越大&#xff0c;或是你无法忍受node超慢的构建时你可以考虑继续读下去&#xff0c;给大家推荐一个基于Verdaccio相对较完整的解决方案。由于环境的原因&#xff0c;我们直接去 http://npmjs.org 下载就不要考虑了&#xff0c;可以将npm config se…

(STL,vector)木块问题

题目&#xff1a; 输入n&#xff0c;得到编号为0~n-1的木块&#xff0c;分别摆放在顺序排列编号为0~n-1的位置。现对这些木块进行操作&#xff0c;操作分为四种。 1、move a onto b&#xff1a;把木块a、b上方的木块放回各自的原位&#xff0c;再把a放到b上&#xff1b; 2、…

地图自定义图标_如何在H5里添加地图导航?这份教程请收藏!

智能手机的出现为我们的生活带来了翻天覆地的改变&#xff0c;比如说衣食住行都有了显著的变化。外卖让就餐更加方便、手机支付也让生活更加便利&#xff0c;地图导航功能更是让大家从此不再迷路&#xff0c;有了手机以后&#xff0c;大家都开始习惯直接用手机搜索目的地&#…

(STL,set)安迪的第一个字典

问题&#xff1a; 输入一个文本&#xff0c;找出所有不同的单词&#xff08;连续的字母序列&#xff09;&#xff0c;按字典序从小到大输出。单词不区分大小写。 样例输入 Adventures in Disneyland Two blondes were going to Disneyland when they came to a fork in the…

ios 高德获取定位_解决ios11不支持高德地图API定位功能的方法

在 iOS 11 系统上访问JS API定位业务失败怎么解决&#xff1f;苹果新发的 iOS 11 操作系统的一大特性是对 http 形式访问页面的限制变得非常严格(相比iOS 10 和 iOS 9)。高德提供的JS API功能均支持http和https两种协议访问&#xff0c;在iOS 11操作系统上推荐使用https形式访问…

(STL,map)反片语

题目 输入一些单词&#xff0c;找出所有满足如下条件的单词&#xff1a;该单词不能通过字母重排&#xff0c;得到输入文本中的另外一个单词。在判断是否满足条件时&#xff0c;不区分大小写&#xff0c;但输出保留输入中的大小写&#xff0c;按字典序进行排列&#xff08;所有…