【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例-完整版

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、maven依赖及User bean
    • 1、maven依赖
    • 2、User bean
  • 二、window join
    • 1、滚动 Window Join - TumblingEventTimeWindows
    • 2、滑动 Window Join - SlidingEventTimeWindows
    • 3、会话 Window Join - EventTimeSessionWindows
    • 4、TumblingEventTimeWindows示例
      • 1)、数据结构及bean
      • 2)、定义商品和订单数据源
      • 3)、Window Join实现方式一
      • 4)、WindowJoin实现方式二
      • 5)、运行结果
  • 三、interval join
    • 1、示例
      • 1)、数据结构及bean
      • 2)、定义商品和订单数据源
      • 3)、interval join 实现方式二
      • 4)、interval join 实现方式二
      • 5)、运行结果
  • 四、Rebalance 示例
    • 1、实现
    • 2、验证
  • 五、物理分区
    • 1、自定义分区
    • 2、随机分区
    • 3、Rescaling
    • 4、广播
    • 5、具体示例1
      • 1)、测试文件数据
      • 2)、实现代码
      • 3)、验证
    • 6、具体示例2


本文主要介绍Flink 的常用的operator window join 和interval join 数据倾斜处理、分区介绍及详细示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。

本专题分为四篇文章介绍,即
【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(1)- window join
【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(2)- interval join
【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(3)- 数据倾斜处理、分区示例
【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例-完整版

一、maven依赖及User bean

1、maven依赖

下文中所有示例都是用该maven依赖,除非有特殊说明的情况。

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.44</version></dependency></dependencies>

2、User bean

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {private int id;private String name;private String pwd;private String email;private int age;private double balance;
}

二、window join

Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。

两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction 或 FlatJoinFunction,用户可以用它们输出符合 join 要求的结果。

常见的用例可以总结为以下代码:

stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>);

语义上有一些值得注意的地方:

  • 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
  • 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为 [5, 10) 窗口中的元素在 join 之后的 timestamp 为 9。

1、滚动 Window Join - TumblingEventTimeWindows

使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。因为这个行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出!
在这里插入图片描述
如图所示,我们定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], … 的窗口。图中展示了如何将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction。注意,滚动窗口 [6,7] 将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。

  • 示例代码
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

2、滑动 Window Join - SlidingEventTimeWindows

当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出!
注意,在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。
在这里插入图片描述
本例中我们定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction 的元素。图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3] 中 join,但没有与窗口 [1,2] 中任何元素 join。

  • 示例代码
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

3、会话 Window Join - EventTimeSessionWindows

使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。这个操作同样是 inner join,所以如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出!
在这里插入图片描述
这里我们定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction。而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!

  • 示例代码
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

4、TumblingEventTimeWindows示例

本示例功能是通过系统模拟生成订单数据,然后通过订单关联商品信息,统计订单的金额。
本示例有2种实现方式,其区别就是WatermarkStrategy的实现方式不同,一个是匿名类,一个是实现接口。

1)、数据结构及bean

  • 商品类
package org.datastreamapi.operator.window.bean;import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;import com.alibaba.fastjson.JSON;import lombok.Data;/*** @author alanchan**/
// 商品类(商品id,商品名称,商品价格)
@Data
public class Goods {private String goodsId;private String goodsName;private BigDecimal goodsPrice;public static List<Goods> GOODSLIST;public static Random r;static {r = new Random();GOODSLIST = new ArrayList<>();GOODSLIST.add(new Goods("1", "iphone11", new BigDecimal(6000)));GOODSLIST.add(new Goods("2", "iphone12", new BigDecimal(7000)));GOODSLIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));GOODSLIST.add(new Goods("4", "iphone13", new BigDecimal(8000)));GOODSLIST.add(new Goods("5", "iphone14", new BigDecimal(9000)));GOODSLIST.add(new Goods("6", "iphone15", new BigDecimal(10000)));}public static Goods randomGoods() {int rIndex = r.nextInt(GOODSLIST.size());return GOODSLIST.get(rIndex);}public Goods() {}public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {this.goodsId = goodsId;this.goodsName = goodsName;this.goodsPrice = goodsPrice;}@Overridepublic String toString() {return JSON.toJSONString(this);}
}
  • 订单类
package org.datastreamapi.operator.window.bean;import com.alibaba.fastjson.JSON;import lombok.Data;/*** @author alanchan**/
// 订单明细类(订单id,商品id,商品数量)
@Data
public class Order {private String itemId;private String goodsId;private Integer count;@Overridepublic String toString() {return JSON.toJSONString(this);}
}
  • 商品和订单关联类
package org.datastreamapi.operator.window.bean;import java.math.BigDecimal;import com.alibaba.fastjson.JSON;import lombok.Data;/*** @author alanchan**/
// 商品类(商品id,商品名称,商品价格)
// 订单明细类(订单id,商品id,商品数量)
// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
@Data
public class OrderItem {private String goodsId;private String goodsName;private BigDecimal count;private BigDecimal total;@Overridepublic String toString() {return JSON.toJSONString(this);}
}

2)、定义商品和订单数据源

  • 商品数据源
