Flink keyby 数据倾斜问题处理

上一篇我们使用keyby后发现数据严重倾斜

https://datamining.blog.csdn.net/article/details/105316728

大概看下问题所在,大量数据在一个subtask中运行

这里我们使用两阶段keyby 解决该问题

之前的问题如下图所示

我们期望的是

但我们的需要根据key进行聚合统计,那么把相同的key放在不同的subtask如何统计?

我们看下图(只画了主要部分)

1.首先将key打散,我们加入将key转化为 key-随机数 ,保证数据散列

2.对打散后的数据进行聚合统计,这时我们会得到数据比如 : (key1-12,1),(key1-13,19),(key1-1,20),(key2-123,11),(key2-123,10)

3.将散列key还原成我们之前传入的key,这时我们的到数据是聚合统计后的结果,不是最初的原数据

4.二次keyby进行结果统计,输出到addSink

直接看实现代码

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorobject ProcessFunctionScalaV2 {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(2000)val stream: DataStream[String] = env.socketTextStream("localhost", 9999)val typeAndData: DataStream[(String, Long)] = stream.map(x => (x.split(",")(0), x.split(",")(1).toLong))val dataStream: DataStream[(String, Long)] = typeAndData.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new CountAggregate())keyByAgg.print("第一次keyby输出")val result: DataStream[DataJast] = keyByAgg.map(data => {val newKey: String = data.key.substring(0, data.key.indexOf("-"))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())result.print("第二次keyby输出")env.execute()}case class DataJast(key :String,count:Long)//计算keyby后,每个Window中的数据总和class CountAggregate extends AggregateFunction[(String, Long),DataJast, DataJast] {override def createAccumulator(): DataJast = {println("初始化")DataJast(null,0)}override def add(value: (String, Long), accumulator: DataJast): DataJast = {if(accumulator.key==null){printf("第一次加载,key:%s,value:%d\n",value._1,value._2)DataJast(value._1,value._2)}else{printf("数据累加,key:%s,value:%d\n",value._1,accumulator.count+value._2)DataJast(value._1,accumulator.count + value._2)}}override def getResult(accumulator: DataJast): DataJast = {println("返回结果:"+accumulator)accumulator}override def merge(a: DataJast, b: DataJast): DataJast = {DataJast(a.key,a.count+b.count)}}/*** 实现:*    根据key分类,统计每个key进来的数据量,定期统计数量*/class MyProcessFunction extends  KeyedProcessFunction[String,DataJast,DataJast]{val delayTime : Long = 1000L * 30lazy val valueState:ValueState[Long] = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("ccount",classOf[Long]))override def processElement(value: DataJast, ctx: KeyedProcessFunction[String, DataJast, DataJast]#Context, out: Collector[DataJast]): Unit = {if(valueState.value()==0){valueState.update(value.count)printf("运行task:%s,第一次初始化数量:%s\n",getRuntimeContext.getIndexOfThisSubtask,value.count)val currentTime: Long = ctx.timerService().currentProcessingTime()//注册定时器ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}else{valueState.update(valueState.value()+value.count)printf("运行task:%s,更新统计结果:%s\n" ,getRuntimeContext.getIndexOfThisSubtask,valueState.value())}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, DataJast, DataJast]#OnTimerContext, out: Collector[DataJast]): Unit = {//定时器执行,可加入业务操作printf("运行task:%s,触发定时器,30秒内数据一共,key:%s,value:%s\n",getRuntimeContext.getIndexOfThisSubtask,ctx.getCurrentKey,valueState.value())//定时统计完成,初始化统计数据valueState.update(0)//注册定时器val currentTime: Long = ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}}}

对key进行散列 

 val dataStream: DataStream[(String, Long)] = typeAndData.map(x => (x._1 + "-" + scala.util.Random.nextInt(100), x._2))

 设置窗口滚动时间,每隔十秒统计一次每隔key下的数据总量

 val keyByAgg: DataStream[DataJast] = dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new AverageAggregate())keyByAgg.print("第一次keyby输出")

