日均处理万亿数据!Flink在快手的应用实践与技术演进之路

董亭亭,快手大数据架构实时计算引擎团队负责人。目前负责 Flink 引擎在快手内的研发、应用以及周边子系统建设。2013 年毕业于大连理工大学,曾就职于奇虎 360、58 集团。主要研究领域包括:分布式计算、调度系统、分布式存储等系统。

本次的分享包括以下三个部分:

  1. 介绍 Flink 在快手的应用场景以及目前规模;
  2. 介绍 Flink 在落地过程的技术演进过程;
  3. 讨论 Flink 在快手的未来计划。

一.Flink 在快手应用场景与规模

1. Flink 在快手应用场景

快手计算链路是从 DB/Binlog 以及 WebService Log 实时入到 Kafka 中,然后接入 Flink 做实时计算,其中包括实时 ETL、实时分析、Interval Join 以及实时训练,最后的结果存到 Druid、ES 或者 HBase 里面,后面接入一些数据应用产品;同时这一份 Kafka 数据实时 Dump 一份到 Hadoop 集群,然后接入离线计算。

Flink 在快手应用的类别主要分为三大类:

  • 80% 统计监控:实时统计,包括各项数据的指标,监控项报警,用于辅助业务进行实时分析和监控;
  • 15% 数据处理:对数据的清洗、拆分、Join 等逻辑处理,例如大 Topic 的数据拆分、清洗;
  • 5% 数据处理:实时业务处理,针对特定业务逻辑的实时处理,例如实时调度。

Flink 在快手应用的典型场景包括:

  • 快手是分享短视频跟直播的平台,快手短视频、直播的质量监控是通过 Flink 进行实时统计,比如直播观众端、主播端的播放量、卡顿率、开播失败率等跟直播质量相关的多种监控指标;
  • 用户增长分析,实时统计各投放渠道拉新情况,根据效果实时调整各渠道的投放量;
  • 实时数据处理,广告展现流、点击流实时 Join,客户端日志的拆分等;
  • 直播 CDN 调度,实时监控各 CDN 厂商质量,通过 Flink 实时训练调整各个CDN厂商流量配比。

2.Flink 集群规模

快手目前集群规模有 1500 台左右,作业数量大约是 500 左右,日处理条目数总共有 1.7 万亿,峰值处理条目数大约是 3.7 千万。集群部署都是 On Yarn 模式,分为离线集群和实时集群两类集群,其中离线集群混合部署,机器通过标签进行物理隔离,实时集群是 Flink 专用集群,针对隔离性、稳定性要求极高的业务部署。

二.快手 Flink 技术演进

快手 Flink 技术演进主要分为三部分:

  1. 基于特定场景优化,包括 Interval Join 场景优化;
  2. 稳定性改进,包括数据源控速,JobManager 稳定性,作业频繁失败;
  3. 平台建设。

1.场景优化

1.1 Interval Join 应用场景

Interval Join 在快手的一个应用场景是广告展现点击流实时 Join 场景:打开快手 App 可能会收到广告服务推荐的广告视频,用户有时会点击展现的广告视频。这样在后端形成两份数据流,一份是广告展现日志,一份是客户端点击日志。这两份数据需进行实时 Join,将 Join 结果作为样本数据用于模型训练,训练出的模型会被推送到线上的广告服务。该场景下展现以后 20 分钟的点击被认为是有效点击,实时 Join 逻辑则是点击数据 Join 过去 20 分钟展现。其中,展现流的数据量相对比较大,20 分钟数据在 1 TB 以上。最初实时 Join 过程是业务自己实现,通过 Redis 缓存广告展现日志,Kafka 延迟消费客户端点击日志实现 Join 逻辑,该方式缺点是实时性不高,并且随着业务增长需要堆积更多机器,运维成本非常高。基于 Flink 使用 Interval Join 完美契合此场景,并且实时性高,能够实时输出 Join 后的结果数据,对业务来说维护成本非常低,只需要维护一个 Flink 作业即可。

1.2 Interval Join 场景优化

1.2.1 Interval Join 原理:

