PyFlink 开发环境利器:Zeppelin Notebook

简介: 在 Zeppelin notebook 里利用 Conda 来创建 Python env 自动部署到 Yarn 集群中。

PyFlink 作为 Flink 的 Python 语言入口,其 Python 语言的确很简单易学,但是 PyFlink 的开发环境却不容易搭建,稍有不慎,PyFlink 环境就会乱掉,而且很难排查原因。今天给大家介绍一款能够帮你解决这些问题的 PyFlink 开发环境利器:Zeppelin Notebook。主要内容为:

  1. 准备工作
  2. 搭建 PyFlink 环境
  3. 总结与未来

也许你早就听说过 Zeppelin,但是之前的文章都偏重讲述如何在 Zeppelin 里开发 Flink SQL,今天则来介绍下如何在 Zeppelin 里高效的开发 PyFlink Job,特别是解决 PyFlink 的环境问题。

一句来总结这篇文章的主题,就是在 Zeppelin notebook 里利用 Conda 来创建 Python env 自动部署到 Yarn 集群中,你无需手动在集群上去安装任何 PyFlink 的包,并且你可以在一个 Yarn 集群里同时使用互相隔离的多个版本的 PyFlink。最后你能看到的效果就是这样:

1. 能够在 PyFlink 客户端使用第三方 Python 库,比如 matplotlib:

img

2. 可以在 PyFlink UDF 里使用第三方 Python 库,如:

img

接下来看看如何来实现。

一、准备工作

Step 1.

准备好最新版本的 Zeppelin 的搭建,这个就不在这边展开了,如果有问题可以加入 Flink on Zeppelin 钉钉群 (34517043) 咨询。另外需要注意的是,Zeppelin 部署集群需要是 Linux,如果是 Mac 的话,会导致在 Mac 机器上打的 Conda 环境无法在 Yarn 集群里使用 (因为 Conda 包在不同系统间是不兼容的)。

Step 2.

下载 Flink 1.13, 需要注意的是,本文的功能只能用在 Flink 1.13 以上版本,然后:

  • 把 flink-Python-*.jar 这个 jar 包 copy 到 Flink 的 lib 文件夹下;
  • 把 opt/Python 这个文件夹 copy 到 Flink 的 lib 文件夹下。

Step 3.

安装以下软件 (这些软件是用于创建 Conda env 的):

  • miniconda:https://docs.conda.io/en/latest/miniconda.html
  • conda pack:https://conda.github.io/conda-pack/
  • mamba:https://github.com/mamba-org/mamba

二、搭建 PyFlink 环境

接下来就可以在 Zeppelin 里搭建并且使用 PyFlink 了。

Step 1. 制作 JobManager 上的 PyFlink Conda 环境

因为 Zeppelin 天生支持 Shell,所以可以在 Zeppelin 里用 Shell 来制作 PyFlink 环境。注意这里的 Python 第三方包是在 PyFlink 客户端 (JobManager) 需要的包,比如 Matplotlib 这些,并且确保至少安装了下面这些包:

  • 某个版本的 Python (这里用的是 3.7)
  • apache-flink (这里用的是 1.13.1)
  • jupyter,grpcio,protobuf (这三个包是 Zeppelin 需要的)

剩下的包可以根据需要来指定:

%sh# make sure you have conda and momba installed.
# install miniconda: https://docs.conda.io/en/latest/miniconda.html
# install mamba: https://github.com/mamba-org/mambaecho "name: pyflink_env
channels:- conda-forge- defaults
dependencies:- Python=3.7- pip- pip:- apache-flink==1.13.1- jupyter- grpcio- protobuf- matplotlib- pandasql- pandas- scipy- seaborn- plotnine" > pyflink_env.ymlmamba env remove -n pyflink_env
mamba env create -f pyflink_env.yml

运行下面的代码打包 PyFlink 的 Conda 环境并且上传到 HDFS (注意这里打包出来的文件格式是 tar.gz):

%shrm -rf pyflink_env.tar.gz
conda pack --ignore-missing-files -n pyflink_env -o pyflink_env.tar.gzhadoop fs -rmr /tmp/pyflink_env.tar.gz
hadoop fs -put pyflink_env.tar.gz /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_env.tar.gz

Step 2. 制作 TaskManager 上的 PyFlink Conda 环境

运行下面的代码来创建 TaskManager 上的 PyFlink Conda 环境,TaskManager 上的 PyFlink 环境至少包含以下 2 个包:

  • 某个版本的 Python (这里用的是 3.7)
  • apache-flink (这里用的是 1.13.1)

剩下的包是 Python UDF 需要依赖的包,比如这里指定了 pandas:

echo "name: pyflink_tm_env
channels:- conda-forge- defaults
dependencies:- Python=3.7- pip- pip:- apache-flink==1.13.1- pandas" > pyflink_tm_env.ymlmamba env remove -n pyflink_tm_env
mamba env create -f pyflink_tm_env.yml

运行下面的代码打包 PyFlink 的 conda 环境并且上传到 HDFS (注意这里使用的是 zip 格式)