package org.datastreamapi.operator.window.source;import java.util.concurrent.TimeUnit;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.datastreamapi.operator.window.bean.Goods;/*** @author alanchan**/
public class GoodsSource extends RichSourceFunction<Goods> {private Boolean isCancel;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods.GOODSLIST.stream().forEach(goods -> sourceContext.collect(goods));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}
}
  • 订单数据源
package org.datastreamapi.operator.window.source;import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;/*** @author alanchan**/
public class OrderSource extends RichSourceFunction<Order>{private Boolean isCancel;private Random r;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;r = new Random();}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods goods = Goods.randomGoods();Order order = new Order();order.setGoodsId(goods.getGoodsId());order.setCount(r.nextInt(10) + 1);order.setItemId(UUID.randomUUID().toString());sourceContext.collect(order);// 模拟一个订单中有多个商品order.setGoodsId("10");sourceContext.collect(order);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}
}

3)、Window Join实现方式一

package org.datastreamapi.operator.window;import java.math.BigDecimal;
import java.time.Duration;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;
import org.datastreamapi.operator.window.bean.OrderItem;
import org.datastreamapi.operator.window.source.GoodsSource;
import org.datastreamapi.operator.window.source.OrderSource;/*** @author alanchan**/
public class TestWindowJoinDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 1.source// 商品数据流DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());// 订单数据流DataStreamSource<Order> orderDS = env.addSource(new OrderSource());// 给数据添加水印(这里直接使用系统时间作为事件时间)// 方式一SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间.withTimestampAssigner((element, timestamp) -> System.currentTimeMillis()));SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Goods>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间.withTimestampAssigner((element, timestamp) -> System.currentTimeMillis()));// 2.transformation// 商品类(商品id,商品名称,商品价格)// 订单明细类(订单id,商品id,商品数量)// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)// 官方示例代码
//				orangeStream.join(greenStream)
//			    .where(<KeySelector>)
//			    .equalTo(<KeySelector>)
//			    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
//			    .apply (new JoinFunction<Integer, Integer, String> (){
//			        @Override
//			        public String join(Integer first, Integer second) {
//			            return first + "," + second;
//			        }
//			    });DataStream<OrderItem> resultDS = goodsDSWithWatermark.join(orderDSWithWatermark).where(goods -> goods.getGoodsId()).equalTo(orderItem -> orderItem.getGoodsId())
//		              .where(Goods::getGoodsId)
//		              .equalTo(Order::getGoodsId).window(TumblingEventTimeWindows.of(Time.seconds(5)))// <IN1, IN2, OUT>.apply(new JoinFunction<Goods, Order, OrderItem>() {@Overridepublic OrderItem join(Goods first, Order second) throws Exception {OrderItem orderItem = new OrderItem();orderItem.setGoodsId(first.getGoodsId());orderItem.setGoodsName(first.getGoodsName());orderItem.setCount(new BigDecimal(second.getCount()));orderItem.setTotal(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));return orderItem;}});// 3.sinkresultDS.print();// 4.executeenv.execute();}}

4)、WindowJoin实现方式二

  • GoodsWatermark
package org.datastreamapi.operator.window.watermark;import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.datastreamapi.operator.window.bean.Goods;/*** @author alanchan* 使用系统时间构建水印分配器*/
public class GoodsWatermark implements WatermarkStrategy<Goods> {@Overridepublic TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Goods>() {@Overridepublic void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}
  • OrderWatermark
package org.datastreamapi.operator.window.watermark;import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.datastreamapi.operator.window.bean.Order;/*** @author alanchan* 使用系统时间构建水印分配器*/
public class OrderWatermark implements WatermarkStrategy<Order> {@Overridepublic TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Order>() {@Overridepublic void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}
}
  • WindowJoin实现
package org.datastreamapi.operator.window;import java.math.BigDecimal;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;
import org.datastreamapi.operator.window.bean.OrderItem;
import org.datastreamapi.operator.window.source.GoodsSource;
import org.datastreamapi.operator.window.source.OrderSource;
import org.datastreamapi.operator.window.watermark.GoodsWatermark;
import org.datastreamapi.operator.window.watermark.OrderWatermark;/*** @author alanchan**/
public class TestWindowJoinDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 1.source// 商品数据流DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());// 订单数据流DataStreamSource<Order> orderDS = env.addSource(new OrderSource());// 给数据添加水印(这里直接使用系统时间作为事件时间)// 方式二SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(new GoodsWatermark());SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(new OrderWatermark());// 2.transformation// 商品类(商品id,商品名称,商品价格)// 订单明细类(订单id,商品id,商品数量)// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)// 官方示例代码
//				orangeStream.join(greenStream)
//			    .where(<KeySelector>)
//			    .equalTo(<KeySelector>)
//			    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
//			    .apply (new JoinFunction<Integer, Integer, String> (){
//			        @Override
//			        public String join(Integer first, Integer second) {
//			            return first + "," + second;
//			        }
//			    });DataStream<OrderItem> resultDS = goodsDSWithWatermark.join(orderDSWithWatermark).where(goods -> goods.getGoodsId()).equalTo(orderItem -> orderItem.getGoodsId())
//		              .where(Goods::getGoodsId)
//		              .equalTo(Order::getGoodsId).window(TumblingEventTimeWindows.of(Time.seconds(5)))// <IN1, IN2, OUT>.apply(new JoinFunction<Goods, Order, OrderItem>() {@Overridepublic OrderItem join(Goods first, Order second) throws Exception {OrderItem orderItem = new OrderItem();orderItem.setGoodsId(first.getGoodsId());orderItem.setGoodsName(first.getGoodsName());orderItem.setCount(new BigDecimal(second.getCount()));orderItem.setTotal(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));return orderItem;}});// 3.sinkresultDS.print();// 4.executeenv.execute();}}

5)、运行结果

