基于YARN集群构建运行PySpark Application

文章转载:https://www.tuicool.com/articles/eaYVN3v

Spark Application可以直接运行在YARN集群上,这种运行模式,会将资源的管理与协调统一交给YARN集群去处理,这样能够实现构建于YARN集群之上Application的多样性,比如可以运行MapReduc程序,可以运行HBase集群,也可以运行Storm集群,还可以运行使用Python开发机器学习应用程序,等等。

我们知道,Spark on YARN又分为client模式和cluster模式:在client模式下,Spark Application运行的Driver会在提交程序的节点上,而该节点可能是YARN集群内部节点,也可能不是,一般来说提交Spark Application的客户端节点不是YARN集群内部的节点,那么在客户端节点上可以根据自己的需要安装各种需要的软件和环境,以支撑Spark Application正常运行。在cluster模式下,Spark Application运行时的所有进程都在YARN集群的NodeManager节点上,而且具体在哪些NodeManager上运行是由YARN的调度策略所决定的。

对比这两种模式,最关键的是Spark Application运行时Driver所在的节点不同,而且,如果想要对Driver所在节点的运行环境进行配置,区别很大,但这对于PySpark Application运行来说是非常关键的。

PySpark是Spark为使用Python程序编写Spark Application而实现的客户端库,通过PySpark也可以编写Spark Application并在Spark集群上运行。Python具有非常丰富的科学计算、机器学习处理库,如numpy、pandas、scipy等等。为了能够充分利用这些高效的Python模块,很多机器学习程序都会使用Python实现,同时也希望能够在Spark集群上运行。

PySpark Application运行原理

理解PySpark Application的运行原理,有助于我们使用Python编写Spark Application,并能够对PySpark Application进行各种调优。PySpark构建于Spark的Java API之上,数据在Python脚本里面进行处理,而在JVM中缓存和Shuffle数据,数据处理流程如下图所示(来自Apache Spark Wiki):
这里写图片描述

Spark Application会在Driver中创建pyspark.SparkContext对象,后续通过pyspark.SparkContext对象来构建Job DAG并提交DAG运行。使用Python编写PySpark Application,在Python编写的Driver中也有一个pyspark.SparkContext对象,该pyspark.SparkContext对象会通过Py4J模块启动一个JVM实例,创建一个JavaSparkContext对象。PY4J只用在Driver上,后续在Python程序与JavaSparkContext对象之间的通信,都会通过PY4J模块来实现,而且都是本地通信。

PySpark Application中也有RDD,对Python RDD的Transformation操作,都会被映射到Java中的PythonRDD对象上。对于远程节点上的Python RDD操作,Java PythonRDD对象会创建一个Python子进程,并基于Pipe的方式与该Python子进程通信,将用户编写Python处理代码和数据发送到Python子进程中进行处理。

下面,我们基于Spark on YARN模式,并根据当前企业所具有的实际集群运行环境情况,来说明如何在Spark集群上运行PySpark Application,大致分为如下3种情况:

YARN集群配置Python环境
这种情况,如果是初始安装YARN、Spark集群,并考虑到了当前应用场景需要支持Python程序运行在Spark集群之上,这时可以准备好对应Python软件包、依赖模块,在YARN集群中的每个节点上进行安装。这样,YARN集群的每个NodeManager上都具有Python环境,可以编写PySpark Application并在集群上运行。目前比较流行的是直接安装Python虚拟环境,使用Anaconda等软件,可以极大地简化Python环境的管理工作。

这种方式的缺点是,如果后续使用Python编写Spark Application,需要增加新的依赖模块,那么就需要在YARN集群的每个节点上都进行该新增模块的安装。而且,如果依赖Python的版本,可能还需要管理不同版本Python环境。因为提交PySpark Application运行,具体在哪些NodeManager上运行该Application,是由YARN的调度器决定的,必须保证每个NodeManager上都具有Python环境(基础环境+依赖模块)。

YARN集群不配置Python环境
这种情况,更适合企业已经安装了规模较大的YARN集群,并在开始使用时并未考虑到后续会使用基于Python来编写Spark Application,并且不想在YARN集群的NodeManager上安装Python环境依赖依赖模块。我们参考了Benjamin Zaitlen的博文(详见后面参考链接),并基于Anaconda软件环境进行了实践和验证,具体实现思路如下所示:

