性能提升约7倍!Apache Flink 与 Apache Hive 的集成

导读:随着 Flink 在流式计算的应用场景逐渐成熟和流行,如果 Flink 能同时把批量计算的应用场景处理好,就能减少用户在使用 Flink 时开发和维护的成本,并且能够丰富 Flink 的生态。SQL 是批计算中比较常用的工具,所以 Flink 针对于批计算也以 SQL 为主要接口。本次分享主要介绍 Flink 对批处理的设计与 Hive 的集成。主要分为下面三点展开:

  1. 设计架构
  2. 项目进展
  3. 性能测试

设计架构

首先和大家分享一下 Flink 批处理的设计架构。

1. 背景

640.jpeg

Flink 提升批处理的主要原因是为了减少客户的维护成本和更新成本和更好的完善 Flink 生态环境。SQL 是批计算场景中一个非常重要的工具,所以希望以 SQL 作为在批计算场景的主要接口,为此我们着重优化了 Flink SQL 的功能。当前 Flink SQL 主要有下面几点需要优化:

  • 需要完整的元数据管理体制。
  • 缺少对 DDL(数据定义语言 DDL 用来创建数据库中的各种对象,如表、视图、索引、同义词、聚簇等)的支持。
  • 与外部系统进行对接不是很方便,尤其是 Hive, 因为 Hive 是大数据领域最早的 SQL 引擎,所以 Hive 的用户基础非常广泛,新的一些 SQL 工具,如 Spark SQL、Impala 都提供了与 Hive 对接的功能,这样用户才能更好地将其应用从 Hive 迁移过来,所以与 Hive 对接对 Flink SQL 而言也十分重要。

2. 目标

640-2.jpeg

所以我们要完成以下目标:

  • 定义统一的 Catalog 接口,这个是 Flink SQL 更方便与外部对接的前提条件。如果大家用过 Flink 的 TableSource 和 TableSink 来对接外部的系统的表,会发现不管是通过写程序还是配置 yaml 文件会跟传统的 SQL 使用方式会有些不同。所以我们肯定不希望 Hive 的用户迁移 Flink SQL 需要通过定义 TableSouces 和 TableSink 的方式来与 Hive 进行交互。因此我们提供了一套新的 Catalog 接口以一种更接近传统 SQL 的方式与 Hive 进行交互。
  • 提供基于内存和可持久化的实现。基于内存就是 Flink 原有的方式,用户所有的元数据的生命周期是跟他的 Session(会话)绑定的,Session(会话)结束之后所有的元数据都没有了。因为要跟 Hive 交互所以肯定还要提供一个持久化的 Catalog。
  • 支持 Hive 的互操作。有了 Catalog 之后用户就可以通过 Catalog 访问 Hive 的元数据,提供 Data Connector 让用户能通过 Flink 读写 Hive 的实际数据,实现 Flink 与 Hive 的交互。
  • 支持 Flink 作为 Hive 的计算引擎(长期目标),像 Hive On Spark,Hive On Tez。

3. 全新设计的 Catalog API(FlIP-30)

640-3.jpeg

用户通过 SQL Client 或者 Table API 提交请求,Flink 会创建 TableEnvironment, TableEnvironment 会创建 CatalogManager 加载并配置 Catalog 实例,并且 Catalog 支持多种元数据类型 table、database、function、view、partition 等,在 1.9.0 的版本当中 Catalog 会有两个实现:

  • 一个是基于内存的 GenericinMemoryCatalog。
  • 另一是 HiveCatalog,HiveCatalog 通过 HiveShim 与 Hive Metasotre 交互来操作 Hive 元数据,HiveShim 的作用是处理 Hive 在大版本中 Hive Metastore 不兼容的问题。

从这种实现的方式可以看出,用户可以创建多个 Catalog,也可以访问多个 Hive Metastore,来达到跨 Catalog 查询的操作。

4. 读写 Hive 数据

640-4.jpeg

