京东:Flink SQL 优化实战

简介: 本文着重从 shuffle、join 方式的选择、对象重用、UDF 重用等方面介绍了京东在 Flink SQL 任务方面做的优化措施。

本文作者为京东算法服务部的张颖和段学浩,并由 Apache Hive PMC,阿里巴巴技术专家李锐帮忙校对。主要内容为:

  1. 背景
  2. Flink SQL 的优化
  3. 总结

一、背景

img

目前,京东搜索推荐的数据处理流程如上图所示。可以看到实时和离线是分开的,离线数据处理大部分用的是 Hive / Spark,实时数据处理则大部分用 Flink / Storm。

这就造成了以下现象:在一个业务引擎里,用户需要维护两套环境、两套代码,许多共性不能复用,数据的质量和一致性很难得到保障。且因为流批底层数据模型不一致,导致需要做大量的拼凑逻辑;甚至为了数据一致性,需要做大量的同比、环比、二次加工等数据对比,效率极差,并且非常容易出错。

而支持批流一体的 Flink SQL 可以很大程度上解决这个痛点,因此我们决定引入 Flink 来解决这种问题。

在大多数作业,特别是 Flink 作业中,执行效率的优化一直是 Flink 任务优化的关键,在京东每天数据增量 PB 级情况下,作业的优化显得尤为重要。

写过一些 SQL 作业的同学肯定都知道,对于 Flink SQL 作业,在一些情况下会造成同一个 UDF 被反复调用的情况,这对一些消耗资源的任务非常不友好;此外,影响执行效率大致可以从 shuffle、join、failover 策略等方面考虑;另外,Flink 任务调试的过程也非常复杂,对于一些线上机器隔离的公司来说尤甚。

为此,我们实现了内嵌式的 Derby 来作为 Hive 的元数据存储数据库 (allowEmbedded);在任务恢复方面,批式作业没有 checkpoint 机制来实现failover,但是 Flink 特有的 region 策略可以使批式作业快速恢复;此外,本文还介绍了对象重用等相关优化措施。

二、 Flink SQL 的优化

1. UDF 重用

在 Flink SQL 任务里会出现以下这种情况:如果相同的 UDF 既出现在 LogicalProject 中,又出现在 Where 条件中,那么 UDF 会进行多次调用 (见https://issues.apache.org/jira/browse/FLINK-20887)。但是如果该 UDF 非常耗 CPU 或者内存,这种多余的计算会非常影响性能,为此我们希望能把 UDF 的结果缓存起来下次直接使用。在设计的时候需要考虑:(非常重要:请一定保证 LogicalProject 和 where 条件的 subtask chain 到一起)

  • 一个 taskmanager 里面可能会有多个 subtask,所以这个 cache 要么是 thread (THREAD LOCAL) 级别要么是 tm 级别;
  • 为了防止出现一些情况导致清理 cache 的逻辑走不到,一定要在 close 方法里将 cache 清掉;
  • 为了防止内存无限增大,选取的 cache 最好可以主动控制 size;至于 “超时时间”,建议可以配置一下,但是最好不要小于 UDF 先后调用的时间;
  • 上文有提到过,一个 tm 里面可能会有多个 subtask,相当于 tm 里面是个多线程的环境。首先我们的 cache 需要是线程安全的,然后可根据业务判断需不需要锁。

根据以上考虑,我们用 guava cache 将 UDF 的结果缓存起来,之后调用的时候直接去cache 里面拿数据,最大可能降低任务的消耗。下面是一个简单的使用(同时设置了最大使用 size、超时时间,但是没有写锁):

public class RandomFunction extends ScalarFunction {private static Cache<String, Integer> cache = CacheBuilder.newBuilder().maximumSize(2).expireAfterWrite(3, TimeUnit.SECONDS).build();public int eval(String pvid) {profileLog.error("RandomFunction invoked:" + atomicInteger.incrementAndGet());Integer result = cache.getIfPresent(pvid);if (null == result) {int tmp = (int)(Math.random() * 1000);cache.put("pvid", tmp);return tmp;}return result;}@Overridepublic void close() throws Exception {super.close();cache.cleanUp();}
}

2. 单元测试

大家可能会好奇为什么会把单元测试也放到优化里面,大家都知道 Flink 任务调试过程非常复杂,对于一些线上机器隔离的公司来说尤甚。京东的本地环境是没有办法访问任务服务器的,因此在初始阶段调试任务,我们耗费了很多时间用来上传 jar 包、查看日志等行为。

为了降低任务的调试时间、增加代码开发人员的开发效率,实现了内嵌式的 Derby 来作为 Hive 的元数据存储数据库 (allowEmbedded),这算是一种优化开发时间的方法。具体思路如下:

首先创建 Hive Conf:

public static HiveConf createHiveConf() {ClassLoader classLoader = new HiveOperatorTest().getClass().getClassLoader();HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));try {TEMPORARY_FOLDER.create();String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir);HiveConf hiveConf = new HiveConf();hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);hiveConf.set("datanucleus.connectionPoolingType", "None");hiveConf.set("hive.metastore.schema.verification", "false");hiveConf.set("datanucleus.schema.autoCreateTables", "true");return hiveConf;} catch (IOException e) {throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e);}
}

