Flink中KeyBy、分区、分组的正确理解

1.Flink中的KeyBy

在Flink中,KeyBy作为我们常用的一个聚合类型算子,它可以按照相同的Key对数据进行重新分区,分区之后分配到对应的子任务当中去。
源码解析
keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为 KeyedStream(键控流),KeyedStream 可以认为是“分区流”或者“键控流”,它是对 DataStream 按照 key 的一个逻辑分区。
所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 的类型。
在这里插入图片描述
KeyBy是如何实现分区的呢

Flink中的KeyBy底层其实就是通过Hash实现的,通过对Key的值进行Hash,再做一次murmurHash,取模运算。
再通过Job的并行度,就能获取每个Key应该分配到那个子任务中了。

在这里插入图片描述

2.分组和分区在Flink中的区别

分区:分区(Partitioning)是将数据流划分为多个子集,这些子集可以在不同的任务实例上进行处理,以实现数据的并行处理。
数据具体去往哪个分区,是通过指定的 key 值先进行一次 hash 再进行一次 murmurHash,通过上述计算得到的值再与并行度进行相应的计算得到。
分组:分组(Grouping)是将具有相同键值的数据元素归类到一起,以便进行后续操作(如聚合、窗口计算等)。
key值相同的数据将进入同一个分组中。
注意:数据如果具有相同的key将一定去往同一个分组和分区,但是同一分区中的数据不一定属于同一组。

3.代码示例