Flink 实现 Interval join 的原理:两条流数据缓存在内部 State 中,任意一数据到达,获取对面流相应时间范围数据,执行 joinFunction 进行 Join。随着时间的推进,State 中两条流相应时间范围的数据会被清理。

在前面提到的广告应用场景 Join 过去 20 分钟数据,假设两个流的数据完全有序到达,Stream A 作为展现流缓存过去 20 分钟数据,Stream B 作为点击流每来一条数据到对面 Join 过去 20 分钟数据即可。

Flink 实现 Interval Join:

KeyedStreamA.intervalJoin(KeyedStreamB).between(Time.minutes(0),Time.minutes(20)).process(joinFunction)

1.2.2 状态存储策略选择

关于状态存储策略选择,生产环境状态存储 Backend 有两种方式:

  1. FsStateBackend:State 存储在内存,Checkpoint 时持久化到 HDFS;
  2. RocksDBStateBackend:State 存储在 RocksDB 实例,可增量 Checkpoint,适合超大 State。在广告场景下展现流 20 分钟数据有 1 TB 以上,从节省内存等方面综合考虑,快手最终选择的是 RocksDBStateBackend。

在 Interval join 场景下,RocksDB 状态存储方式是将两个流的数据存在两个 Column Family 里,RowKey 根据 keyGroupId+joinKey+ts 方式组织。

1.2.3 RocksDB 访问性能问题

Flink 作业上线遇到的第一个问题是 RocksDB 访问性能问题,表现为:

  • 作业在运行一段时间之后出现反压,吞吐下降。
  • 通过 Jstack 发现程序逻辑频繁处于 RocksDB get 请求处。
  • 通过 Top 发现存在单线程 CPU 持续被打满。

进一步对问题分析,发现:该场景下,Flink 内部基于 RocksDB State 状态存储时,获取某个 Join key 值某段范围的数据,是通过前缀扫描的方式获取某个 Join key 前缀的 entries 集合,然后再判断哪些数据在相应的时间范围内。前缀扫描的方式会导致扫描大量的无效数据,扫描的数据大多缓存在 PageCache 中,在 Decode 数据判断数据是否为 Delete 时,消耗大量 CPU。

以上图场景为例,蓝色部分为目标数据,红色部分为上下边界之外的数据,前缀扫描时会过多扫描红色部分无用数据,在对该大量无效数据做处理时,将单线程 CPU 消耗尽。

1.2.4 针对 RocksDB 访问性能优化

快手在 Interval join 该场景下对 RocksDB 的访问方式做了以下优化:

  • 在 Interval join 场景下,是可以精确的确定需访问的数据边界范围。所以用全 Key 范围扫描代替前缀扫描,精确拼出查询上下边界 Full Key 即 keyGroupId+joinKey+ts[lower,upper]。
  • 范围查询 RocksDB ,可以更加精确 Seek 到上下边界,避免无效数据扫描和校验。

优化后的效果:P99 查询时延性能提升 10 倍,即 nextKey 获取 RocksDB 一条数据, P99 时延由 1000 毫秒到 100 毫秒以内。 作业吞吐反压问题进而得到解决。

1.2.5 RocksDB 磁盘压力问题

Flink 作业上线遇到的第二个问题是随着业务的增长, RocksDB 所在磁盘压力即将达到上限,高峰时磁盘 util 达到 90%,写吞吐在 150 MB/s。详细分析发现,该问题是由以下几个原因叠加导致:

  • Flink 机器选型为计算型,大内存、单块 HDD 盘,在集群规模不是很大的情况下,单个机器会有 4-5 个该作业 Container,同时使用一块 HDD 盘。
  • RocksDB 后台会频繁进行 Compaction 有写放大情况,同时 Checkpoint 也在写磁盘。

针对 RocksDB 磁盘压力,快手内部做了以下优化:

  • 针对 RocksDB 参数进行调优,目的是减少 Compaction IO 量。优化后 IO 总量有一半左右的下降。
  • 为更加方便的调整 RocksDB 参数,在 Flink 框架层新增 Large State RocksDB 配置套餐。同时支持 RocksDBStateBackend 自定义配置各种 RocksDB 参数。
  • 未来计划,考虑将 State 用共享存储的方式存储,进一步做到减少 IO 总量,并且快速Checkpoint 和恢复。