接下来创建 Hive Catalog:(利用反射的方式调用 embedded 的接口)

public static void createCatalog() throws Exception{Class clazz = HiveCatalog.class;Constructor c1 = clazz.getDeclaredConstructor(new Class[]{String.class, String.class, HiveConf.class, String.class, boolean.class});c1.setAccessible(true);hiveCatalog = (HiveCatalog)c1.newInstance(new Object[]{"test-catalog", null, createHiveConf(), "2.3.4", true});hiveCatalog.open();
}

创建 tableEnvironment:(同官网)

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
TableConfig tableConfig = tableEnv.getConfig();
Configuration configuration = new Configuration();
configuration.setInteger("table.exec.resource.default-parallelism", 1);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());

最后关闭 Hive Catalog:

public static void closeCatalog() {if (hiveCatalog != null) {hiveCatalog.close();}
}

此外,对于单元测试,构建合适的数据集也是一个非常大的功能,我们实现了 CollectionTableFactory,允许自己构建合适的数据集,使用方法如下:

CollectionTableFactory.reset();
CollectionTableFactory.initData(Arrays.asList(Row.of("this is a test"), Row.of("zhangying480"), Row.of("just for test"), Row.of("a test case")));
StringBuilder sbFilesSource = new StringBuilder();
sbFilesSource.append("CREATE temporary TABLE db1.`search_realtime_table_dump_p13`(" + "  `pvid` string) with ('connector.type'='COLLECTION','is-bounded' = 'true')");
tableEnv.executeSql(sbFilesSource.toString());

3. join 方式的选择

传统的离线 Batch SQL (面向有界数据集的 SQL) 有三种基础的实现方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。

效率空间备注
Nested-loop Join占用大
Sort-Merge Join有sort merge开销占用小有序数据集的一种优化措施
Hash Join占用大适合大小表
  • Nested-loop Join 最为简单直接,将两个数据集加载到内存,并用内嵌遍历的方式来逐个比较两个数据集内的元素是否符合 Join 条件。Nested-loop Join 的时间效率以及空间效率都是最低的,可以使用:table.exec.disabled-operators:NestedLoopJoin 来禁用。

    以下两张图片是禁用前和禁用后的效果 (如果你的禁用没有生效,先看一下是不是 Equi-Join):

    img

    img

  • Sort-Merge Join 分为 Sort 和 Merge 两个阶段:首先将两个数据集进行分别排序,然后再对两个有序数据集分别进行遍历和匹配,类似于归并排序的合并。(Sort-Merge Join 要求对两个数据集进行排序,但是如果两个输入是有序的数据集,则可以作为一种优化方案)。
  • Hash Join 同样分为两个阶段:首先将一个数据集转换为 Hash Table,然后遍历另外一个数据集元素并与 Hash Table 内的元素进行匹配。

    • 第一阶段和第一个数据集分别称为 build 阶段和 build table;
    • 第二个阶段和第二个数据集分别称为 probe 阶段和 probe table。

    Hash Join 效率较高但是对空间要求较大,通常是作为 Join 其中一个表为适合放入内存的小表的情况下的优化方案 (并不是不允许溢写磁盘)。

注意:Sort-Merge Join 和 Hash Join 只适用于 Equi-Join ( Join 条件均使用等于作为比较算子)。

Flink 在 join 之上又做了一些细分,具体包括:

特点使用
Repartition-Repartition strategy对数据集分别进行分区和shuffle,如果数据集大的时候效率极差两个数据集相差不大
Broadcast-Forward strategy将小表的数据全部发送到大表数据的机器上两个数据集有较大的差距
  • Repartition-Repartition strategy:Join 的两个数据集分别对它们的 key 使用相同的分区函数进行分区,并经过网络发送数据;
  • Broadcast-Forward strategy:大的数据集不做处理,另一个比较小的数据集全部复制到集群中一部分数据的机器上。