WindowJoin实现方式有2种,但运行结果类似,因为数据都是随机产生的,下述结果供参考。

7> {"count":2,"goodsId":"1","goodsName":"iphone11","total":12000}
7> {"count":7,"goodsId":"1","goodsName":"iphone11","total":42000}
1> {"count":9,"goodsId":"4","goodsName":"iphone13","total":72000}
5> {"count":10,"goodsId":"3","goodsName":"MacBookPro","total":150000}
1> {"count":9,"goodsId":"4","goodsName":"iphone13","total":72000}
7> {"count":9,"goodsId":"1","goodsName":"iphone11","total":54000}
1> {"count":9,"goodsId":"4","goodsName":"iphone13","total":72000}
5> {"count":8,"goodsId":"3","goodsName":"MacBookPro","total":120000}
7> {"count":7,"goodsId":"1","goodsName":"iphone11","total":42000}
5> {"count":10,"goodsId":"3","goodsName":"MacBookPro","total":150000}
1> {"count":9,"goodsId":"4","goodsName":"iphone13","total":72000}
5> {"count":8,"goodsId":"3","goodsName":"MacBookPro","total":120000}
7> {"count":9,"goodsId":"1","goodsName":"iphone11","total":54000}
5> {"count":10,"goodsId":"3","goodsName":"MacBookPro","total":150000}
1> {"count":9,"goodsId":"4","goodsName":"iphone13","total":72000}
7> {"count":7,"goodsId":"1","goodsName":"iphone11","total":42000}
5> {"count":8,"goodsId":"3","goodsName":"MacBookPro","total":120000}
7> {"count":9,"goodsId":"1","goodsName":"iphone11","total":54000}

三、interval join

Interval join 组合元素的条件为:两个流(我们暂时称为 A 和 B)中 key 相同且 B 中元素的 timestamp 处于 A 中元素 timestamp 的一定范围内。

这个条件可以更加正式地表示为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] 或 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里的 a 和 b 为 A 和 B 中共享相同 key 的元素。上界和下界可正可负,只要下界永远小于等于上界即可。 Interval join 目前仅执行 inner join。

当一对元素被传递给 ProcessJoinFunction,他们的 timestamp 会从两个元素的 timestamp 中取最大值 (timestamp 可以通过 ProcessJoinFunction.Context 访问)。

Interval join 截至版本1.17 仅支持 event time。

在这里插入图片描述
上例中,我们 join 了橙色和绿色两个流,join 的条件是:以 -2 毫秒为下界、+1 毫秒为上界。 默认情况下,上下界也被包括在区间内,但 .lowerBoundExclusive() 和 .upperBoundExclusive() 可以将它们排除在外。

图中三角形所表示的条件也可以写成更加正式的表达式:

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

  • 示例代码

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String>(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});

1、示例

本示例与上述的window joing构造数据一样,不同的就是数据聚合方式不同,也就是window join与interval join的功能不同,其他都一样。
本示例功能是通过系统模拟生成订单数据,然后通过订单关联商品信息,统计订单的金额。
本示例有2种实现方式,其区别就是WatermarkStrategy的实现方式不同,一个是匿名类,一个是实现接口。

1)、数据结构及bean

  • 商品类
package org.datastreamapi.operator.window.bean;import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;import com.alibaba.fastjson.JSON;import lombok.Data;/*** @author alanchan**/
// 商品类(商品id,商品名称,商品价格)
@Data
public class Goods {private String goodsId;private String goodsName;private BigDecimal goodsPrice;public static List<Goods> GOODSLIST;public static Random r;static {r = new Random();GOODSLIST = new ArrayList<>();GOODSLIST.add(new Goods("1", "iphone11", new BigDecimal(6000)));GOODSLIST.add(new Goods("2", "iphone12", new BigDecimal(7000)));GOODSLIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));GOODSLIST.add(new Goods("4", "iphone13", new BigDecimal(8000)));GOODSLIST.add(new Goods("5", "iphone14", new BigDecimal(9000)));GOODSLIST.add(new Goods("6", "iphone15", new BigDecimal(10000)));}public static Goods randomGoods() {int rIndex = r.nextInt(GOODSLIST.size());return GOODSLIST.get(rIndex);}public Goods() {}public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {this.goodsId = goodsId;this.goodsName = goodsName;this.goodsPrice = goodsPrice;}@Overridepublic String toString() {return JSON.toJSONString(this);}
}
  • 订单类
package org.datastreamapi.operator.window.bean;import com.alibaba.fastjson.JSON;import lombok.Data;/*** @author alanchan**/
// 订单明细类(订单id,商品id,商品数量)
@Data
public class Order {private String itemId;private String goodsId;private Integer count;@Overridepublic String toString() {return JSON.toJSONString(this);}
}
  • 商品和订单关联类