2.稳定性改进

首先介绍下视频质量监控调度应用背景,有多个 Kafka Topic 存储短视频、直播相关质量日志,包括短视频上传/下载、直播观众端日志,主播端上报日志等。Flink Job 读取相应 Topic 数据实时统计各类指标,包括播放量、卡顿率、黑屏率以及开播失败率等。指标数据会存到 Druid 提供后续相应的报警监控以及多维度的指标分析。同时还有一条流是进行直播 CDN 调度,也是通过 Flink Job 实时训练、调整各 CDN 厂商的流量配比。以上 Kafka Topic 数据会同时落一份到 Hadoop 集群,用于离线补偿数据。实时计算跟离线补数据的过程共用同一份 Flink 代码,针对不同的数据源,分别读取 Kafka 数据或 HDFS 数据。

2.1 数据源控速

视频应用场景下遇到的问题是:作业 DAG 比较复杂,同时从多个 Topic 读取数据。一旦作业异常,作业失败从较早状态恢复,需要读取部分历史数据。此时,不同 Source 并发读取数据速度不可控,会导致 Window 类算子 State 堆积、作业性能变差,最终导致作业恢复失败。 另外,离线补数据,从不同 HDFS 文件读数据同样会遇到读取数据不可控问题。在此之前,实时场景下临时解决办法是重置 GroupID 丢弃历史数据,使得从最新位置开始消费。

针对该问题我们希望从源头控制多个 Source 并发读取速度,所以设计了从 Source 源控速的策略。

Source 控速策略

Source 控速策略是 :

  • SourceTask 共享速度状态提供给 JobManager。
  • JobManager 引入 SourceCoordinator,该 Coordinator 拥有全局速度视角,制定相应的策略,并将限速策略下发给 SourceTask。
  • SourceTask 根据 JobManager 下发的速度调节信息执行相应控速逻辑。
  • 一个小细节是 DAG 图有子图的话, 不同子图 Source 源之间互相不影响。

Source 控速策略详细细节

SourceTask 共享状态

  • SourceTask 定期汇报状态给 JobManager,默认 10 s 间隔。
  • 汇报内容为。

协调中心 SourceCoordinator

  • 限速阈值:最快并发 Watermark - 最慢并发 Watermark > ∆t(默认 5 分钟)。只要在达到限速阈值情况下,才进行限速策略制定。
  • 全局预测:各并发 targetWatermark=base+speed*time;Coordinator 先进行全局预测,预测各并发接下来时间间隔能运行到的 Watermark 位置。
  • 全局决策:targetWatermark = 预测最慢 Watermark+∆t/2;Coordinator 根据全局预测结果,取预测最慢并发的 Watermark 值再浮动一个范围作为下个周期全局限速决策的目标值。
  • 限速信息下发:。将全局决策的信息下发给所有的 Source task,限速信息包括下一个目标的时间和目标的 Watermark 位置。

以上图为例,A 时刻,4 个并发分别到达如图所示位置,为 A+interval 的时刻做预测,图中蓝色虚线为预测各并发能够到达的位置,选择最慢的并发的 Watermark 位置,浮动范围值为 Watermark + ∆t/2 的时间,图中鲜红色虚线部分为限速的目标 Watermark,以此作为全局决策发给下游 Task。

SourceTask 限速控制

  • SourceTask 获取到限速信息后,进行限速控制。
  • 以 KafkaSource 为例,KafkaFetcher 获取数据时,根据限速信息 Check 当前进度,确定是否需要限速等待。

该方案中,还有一些其他考虑,例如:

  • 时间属性:只针对 EventTime 情况下进行限速执行。
  • 开关控制:支持作业开关控制是否开启 Source 限速策略。
  • DAG 子图 Source 源之间互相不影响。
  • 是否会影响 CheckPoint Barrier 下发。
  • 数据源发送速度不恒定,Watermark 突变情况。

