Flink keyby 数据倾斜问题处理

上一篇我们使用keyby后发现数据严重倾斜

https://datamining.blog.csdn.net/article/details/105316728

大概看下问题所在,大量数据在一个subtask中运行

这里我们使用两阶段keyby 解决该问题

之前的问题如下图所示

我们期望的是

但我们的需要根据key进行聚合统计,那么把相同的key放在不同的subtask如何统计?

我们看下图(只画了主要部分)

1.首先将key打散,我们加入将key转化为 key-随机数 ,保证数据散列

2.对打散后的数据进行聚合统计,这时我们会得到数据比如 : (key1-12,1),(key1-13,19),(key1-1,20),(key2-123,11),(key2-123,10)

3.将散列key还原成我们之前传入的key,这时我们的到数据是聚合统计后的结果,不是最初的原数据

4.二次keyby进行结果统计,输出到addSink

直接看实现代码

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorobject ProcessFunctionScalaV2 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(2000)val stream: DataStream[String] = env.socketTextStream("localhost", 9999)val typeAndData: DataStream[(String, Long)] = stream.map(x => (x.split(",")(0), x.split(",")(1).toLong))val dataStream: DataStream[(String, Long)] = typeAndData.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new CountAggregate())keyByAgg.print("第一次keyby输出")val result: DataStream[DataJast] = keyByAgg.map(data => {val newKey: String = data.key.substring(0, data.key.indexOf("-"))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())result.print("第二次keyby输出")env.execute()}case class DataJast(key :String,count:Long)//计算keyby后,每个Window中的数据总和class CountAggregate extends AggregateFunction[(String, Long),DataJast, DataJast] {override def createAccumulator(): DataJast = {println("初始化")DataJast(null,0)}override def add(value: (String, Long), accumulator: DataJast): DataJast = {if(accumulator.key==null){printf("第一次加载,key:%s,value:%d\n",value._1,value._2)DataJast(value._1,value._2)}else{printf("数据累加,key:%s,value:%d\n",value._1,accumulator.count+value._2)DataJast(value._1,accumulator.count + value._2)}}override def getResult(accumulator: DataJast): DataJast = {println("返回结果:"+accumulator)accumulator}override def merge(a: DataJast, b: DataJast): DataJast = {DataJast(a.key,a.count+b.count)}}/*** 实现:*    根据key分类,统计每个key进来的数据量,定期统计数量*/class MyProcessFunction extends  KeyedProcessFunction[String,DataJast,DataJast]{val delayTime : Long = 1000L * 30lazy val valueState:ValueState[Long] = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("ccount",classOf[Long]))override def processElement(value: DataJast, ctx: KeyedProcessFunction[String, DataJast, DataJast]#Context, out: Collector[DataJast]): Unit = {if(valueState.value()==0){valueState.update(value.count)printf("运行task:%s,第一次初始化数量:%s\n",getRuntimeContext.getIndexOfThisSubtask,value.count)val currentTime: Long = ctx.timerService().currentProcessingTime()//注册定时器ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}else{valueState.update(valueState.value()+value.count)printf("运行task:%s,更新统计结果:%s\n" ,getRuntimeContext.getIndexOfThisSubtask,valueState.value())}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, DataJast, DataJast]#OnTimerContext, out: Collector[DataJast]): Unit = {//定时器执行,可加入业务操作printf("运行task:%s,触发定时器,30秒内数据一共,key:%s,value:%s\n",getRuntimeContext.getIndexOfThisSubtask,ctx.getCurrentKey,valueState.value())//定时统计完成,初始化统计数据valueState.update(0)//注册定时器val currentTime: Long = ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}}}

对key进行散列 

 val dataStream: DataStream[(String, Long)] = typeAndData.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))

 设置窗口滚动时间,每隔十秒统计一次每隔key下的数据总量

 val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new AverageAggregate())keyByAgg.print("第一次keyby输出")