package org.datastreamapi.operator.window.bean;import java.math.BigDecimal;import com.alibaba.fastjson.JSON;import lombok.Data;/*** @author alanchan**/
// 商品类(商品id,商品名称,商品价格)
// 订单明细类(订单id,商品id,商品数量)
// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
@Data
public class OrderItem {private String goodsId;private String goodsName;private BigDecimal count;private BigDecimal total;@Overridepublic String toString() {return JSON.toJSONString(this);}
}

2)、定义商品和订单数据源

  • 商品数据源
package org.datastreamapi.operator.window.source;import java.util.concurrent.TimeUnit;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.datastreamapi.operator.window.bean.Goods;/*** @author alanchan**/
public class GoodsSource extends RichSourceFunction<Goods> {private Boolean isCancel;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods.GOODSLIST.stream().forEach(goods -> sourceContext.collect(goods));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}
}
  • 订单数据源
package org.datastreamapi.operator.window.source;import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;/*** @author alanchan**/
public class OrderSource extends RichSourceFunction<Order>{private Boolean isCancel;private Random r;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;r = new Random();}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods goods = Goods.randomGoods();Order order = new Order();order.setGoodsId(goods.getGoodsId());order.setCount(r.nextInt(10) + 1);order.setItemId(UUID.randomUUID().toString());sourceContext.collect(order);// 模拟一个订单中有多个商品order.setGoodsId("10");sourceContext.collect(order);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}
}

3)、interval join 实现方式二

/*** @author alanchan*/
package org.datastreamapi.operator.window;import java.math.BigDecimal;
import java.time.Duration;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;
import org.datastreamapi.operator.window.bean.OrderItem;
import org.datastreamapi.operator.window.source.GoodsSource;
import org.datastreamapi.operator.window.source.OrderSource;/*** @author alanchan**/
public class TestIntervalJoinDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 1.source// 商品数据流DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());// 订单数据流DataStreamSource<Order> orderDS = env.addSource(new OrderSource());// 给数据添加水印(直接使用系统时间作为事件时间)// 方式一SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间.withTimestampAssigner((element, timestamp) -> System.currentTimeMillis()));SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Goods>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间.withTimestampAssigner((element, timestamp) -> System.currentTimeMillis()));// 2.transformation// 商品类(商品id,商品名称,商品价格)// 订单明细类(订单id,商品id,商品数量)// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)// 代码示例
//				orangeStream
//			    .keyBy(<KeySelector>)
//			    .intervalJoin(greenStream.keyBy(<KeySelector>))
//			    .between(Time.milliseconds(-2), Time.milliseconds(1))
//			    .process (new ProcessJoinFunction<Integer, Integer, String(){
//			 
//			        @Override
//			        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
//			            out.collect(first + "," + second);
//			        }
//			    });DataStream<OrderItem> resultDS = goodsDSWithWatermark.keyBy(goods -> goods.getGoodsId())// join的条件:// 条件1.id要相等// 条件2. Order的时间戳 - 2 <=Goods的时间戳 <= Order的时间戳 + 1.intervalJoin(orderDSWithWatermark.keyBy(orderItem -> orderItem.getGoodsId())).between(Time.seconds(-2), Time.seconds(1)).process(new ProcessJoinFunction<Goods, Order, OrderItem>() {@Overridepublic void processElement(Goods first, Order second, Context ctx, Collector<OrderItem> out) throws Exception {OrderItem orderItem = new OrderItem();orderItem.setGoodsId(first.getGoodsId());orderItem.setGoodsName(first.getGoodsName());orderItem.setCount(new BigDecimal(second.getCount()));orderItem.setTotal(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));out.collect(orderItem);}});// 3.sinkresultDS.print();// 4.executeenv.execute();}}

4)、interval join 实现方式二

  • GoodsWatermark