%shrm -rf pyflink_tm_env.zip
conda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.ziphadoop fs -rmr /tmp/pyflink_tm_env.zip
hadoop fs -put pyflink_tm_env.zip /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip

Step 3. 在 PyFlink 中使用 Conda 环境

接下来就可以在 Zeppelin 中使用上面创建的 Conda 环境了,首先需要在 Zeppelin 里配置 Flink,主要配置的选项有:

  • flink.execution.mode 为 yarn-application, 本文所讲的方法只适用于 yarn-application 模式;
  • 指定 yarn.ship-archives,zeppelin.pyflink.Python 以及 zeppelin.interpreter.conda.env.name 来配置 JobManager 侧的 PyFlink Conda 环境;
  • 指定 Python.archives 以及 Python.executable 来指定 TaskManager 侧的 PyFlink Conda 环境;
  • 指定其他可选的 Flink 配置,比如这里的 flink.jm.memory 和 flink.tm.memory。
%flink.confflink.execution.mode yarn-applicationyarn.ship-archives /mnt/disk1/jzhang/zeppelin/pyflink_env.tar.gz
zeppelin.pyflink.Python pyflink_env.tar.gz/bin/Python
zeppelin.interpreter.conda.env.name pyflink_env.tar.gzPython.archives hdfs:///tmp/pyflink_tm_env.zip
Python.executable  pyflink_tm_env.zip/bin/Python3.7flink.jm.memory 2048
flink.tm.memory 2048

接下来就可以如一开始所说的那样在 Zeppelin 里使用 PyFlink 以及指定的 Conda 环境了。有 2 种场景:

  • 下面的例子里,可以在 PyFlink 客户端 (JobManager 侧) 使用上面创建的 JobManager 侧的 Conda 环境,比如下边使用了 Matplotlib。

    img

  • 下面的例子是在 PyFlink UDF 里使用上面创建的 TaskManager 侧 Conda 环境里的库,比如下面在 UDF 里使用 Pandas。

    img

三、总结与未来

本文内容就是在 Zeppelin notebook 里利用 Conda 来创建 Python env 自动部署到 Yarn 集群中,无需手动在集群上去安装任何 Pyflink 的包,并且可以在一个 Yarn 集群里同时使用多个版本的 PyFlink。

每个 PyFlink 的环境都是隔离的,而且可以随时定制更改 Conda 环境。可以下载下面这个 note 并导入到 Zeppelin,就可以复现今天讲的内容:http://23.254.161.240/#/notebook/2G8N1WTTS

此外还有很多可以改进的地方:

  • 目前我们需要创建 2 个 conda env ,原因是 Zeppelin 支持 tar.gz 格式,而 Flink 只支持 zip 格式。等后期两边统一之后,只要创建一个 conda env 就可以;
  • apache-flink 现在包含了 Flink 的 jar 包,这就导致打出来的 conda env 特别大,yarn container 在初始化的时候耗时会比较长,这个需要 Flink 社区提供一个轻量级的 Python 包 (不包含 Flink jar 包),就可以大大减小 conda env 的大小。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/512562.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android自动化打包工具,利用Jenkins实现Android自动化打包

Jenkins简介What is Jenkins?Jenkins is a self-contained, open source automation server which can be used to automate all sorts of tasks related to building, testing, and delivering or deploying software.Jenkins can be installed through native system packag…

为什么简单的删除集合中的元素竟然报错了?

作者 | 七十一来源 | 程序员巴士前言什么是快速失败:fail-fast 机制是java集合(Collection)中的一种错误机制。它只能被用来检测错误,因为JDK并不保证fail-fast机制一定会发生。当多个线程对同一个集合的内容进行操作时,就可能会产生fail-fas…

一文详解Redis中BigKey、HotKey的发现与处理

简介: 在Redis的使用过程中,我们经常会遇到BigKey(下文将其称为“大key”)及HotKey(下文将其称为“热key”)。大Key与热Key如果未能及时发现并进行处理,很可能会使服务性能下降、用户体验变差&a…

阿里云CDN操控2.0版本正式发布

简介: 2021年8月,阿里云边缘云CDN完成过去3年来最大的一次版本升级。 2021年8月,阿里云边缘云CDN完成过去3年来最大的一次版本升级。本次升级根据上万企业客户的使用反馈和行业应用特征,从简单开通到个性化定制,从内容…

向xxxhub发了一个数据包,发现了···

作者 | 轩辕之风来源 | 编程技术宇宙那天,我突然想到一个问题:当我访问那个让万千宅男程序员为之着迷的GitHub时,我电脑发出的数据包是如何抵达大洋彼岸的GitHub服务器的呢,这中间又要经过哪些节点呢?让我们一起来探究…

使用 Flink Hudi 构建流式数据湖

简介: 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型的不断优化演进。 本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型不断优化演进。用户可以通过 Flink SQL 将 CDC 数据实时写入 Hudi 存储,且在即将…

android获取版本号报错,Android开发:获取安卓App版本号的方法步骤