在任意一个LInux OS的节点上,安装Anaconda软件
通过Anaconda创建虚拟Python环境
在创建好的Python环境中下载安装依赖的Python模块
将整个Python环境打成zip包
提交PySpark Application时,并通过–archives选项指定zip包路径
下面进行详细说明:

首先,我们在CentOS 7.2上,基于Python 2.7,下载了Anaconda2-5.0.0.1-Linux-x86_64.sh安装软件,并进行了安装。Anaconda的安装路径为/root/anaconda2。

然后,创建一个Python虚拟环境,执行如下命令:

conda create -n mlpy_env –copy -y -q python=2 numpy pandas scipy
上述命令创建了一个名称为mlpy_env的Python环境,–copy选项将对应的软件包都安装到该环境中,包括一些C的动态链接库文件。同时,下载numpy、pandas、scipy这三个依赖模块到该环境中。

接着,将该Python环境打包,执行如下命令:

cd /root/anaconda2/envs
zip -r mlpy_env.zip mlpy_env
该zip文件大概有400MB左右,将该zip压缩包拷贝到指定目录中,方便后续提交PySpark Application:

cp mlpy_env.zip /tmp/
最后,我们可以提交我们的PySpark Application,执行如下命令:

PYSPARK_PYTHON=./ANACONDA/mlpy_env/bin/python spark-submit \
–conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./ANACONDA/mlpy_env/bin/python \
–master yarn-cluster \
–archives /tmp/mlpy_env.zip#ANACONDA \
/var/lib/hadoop-hdfs/pyspark/test_pyspark_dependencies.py
上面的test_pyspark_dependencies.py文件中,使用了numpy、pandas、scipy这三个依赖包的函数,通过上面提到的YARN集群的cluster模式可以运行在Spark集群上。

可以看到,上面的依赖zip压缩包将整个Python的运行环境都包含在里面,在提交PySpark Application时会将该环境zip包上传到运行Application的所在的每个节点上,并解压缩后为Python代码提供运行时环境。如果不想每次都从客户端将该环境文件上传到集群中运行PySpark Application的节点上,也可以将zip包上传到HDFS上,并修改–archives参数的值为hdfs:///tmp/mlpy_env.zip#ANACONDA,也是可以的。

另外,需要说明的是,如果我们开发的/var/lib/hadoop-hdfs/pyspark/test_pyspark_dependencies.py文件中,也依赖的一些我们自己实现的处理函数,具有多个Python依赖的文件,想要通过上面的方式运行,必须将这些依赖的Python文件拷贝到我们创建的环境中,对应的目录为mlpy_env/lib/python2.7/site-packages/下面。

基于混合编程语言环境
假如我们还是希望使用Spark on YARN模式来运行PySpark Application,但并不将Python程序提交到YARN集群上运行。这时,我们可以考虑使用混合编程语言的方式,来处理数据任务。比如,机器学习Application具有迭代计算的特性,更适合在一个高配的节点上运行;而普通的ETL数据处理具有多机并行处理的特点,适合放到集群上进行分布式处理。

一个完整的机器学习Application的设计与构建,可以将算法部分和数据准备部分分离出来,使用Scala/Java进行数据预处理,输出一个机器学习算法所需要(更便于迭代、寻优计算)的输入数据格式,这会极大地压缩算法输入数据的规模,从而使算法迭代计算充分利用单机本地的资源(内存、CPU、网络),这可能会比直接放到集群中计算要快得多。

因此,我们在对机器学习Application准备数据时,使用原生的Scala编程语言实现Spark Application来处理数据,包括转换、统计、压缩等等,将满足算法输入格式的数据输出到HDFS指定目录中。在性能方面,对数据规模较大的情况下,在Spark集群上处理数据,Scala/Java实现的Spark Application运行性能要好一些。然后,算法迭代部分,基于丰富、高性能的Python科学计算模块,使用Python语言实现,其实直接使用PySpark API实现一个机器学习PySpark Application,运行模式为YARN client模式。这时,就需要在算法运行的节点上安装好Python环境及其依赖模块(而不需要在YARN集群的节点上安装),Driver程序从HDFS中读取输入数据(缓存到本地),然后在本地进行算法的迭代计算,最后输出模型。