package org.datastreamapi.operator.window.watermark;import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.datastreamapi.operator.window.bean.Goods;/*** @author alanchan* 使用系统时间构建水印分配器*/
public class GoodsWatermark implements WatermarkStrategy<Goods> {@Overridepublic TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Goods>() {@Overridepublic void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}
  • OrderWatermark
package org.datastreamapi.operator.window.watermark;import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.datastreamapi.operator.window.bean.Order;/*** @author alanchan* 使用系统时间构建水印分配器*/
public class OrderWatermark implements WatermarkStrategy<Order> {@Overridepublic TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Order>() {@Overridepublic void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}
}
  • interval Join实现
package org.datastreamapi.operator.window;import java.math.BigDecimal;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;
import org.datastreamapi.operator.window.bean.OrderItem;
import org.datastreamapi.operator.window.source.GoodsSource;
import org.datastreamapi.operator.window.source.OrderSource;
import org.datastreamapi.operator.window.watermark.GoodsWatermark;
import org.datastreamapi.operator.window.watermark.OrderWatermark;/*** @author alanchan**/
public class TestIntervalJoinDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 1.source// 商品数据流DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());// 订单数据流DataStreamSource<Order> orderDS = env.addSource(new OrderSource());// 给数据添加水印(直接使用系统时间作为事件时间)// 方式二SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(new GoodsWatermark());SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(new OrderWatermark());// 2.transformation// 商品类(商品id,商品名称,商品价格)// 订单明细类(订单id,商品id,商品数量)// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)// 代码示例
//				orangeStream
//			    .keyBy(<KeySelector>)
//			    .intervalJoin(greenStream.keyBy(<KeySelector>))
//			    .between(Time.milliseconds(-2), Time.milliseconds(1))
//			    .process (new ProcessJoinFunction<Integer, Integer, String(){
//			 
//			        @Override
//			        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
//			            out.collect(first + "," + second);
//			        }
//			    });DataStream<OrderItem> resultDS = goodsDSWithWatermark.keyBy(goods -> goods.getGoodsId())// join的条件:// 条件1.id要相等// 条件2. Order的时间戳 - 2 <=Goods的时间戳 <= Order的时间戳 + 1.intervalJoin(orderDSWithWatermark.keyBy(orderItem -> orderItem.getGoodsId())).between(Time.seconds(-2), Time.seconds(1)).process(new ProcessJoinFunction<Goods, Order, OrderItem>() {@Overridepublic void processElement(Goods first, Order second, Context ctx, Collector<OrderItem> out) throws Exception {OrderItem orderItem = new OrderItem();orderItem.setGoodsId(first.getGoodsId());orderItem.setGoodsName(first.getGoodsName());orderItem.setCount(new BigDecimal(second.getCount()));orderItem.setTotal(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));out.collect(orderItem);}});// 3.sinkresultDS.print();// 4.executeenv.execute();}}

5)、运行结果

Interval Join实现方式有2种,但运行结果类似,因为数据都是随机产生的,下述结果供参考。

5> {"count":3,"goodsId":"3","goodsName":"MacBookPro","total":45000}
1> {"count":6,"goodsId":"4","goodsName":"iphone13","total":48000}
5> {"count":3,"goodsId":"3","goodsName":"MacBookPro","total":45000}
1> {"count":6,"goodsId":"4","goodsName":"iphone13","total":48000}
7> {"count":6,"goodsId":"1","goodsName":"iphone11","total":36000}
7> {"count":6,"goodsId":"1","goodsName":"iphone11","total":36000}
5> {"count":3,"goodsId":"3","goodsName":"MacBookPro","total":45000}
1> {"count":3,"goodsId":"4","goodsName":"iphone13","total":24000}
5> {"count":3,"goodsId":"3","goodsName":"MacBookPro","total":45000}
7> {"count":2,"goodsId":"1","goodsName":"iphone11","total":12000}
1> {"count":3,"goodsId":"4","goodsName":"iphone13","total":24000}
7> {"count":2,"goodsId":"1","goodsName":"iphone11","total":12000}
7> {"count":2,"goodsId":"1","goodsName":"iphone11","total":12000}
7> {"count":2,"goodsId":"1","goodsName":"iphone11","total":12000}
7> {"count":8,"goodsId":"1","goodsName":"iphone11","total":48000}
4> {"count":10,"goodsId":"2","goodsName":"iphone12","total":70000}
7> {"count":8,"goodsId":"1","goodsName":"iphone11","total":48000}

四、Rebalance 示例

主要用于解决数据倾斜的情况。数据倾斜不一定时刻发生,验证的时候结果不一定能很明显。

1、实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author alanchan * * 数据倾斜,出现这种情况比较好的解决方案就是rebalance(内部使用round robin方法将数据均匀打散)*/
public class TestRebalanceDemo {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<Long> longDS = env.fromSequence(0, 10000);// 下面的操作相当于将数据随机分配一下,有可能出现数据倾斜DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long num) throws Exception {return num > 10;}});// transformation// 没有经过rebalance有可能出现数据倾斜SingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = filterDS.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long value) throws Exception {int subTaskId = getRuntimeContext().getIndexOfThisSubtask();// 子任务id/分区编号return new Tuple2(subTaskId, 1);}// 按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素}).keyBy(t -> t.f0).sum(1);// 调用了rebalance解决了数据倾斜SingleOutputStreamOperator<Tuple2<Integer, Integer>> result2 = filterDS.rebalance().map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long value) throws Exception {int subTaskId = getRuntimeContext().getIndexOfThisSubtask();// 子任务id/分区编号return new Tuple2(subTaskId, 1);}}).keyBy(t -> t.f0).sum(1);// sinkresult1.print("result1");result2.print("result2");// executeenv.execute();}
}

2、验证

好像不太明显,从结果来看。

result1:3> (6,625)
result1:11> (1,625)
result1:2> (8,625)
result1:12> (0,625)
result1:7> (9,625)
result1:15> (3,615)
result1:1> (4,625)
result1:4> (14,625)
result1:7> (12,625)
result1:15> (7,625)
result1:1> (13,625)
result1:16> (2,625)
result1:13> (11,625)
result1:9> (10,625)
result1:16> (5,625)
result1:9> (15,625)
result2:3> (6,625)
result2:2> (8,626)
result2:9> (10,623)
result2:9> (15,624)
result2:15> (3,623)
result2:15> (7,624)
result2:11> (1,624)
result2:4> (14,625)
result2:16> (2,623)
result2:16> (5,625)
result2:13> (11,626)
result2:1> (4,623)
result2:1> (13,625)
result2:12> (0,624)
result2:7> (9,626)
result2:7> (12,624)