package com.flink.DataStream.Aggregation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkKeyByDemo {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1streamExecutionEnvironment.setParallelism(1);//设置执行模式为批处理streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);//TODO source 从集合中创建数据源DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.fromElements("hello word", "hello flink");//TODO 方式一 匿名实现类SingleOutputStreamOperator<Tuple2<String, Integer>> outputStreamOperator1 = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {String[] s1 = s.split(" ");for (String word : s1) {collector.collect(word);}}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {Tuple2<String, Integer> aa = Tuple2.of(s, 1);return aa;}})/*** keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为 KeyedStream(键控流)* KeyedStream 可以认为是“分区流”或者“键控流”,它是对 DataStream 按照 key 的一个逻辑分区* 所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 的类型。* *//*** 分组和分区在Flink 中具有不同的含义和作用:* 分区:分区(Partitioning)是将数据流划分为多个子集,这些子集可以在不同的任务实例上进行处理,以实现数据的并行处理。*      数据具体去往哪个分区,是通过指定的 key 值先进行一次 hash 再进行一次 murmurHash,通过上述计算得到的值再与并行度进行相应的计算得到。* 分组:分组(Grouping)是将具有相同键值的数据元素归类到一起,以便进行后续操作 (如聚合、窗口计算等)。*      key 值相同的数据将进入同一个分组中。* 注意:数据如果具有相同的key将一定去往同一个分组和分区,但是同一分区中的数据不一定属于同一组。* */.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}}).sum(1);//TODO 方式二 Lamda表达式实现SingleOutputStreamOperator<Tuple2<String, Integer>> outputStreamOperator2 = dataStreamSource.flatMap((String s, Collector<String> collector) -> {String[] s1 = s.split(" ");for (String word : s1) {collector.collect(word);}}).returns(Types.STRING).map((String word) -> {return Tuple2.of(word, 1);})//Java中lamda表达式存在类型擦除.returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2<String, Integer> s) -> {return s.f0;}).sum(1);//TODO sinkoutputStreamOperator1.print("方式一");outputStreamOperator2.print("方式二");//TODO 执行streamExecutionEnvironment.execute("Flink KeyBy Demo");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/106075.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

同源策略和跨域问题

1.跨域问题产生的原因 浏览器的同源策略影响&#xff0c;同源策略是一种安全机制&#xff0c;它限制了一个网页中的脚本只能访问同源的资源。 跨源网络访问的三种方式&#xff1a;跨域写操作&#xff0c;跨域资源嵌入&#xff0c;跨域读操作 2.跨域问题案例 ip和域名不一致…

理解位运算,左移、右移、与、或、非

位运算是对二进制数的操作&#xff0c;包括左移、右移、&&#xff08;与&#xff09;、|&#xff08;或&#xff09;、^(异或)等操作。 左移 左移就是将当前的二进制数&#xff0c;整体向左移动N个单位&#xff0c;例如整数32的二级制表达为100000&#xff0c;将这个二进制…

微信小程序底部tabBar不显示图标

现场还原 在设置微信小程序底部tabBar导航图标时&#xff0c;无论如何操作均无法显示在界面上 解决思路 问题1 图标类型 一开始以为不支持png类型&#xff0c;但查看官方API仅提示ICON尺寸大小 打开其他项目可以正常展示&#xff0c;排除图标类型问题 iconPath string 否 …

【算法学习】-【滑动窗口】-【找到字符串中所有字母异位词】

LeetCode原题链接&#xff1a;438. 找到字符串中所有字母异位词 下面是题目描述&#xff1a; 给定两个字符串 s 和 p&#xff0c;找到 s 中所有 p 的 异位词 的子串&#xff0c;返回这些子串的起始索引。不考虑答案输出的顺序。 异位词 指由相同字母重排列形成的字符串&…

纯前端js中使用sheetjs导出excel,并且合并标题

先定义变量----用的是Vue2 &#xff0c;以下在vue的data&#xff1a;{}中定义--------------//空格占位符 headerTopTitle: [患者信息, , , , , , , , , 入出院信息, , , , , , , 病案首页中的出院主要诊断, ,出院其他诊断&#xff08;病案首页中原始信息&#xff09;, , , , ,…

一些常见的必须会的谭浩强基本代码大全也是常考的应试是没问题的

//1. 1£¡+2£¡+3£¡+...20! /* #include <stdio.h> int main() {int i;long sum=0,k=1;for(i=1;i<=20;i++){k*=i;sum+=k;}printf("%d",sum); } *///方法2 /* #include <stdio.h> int main() {int i,j;long sum=0,k;for(i…

UE4和C++ 开发-常用的宏(二)UPROPERTY(类似于Unity中C#的特性[SerializeField])

UPROPERTY的作用类似于Unity中C#的特性[SerializeField]或者Godot中的export。目的就是通过反射把属性暴露在蓝图或实例的细节面板。 属性说明符&#xff08;Property Specifiers&#xff09;

unity2022版本 实现手机虚拟操作杆

简介 在许多移动游戏中&#xff0c;虚拟操纵杆是一个重要的用户界面元素&#xff0c;用于控制角色或物体的移动。本文将介绍如何在Unity中实现虚拟操纵杆&#xff0c;提供了一段用于移动控制的代码。我们将讨论不同类型的虚拟操纵杆&#xff0c;如固定和跟随&#xff0c;以及如…

源码解析FlinkKafkaConsumer支持周期性水位线发送

背景 当flink消费kafka的消息时&#xff0c;我们经常会用到FlinkKafkaConsumer进行水位线的发送&#xff0c;本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程 FlinkKafkaConsumer水位线发送 1.首先从Fetcher类开始&#xff0c…

进阶JAVA篇- DateTimeFormatter 类与 Period 类、Duration类的常用API(八)

目录 1.0 DateTimeFormatter 类的说明 1.1 如何创建格式化器的对象呢&#xff1f; 1.2 DateTimeFormatter 类中的 format&#xff08;LocalDateTime ldt&#xff09; 实例方法 2.0 Period 类的说明 2.1 Period 类中的 between(localDate1,localDate2) 静态方法来创建对象。 3.…

京东优惠券怎么找?

京东优惠券怎么找&#xff1f; 1、手机安装「草柴」后&#xff0c;打开京东挑选要购买的商品&#xff1b; 2、挑选好京东商品后&#xff0c;点击右上角的「分享」&#xff0c;并点击「复制链接」&#xff1b; 3、将复制的京东商品链接&#xff0c;粘贴到草柴输入框&#xff0c…

antd pro form 数组套数组 form数组动态赋值 shouldUpdate 使用

antd form中数组套数组 form数组动态变化 动态赋值 需求如上&#xff0c;同时添加多个产品&#xff0c;同时每个产品可以增加多台设备&#xff0c;根据设备增加相应编号&#xff0c;所以存在数组套数组&#xff0c;根据数组值动态变化 使用的知识点 form.list form中的数组…

十六、代码校验(5)

本章概要 基准测试 微基准测试JMH 的引入 基准测试 我们应该忘掉微小的效率提升&#xff0c;说的就是这些 97% 的时间做的事&#xff1a;过早的优化是万恶之源。—— Donald Knuth 如果你发现自己正在过早优化的滑坡上&#xff0c;你可能浪费了几个月的时间(如果你雄心勃勃的…

Vitis导入自制IP导致无法构建Platform

怎么还有这种问题&#xff08; 解决Vitis导入自制IP导致无法构建Platform – TaterLi 个人博客 Vitis报错&#xff1a;fatal error: xxx.h: No such file or directory._ly2lj的博客-CSDN博客 在指定位置黏入以上代码即可&#xff1a; INCLUDEFILES$(wildcard *.h) LIBSOUR…

基于springboot养老院管理系统开题报告

一、项目简介 本项目是一款基于Spring Boot的养老院管理系统&#xff0c;旨在为养老院提供一个全方位的解决方案&#xff0c;包括老人信息管理、医疗服务管理、饮食管理、活动管理等模块。通过系统的数字化管理&#xff0c;将老人的生活照料和医学护理更加科学化、规范化&…

【AI视野·今日Robot 机器人论文速览 第五十四期】Fri, 13 Oct 2023

AI视野今日CS.Robotics 机器人学论文速览 Fri, 13 Oct 2023 Totally 45 papers &#x1f449;上期速览✈更多精彩请移步主页 Interesting: &#x1f4da;AI与机器人安全, 从攻击界面、伦理法律和人机交互层面进行了论述。(from 密西西比大学) &#x1f4da;机器人与图机器学…

Centos Docker部署Redis集群三主三从

一、安装Docker yum install docker-engine 二、编辑节点配置文件 创建文件夹 cd /home # 节点一&#xff1a;6370端口 mkdir -p redis-cluster/redis-6370/conf mkdir -p redis-cluster/redis-6370/data # 节点二&#xff1a;6371端口 mkdir -p redis-cluster/redis-6371/co…

华为云云耀云服务器L实例评测|企业项目最佳实践之建议与总结(十二)

华为云云耀云服务器L实例评测&#xff5c;企业项目最佳实践系列&#xff1a; 华为云云耀云服务器L实例评测&#xff5c;企业项目最佳实践之云服务器介绍(一) 华为云云耀云服务器L实例评测&#xff5c;企业项目最佳实践之华为云介绍(二) 华为云云耀云服务器L实例评测&#xff5…

Java基础面试-接口和抽象类的区别

抽象类 详细描述 类和类之间具有共同特征&#xff0c;将这些共同特征抽取出来&#xff0c;形成的就是抽象类。如果一个类没有足够的信息来描述一个具体的对象&#xff0c;这个类就是抽象类。因为类本身就是不存在的&#xff0c;所以抽象类无法创建对象&#xff0c;也就是无法…

2021年12月 Python(二级)真题解析#中国电子学会#全国青少年软件编程等级考试

Python编程&#xff08;1~6级&#xff09;全部真题・点这里 C/C编程&#xff08;1~8级&#xff09;全部真题・点这里 一、单选题&#xff08;共25题&#xff0c;每题2分&#xff0c;共50分&#xff09; 第1题 执行以下程序 a[33,55,22,77] a.sort() for i in a:print(i)运行…