总结

对于重度使用PySpark的情况,比如偏向机器学习,可以考虑在整个集群中都安装好Python环境,并根据不同的需要进行依赖模块的统一管理,能够=极大地方便PySpark Application的运行。

不在YARN集群上安装Python环境的方案,会使提交的Python环境zip包在YARN集群中传输带来一定开销,而且每次提交一个PySpark Application都需要打包一个环境zip文件,如果有大量的Python实现的PySpark Application需要在Spark集群上运行,开销会越来越大。另外,如果PySpark应用程序修改,可能需要重新打包环境。但是这样做确实不在需要考虑YARN集群集群节点上的Python环境了,任何版本Python编写的PySpark Application都可以使用集群资源运行。

关于该问题,SPARK-13587(详见下面参考链接)也在讨论如果优化该问题,后续应该会有一个比较合适的解决方案。

参考链接

https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals
https://issues.apache.org/jira/browse/SPARK-13587
http://quasiben.github.io/blog/2016/4/15/conda-spark/
https://blog.cloudera.com/blog/2017/04/use-your-favorite-python-library-on-pyspark-cluster-with-cloudera-data-science-workbench/
https://repo.continuum.io/pkgs/misc/parcels/archive/
https://repo.continuum.io/pkgs/misc/parcels/
https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_python.html
https://docs.anaconda.com/anaconda/user-guide/tasks/integration/cloudera

本文基于 署名-非商业性使用-相同方式共享 4.0 许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/570768.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[luogu4133 BJOI2012] 最多的方案 (计数dp)

题目描述 第二关和很出名的斐波那契数列有关,地球上的OIer都知道:F11, F22, Fi Fi-1 Fi-2,每一项都可以称为斐波那契数。现在给一个正整数N,它可以写成一些斐波那契数的和的形式。如果我们要求不同的方案中不能有相同的斐波那契…

Apache nifi 集群安装

原文地址:https://pierrevillard.com/2016/08/13/apache-nifi-1-0-0-cluster-setup/ 文章写的很好了,步骤性的英文写得也比较易懂,原样搬过来了,没有再翻译 As you may know a version 1.0.0-BETA of Apache NiFi has been rele…

stixel提升思路总结

1.用psmnet获得更好的disparity 2.用edgebox获得整个rgb图片的边缘,然后通过原本的stixel的上下边缘去寻找最优,用两个的边缘去重新得到一个新的边缘,但获得的轮廓不仅仅是外轮廓还有内部的轮廓,得出的结果比之前没有太多提升. 目前可以尝试在disparity图求边缘这种方式 3.使用…

Hive分区表count(*)不起mapreduce的真相

问题背景: 在对Hive求count(*)时,发现有些表会启mapreduce计算、返回 结果,比较耗时,有的表1秒之内返回结果 刚开始以为刚刚执行过一次count()后会对结果进行缓存,不用再去跑mapreduce,但经进一步实验发现…

进阶攻略|前端完整的学习路线

最近写了一篇关于前端一些常见轻便耐用的UI框架的小总结,很多小伙伴私信问我,要怎么学习前端,没有明确的方向,为了感谢大家的关注点赞打赏和喜欢,决定把前端的学习进阶之路稍微整理一下,也为了自己能在工作…

Kylin下构建Cube第一步出错:shell-init: error retrieving current directory

问题背景: 生产环境部署的Kylin-2.1,官方发布的最新安装包并不支持更改hbase存储的namespace,修改源码后重新打包部署过程中,build cube第一步出错 大概错误信息是: OS command error exit with 5 – hive -e "…

python 字典练习 记录学生是否交作业的小程序

#记录学生是否交作业的小程序 #包括:学生名字、日期、状态 1 data{2 taotao:{3 2018-6-3:已交,4 2018-6-4:未交,5 2018-6-5:已交6 } 7 mingming:{8 2018-6-3:未交,9 2018-6-4:已交 10 } 11 } #1、判断名字和日期是否…

boost::timer库使用

boost::timer boost库定时器使用,需要在编译时加相关链接库 -lboost_timer -lboost_system boost::timer::cpu_timer 和boost::timer::auto_cpu_timer用于精确定时,有start(),elapsed(),is_stopped()等方法,elapsed()方法返回的时结构体boost…