五、物理分区

Flink 也提供以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。
在这里插入图片描述

1、自定义分区

DataStream → DataStream
使用用户定义的 Partitioner 为每个元素选择目标任务。

dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);

2、随机分区

DataStream → DataStream
将元素随机地均匀划分到分区。

Java
dataStream.shuffle();

3、Rescaling

DataStream → DataStream
将元素以 Round-robin 轮询的方式分发到下游算子。如果你想实现数据管道,这将很有用,例如,想将数据源多个并发实例的数据分发到多个下游 map 来实现负载分配,但又不想像 rebalance() 那样引起完全重新平衡。该算子将只会到本地数据传输而不是网络数据传输,这取决于其它配置值,例如 TaskManager 的 slot 数量。

上游算子将元素发往哪些下游的算子实例集合同时取决于上游和下游算子的并行度。例如,如果上游算子并行度为 2,下游算子的并发度为 6, 那么上游算子的其中一个并行实例将数据分发到下游算子的三个并行实例, 另外一个上游算子的并行实例则将数据分发到下游算子的另外三个并行实例中。再如,当下游算子的并行度为2,而上游算子的并行度为 6 的时候,那么上游算子中的三个并行实例将会分发数据至下游算子的其中一个并行实例,而另外三个上游算子的并行实例则将数据分发至另下游算子的另外一个并行实例。

当算子的并行度不是彼此的倍数时,一个或多个下游算子将从上游算子获取到不同数量的输入。

请参阅下图来可视化地感知上述示例中的连接模式:
在这里插入图片描述

dataStream.rescale();

4、广播

DataStream → DataStream #
将元素广播到每个分区 。

dataStream.broadcast();

5、具体示例1

1)、测试文件数据

i am alanchan
i like flink
and you ?

2)、实现代码

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestPartitionDemo {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> linesDS = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}}).setMaxParallelism(4);// transformationDataStream<Tuple2<String, Integer>> result1 = tupleDS.global();// 全部发往第一个taskDataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast();// 广播DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward();// 上下游并发度一样时一对一发送DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle();// 随机均匀发送DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance();// 再平衡DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale();// 本地再平衡DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new AlanPartitioner(), t -> t.f0);// 自定义分区// sink
//		result1.print("result1");
//		result2.print("result2");
//		result3.print("result3");
//		result4.print("result4");
//		result5.print("result5");
//		result6.print("result6");result7.print("result7");// executeenv.execute();}private static class AlanPartitioner implements Partitioner<String> {// 分区逻辑@Overridepublic int partition(String key, int numPartitions) {int part = 0;switch (key) {case "i":part = 1;break;case "and":part = 2;break;default:part = 0;break;}return part;}}
}

3)、验证

本示例验证可能比较麻烦,以下数据是基于本应用程序运行结果。

# 1、global,全部发往第一个task
result1:1> (i,1)
result1:1> (am,1)
result1:1> (alanchan,1)
result1:1> (i,1)
result1:1> (like,1)
result1:1> (flink,1)
result1:1> (and,1)
result1:1> (you,1)
result1:1> (?,1)# 2、broadcast,广播,运行结果较长,下面不列出了# 3、forward,上下游并发度一样时一对一发送
result3:16> (i,1)
result3:9> (and,1)
result3:4> (i,1)
result3:16> (am,1)
result3:4> (like,1)
result3:16> (alanchan,1)
result3:9> (you,1)
result3:9> (?,1)
result3:4> (flink,1)# 4、shuffle,随机均匀发送
result4:7> (alanchan,1)
result4:7> (flink,1)
result4:7> (?,1)
result4:14> (i,1)
result4:14> (i,1)
result4:14> (and,1)
result4:16> (am,1)
result4:16> (like,1)
result4:16> (you,1)# 5、rebalance,上面有示例展示过
result5:6> (and,1)
result5:4> (flink,1)
result5:8> (?,1)
result5:2> (i,1)
result5:3> (like,1)
result5:9> (i,1)
result5:7> (you,1)
result5:10> (am,1)
result5:11> (alanchan,1)# 6、rescale,本地再平衡运行结果如下,由于数据量较少,效果不明显
result6:1> (i,1)
result6:1> (like,1)
result6:1> (flink,1)
result6:6> (and,1)
result6:6> (you,1)
result6:6> (?,1)
result6:13> (i,1)
result6:13> (am,1)
result6:13> (alanchan,1)# 7、自定义分区,可见是按照i和and进行了分区,总共有三个分区,i都分在了第二个分区,and分在了第三个分区,其他的都分在了1个分区
result7:2> (i,1)
result7:2> (i,1)
result7:3> (and,1)
result7:1> (like,1)
result7:1> (flink,1)
result7:1> (am,1)
result7:1> (alanchan,1)
result7:1> (you,1)
result7:1> (?,1)

6、具体示例2