Source 控速结果

拿线上作业,使用 Kafka 从最早位置(2 days ago)开始消费。如上图,不限速情况下State 持续增大,最终作业挂掉。使用限速策略后,最开始 State 有缓慢上升,但是 State 大小可控,最终能平稳追上最新数据,并 State 持续在 40 G 左右。

2.2 JobManager 稳定性

关于 JobManager 稳定性,遇到了两类 Case,表现均为:JobManager 在大并发作业场景 WebUI 卡顿明显,作业调度会超时。进一步分析了两种场景下的问题原因。

场景一,JobManager 内存压力大问题。JobManager 需要控制删除已完成的 Checkpoint 在 HDFS 上的路径。在 NameNode 压力大时,Completed CheckPoint 路径删除慢,导致CheckPoint Path 在内存中堆积。 原来删除某一次 Checkpoint 路径策略为:每删除目录下一个文件,需 List 该目录判断是否为空,如为空将目录删除。在大的 Checkpoint 路径下, List 目录操作为代价较大的操作。针对该逻辑进行优化,删除文件时直接调用 HDFS delete(path,false) 操作,语义保持一致,并且开销小。

场景二,该 Case 发生在 Yarn Cgroup 功能上线之后,JobManager G1 GC 过程变慢导致阻塞应用线程。AppMaster 申请 CPU 个数硬编码为1,在上线 Cgroup 之后可用的 CPU 资源受到限制。解决该问题的方法为,支持 AppMaster 申请 CPU 个数参数化配置。

2.3 作业频繁失败

机器故障造成作业频繁失败,具体的场景也有两种:

场景一:磁盘问题导致作业持续调度失败。磁盘出问题导致一些 Buffer 文件找不到。又因为 TaskManager 不感知磁盘健康状况,会频繁调度作业到该 TaskManager,作业频繁失败。

场景二:某台机器有问题导致 TaskManager 在某台机器上频繁出 Core,陆续分配新的 TaskManager 到这台机器上,导致作业频繁失败。

针对机器故障问题解决方法:

  • 针对磁盘问题,TaskManager 增加 DiskChecker 磁盘健康检查,发现磁盘有问题 TaskManager 自动退出;
  • 针对有些机器频繁出现 TaskManager 出现问题,根据一定的策略将有问题机器加到黑名单中,然后通过软黑名单机制,告知 Yarn 尽量不要调度 Container 到该机器。

3.平台化建设

3.1 平台建设:

快手的平台化建设主要体现在青藤作业托管平台。通过该平台可进行作业操作、作业管理以及作业详情查看等。作业操作包括提交、停止作业。作业管理包括管理作业存活、性能报警,自动拉起配置等;详情查看,包括查看作业的各类 Metric 等。

上图为青藤作业托管平台的一些操作界面。

3.2 问题定位流程优化:

我们也经常需要给业务分析作业性能问题,帮助业务 debug 一些问题,过程相对繁琐。所以该部分我们也做了很多工作,尽量提供更多的信息给业务,方便业务自主分析定位问题。首先,我们将所有 Metric 入 Druid,通过 Superset 可从各个维度分析作业各项指标。第二,针对 Flink 的 WebUI 做了一些完善,支持 Web 实时打印 jstack,Web DAG 为各 Vertex 增加序号,Subtask 信息中增加各并发 SubtaskId。第三,丰富异常信息提示,针对机器宕机等特定场景信息进行明确提示。第四,新增各种 Metric。

三.未来计划

快手的未来规划主要分为两个部分:

第一,目前在建设的 Flink SQL 相关工作。因为 SQL 能够减少用户开发的成本,包括我们现在也在对接实时数仓的需求,所以 Flink SQL 是我们未来计划的重要部分之一。
第二,我们希望进行一些资源上的优化。目前业务在提作业时存在需求资源及并发预估不准确的情况,可能会过多申请资源导致资源浪费。另外如何提升整体集群资源的利用率问题,也是接下来需要探索的问题。


原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/518375.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

