Flink 双流 Join 的3种操作示例

在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:

 

  • join()
  • coGroup()
  • intervalJoin()

 

本文举例说明它们的使用方法,顺便聊聊比较特殊的 interval join 的原理。

 

准备数据

 

从 Kafka 分别接入点击流和订单流,并转化为 POJO。

 

DataStream<String> clickSourceStream = env.addSource(new FlinkKafkaConsumer011<>("ods_analytics_access_log",new SimpleStringSchema(),kafkaProps).setStartFromLatest());
DataStream<String> orderSourceStream = env.addSource(new FlinkKafkaConsumer011<>("ods_ms_order_done",new SimpleStringSchema(),kafkaProps).setStartFromLatest());DataStream<AnalyticsAccessLogRecord> clickRecordStream = clickSourceStream.map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class));
DataStream<OrderDoneLogRecord> orderRecordStream = orderSourceStream.map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));

 

join()

 

join() 算子提供的语义为"Window join",即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。以下示例以10秒滚动窗口,将两个流通过商品 ID 关联,取得订单流中的售价相关字段。

 

640.png

clickRecordStream.join(orderRecordStream).where(record -> record.getMerchandiseId()).equalTo(record -> record.getMerchandiseId()).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {@Overridepublic String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {return StringUtils.join(Arrays.asList(accessRecord.getMerchandiseId(),orderRecord.getPrice(),orderRecord.getCouponMoney(),orderRecord.getRebateAmount()), '\t');}}).print().setParallelism(1);

 

简单易用。

 

coGroup()

 

只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。

 

以下的例子就实现了点击流 left join 订单流的功能,是很朴素的 nested loop join 思想(二重循环)。

 

clickRecordStream.coGroup(orderRecordStream).where(record -> record.getMerchandiseId()).equalTo(record -> record.getMerchandiseId()).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() {@Overridepublic void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>> collector) throws Exception {for (AnalyticsAccessLogRecord accessRecord : accessRecords) {boolean isMatched = false;for (OrderDoneLogRecord orderRecord : orderRecords) {// 右流中有对应的记录collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));isMatched = true;}if (!isMatched) {// 右流中没有对应的记录collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));}}}}).print().setParallelism(1);

 

intervalJoin()

 

join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:

 

right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

 

2.jpg

interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。

 

示例代码如下。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。

 

clickRecordStream.keyBy(record -> record.getMerchandiseId()).intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId())).between(Time.seconds(-30), Time.seconds(30)).process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {@Overridepublic void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {collector.collect(StringUtils.join(Arrays.asList(accessRecord.getMerchandiseId(),orderRecord.getPrice(),orderRecord.getCouponMoney(),orderRecord.getRebateAmount()), '\t'));}}).print().setParallelism(1);

 

由上可见,interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。

 

interval join 的实现原理

 

以下是 KeyedStream.process(ProcessJoinFunction) 方法调用的重载方法的逻辑。

 

public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,TypeInformation<OUT> outputType) {Preconditions.checkNotNull(processJoinFunction);Preconditions.checkNotNull(outputType);final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =new IntervalJoinOperator<>(lowerBound,upperBound,lowerBoundInclusive,upperBoundInclusive,left.getType().createSerializer(left.getExecutionConfig()),right.getType().createSerializer(right.getExecutionConfig()),cleanedUdf);return left.connect(right).keyBy(keySelector1, keySelector2).transform("Interval Join", outputType, operator);
}

 

可见是先对两条流执行 connect() 和 keyBy() 操作,然后利用 IntervalJoinOperator 算子进行转换。在 IntervalJoinOperator 中,会利用两个 MapState 分别缓存左流和右流的数据。

 

private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;@Override
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(LEFT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))));this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(RIGHT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))));
}

 

其中 Long 表示事件时间戳,List> 表示该时刻到来的数据记录。当左流和右流有数据到达时,会分别调用 processElement1() 和 processElement2() 方法,它们都调用了 processElement() 方法,代码如下。

 