众所周知,batch 的 shuffle 非常耗时间。

  • 如果两个数据集有较大差距,建议采用 Broadcast-Forward strategy;
  • 如果两个数据集差不多,建议采用 Repartition-Repartition strategy。

可以通过:table.optimizer.join.broadcast-threshold 来设置采用 broadcast 的 table 大小,如果设置为 “-1”,表示禁用 broadcast。

下图为禁用前后的效果:

​ 

img

​ 

img

4. multiple input

在 Flink SQL 任务里,降低 shuffle 可以有效的提高 SQL 任务的吞吐量,在实际的业务场景中经常遇到这样的情况:上游产出的数据已经满足了数据分布要求 (如连续多个 join 算子,其中 key 是相同的),此时 Flink 的 forward shuffle 是冗余的 shuffle,我们希望将这些算子 chain 到一起。Flink 1.12 引入了 mutiple input 的特性,可以消除大部分没必要的 forward shuffle,把 source 的算子 chain 到一起。

table.optimizer.multiple-input-enabled:true

下图为开了 multiple input 和没有开的拓扑图 ( operator chain 功能已经打开):

​ 

img

​ 

img

5. 对象重用

上下游 operator 之间会经过序列化 / 反序列化 / 复制阶段来进行数据传输,这种行为非常影响 Flink SQL 程序的性能,可以通过启用对象重用来提高性能。但是这在 DataStream 里面非常危险,因为可能会发生以下情况:在下一个算子中修改对象意外影响了上面算子的对象。

但是 Flink 的 Table / SQL API 中是非常安全的,可以通过如下方式来启用:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

或者是通过设置:pipeline-object-reuse:true

为什么启用了对象重用会有这么大的性能提升?在 Blink planner 中,同一任务的两个算子之间的数据交换最终将调用 BinaryString#copy,查看实现代码,可以发现 BinaryString#copy 需要复制底层 MemorySegment 的字节,通过启用对象重用来避免复制,可以有效提升效率。

下图为没有开启对象重用时相应的火焰图:

img

6. SQL 任务的 failover 策略

batch 任务模式下 checkpoint 以及其相关的特性全部都不可用,因此针对实时任务的基于 checkpoint 的 failover 策略是不能应用在批任务上面的,但是 batch 任务允许 Task 之间通过 Blocking Shuffle 进行通信,当一个 Task 因为任务未知的原因失败之后,由于 Blocking Shuffle 中存储了这个 Task 所需要的全部数据,所以只需要重启这个 Task 以及通过 Pipeline Shuffle 与其相连的全部下游任务即可:

jobmanager.execution.failover-strategy:region (已经 finish 的 operator 可直接恢复)

table.exec.shuffle-mode:ALL_EDGES_BLOCKING (shuffle 策略)。

7. shuffle

Flink 里的 shuffle 分为 pipeline shuffle 和 blocking shuffle。

  • pipeline shuffle 性能好,但是对资源的要求高,而且容错比较差 (会将该 operator 分到前面的一个 region 里面,对于 batch 任务来说,如果这个算子出问题,将从上一个 region 恢复);
  • blocking shuffle 就是传统的 batch shuffle,会将数据落盘,这种 shuffle 的容错好,但是会产生大量的磁盘、网络 io (如果为了省心的话,建议用 blocking suffle)。blocking shuffle 又分为 hash shuffle 和 sort shuffle,

    • 如果你的磁盘是 ssd 并且并发不太大的话,可以选择使用 hash shuffle,这种 shuffle 方式产生的文件多、随机读多,对磁盘 io 影响较大;
    • 如果你是 sata 并且并发比较大,可以选择用 sort-merge shuffle,这种 shuffle 产生的数据少,顺序读,不会产生大量的磁盘 io,不过开销会更大一些 (sort merge)。

相应的控制参数:

table.exec.shuffle-mode,该参数有多个参数,默认是 ALL_EDGES_BLOCKING,表示所有的边都会用 blocking shuffle,不过大家可以试一下 POINTWISE_EDGES_PIPELINED,表示 forward 和 rescale edges 会自动开始 pipeline 模式。

taskmanager.network.sort-shuffle.min-parallelism ,将这个参数设置为小于你的并行度,就可以开启 sort-merge shuffle;这个参数的设置需要考虑一些其他的情况,具体的可以按照官网设置。

三、总结