有了元数据之后我们就可以实现 Flink SQL 的 Data Connector 来真正的读写 Hive 实际数据。Flink SQL 写入的数据必须要兼容 Hive 的数据格式,也就是 Hive 可以正常读取 Flink 写入的数据,反过来也是一样的。为了实现这一点我们大量复用 Hive 原有的 Input/Output Format、SerDe 等 API,一是为了减少代码冗余,二是尽可能的保持兼容性。

在 Data Connect 中读取 Hive 表数据具体实现类为:HiveTableSource、HiveTableInputFormat。写 Hive 表的具体实现类为:HiveTableSink、HiveTableOutputFormat。

项目进展

其次和大家分享 Flink 1.9.0 的现状和 1.10.0 中的新特性还有未来工作。

1. Flink 1.9.0 的现状

640-5.jpeg

Flink SQL 作为 1.9.0 版本中作为试用功能发布的,它的功能还不是很完善:

  • 支持的数据类型还不全。(1.9.0 中带参数的数据类型基本上都不支持:如 DECIMAL,CHAR 等)
  • 对分区表的支持不完善,只能读取分区表,不能写分区表。
  • 不支持表的 INSERT OVERWRITE。

2. Flink 1.10.0 中的新特性

640-6.jpeg

Flink SQL 在 1.10.0 版本里我们做了比较多的进一步开发,与 Hive 集成的功能更加完整。

  • 支持读写静态分区和动态分区表。
  • 在表级别和分区级别都支持 INSERT OVERWRITE。
  • 支持了更多地数据类型。(除 UNION 类型都支持)
  • 支持更多地 DDL。(CREATE TABLE/DATABASE)
  • 支持在 Flink 中调用 Hive 的内置函数。(Hive 大约 200 多个内置函数)
  • 支持了更多的 Hive 版本。(Hive 的 1.0.0~3.1.1)
  • 做了很多性能优化如,Project/Predicate Pushdown,向量的读取 ORC 数据等。

3. Module 接口

640-7.jpeg

为了能让用户调用 Flink SQL 中调用 Hive 的内置函数,我们在 Flink 1.10 当中引入了一个 Module 接口。这个 Module 是为了让用户能够方便的把外部系统的内置函数接入到系统当中。

  • 使用方式和 Catalog 类似,用户可以通过 Table API 或 Yaml 文件来配置 Module。
  • Module 可以同时加载多个,Flink 解析函数的时候通过 Module 的加载顺序在多个 Module 中查找函数的解析。也就是如果两个 Module 包含名字相同的 Function,先加载的 Module 会提供 Function 的定义。
  • 目前 Module 有两个实现,CoreModule 提供了 Flink 原生的内置函数,HiveModule 提供了 Hive 的内置函数。

4. 未来工作

640-8.jpeg

未来的工作主要是先做功能的补全,其中包括:

  • View 的支持(有可能在 1.11 中完成)。
  • 持续改进 SQL CLI 的易用性,现在支持翻页显示查询结果,后续支持滚动显示。并支持 Hive 的 -e -f 这种非交互式的使用方式。
  • 支持所有的 Hive 常用 DDL,例如 CREATE TABLE AS。
  • 兼容 Hive 的语法,让原来在 Hive 上的工程在 Flink 的顺滑的迁移过来。
  • 支持 SQL CLI 的远程模式,类似 HiveServer2 的远程连接模式。
  • 支持流式的写入 Hive 数据。

性能测试

下面是 Flink 在批处理作业下与 HiveMR 对比测试的测试环境和结果。

1. 测试环境

640-9.jpeg

首先我们的测试环境使用了 21 个节点的物理机群,一个 Master 节点和 20 个 Slave 节点。节点的硬件配置是 32 核,64 个线程,256 内存,网络做了端口聚合,每个机器是 12 块的 HDD 硬盘。

2. 测试工具

640-10.jpeg

测试工具使用了 Hortonworks 的 hive-testbench,github 中一个开源的工具。我们使用这个工具生成了 10TB 的 TPC-DS 测试数据集,然后分别通过 Flink SQL 和 Hive 对该数据集进行 TPC-DS 的测试。