@Override
public void processElement1(StreamRecord<T1> record) throws Exception {processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}@Override
public void processElement2(StreamRecord<T2> record) throws Exception {processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}@SuppressWarnings("unchecked")
private <THIS, OTHER> void processElement(final StreamRecord<THIS> record,final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,final long relativeLowerBound,final long relativeUpperBound,final boolean isLeft) throws Exception {final THIS ourValue = record.getValue();final long ourTimestamp = record.getTimestamp();if (ourTimestamp == Long.MIN_VALUE) {throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +"interval stream joins need to have timestamps meaningful timestamps.");}if (isLate(ourTimestamp)) {return;}addToBuffer(ourBuffer, ourValue, ourTimestamp);for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {final long timestamp  = bucket.getKey();if (timestamp < ourTimestamp + relativeLowerBound ||timestamp > ourTimestamp + relativeUpperBound) {continue;}for (BufferEntry<OTHER> entry: bucket.getValue()) {if (isLeft) {collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);} else {collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);}}}long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;if (isLeft) {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);} else {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);}
}

 

这段代码的思路是:

 

  1. 取得当前流 StreamRecord 的时间戳,调用 isLate() 方法判断它是否是迟到数据(即时间戳小于当前水印值),如是则丢弃。
  2. 调用 addToBuffer() 方法,将时间戳和数据一起插入当前流对应的 MapState。
  3. 遍历另外一个流的 MapState,如果数据满足前述的时间区间条件,则调用 collect() 方法将该条数据投递给用户定义的 ProcessJoinFunction 进行处理。collect() 方法的代码如下,注意结果对应的时间戳是左右流时间戳里较大的那个。

 

private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);collector.setAbsoluteTimestamp(resultTimestamp);context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);userFunction.processElement(left, right, context, collector);
}

 

  1. 调用 TimerService.registerEventTimeTimer() 注册时间戳为 timestamp + relativeUpperBound 的定时器,该定时器负责在水印超过区间的上界时执行状态的清理逻辑,防止数据堆积。注意左右流的定时器所属的 namespace 是不同的,具体逻辑则位于 onEventTime() 方法中。

 

@Override
public void onEventTime(InternalTimer<K, String> timer) throws Exception {long timerTimestamp = timer.getTimestamp();String namespace = timer.getNamespace();logger.trace("onEventTime @ {}", timerTimestamp);switch (namespace) {case CLEANUP_NAMESPACE_LEFT: {long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;logger.trace("Removing from left buffer @ {}", timestamp);leftBuffer.remove(timestamp);break;}case CLEANUP_NAMESPACE_RIGHT: {long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;logger.trace("Removing from right buffer @ {}", timestamp);rightBuffer.remove(timestamp);break;}default:throw new RuntimeException("Invalid namespace " + namespace);}
}

 

原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/514684.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云原生趋势下的迁移与容灾思考

作者 | 孙琦 导读&#xff1a;下一个云原生颠覆的领域会不会是在传统的容灾领域呢&#xff1f;在云原生的趋势下&#xff0c;如何构建应用系统的迁移与容灾方案&#xff1f; 趋势 1. 云原生发展趋势 云原生&#xff08;Cloud Native&#xff09;是最近几年非常火爆的话题&…

深度盘点Python11个主流框架:Pandas、Django、Matplotlib、Numpy、PyTorch......

六月份TIOBE编程语言排行榜&#xff0c;位居第二名的Python与第一名C语言之间的差距正在逐渐缩小。Python如此受欢迎一方面得益于它崇尚简洁的编程哲学&#xff0c;另一方面是因为强大的第三方库生态。要说杀手级的库&#xff0c;很难排出个先后顺序&#xff0c;因为python的明…

从基础设施到云原生应用,全方位解读阿里云原生新锐开源项目