本文着重从 shuffle、join 方式的选择、对象重用、UDF 重用等方面介绍了京东在 Flink SQL 任务方面做的优化措施。另外,感谢京东实时计算研发部付海涛等全部同事的支持与帮助。

原文链接
本文为阿里云原创内容,未经允许不得转载。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/512715.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot参数校验以及分组校验的使用

简介&#xff1a; 做web开发基本上每个接口都要对参数进行校验&#xff0c;如果参数比较少&#xff0c;还比较容易处理&#xff0c;一但参数比较多了的话代码中就会出现大量的if-else语句。虽然这种方式简单直接&#xff0c;但会大大降低开发效率和代码可读性。所以我们可以使用…

长文解析:作为容器底层技术的半壁江山, cgroup如何突破并发创建瓶颈?

简介&#xff1a; io_uring 作为一种新型高性能异步编程框架&#xff0c;代表着 Linux 内核未来的方向&#xff0c;当前仍处于快速发展中。阿里云联合 InfoQ 发起《io_uring 介绍及应用实践》的技术公开课&#xff0c;围绕 OpenAnolis 龙蜥社区 Anolis OS 8 全方位解析高性能存…

Orion:谷歌的新一代SDN控制器

作者 | 魏煌松来源 | 鲜枣课堂时至今日&#xff0c;谷歌在2015年公布的成果&#xff0c;“利用SDN将广域网带宽利用率提升至接近100%”&#xff0c;仍然是SDN的一个标杆案列&#xff0c;也是难以逾越的巅峰。但事实上&#xff0c;当时使用的SDN控制器Onix&#xff0c;早已退出了…

移动云正式发布基于龙蜥 Anolis OS 的 BC-Linux V8.2 通用版操作系统

简介&#xff1a; 2020年12月CentOS项目组宣布CentOS 8将于2021年12月31日结束支持&#xff0c;这意味着从2022年开始&#xff0c;使用CentOS 8的用户&#xff0c;将无法得到来自官方的新硬件支持、bug修复和安全补丁。针对这一情况&#xff0c;移动云大云操作系统团队基于国内…

干掉讨厌的 CPU 限流,让容器跑得更快

简介&#xff1a; 让人讨厌的 CPU 限流影响容器运行&#xff0c;有时人们不得不牺牲容器部署密度来避免 CPU 限流出现。本文介绍的 CPU Burst 技术可以帮助您既能保证容器运行服务质量&#xff0c;又不降低容器部署密度。文章分为上下两篇&#xff0c;该文为上篇&#xff0c;下…

微弱信号检测_机动车检测线常用传感器介绍

机动车检测线中经常会运用到各种传感器&#xff0c;这些传感器相当于车辆检测系统的“眼睛”、“鼻子”和“耳朵”&#xff0c;通过台体装置和装在台体中的传感器&#xff0c;能够把车辆的性能数据转换成计算机系统能够识别的信号&#xff0c;供计算机处理和计算&#xff0c;最…

赋能开发者,英特尔发布oneAPI 2022工具包

英特尔发布了oneAPI 2022工具包。此次发布的最新增强版工具包扩展了跨架构开发的特性&#xff0c;为开发者提供更强的实用性和更丰富的架构选择&#xff0c;用以加速计算。 英特尔公司首席技术官、高级副总裁、软件和先进技术事业部总经理 Greg Lavender表示&#xff1a;“我十…

Quick BI V4.0功能“炸弹”来袭,重磅推出即席分析、模板市场、企业微信免密登录等强势功能

简介&#xff1a; 2021年7月&#xff0c;Quick BI公共云版本迭代新功能&#xff1a;重磅推出即席分析、模板市场&#xff0c;分析门槛再降低&#xff1b;推出企业微信无缝对接&#xff0c;移动端类目个性配置及管理提升多端能力&#xff1b;数据建模配置交互升级至拖拽模式提升…

打印速度快点的打印机_瞒着领导偷偷给你们发两台打印机

前几次小粉笔组织的活动都被“投诉”&#xff01;说我们打印机太少~小粉笔心领神会&#xff0c;在这个月的活动预算费用上悄咪咪加了【两台打印机】~(看小粉笔多疼你们&#xff01;)希望知道的笔芯不要把这条推文转发给我领导(要不然你们以后就没有打印机了~哼&#xff01;)现在…

数据库误操作后悔药来了:AnalyticDB PostgreSQL教你实现分布式一致性备份恢复

简介&#xff1a; 本文将介绍AnalyticDB PostgreSQL版备份恢复的原理与使用方法。 一、背景 AnalyticDB PostgreSQL版&#xff08;简称ADB PG&#xff09;是阿里云数据库团队基于PostgreSQL内核&#xff08;简称PG&#xff09;打造的一款云原生数据仓库产品。在数据实时交互式…