import java.util.Arrays;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestPartitionDemo2 {// 构造User数据源public static DataStreamSource<User> source(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}// 数据分区示例public static void mapPartitionFunction6(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);DataStream<User> userTemp = source.map(user -> {User user2 = user;user2.setAge(user.getAge() + 5);return user2;}).returns(User.class);//			public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
//				return setConnectionType(new CustomPartitionerWrapper<>(clean(partitioner),
//						clean(keySelector)));
//			}DataStream<User> sink = userTemp.partitionCustom(new Partitioner<Integer>() {public int partition(Integer key, int numPartitions) {System.out.println("分区数:" + numPartitions);if (key < 20)numPartitions = 0;else if (key >= 20 && key < 30)numPartitions = 1;else if (key >= 0)numPartitions = 2;System.out.println("分区数2:" + numPartitions);return numPartitions;}}, new KeySelector<User, Integer>() {@Overridepublic Integer getKey(User value) throws Exception {return value.getAge();}});sink.map((MapFunction<User, User>) user -> {System.out.println("当前线程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());return user;}).returns(User.class);
//			System.out.println("并行数:" + sink.getParallelism());// 输出结果,3个区,按照年龄分的
//			当前线程ID:138,user:User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=1500.0)
//			当前线程ID:136,user:User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=1000.0)
//			当前线程ID:138,user:User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=400.0)
//			当前线程ID:140,user:User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=200.0)
//			当前线程ID:140,user:User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=500.0)sink.print();}// lambda数据分区示例public static void mapPartitionFunction7(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);DataStream<User> userTemp = source.map(user -> {User user2 = user;user2.setAge(user.getAge() + 5);return user2;}).returns(User.class);DataStream<User> sink = userTemp.partitionCustom((key, numPartitions) -> {if (key < 20)numPartitions = 0;else if (key >= 20 && key < 30)numPartitions = 1;else if (key >= 0)numPartitions = 2;return numPartitions;}, user -> user.getAge());sink.print();}// 按照用户id的奇数和偶数进行分区,如果id=1是单独分区public static void mapPartitionFunction8(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);DataStream<User> sink = source.partitionCustom(new CusPartitioner(), user -> user.getId());// 示例分区过程,输出结果如下
//			1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//			当前线程ID:90,user:User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
//			当前线程ID:89,user:User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)
//			2> User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)
//			当前线程ID:88,user:User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//			当前线程ID:89,user:User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)
//			1> User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)
//			3> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
//			当前线程ID:88,user:User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)
//			2> User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)sink.map((MapFunction<User, User>) user -> {System.out.println("当前线程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());return user;}).returns(User.class);sink.print();}public static class CusPartitioner implements Partitioner<Integer> {@Overridepublic int partition(Integer key, int numPartitions) {if (key == 1)numPartitions = 2;else if (key % 2 == 0) {numPartitions = 0;} else {numPartitions = 1;}return numPartitions;}}/*** @param args* @throws Exception */public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source// transformationmapPartitionFunction8(env);// sink// executeenv.execute();}}

以上,本文主要介绍Flink 的常用的operator window join 和interval join 数据倾斜处理、分区介绍及详细示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为四篇文章介绍,即
【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(1)- window join
【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(2)- interval join
【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(3)- 数据倾斜处理、分区示例
【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例-完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/213275.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

行人重识别paper汇总

文章目录 2021Learning Generalisable Omni-Scale Representations for Person Re-Identification 参考 2021 Learning Generalisable Omni-Scale Representations for Person Re-Identification code: https://github.com/KaiyangZhou/deep-person-reid 摘要&#xff1a;一…

GLAB | CCNA+HCIA=融合课-最新开课通知

敲重点! 12月17日 CCNAHCIA 周日开课啦&#xff01; CCNA&#xff08;Cisco Certified Network Associate&#xff09;认证是Cisco售后工程师认证体系的入门认证&#xff0c;也是Cisco各项认证中级别最低的技术认证通过CCNA认证可证明你已掌握网络的基本知识&#xff0c;并能…

TypeScript入门实战笔记 -- 01 如何快速搭建 TypeScript 学习开发环境?

&#x1f34d;IDE for TypeScript 在搭建 TypeScript 环境之前&#xff0c;我们需要先认识几款适合 TypeScript 的 IDE。只有这样&#xff0c;在开发时我们才能根据实际情况选择合适的 IDE 进行安装&#xff0c;从而提升工作效率。 VS Code Visual Studio Code&#xff08;VS C…

public static final

常量对象中声明并赋值的正确写法一&#xff1a; public class RoleConstant {public static final List<String> PROBLEM_VIEW_ALL_DATA new ArrayList<String>() {{add("角色1");add("角色2");add("角色3");}};}常量对象中声明并…

设计模式篇---代理模式

文章目录 概念结构实例静态代理动态代理 总结 概念 代理模式&#xff1a;给某一个对象提供一个代理或占位符&#xff0c;并由代理对象来控制对原对象的访问。 比如我们想从其他国家买东西&#xff0c;但我们无法直接联系外国的商家&#xff0c;可以找代理商&#xff0c;让他们…

dell r720远程网络安装ubuntu20.04(无U盘)

登陆后界面&#xff0c;在主界面上&#xff0c;我们就可以看到各个硬件组件的状态。在快速启动任务栏中&#xff0c;可以对系统电源进行操作&#xff0c;如开机、关机等。安装操作系统&#xff0c;在虚拟控制台预览处点击>启动 按照浏览器出现的提示确定安装控件等&#x…

NSSCTF web刷题记录7

文章目录 [SDCTF 2022]CURL Up and Read [SDCTF 2022]CURL Up and Read 考点&#xff1a;SSRF 打开题目发现是curl命令&#xff0c;提示填入url 尝试http://www.baidu.com&#xff0c;成功跳转 将url的字符串拿去解码&#xff0c;得到json格式数据 读取下环境变量&#xff0c…

基础宠物商店管理系统(Java)大一程序设计

一.开发环境 Windows 11 -- JDK 21 -- IDEA 2021.3.3 二.需求 三.代码部分 //创建一个宠物类&#xff0c;被另外两类继承public class Pet {private String name;private int age;private String gender;private double cost0;//买进价格private double sellprice0;//卖出价…

【Spring】Spring统一功能处理

Spring统一功能处理 拦截器拦截器什么是拦截器拦截器的基本使用定义拦截器注册配置拦截器 拦截器详解拦截器的拦截路径配置拦截器实现原理初始化处理请求 适配器模式 统一数据返回格式统一数据返回格式快速入门 统一异常处理 拦截器 场景: 我们要对一个网站实现强制登陆的功能…

ChibiOS简介2/5

ChibiOS简介2/5 1. 源由2. ChibiOS基础知识2/52.4 Chapter 4 - ChibiOS General Architecture2.4.1 The Big Picture&#xff08;总体框图&#xff09;2.4.2 Embedded Components&#xff08;嵌入式组件&#xff09;2.4.3 Application Model&#xff08;应用模型&#xff09;2.…

爬虫解析——Xpath的安装及使用(五)

目录 一、Xpath插件的安装 二、安装 lxml 三、Xpath解析文件 1.解析本地文件 &#xff08;1&#xff09;导入本地文件 &#xff08;2&#xff09;解析本地文件 2.服务器文件解析 &#xff08;1&#xff09;获取网页源码 &#xff08;2&#xff09;解析服务器响应文件 …

TailwindCSS 如何处理RTL布局模式

背景 TikTok作为目前全世界最受欢迎的APP&#xff0c;需要考虑兼容全世界各个地区的本地化语言和阅读习惯。其中对于阿拉伯语、波斯语等语言的阅读书写习惯是从右向左的&#xff0c;在前端有一个专有名字RTL模式&#xff0c;即Right-to-Left。 其中以阿拉伯语作为第一语言的人…

建立个人学习观|地铁上的自习室

作者&#xff1a;向知 如果大家有机会来北京&#xff0c;可以来看看工作日早上八九点钟&#xff0c;15 号线从那座叫“顺义”的城市通向“望京”的地铁&#xff0c;你在那上面&#xff0c;能看到明明白白的&#xff0c;人们奔向梦想的模样。 一、地铁上的自习室 我在来北京之前…

【算法集训】基础数据结构:三、链表

链表就是将所有数据都用一个链子串起来&#xff0c;其中链表也有多种形式&#xff0c;包含单向链表、双向链表等&#xff1b; 现在毕竟还是基础阶段&#xff0c;就先学习单链表吧&#xff1b; 链表用头结点head表示一整个链表&#xff0c;每个链表的节点包含当前节点的值val和下…

2024 年顶级的 Android 系统修复软件与方法

您是否正在寻找可以修复 PC 上 Android 操作系统的工具&#xff1f;这是我们精选的最好的 Android 系统修复软件&#xff01; Android 是世界著名的智能手机操作系统。全世界有数百万人使用这个操作系统&#xff0c;这使得它安全可靠。然而&#xff0c;这仍然不能使它完美无缺…

048:利用vue-video-player播放m3u8

第048个 查看专栏目录: VUE ------ element UI 专栏目标 在vue和element UI联合技术栈的操控下&#xff0c;本专栏提供行之有效的源代码示例和信息点介绍&#xff0c;做到灵活运用。 &#xff08;1&#xff09;提供vue2的一些基本操作&#xff1a;安装、引用&#xff0c;模板使…

MyBatis进阶之分页和延迟加载

文章目录 分页1. RowBounds 分页2. PageHelper 分页3. PageInfo 对象属性描述 延迟加载立即加载激进式延迟加载真-延迟加载 分页 Mybatis 中实现分页功能有 3 种途径&#xff1a; RowBounds 分页&#xff08;不建议使用&#xff09;Example 分页&#xff08;简单情况可用)Pag…

关于对向量检索研究的一些学习资料整理

官方学习资料 主要是的学习资料是&#xff0c; 官方文档 和官方博客。相关文章还是挺多 挺不错的 他们更新也比较及时。有最新的东西 都会更新出来。es scdn官方博客 这里简单列一些&#xff0c;还有一些其他的&#xff0c;大家自己感兴趣去看。 什么是向量数据库 Elasticse…

文件加密软件哪个最好用 好用的文件加密软件推荐

一说到文件加密软件&#xff0c;可能大家都会去搜一些不知名的软件来&#xff0c;但是选择这种加密软件&#xff0c;最好还是要看一些资质的。 资质不好的&#xff0c;可能加密过后你自己也打不开文件&#xff0c;&#xff08;ps&#xff1a;我自己就遇到过这种情况&#xff09…

基于Java SSM框架高校校园点餐订餐系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架高校校园点餐订餐系统演示 摘要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;管理工作的重要性已逐渐被人们所认识&a…