2020 年 11 月 19 日&#xff0c;由 InfoQ 主办的“2020 中国技术力量年度榜单盛典”隆重召开&#xff0c;并正式揭晓了“开源杰出贡献人物”、“开源新锐项目”和“云原生行业落地典范”等重大奖项。在此前的入围赛中&#xff0c;仅“开源新锐项目”单项&#xff0c;阿里云原生…

揭秘双11丝滑般剁手之路背后的网络监控技术

简介&#xff1a; 本篇将重点介绍Hologres在阿里巴巴网络监控部门成功替换Druid的最佳实践&#xff0c;并助力双11实时网络监控大盘毫秒级响应。 概要&#xff1a;刚刚结束的2020天猫双11中&#xff0c;MaxCompute交互式分析&#xff08;下称Hologres&#xff09;实时计算Flin…

OpenKruise:阿里巴巴 双11 全链路应用的云原生部署基座

简介&#xff1a; Kruise 是 Cruise 的谐音&#xff0c;K for Kubernetes&#xff0c;寓意 Kubernetes 上应用的航行和自动巡行&#xff0c;它满载着阿里巴巴多年在大规模应用部署、发布与管理最佳实践&#xff0c;以及阿里云 Kubernetes 服务数千客户的需求沉淀。 来源 | 阿里…

AI 如何推动双碳目标达成?施耐德电气这么说

以当前的排放总量而言&#xff0c;中国是全球碳排放第一大国。如何兼顾经济转型与能源低碳转型成为国家重要的发展战略之一&#xff0c;因此中国提出 2030 年碳达峰以及 2060 年碳中和的目标&#xff0c;并被写进《政府工作报告》中&#xff0c;成为各行各业关注的热点话题。 …

轻松玩转全链路监控

简介&#xff1a; 好的产品总是能给予用户最轻松的使用体验&#xff0c;并在实际生产中发挥出巨大的业务价值。我们不妨从现在开始&#xff0c;就将所有微服务应用通过无侵入的方式接入ARMS&#xff0c;构建一体化的全链路监控体系&#xff0c;而不是等到真正遇到生产故障的那一…

深度解读 MongoDB 最全面的增强版本 4.4 新特性

MongoDB 在今年正式发布了新的 4.4 大版本&#xff0c;这次的发布包含众多的增强 Feature&#xff0c;可以称之为是一个维护性的版本&#xff0c;而且是一个用户期待已久的维护性版本&#xff0c;MongoDB 官方也把这次发布称为「User-Driven Engineering」&#xff0c;说明新版…

四大招让无处不在的工作空间成为可能?揭秘Ivanti 的战略布局

如今二维码已成为我们生活、工作的“必需品”&#xff0c;大家往往会通过简单扫码获取内容信息或进行交易。受疫情的影响&#xff0c;人们对非接触式交易需求增多&#xff0c;二维码的应用场景更无处不在。 与此同时&#xff0c;二维码带来的安全问题也受到人们的关注&#xf…

深度| 每秒1.4亿次!再度刷新TPS记录的PolarDB如何应对双11“尖峰时刻”?

2020年是云原生数据库PolarDB全面支撑天猫双十一的第二年&#xff0c;天猫交易、买家、卖家以及物流等系统在双十一期间基于PolarDB为亿万客户提供了顺滑的体验。同时&#xff0c;PolarDB还刷新了去年由自己创造的数据库处理峰值&#xff08;TPS&#xff09;纪录&#xff0c;今…

Hologres是如何完美支撑双11智能客服实时数仓的?

简介&#xff1a; 本文重点介绍Hologres如何帮助阿里巴巴客户体验部&#xff08;CCO&#xff09;&#xff0c;构建集实时化、自助化、系统化于一体的用户体验实时数仓&#xff0c;完美助力双11场景&#xff0c;支持上千服务大屏&#xff0c;削峰30%&#xff0c;节约成本近30%。…