与变异风险词赛跑 阿里探索AI治理网络风险

最近&#xff0c;阿里安全一线风控小二可粒发现&#xff0c;在禁售的风险防控库里&#xff0c;有人试图“上新”新品种&#xff0c;不法份子借助在社交媒体上走红的“魔法改运”等说辞&#xff0c;引人入玄学骗局。 尽量提前发现风险问题&#xff0c;提早布防是阿里安全风控部…

高效研发运维体系构建的流程和方法论

简介&#xff1a; 云计算产品大多都会与云原生发生关联&#xff0c;云原生正在重塑整个软件的生命周期。但到底什么是云原生&#xff1f;云原生带来的最大技术创新和未来机会是什么&#xff1f;围绕云原生&#xff0c;是否可以构建出一套云上的开发&运维体系&#xff0c;打…

Colima:MacOS 上的极简容器运行时和 Kubernetes

作者 | Addo Zhang来源 | 云原生指北Colima 是一个以最小化设置来在MacOS上运行容器运行时和 Kubernetes 的工具。支持 m1&#xff0c;同样也支持 Linux。Colima 的名字取自 Container on Lima。Lima 是一个虚拟机工具&#xff0c;可以实现自动的文件共享、端口转发以及 contai…

当容器应用越发广泛,我们又该如何监测容器?

简介&#xff1a; 随着容器技术蓬勃发展与落地推行&#xff0c;越来越多企业的业务运行于容器中。作为主流部署方式之一&#xff0c;容器将团队的任务和关注点分割开&#xff0c;开发团队只需关注应用程序逻辑和依赖项&#xff0c;而运维团队只需关注部署和管理&#xff0c;无需…

内含福利|CSDN携手字节跳动:云原生Meetup北京站报名热烈启动,1月8日见!

伴随云原生技术的成熟与落地&#xff0c;越来越多框架、中间件等开源项目相继涌现&#xff0c;帮助开发者和企业有效解决业务问题。2022年1月8日&#xff0c;CSDN携手字节跳动基础架构&#xff0c;将在北京举办第四场云原生线下Meetup。在这里&#xff0c;您可以与众多开源技术…

Flink CDC 2.0 正式发布,详解核心改进

简介&#xff1a; 本文由社区志愿者陈政羽整理&#xff0c;内容来源自阿里巴巴高级开发工程师徐榜江 (雪尽) 7 月 10 日在北京站 Flink Meetup 分享的《详解 Flink-CDC》。深入讲解了最新发布的 Flink CDC 2.0.0 版本带来的核心特性&#xff0c;包括&#xff1a;全量数据的并发…

unity三维地图的经纬度如何在二维地图上表示_接入C++版本recastnavigation寻路库到Unity/服务端中...

前言因为Unity版本的更新迭代&#xff0c;老版本的A*插件在新版本Unity已经无法正常使用&#xff0c;包括一些运行时代码也已经过时&#xff0c;重新接入要花费很多时间&#xff0c;干脆接入一个新的寻路方案吧。这里选择的是久负盛名的https://github.com/recastnavigation/re…

Dataphin功能:集成——如何将业务系统的数据抽取汇聚到数据中台

简介&#xff1a; 数据集成是简单高效的数据同步平台&#xff0c;致力于提供具有强大的数据预处理能力、丰富的异构数据源之间数据高速稳定的同步能力&#xff0c;为数据中台的建设打好坚实的数据基座。 数据中台是当下大数据领域最前沿的数据建设体系, 它并不是从零开始, 无中…

5G专网,路在何方?

作者 | 蜉蝣采采来源 | 无线深海话说你平常打电话、刷视频、玩游戏的4G和5G&#xff0c;一般也被叫做“公网”。这个“公”字的含义正是公开&#xff0c;公用的意思。也就是说&#xff0c;这个网络&#xff0c;不但你能用&#xff0c;你隔壁的张三也能用&#xff0c;张三的老乡…

如何开发 Node.js Native Add-on?

简介&#xff1a; 来一起为 Node.js 的 add-on 生态做贡献吧~ 作者 | 吴成忠(昭朗)这篇文章是由 Chengzhong Wu (legendecas)&#xff0c;Gabriel Schulhof (gabrielschulhof) &#xff0c;Jim Schlight (jimschlight)&#xff0c;Kevin Eady&#xff0c;Michael Dawson (mhdaw…