一方面我们对比了 Flink 和 Hive 的性能,另一方面我们验证了 Flink SQL 能够很好的访问 Hive 的数据。测试用到了 Hive 版本是 3.1.1,Flink 用到的是 Master 分支代码。

3. 测试结果

640-11.jpeg

测试结果 Flink SQL 对比 Hive On MapReduce 取得了大约 7 倍的性能提升。这得益于 Flink SQL 所做的一系列优化,比如在调度方面的优化,以及执行计划的优化等。总体来说如果用的是 Hive On MapReduce,迁移到 Flink SQL 会有很大性能的提升。

附最新性能对比详情及思路解析:Flink 1.10 和 Hive 3.0 性能对比

原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/516661.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

日均万亿条数据如何处理?爱奇艺实时计算平台这样做

摘要:本文由爱奇艺大数据服务负责人梁建煌分享,介绍爱奇艺如何基于 Apache Flink 技术打造实时计算平台,并通过业务应用案例分享帮助用户了解 Apache Flink 的技术特点及应用场景。提纲如下: 爱奇艺 Flink 服务现状Flink 改进实时…

进入编译器后,一个函数经历了什么?

来源 | 编程技术宇宙责编 | Carol封图 | CSDN 付费下载自视觉中国我是一个函数我是一个函数,名叫str_upper,我可以把输入的字符串从小写变成大写。不信你看,我长这样:char* str_upper(char* str, int len) {char upper[256];if (l…

docker sonarqube 7.7 sonar-scanner-4.6.2 maven 安装、搭建+实战

文章目录一、docker安装Mysql1. 映射目录2. 赋予权限3. 镜像拉取4. 运行容器5. 查看日志6. 创建数据库二、docker安装sonarqube2.1. 映射目录2.2.赋予权限2.3. 镜像拉取2.4. 运行容器2.5. 查看日志二、中文简体2.1. 版本对照2.2. download2.3. 重启容器三、规则添加3.1. 规则无…

使用CLONE TABLE方式实现同region不同可用区的MaxCompute

该文章主要针对于用户同region不同账户之间的MaxCompute数据迁移方式,属于迁移的方式可以有三种方式,一、添加MaxCompute数据源的方式进行数据迁移(该方式针对多个表配置同步节点较为繁琐);二、使用tunnel方式结合代码…

Fabric中的Transient Data与Private Data

在Hyperledger Fabric中有两个相关的概念:私有数据(Private Data)和暂态数据(Transient Data)。本文提供四个示例程序,分别对应私有数据和暂态数据的四种组合使用方式,并通过观察账本的交易以及…

窃隐私、放高利贷,输入法的骚操作真不少!

来源 | 编程技术宇宙责编 | 李雪敬封图 | CSDN 付费下载自视觉中国光说隐私泄露,人们总觉得似乎离自己很远,然而它早已像一个“地雷”,悄悄埋进了我们的生活中,不是不爆,时候未到。别认为自己只是社会中的一个小透明&a…

快速迁移 Next.js 应用到函数计算

首先介绍下在本文出现的几个比较重要的概念: 函数计算(Function Compute): 函数计算是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传。函数计算准备计算资源&#xff…

为什么字节跳动、腾讯、阿里都在用Python??

Python 作为一种解释型技术脚本语言,越来越被认可为程序员新时代的风口语言。 无论是刚入门的程序员,还是年薪百万的 BATJ 的技术大牛都无可否认:Python的应用能力是成为一名码农大神的必要项。 而作为Python初学者来讲,最大的问题…

Need to upgrade docker package to 17.06.0+. Docker升级到最新版本

文章目录1. 现象2. 查找3. 在线卸载4. 升级docker5. 重启Docker6. 设置Docker开机自启7. 查看版本背景: 在搭建docker私有仓库的时候出现以下错误,版本太低 1. 现象 Need to upgrade docker package to 17.06.0.2. 查找 查找主机上关于Docker的软件包 …

云数据库RDS基础版的优势及适用场景

云栖号快速入门:【点击查看更多云产品快速入门】 不知道怎么入门?这里分分钟解决新手入门等基础问题,可快速完成产品配置操作! 阿里云的产品系列包括基础版、高可用版、集群版、三节点企业版,本文介绍基础版的相关信息…

docker Harbor2.3.4 http 搭建镜像仓库

文章目录一、环境准备1. 环境要求2. 节点总览3. 安装docker-compose二、安装harbor2.1. 下载2.2. 解压2.3. 调整配置2.5. 安装 harbor2.6. 效果验证三、Docker开启远程API3.1. 修改配置3.2. 支持http3.3. 重新启动Docker服务3.4. 防火墙管理3.5. 重新启动3.6. 监控状态四、测试…

下一代 IDE:Eclipse Che 究竟有什么奥秘?

来源 | CSDN(ID:CSDNnews)Eclipse Che被Eclipse官方称为下一代IDE,作为老牌的IDE,被其寄予厚望的Eclipse Che到底有什么特点,在这篇文章中我们来一探究竟。开发团队的Kuberentes原生IDEEclipse Che对开发团…

【开发者成长】阿里代码缺陷检测探索与实践

目前PRECFIX技术已经在阿里巴巴集团内部落地并获得好评;关于“PRECFIX”技术的论文被国际软件工程大会(ICSE)收录。 张昕东(别象) 阿里巴巴 云研发事业部 算法工程师 【以下为别象分享实录】 阿里巴巴在缺陷检测技术方面遇到的三个挑战 编码…

docker Harbor2.3.4 https 搭建镜像仓库

文章目录一、环境准备1. 环境要求2. 节点总览3. 安装docker-compose二、安装harbor2.1. 下载2.2. 解压2.3. 认证2.4. 调整配置2.5. 安装 harbor2.6. 配置host2.7. 效果验证三、客户端3.1. 证书保存3.2. 新建配置3.3. 登录harbor四、基本操作4.1. 下线4.2. 监控状态4.3. 重新部署…

OPPO实时数仓揭秘:从顶层设计实现离线与实时的平滑迁移

一、建设背景 关于 OPPO 移动互联网业务 大家都认为 OPPO 是一家手机公司,但大家可能并不清楚,其实 OPPO 也会做与移动互联网相关的业务。在 2019 年 12 月,OPPO 发布了自己定制的手机操作系统 ColorOS 7.0 版本。目前包括海外市场在内&…

十年技术骨干面试被开出一万五薪资,直呼 “这是对我的侮辱”

老周是我十多年前认识的同事,2012年前后,老周到北京工作的第一个任务就是为公司的产品开发IOS APP。2012年底,老周已经能熟练的驾驭苹果的cocoatouch和android核心组件。也正因为如此,老周的薪水直接翻倍,当时已经拿到…

登录 Harbor response from daemon: Get “https://192.168.92.129/v2/“: x509: cannot validate certificate

文章目录1. 现象2. 解决方案3. 重新登陆1. 现象 [rootlocalhost harbor]# docker login 192.168.92.129 Username: admin Password: Error response from daemon: Get "https://192.168.92.129/v2/": x509: cannot validate certificate for 192.168.92.129 becaus…

基于Flink的超大规模在线实时反欺诈系统的建设与实践

作者:关贺宇 在大数据时代,金融科技公司通常借助消费数据来综合评估用户的信用和还款能力。这个过程中,某些中介机构会搜集大量的号并进行“养号”工作,即在一年周期里让这些号形成正常的消费、通讯记录,目的是将这些…

别再被 Python 洗脑了!!

Python 作为一种解释型技术脚本语言,越来越被认可为程序员新时代的风口语言。无论是刚入门的程序员,还是年薪百万的 BATJ 的技术大牛都无可否认:Python的应用能力是成为一名码农大神的必要项。 而作为Python初学者来讲,最大的问题…

任务不再等待!玩转DataWorks资源组

引言 DataWorks提供了三种资源组的能力:独享资源组、自定义资源组和默认资源组,很多开发者在使用资源组时经常会碰到各类情况,到时候任务运行失败或者延迟,例如:1. 正在使用默认资源组,任务经常要等待2.购…