在Android开发过程中,想要开发一个完整功能的App,各个地方的内容都要涉及到,比如获取App的系统版本号就是必须要有的功能。Android的App版本号相关内容比iOS的App版本号内容要多,而且iOS版的App版本信息跟Android的还不一样。本篇…

运营也用的起来的数据分析工具:Quick BI即席分析详解

简介: 数据部门是一个容易被投诉的“高危”部门,需求响应慢、数据准确性不高会影响业务的发展。 然而数据分析师每周动辄就有几十个需求在手,无限的加班也无法解决所有问题,到底怎样才能改变BI分析师的需求响应问题呢?…

【产品动态】解读Dataphin流批一体的实时研发

简介: Dataphin作为一款企业级智能数据构建与管理产品,具备全链路实时研发能力,从2019年开始就支撑可集团天猫双11的实时计算需求,文章将详细介绍Dataphin实时计算的能力。 背景 每当双11全球购物狂欢节钟声响起,上千…

Aruba与中国电信国际有限公司达成战略合作 助力中国企业扬帆出海

2022年1月12日,慧与科技公司 (NYSE: HPE) 旗下Aruba日前宣布,与中国电信国际有限公司(CTG)签署MSP(托管服务运营商)战略合作伙伴协议,Aruba的产品将纳入中国电信国际有限公司的主营产品线。协议…

模仿Spring实现一个类管理容器

简介: 项目的初衷是独立作出一个成熟的有特色的IOC容器,但由于过程参考Spring太多,而且也无法作出太多改进,于是目的变为以此项目作为理解Spring的一个跳板,与网上的一些模仿Spring的框架不同,本项目主要是针对注解形式 概述 项目的初衷是独立作出一个成熟的有特色…

湖仓一体化的路,很多人都只走了一半

2022已至,如果回看2021,这一年无疑是数据的价值进一步体现的一年。数据应用场景不断丰富,从工业、交通、金融到制造,几乎无处不在。当然,数据价值的迅速提升也给开发者和相关企业带来了新的问题。数据量的爆发让存储成…

学术顶会再突破!计算平台MaxCompute论文入选国际顶会VLDB 2021

简介: VLDB 2021上,阿里云计算平台MaxCompute参与的论文入选,核心分布式调度执行引擎Fangorn、基于TVR Cost模型的通用增量计算优化器框架Tempura等分别被Industry Track、Research Track录取。 一、顶会概览 VLDB 2021上,阿里云…

技术干货 | 应用性能提升 70%,探究 mPaaS 全链路压测的实现原理和实施路径

简介: 全链路压测方案下,非加密场景下至少有 70% 的性能提升,加密场景下 10%的性能提升,并在 MGS 扩容完成后可实现大幅的性能提升,调优的结果远超预期。 业务背景 随着移动开发行业的步入存量时代,App 整…

投稿指南 | 云计算领域最前沿资讯、技术,期待您的专业解读!

我们是谁?CSDN云计算是CSDN旗下官方账号,提供云计算、大数据、虚拟化、数据中心、OpenStack、CloudStack、机器学习、智能算法等相关云计算观点、云计算技术、云计算平台、云计算实践、云计算产业咨询等服务。内容平台方面,我们的目标读者主要…

DataWorks 功能实践速览03期 — 生产开发环境隔离

简介: DataWorks功能实践系列,帮助您解析业务实现过程中的痛点,提高业务功能使用效率! 往期回顾: DataWorks 功能实践速览01期——数据同步解决方案:为您介绍不同场景下可选的数据同步方案。DataWorks 功…

鸿蒙手表esim,鸿蒙手表终于来了!或将支持 eSIM,实现独立通话

原标题:鸿蒙手表终于来了!或将支持 eSIM,实现独立通话根据此前的爆料消息,华为将于 6 月份带来与鸿蒙相关的产品发布会,备受瞩目的平板、手表等新品也将亮相。临近产品发布,华为官方也开始了新品的预热。今…

Pull or Push?监控系统如何选型

简介: 对于建设一套公司内部使用的监控系统平台,相对来说可选的方案还是非常多的,无论是用开源方案自建还是使用商业的SaaS化产品,都有比较多的可选项。但无论是开源方案还是商业的SaaS产品,真正实施起来都需要考虑如何…

k8s 集群居然可以图形化安装了?

作者 | 小碗汤来源 | 我的小碗汤今天分享一个可以图形化搭建k8s集群的项目,不妨试一试~本项目是基于 Kubespray 提供图形化的 K8S 集群离线安装、维护工具。Kubespray:https://github.com/kubernetes-sigs/kubesprayKuboard-SprayKuboard-Spray 是一款可…

poi excel导入 判断合并单元格_Excel合并单元格,你需要知道的那些事

合并单元格,是我们经常使用的一个功能。借助合并单元格功能,我们可以制作跨列表头,可以对数据进行显示上的分类,使数据看起来更加清晰明了,让我们的Excel表格看起来更加专业。找到菜单栏的合并单元格功能,我…