Kylin 2.0升级总结

文章转载,原文地址:https://blog.bcmeng.com/post/kylin-upgrade.html #6-给kylin社区的建议 引用于个人自查、学习 Kylin 2.0的升级节奏 升级的大原则 升级的目标 1 Kylin 2.0 升级流程 1.1 Kylin 2.0 代码合入 1.2 配置更新和梳理 1.3 兼容性测…

Html5 学习笔记 --》html基础 css 基础

HTML5 功能 HTML5特点 <!DOCTYPE html> <html lang"zh-cn"> <head><meta charset"utf-8"><title>基本格式</title> </head> <body><a href"http://www.baidu.com">百度</a> </b…

Kylin修改默认hbase namespace命名空间default的解决方案

问题及背景&#xff1a;同一用户的三家公司的物理集群合并&#xff0c;合并后用dataspacekerberos控制不同公司对集群资料的访问权限&#xff0c;三家公司分别使用独立的kerberos票据访问&#xff0c;特定的namespace,而生产环境部署的kylin-2.0/2.1只能保存cuboid到hbase 的 d…

pip download timeout 下载慢,超时解决方法

更换国内的pypi源&#xff1a; 如&#xff1a; pip install -i https://pypi.tuna.tsinghua.edu.cn/simple –upgrade tensorflow-gpu

test'

message.info(Click on left button.);直接弹出提示信息console.log(click left button, e); 后台输出区别 import { Pagination } from antd;function onShowSizeChange(current, pageSize) {console.log(current, pageSize); }ReactDOM.render(<Pagination showSizeChang…

Error:-81024 LR_VUG:The 'QTWeb' type is not supported on win32 platforms

在LR的bin目录下&#xff0c;选择Wlrun.exe文件&#xff0c;右键单击&#xff0c;选择属性&#xff1b;在兼容性里面把兼容性模式改为Windows XP (Service Pack 3),应用保存&#xff1b;然后再关闭controller&#xff0c;重新打开运行就可以了&#xff1b;

VMware仅主机模式访问外网

原文转载至&#xff1a;https://blog.csdn.net/eussi/article/details/79054622 保证VMware Network Adapter VMnet1是启用状态 将可以连接外网的连接共享属性设置成如下图所示 将VMware Network Adapter VMnet1的IP地址设置成与本机IP不同的网段即可 VMware虚拟网络编辑器VMne…

Spark学习之RDD的概念

RDD又叫弹性分布式数据集&#xff0c;是Spark数据的基础单元&#xff0c;Spark编程是围绕着在RDD上创建和执行操作来进行的。它们是跨集群进行分区的不可变集合&#xff08;immutable collection&#xff09;&#xff0c;如果某个分区丢失&#xff0c;这些分区可以重建&#xf…

我的ELK搭建笔记(阿里云上部署)

文章转载&#xff1a;http://www.jianshu.com/p/797073c1913f 仅用作个人学习&#xff0c;收藏 我的 ELK 搭建笔记&#xff08;基于阿里云&#xff09; “不是最好的&#xff0c;但一定是有良心的操作记录。”目录一览 0 重不重要都得有的开头 1 安装配置 1.1 CentOS 7…

HBase regions分布不均匀的解决

1、先确定master页面是否还有region in transition,如果有并且长时间未变化&#xff0c;可以考虑重启master&#xff0c;重新触发容灾。 2、region都加载后进入hbase shell balance_swith ture 开启balancer balancer 手动触发balance 即可。

莫队分块

今天兔哥讲了一波莫队&#xff0c;比较有趣&#xff0c;先加一个链接,这是她的教程 rabbithu.cnblogs.com 这里就不详细说了&#xff0c;其实就是两个指针来优化的暴力。一开始排序函数有问题&#xff0c;没用上莫队的核心思想&#xff1a;把查询区间先排序&#xff0c;第一关键…

Linux Kettle 闪退问题解决方案

我们在搭建kettle平台时&#xff0c;往往会搭建两种平台&#xff0c;一种win、一种是linux。在windows上进行kettle ETL测试工作&#xff0c;测试成功之后&#xff0c;会发布到linux服务器上&#xff0c;这就出现了一下问题——linux执行ktr文件&#xff0c;界面闪退&#xff0…