云原生与AI时代的存储该是什么样?新华三发布全NVMe智能闪存与智慧中枢数据平台

编辑 | 宋慧 出品 | CSDN云计算 7月8日&#xff0c;紫光股份旗下新华三集团以“智以致用速达未来”为主题&#xff0c;召开“2021新华三存储新品发布会”&#xff0c;重磅推出云智原生的新一代端到端NVMe闪存存储H3C/HPE Alletra、分布式融合存储H3C UniStor X10000&#xff0…

java客户端程序用什么自动化测试_五大Java自动化测试框架

51CTO官微技术资讯/行业精华/产品心得多年来&#xff0c;Java一直是服务器端应用开发的首选编程语言。随着时间的推移和自动化测试的兴起&#xff0c;业界出现了许多基于Java&#xff0c;并根据不同的业务逻辑而发展起来的开源框架。在此&#xff0c;我向大家介绍并比较五种用到…

微服务框架Go-Micro集成Nacos实战之服务注册与发现

简介&#xff1a; 本文主要介绍如何使用 Golang 生态中的微服务框架 Go-Micro(v2) 集成 Nacos 进行服务注册与发现。(Go-Micro 目前已经是 v3 版本&#xff0c;但由于某些原因项目已经更名为 nitro 具体原因大家可以去 github 中查看) 相关背景知识 Go-Micro Go Micro 是一个…

「深度学习知识体系总结(2021版)」开放下载了!

随着世界技术的迭代与发展&#xff0c;人工智能和机器学习正在超自动化领域&#xff0c;扮演着越来越重要的角色。2020年的冠状病毒疫情突发&#xff0c;整个世界都在防疫的道路上披荆斩棘。人工智能发挥了重大作用&#xff0c;智能测温、智能消毒、智能建设都能看到AI的影子。…

2020双11,阿里巴巴集团数万数据库系统全面上云揭秘

作者&#xff1a;阿里云高级技术专家 改天阿里云高级产品专家 胜通 2020年天猫双十一成交额突破4982亿&#xff0c;在双十一走过12个年头之际&#xff0c;我们的订单创建峰值达到58.3万笔/秒&#xff0c;再次刷新全球在线交易系统的记录。历年双十一都是对技术人的一次大考&…

以 Kubernetes 为代表的容器技术,已成为云计算的新界面

简介&#xff1a; 可以说&#xff0c;以 Kubernetes 为代表的容器技术正成为云计算新界面。容器提供了应用分发和交付标准&#xff0c;将应用与底层运行环境进行解耦。Kubernetes 作为资源调度和编排的标准&#xff0c;屏蔽底层架构差异性&#xff0c;帮助应用平滑运行在不同基…

2020双11养猫技术大揭秘

简介&#xff1a; 你养猫了没&#xff1f; 作者 | 淘系-珑晴 在电商领域&#xff0c;互动是一个重要的用户增长方案&#xff0c;在提升用户黏性、活跃以及拉新上都发挥着重要的作用。今年双11&#xff0c;淘系互动团队推出了“超级星秀猫”&#xff0c;我们不盖楼、不开车&…

GitHub 遭抵制!AI 代码生成神器竟成“抄袭工具”?

整理 | 郑丽媛出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;上周&#xff0c;微软、GitHub、OpenAI 三方联手推出的 AI 代码生成神器 GitHub Copilot 一经官宣便引起巨大关注&#xff1a;试问哪个开发者不想要这么一位“虚拟程序员”来解放自己的双手&#xff1f…

阿里云性能测试工具PTS介绍

简介&#xff1a; 性能测试 PTS&#xff08;Performance Testing Service&#xff09;是具备强大的分布式压测能力的 SaaS 压测平台&#xff0c;可模拟海量用户的真实业务场景&#xff0c;全方位验证业务站点的性能、容量和稳定性。 阿里云 阿里云智能GTS-平台技术部-SRE团队 1…