还原key,并进行二次keyby,对数据总量进行累加

  val result: DataStream[DataJast] = keyByAgg.map(data => {val newKey: String = data.key.substring(0, data.key.indexOf("-"))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())

 

我们看下优化后的状态

先看下第一map,直接从端口拿数据,这不涉及keyby,所以这个没影响

再看下第一次keyby后的结果,因为我们散列后,flink根据哈希进行分配,所以数据不是百分之百平均,但是很明显基本上已经均衡了,不会出现这里1一条,那里1条这种状况

再看下第二次keyby,这里会发现我们ID的2的subtask有820条数据,其他的没有数据;这里是正常现象,因为我们是对第一次聚合后的数据进行keyby统计,所以这里的数据大小会非常小,比如我们原始数据一条数据有1M大小,1000条数据就1个G,业务往往还有其他操作,我们再第一次keyby 散列时处理其他逻辑(比如ETL等等操作),最终将统计结果输出给第二次keyby,很可能1个G的数据,最终只有1kb,这比我们将1个G的数据放在一个subtask中处理好很多。

上面我们自定义了MyProcessFunction方法,设置每30秒执行一次,实际业务场景,我们可能会设置一小时执行一次。

至此我们既保证了数据定时统计,也保证了数据不倾斜问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509757.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java中正则表达式

package com.test;import java.util.Scanner;public class M1001{public static void main(String[] args) {String input null;Scanner sc new Scanner(System.in);while(sc.hasNext()){inputsc.nextLine();String regex "^1[3|4|5|7|8][0-9]{9}$";//为表达式模板…

读书-时间(反思)

每一天的时间都非常多,下班以后的时间,周末闲暇的时间,但是时间这么多,自己往往不知道该拿它来干什么,每天下完班以后,回到家,打开电脑,打开手机,首先看一看QQ&#xff0…

Kafka 优化参数 unclean.leader.election.enable

Kafka 某个节点挂掉,导致整个服务异常,为了保证服务容灾,可对下面几个参数进行调整 unclean.leader.election.enabletrue min.insync.replicas1 offsets.topic.replication.factor3 这三个配置什么意思呢? 依次来看一下: unclean…

linux中iptables对防火墙的操作

Iptables教程 1. iptables防火墙简介 Iptables也叫netfilter是Linux下自带的一款免费且优秀的基于包过滤的防火墙工具,它的功能十分强大,使用非常灵活,可以对流入、流出、流经服务器的数据包进行精细的控制。iptables是Linux2.4及2.6内核中…

读书--好书

说到好书,我觉得对于每一个人来说,都有自己人生中的一本好书,她的内容能够引起我们的共鸣,认识到自己的不足之处,并且能够指引我们如何去改变它,去战胜它,从而使自己获取进步,得到升…

Web Components入门不完全指北

目前流行的各类前端框架,不管是react, angular还是vue,都有一个共同点,那就是支持组件化开发,但事实上随着浏览器的发展,现在浏览器也原生支持组件式开发,本文将通过介绍Web Components 的三个主要概念&…

Flink 1.9 CDH 6.3 集成

目录 1.下载准备文件 2.felink csa jar包准备 3.将 Flink Parcel放入httpd目录下 4.配置CDH Flink Parcel 5.安装Flink 1.下载准备文件 https://archive.cloudera.com/csa/1.0.0.0/csd/FLINK-1.9.0-csa1.0.0.0-cdh6.3.0.jarhttps://archive.cloudera.com/csa/1.0.0.0/parc…

如何花两年时间面试一个人

http://blog.sina.com.cn/s/blog_4caedc7a0102dycr.html?tj1

ssh免密登陆机制示意图

ssh免密登陆机制示意图

CDH 6.x 安装 Phoenix 服务

最近有个新项目启动,版本升级到6.3,发现CDH6.2 版本已经支持Phoenix parcel安装 一、准备文件 下载 https://archive.cloudera.com/phoenix/6.2.0/csd/PHOENIX-1.0.jar 下载parcel #目录 https://archive.cloudera.com/phoenix/6.2.0/parcels/ #根据…

Kafka 集群数据备份 MirrorMaker 详解

什么是 MirrorMaker? MirrorMaker是Kafka附带的一个用于在Kafka集群之间制作镜像数据的工具。该工具主要动作就是从源集群中消费并生产到目标群集。 一个集群可以启动多个MirrorMaker配置到多个集群 运行 MirrorMaker方法 kafka-mirror-maker.sh --consumer.conf…

面试的态度

关于面试,如果到了面试现场,可能环境并不是自己所希望的那样,但是也不能消极对待,既然自己付出了时间来到这里, 公司对方也安排了时间给予你面试,不管是本着对自己的负责,还是对公司面试人员的…

域名服务的工作流程

域名服务的工作流程

Kafka 消费者组 Rebalance 详解

Rebalance作用 Rebalance 本质上是一种协议,主要作用是为了保证消费者组(Consumer Group)下的所有消费者(Consumer)消费的主体分区达成均衡。 比如:我们有10个分区,当我们有一个消费者时&…

C/C++程序员必须熟悉的开源库

作为一个经验丰富的Linux C/C程序员, 肯定亲手写过各种功能的代码, 比如封装过数据库访问的类, 封装过网络通信的类,封装过日志操作的类, 封装过文件访问的类, 封装过UI界面库等, 也在实际的项目…

CDH kafka JMX 启动

服务正常启动 telnet 127.0.0.1 9393 就可以,直接 telnet ip 9393 就不通 我们查看CDH broker_java_opts 配置项 原内容为 -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 …

java基础之多线程笔记

多线程的优势: 多线程作为一种多任务,并发的工作方式,当然有其存在的优势。 1.线程之间不能共享内存,而线程之间共享内存(堆内存),则很简单。 2.系统创建进程(需要为该进程重新分…

提高C++程序运行效率的10个简单方法

本文以C/C程序为例讲述了程序运行效率的10个简单方法,分享给大家供大家参考之用。具体分析如下: 对于每一个程序员来说,程序的运行效率都是一个值得重视,并为之付出努力的问题。但是程序性能的优化也是一门复杂的学问,…

Kafka JMX 监控 之 jmxtrans + influxdb + grafana

目录 效果图 环境准备 安装 influxdb 安装我们刚刚下载 influxdb rpm文件 查看默认配置 修改参数 启动 influxdb 查看启动状态 设置基本配置 influxdb 其他命令扩展 安装 jmxtrans 可能遇到的异常 验证jmxtrans是否成功运行 安装 Grafana 安装 influxDB 与 Grafa…