css-字体样式

字体样式 <!--font-family: 字体font-size: 字体大小font-weight: 字体粗细color: 字体颜色--><style>body{font-family: "Arial Black", 楷体,serif;color: #cdbb21;}h1{font-size: 50px;}.p1{font-weight: bolder;}</style><!--字体风格 ob…

小网站的容器化(下):网站容器化的各种姿势,先跟着撸一波代码再说!

作者 | 王洪鹏责编 | Carol出品 | CSDN云计算&#xff08;ID&#xff1a;CSDNcloud&#xff09;封图| CSDN下载于视觉中国 上篇文章&#xff1a;小网站的容器化(上) 中我们大致描述了下个人网站在日常维护中的痛点&#xff0c;文章的后半部分我们添加了一个纯静态网站容器化的简…

阿里云应用高可用 AHAS 正式商用,可一键提升云上应用可用性

在分布式架构环境下&#xff0c;服务间的依赖日益复杂&#xff0c;可能没有人能说清单个故障对整个系统的影响&#xff0c;构建一个高可用的分布式系统面临着很大挑战。 7月17日&#xff0c;阿里云应用高可用服务AHAS 正式商用&#xff0c;包含架构感知、流控降级和故障演练三…

机器学习在高德起点抓路中的应用实践

导读&#xff1a;高德地图作为中国领先的出行领域解决方案提供商&#xff0c;导航是其核心用户场景。路线规划作为导航的前提&#xff0c;是根据起点、终点以及路径策略设置&#xff0c;为用户量身定制出行方案。 起点抓路&#xff0c;作为路线规划的初始必备环节&#xff0c;…

css-阴影和超链接伪类

阴影和超链接伪类 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><style>/* 默认的颜色 */a{text-decoration: none; /* 去掉下划线 */color: #000000; /* 修改颜色 */}…

时尚电商新赛道:揭秘 FashionAI 技术

雷音是阿里巴巴研究员、淘系技术部 FashionAI 负责人&#xff0c;在淘系技术嘉年华硅谷站&#xff0c;他分享了《时尚电商新赛道— FashionAI 中的技术》 &#xff0c;旨在揭秘&#xff1a;从面向机器学习的知识重建切入&#xff0c;提出了在 AI 能力的推动下&#xff0c;让人值…

MQ 技术产品井喷,今天来详聊一下腾讯开源消息中间件 TubeMQ | 原力计划

作者 | kimmking来源 | CSDN博客&#xff0c;责编 | 夕颜出品 | CSDN&#xff08;ID:CSDNnews&#xff09;随着分布式技术的发展&#xff0c;MQ技术产品也出现井喷。目前除了各类常用的MQ&#xff0c;比如Apache的ActiveMQ&#xff0c;Kafka&#xff0c;Pulsar&#xff0c;Rock…

MongoDB compact 命令详解

为什么需要 compact 一图胜千言 remove 与 drop 的区别 MongoDB 里删除一个集合里所有文档&#xff0c;有两种方式 db.collection.remove({}, {multi: true})&#xff0c;逐个文档从 btree 里删除&#xff0c;最后所有文档被删除&#xff0c;但文件物理空间不会被回收db.col…

css-背景图片和渐变

背景图片 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><style>/* 边框 border 1px粗细 solid样式 red颜色*/div{width: 1000px;height: 700px;border: 1px solid red;/*…

GitOps 初探

前言 GitOps 的概念最初来源于 Weaveworks 的联合创始人 Alexis 在 2017 年 8 月发表的一篇博客 GitOps - Operations by Pull Request。文章介绍了 Weaveworks 的工程师如何以 Git 作为事实的唯一真实来源&#xff0c;部署、管理和监控基于 Kubernetes 的 SaaS 应用。 随后&…

老码农吐血建议:2020年,低于1w的程序员要注意了...

最近在知乎上&#xff0c;关于AI的这个话题又被顶起来&#xff0c;其中&#xff0c;这条回答让人印象深刻&#xff1a;在这短短的一条信息里&#xff0c;无疑显示出&#xff1a;AI行业缺人&#xff0c;高端岗位80万年薪恐怕也招不来&#xff01;小编上周在一个AI群里&#xff0…

重磅!容器集群监控利器 阿里云Prometheus 正式免费公测

Prometheus 作为容器生态下集群监控的首选方案&#xff0c;是一套开源的系统监控报警框架。它启发于 Google 的 borgmon 监控系统&#xff0c;并于 2015 年正式发布。2016 年&#xff0c;Prometheus 正式加入 Cloud Native Computing Foundation&#xff0c;成为受欢迎度仅次于…

Archsummit 2019重磅分享|闲鱼Flutter&FaaS云端一体化架构

作者&#xff1a;闲鱼技术&#xff0d;国有   讲师介绍 国有&#xff0c;闲鱼架构团队负责人。在7月13号落幕的2019年Archsummit峰会上就近一年来闲鱼在Flutter&FaaS一体化项目上的探索和实践进行了分享。 传统NativeWeb服务端混合开发的挑战 随着无线&#xff0c;Io…

Spring Cloud 云架构下的微服务架构:部门微服务(Dept)

作者 | springML来源 | CSDN 博客 责编 | Carol出品 | CSDN云计算&#xff08;ID&#xff1a;CSDNcloud&#xff09;封图| CSDN下载于视觉中国 对于 Rest 基础架构实现处理是 SpringCloud 核心所在&#xff0c;其基本操作形式在 SpringBoot 之中已经有了明确的讲解&#xff0c;…

并发模式与 RPS 模式之争,性能压测领域的星球大战

本文是《如何做好性能压测》系列专题分享的第四期&#xff0c;该专题将从性能压测的设计、实现、执行、监控、问题定位和分析、应用场景等多个纬度对性能压测的全过程进行拆解&#xff0c;以帮助大家构建完整的性能压测的理论体系&#xff0c;并提供有例可依的实战。 该系列专…

Akka in Schedulerx2.0

1. 前言 Schedulerx2.0是阿里中间件自研的基于akka架构的新一代分布式任务调度平台&#xff0c;提供定时、任务编排、分布式跑批等功能&#xff0c;具有高可靠、海量任务、秒级调度等能力。 本篇文章以Schedulerx2.0为例子&#xff0c;介绍akka的应用场景&#xff0c;希望能给…

java.sql.SQLException: The server time zone value ‘???ú±ê×??±??‘ is unrecognized or represents more

【报错信息】 【百度翻译】 服务器时区值???????无法识别或表示多个时区。如果要利用时区支持&#xff0c;必须配置服务器或JDBC驱动程序&#xff08;通过ServerTimeZone配置属性&#xff09;&#xff0c;以使用更具体的时区值 【解决方法】 数据库连接配置conf.xml(…

【从入门到放弃-Java】并发编程-锁-synchronized

简介 上篇【从入门到放弃-Java】并发编程-线程安全中&#xff0c;我们了解到&#xff0c;可以通过加锁机制来保护共享对象&#xff0c;来实现线程安全。 synchronized是java提供的一种内置的锁机制。通过synchronized关键字同步代码块。线程在进入同步代码块之前会自动获得锁…

Redis 学习之一招击穿自己的系统,附送 N 个击穿解决大礼包 | 原力计划

作者 | Mark_MMXI来源 | CSDN博客&#xff0c;责编 | 夕颜出品 | CSDN&#xff08;ID:CSDNnews&#xff09;缓存的存在是为了在高并发情形下&#xff0c;缓解DB压力&#xff0c;提高业务系统体验。业务系统访问数据&#xff0c;先去缓存中进行查询&#xff0c;假如缓存存在数据…

阿里巴巴王坚:用数据来改变世界

“传统信息化建设都是从无到有&#xff0c;加了杆子和机器&#xff0c;但是新一代数字建设就是从有到无&#xff0c;缴费的机器没有了&#xff0c;你回家缴&#xff0c;杆子没有了&#xff0c;你回家缴。” 7月21日&#xff0c;阿里巴巴技术委员会主席王坚在2019年中国电子政务…