还原key,并进行二次keyby,对数据总量进行累加

  val result: DataStream[DataJast] = keyByAgg.map(data => {val newKey: String = data.key.substring(0, data.key.indexOf("-"))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())

 

我们看下优化后的状态

先看下第一map,直接从端口拿数据,这不涉及keyby,所以这个没影响

再看下第一次keyby后的结果,因为我们散列后,flink根据哈希进行分配,所以数据不是百分之百平均,但是很明显基本上已经均衡了,不会出现这里1一条,那里1条这种状况

再看下第二次keyby,这里会发现我们ID的2的subtask有820条数据,其他的没有数据;这里是正常现象,因为我们是对第一次聚合后的数据进行keyby统计,所以这里的数据大小会非常小,比如我们原始数据一条数据有1M大小,1000条数据就1个G,业务往往还有其他操作,我们再第一次keyby 散列时处理其他逻辑(比如ETL等等操作),最终将统计结果输出给第二次keyby,很可能1个G的数据,最终只有1kb,这比我们将1个G的数据放在一个subtask中处理好很多。

上面我们自定义了MyProcessFunction方法,设置每30秒执行一次,实际业务场景,我们可能会设置一小时执行一次。

至此我们既保证了数据定时统计,也保证了数据不倾斜问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509757.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux中iptables对防火墙的操作

Iptables教程 1. iptables防火墙简介 Iptables也叫netfilter是Linux下自带的一款免费且优秀的基于包过滤的防火墙工具,它的功能十分强大,使用非常灵活,可以对流入、流出、流经服务器的数据包进行精细的控制。iptables是Linux2.4及2.6内核中…

Web Components入门不完全指北

目前流行的各类前端框架,不管是react, angular还是vue,都有一个共同点,那就是支持组件化开发,但事实上随着浏览器的发展,现在浏览器也原生支持组件式开发,本文将通过介绍Web Components 的三个主要概念&…

Flink 1.9 CDH 6.3 集成

目录 1.下载准备文件 2.felink csa jar包准备 3.将 Flink Parcel放入httpd目录下 4.配置CDH Flink Parcel 5.安装Flink 1.下载准备文件 https://archive.cloudera.com/csa/1.0.0.0/csd/FLINK-1.9.0-csa1.0.0.0-cdh6.3.0.jarhttps://archive.cloudera.com/csa/1.0.0.0/parc…

ssh免密登陆机制示意图

ssh免密登陆机制示意图

CDH 6.x 安装 Phoenix 服务

最近有个新项目启动,版本升级到6.3,发现CDH6.2 版本已经支持Phoenix parcel安装 一、准备文件 下载 https://archive.cloudera.com/phoenix/6.2.0/csd/PHOENIX-1.0.jar 下载parcel #目录 https://archive.cloudera.com/phoenix/6.2.0/parcels/ #根据…

域名服务的工作流程

域名服务的工作流程

Kafka 消费者组 Rebalance 详解

Rebalance作用 Rebalance 本质上是一种协议,主要作用是为了保证消费者组(Consumer Group)下的所有消费者(Consumer)消费的主体分区达成均衡。 比如:我们有10个分区,当我们有一个消费者时&…

Kafka JMX 监控 之 jmxtrans + influxdb + grafana

目录 效果图 环境准备 安装 influxdb 安装我们刚刚下载 influxdb rpm文件 查看默认配置 修改参数 启动 influxdb 查看启动状态 设置基本配置 influxdb 其他命令扩展 安装 jmxtrans 可能遇到的异常 验证jmxtrans是否成功运行 安装 Grafana 安装 influxDB 与 Grafa…

实例浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO

一.基本概念 我们通俗一点讲: Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如…

UML序列图

UML学习(三)-----序列图 UML的模型中可分为两种,动态模型和静态模型。用例图、类图和对象图都是UML中的静态结构模型。而在UML系统动态模型的其中一种就是交互视图,它描述了执行系统功能的各个角色之间相互传递消息的顺序关系。序…

OpenTSDB 开发指南之 查询数据

前面博主写了一篇文章去介绍opentsdb的http接口的使用方法,但是某一些接口的使用还是比较复杂,这篇文章会通过example来详细讲述opentsdb的一些特性。 本文的举的例子有这些: 基本的写入和查询数据的注释和说明子查询查询中的filters使用查询数据的rat…

libcurl使用方法

原文地址:http://curl.haxx.se/libcurl/c/libcurl-tutorial.html 译者:JGood(http://blog.csdn.net/JGood ) 译者注:这是一篇介绍如何使用libcurl的入门教程。文档不是逐字逐句按原文翻译,而是根据笔者对libcurl的理解&#xff0c…

OpenTSDB 开发指南之 Grafana 展示OpenTSDB监控数据

目录 准备数据 在Grafana创建OpenTSDB连接 创建一个仪表盘 统计 准备数据 将数据插入OpenTSDB {"metric":"jast.data","value":1023,"timestamp":1588742563,"tags":{"type":"jast-graph-data"}}…

CDH 版本 Kafka 外网设置

登陆CDH页面,进入Kafka配置页面 搜索 advertised 修改advertised.host.name,这里我们有三台Broker,我们把每台的外网ip填写到对应的机器上 advertised.port不填写 我们kafka的端口设置的是9099 将外网端口9099开放,允许外网访问 (这里不做介绍

OpenTSDB 安装

下载目录 https://github.com/OpenTSDB/opentsdb/releases https://github.com/OpenTSDB/opentsdb/releases/download/v2.4.0/opentsdb-2.4.0.noarch.rpm 安装 GnuPlot yum install gnuplot -y 直接安装OpenTSDB会报错 [rootecs-t-001-0001 openTSDB]# rpm -ivh opentsdb-2.…

HBase原理 – snapshot 快照

目录 snapshot(快照)基础原理 snapshot能实现什么功能? hbase snapshot用法大全 hbase snapshot分布式架构-两阶段提交 snapshot核心实现 clone_snapshot如何实现呢? 其他需要注意的 参考文献 更多信息可参考《…

linux如何自动化部署脚本实现免密登录并访问资源

任务把weijie主机jdk文件安装到weijie1中。 首先再各台主机中安装必要的命令: expect、wget、httpd、ssh 执行命令 如:expect提示命令不存在,则分别安装命令 yum install expect yum install wget yum install httpd yum install ssh 开…

时序数据库技术体系 – InfluxDB TSM存储引擎之数据读取

任何一个数据库系统内核关注的重点无非:数据在内存中如何存储、在文件中如何存储、索引结构如何存储、数据写入流程以及数据读取流程。关于InfluxDB存储内核,笔者在之前的文章中已经比较全面的介绍了数据的文件存储格式、倒排索引存储实现以及数据写入流…

java多线程之生产者和消费者问题

线程通信:不同的线程执行不同的任务,如果这些任务有某种关系,线程之间必须能够通信,协调完成工作. 经典的生产者和消费者案例(Producer/Consumer):分析案例:1):生产者和消费者应该操作共享的资源(实现方式来做).2):使用一个或多个线程来表示生产者(Producer).3):使用一个或多个…