在Amazon EMR上运行Hadoop MapReduce作业

不久前,我发布了如何使用CLI设置EMR群集的信息。 在本文中,我将展示如何使用适用于AWS的Java SDK来设置集群。 展示如何使用Java AWS开发工具包执行此操作的最佳方法是展示完整的示例,因此,让我们开始吧。

    • 设置一个新的Maven项目

为此,我创建了一个新的默认Maven项目。 您可以运行该项目中的主类来启动EMR集群并执行我在本文中创建的MapReduce作业:

package net.pascalalma.aws.emr;import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.*;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.UUID;/*** Created with IntelliJ IDEA.* User: pascal* Date: 22-07-13* Time: 20:45*/
public class MyClient {private static final String HADOOP_VERSION = "1.0.3";private static final int INSTANCE_COUNT = 1;private static final String INSTANCE_TYPE = InstanceType.M1Small.toString();private static final UUID RANDOM_UUID = UUID.randomUUID();private static final String FLOW_NAME = "dictionary-" + RANDOM_UUID.toString();private static final String BUCKET_NAME = "map-reduce-intro";private static final String S3N_HADOOP_JAR ="s3n://" + BUCKET_NAME + "/job/MapReduce-1.0-SNAPSHOT.jar";private static final String S3N_LOG_URI = "s3n://" + BUCKET_NAME + "/log/";private static final String[] JOB_ARGS =new String[]{"s3n://" + BUCKET_NAME + "/input/input.txt","s3n://" + BUCKET_NAME + "/result/" + FLOW_NAME};private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS);private static final List<JobFlowExecutionState> DONE_STATES = Arrays.asList(new JobFlowExecutionState[]{JobFlowExecutionState.COMPLETED,JobFlowExecutionState.FAILED,JobFlowExecutionState.TERMINATED});static AmazonS3 s3;static AmazonElasticMapReduceClient emr;private static void init() throws Exception {AWSCredentials credentials = new PropertiesCredentials(MyClient.class.getClassLoader().getResourceAsStream("AwsCredentials.properties"));s3 = new AmazonS3Client(credentials);emr = new AmazonElasticMapReduceClient(credentials);emr.setRegion(Region.getRegion(Regions.EU_WEST_1));}private static JobFlowInstancesConfig configInstance() throws Exception {// Configure instances to useJobFlowInstancesConfig instance = new JobFlowInstancesConfig();instance.setHadoopVersion(HADOOP_VERSION);instance.setInstanceCount(INSTANCE_COUNT);instance.setMasterInstanceType(INSTANCE_TYPE);instance.setSlaveInstanceType(INSTANCE_TYPE);// instance.setKeepJobFlowAliveWhenNoSteps(true);// instance.setEc2KeyName("4synergy_palma");return instance;}private static void runCluster() throws Exception {// Configure the job flowRunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, configInstance());request.setLogUri(S3N_LOG_URI);// Configure the Hadoop jar to useHadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);jarConfig.setArgs(ARGS_AS_LIST);try {StepConfig enableDebugging = new StepConfig().withName("Enable debugging").withActionOnFailure("TERMINATE_JOB_FLOW").withHadoopJarStep(new StepFactory().newEnableDebuggingStep());StepConfig runJar =new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1),jarConfig);request.setSteps(Arrays.asList(new StepConfig[]{enableDebugging, runJar}));//Run the job flowRunJobFlowResult result = emr.runJobFlow(request);//Check the status of the running jobString lastState = "";STATUS_LOOP:while (true) {DescribeJobFlowsRequest desc =new DescribeJobFlowsRequest(Arrays.asList(new String[]{result.getJobFlowId()}));DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);for (JobFlowDetail detail : descResult.getJobFlows()) {String state = detail.getExecutionStatusDetail().getState();if (isDone(state)) {System.out.println("Job " + state + ": " + detail.toString());break STATUS_LOOP;} else if (!lastState.equals(state)) {lastState = state;System.out.println("Job " + state + " at " + new Date().toString());}}Thread.sleep(10000);}} catch (AmazonServiceException ase) {System.out.println("Caught Exception: " + ase.getMessage());System.out.println("Reponse Status Code: " + ase.getStatusCode());System.out.println("Error Code: " + ase.getErrorCode());System.out.println("Request ID: " + ase.getRequestId());}}public static boolean isDone(String value) {JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);return DONE_STATES.contains(state);}public static void main(String[] args) {try {init();runCluster();} catch (Exception e) {e.printStackTrace();  }}
}

在此类中,我首先声明一些常量,我认为这些常量是显而易见的。 在init()方法中,我使用添加到项目中的凭据属性文件。 我将此文件添加到了Maven项目的'/ main / resources'文件夹中。 它包含我的访问密钥和秘密密钥。
我还将EMR客户的区域设置为“ EU-WEST”。
下一个方法是“ configInstance()”。 在这种方法中,我通过设置Hadoop版本,实例数,实例大小等来创建和配置JobFlowInstance。您还可以配置'keepAlive'设置,以在作业完成后使集群保持活动状态。 在某些情况下这可能会有所帮助。 如果要使用此选项,则还可以设置要用于访问集群的密钥对,这可能很有用,因为如果不设置此密钥就无法访问集群。 方法“ runCluster()”是集群实际运行的地方。 它创建启动集群的请求。 在此请求中,添加了必须执行的步骤。 在我们的例子中,其中一个步骤是运行在先前步骤中创建的JAR文件。 我还添加了一个调试步骤,以便在集群完成并终止后我们可以访问调试日志记录。 我们可以简单地访问我用常量'S3N_LOG_URI'设置的S3存储桶中的日志文件。 创建此请求后,我们将基于此请求启动集群。 然后,我们每隔10秒钟拉动一次,以查看作业是否完成,并在控制台上显示一条消息,指示作业的当前状态。 要执行第一次运行,我们必须准备输入。

    • 准备输入

作为作业的输入(有关此示例作业的更多信息,请参见此),我们必须使字典内容可用于EMR群集。 此外,我们必须使JAR文件可用,并确保输出和日志目录存在于我们的S3存储桶中。 有几种方法可以执行此操作:您还可以通过使用SDK以编程方式来执行此操作,也可以通过从命令行使用S3cmd来执行此操作,或者使用AWS管理控制台来执行此操作 。 只要最终得到类似的设置,就可以了:

  • s3:// map-reduce-intro
  • s3:// map-reduce-intro / input
  • s3://map-reduce-intro/input/input.txt
  • s3:// map-reduce-intro / job
  • s3://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
  • s3:// map-reduce-intro / log
  • s3:// map-reduce-intro / result

或在使用S3cmd时如下所示:

s3cmd-1.5.0-alpha1$ s3cmd ls --recursive s3://map-reduce-intro/
2013-07-20 13:06    469941   s3://map-reduce-intro/input/input.txt
2013-07-20 14:12      5491   s3://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
2013-08-06 14:30         0   s3://map-reduce-intro/log/
2013-08-06 14:27         0   s3://map-reduce-intro/result/

在上面的示例中,我已经在代码中引入了S3客户端。 您还可以使用它来准备输入或获取输出,作为客户工作的一部分。

    • 运行集群

一切就绪后,我们就可以运行作业。 我只是在IntelliJ中运行'MyClient'的主要方法,并在控制台中获得以下输出:

Job STARTING at Tue Aug 06 16:31:55 CEST 2013
Job RUNNING at Tue Aug 06 16:36:18 CEST 2013
Job SHUTTING_DOWN at Tue Aug 06 16:38:40 CEST 2013
Job COMPLETED: {JobFlowId: j-JDB14HVTRC1L,Name: dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43,LogUri: s3n://map-reduce-intro/log/,AmiVersion: 2.4.0,ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:36:14 CEST 2013,ReadyDateTime: Tue Aug 06 16:36:14 CEST 2013,EndDateTime: Tue Aug 06 16:39:02 CEST 2013,LastStateChangeReason: Steps completed},Instances: {MasterInstanceType: m1.small,MasterPublicDnsName: ec2-54-216-104-11.eu-west-1.compute.amazonaws.com,MasterInstanceId: i-93268ddf,InstanceCount: 1,InstanceGroups: [{InstanceGroupId: ig-2LURHNAK5NVKZ,Name: master,Market: ON_DEMAND,InstanceRole: MASTER,InstanceType: m1.small,InstanceRequestCount: 1,InstanceRunningCount: 0,State: ENDED,LastStateChangeReason: Job flow terminated,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:34:28 CEST 2013,ReadyDateTime: Tue Aug 06 16:36:10 CEST 2013,EndDateTime: Tue Aug 06 16:39:02 CEST 2013}],NormalizedInstanceHours: 1,Ec2KeyName: 4synergy_palma,Placement: {AvailabilityZone: eu-west-1a},KeepJobFlowAliveWhenNoSteps: false,TerminationProtected: false,HadoopVersion: 1.0.3},Steps: [{StepConfig: {Name: Enable debugging,ActionOnFailure: TERMINATE_JOB_FLOW,HadoopJarStep: {Properties: [],Jar: s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args: [s3://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch]}},ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:36:12 CEST 2013,EndDateTime: Tue Aug 06 16:36:40 CEST 2013,}}, {StepConfig: {Name: /map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar,ActionOnFailure: TERMINATE_JOB_FLOW,HadoopJarStep: {Properties: [],Jar: s3n://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar,Args: [s3n://map-reduce-intro/input/input.txt, s3n://map-reduce-intro/result/dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43]}},ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:36:40 CEST 2013,EndDateTime: Tue Aug 06 16:38:10 CEST 2013,}}],BootstrapActions: [],SupportedProducts: [],VisibleToAllUsers: false
,}
Process finished with exit code 0

当然,我们在S3存储桶中配置的“结果”文件夹中有一个结果:

屏幕截图-2013-08-06-at-19-39-15

我将结果转移到我的本地计算机上,并进行了查看:

屏幕截图-2013-08-06-at-19-41-44

这样就可以得出这个简单的结论,但我认为,这是创建Hadoop作业并在对它进行单元测试之后在群集上运行它的完整示例,就像对待所有软件一样。

以该设置为基础,可以轻松地提出更复杂的业务案例,并对其进行测试和配置以在AWS EMR上运行。

参考: The Pragmatic Integrator博客上的JCG合作伙伴 Pascal Alma在Amazon EMR上运行Hadoop MapReduce作业 。

翻译自: https://www.javacodegeeks.com/2013/09/run-your-hadoop-mapreduce-job-on-amazon-emr.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/366940.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在商城系统开发时遇到商品的多级分类,为增强扩展性,子类可以任意添加,此类问题数据库如何设计...

表结构为&#xff1a; id&#xff08;编号&#xff09; name&#xff08;分类名&#xff09; parentID&#xff08;父类编号&#xff09; 简单举例如下&#xff1a; id name parentID 1 饮料 0&#xff08;为0表示第一大类&#xff09; 2 水果 …

[JSConf EU 2018] 大脑控制 Javascript

先解释&#xff0c;本人为前端菜鸟&#xff0c;之前也未参加过类似的活动&#xff0c;没有翻译过什么文章&#xff0c;此次是好奇心使然&#xff0c;也是想尝试下&#xff0c;学习学习&#xff0c;英文很烂&#xff0c;全靠有道&#xff0c;但是视频整个看下来&#xff0c;还是…

init tarray 太大_[NOIP 2001提高组T4]Car的旅行路线

[题目来源]&#xff1a;NOIP2001提高组T4[关键字]&#xff1a;最短路径[题目大意]&#xff1a;给定平面直角若干个矩形&#xff0c;计算(可经过其他矩形)两个矩形任意顶点间的最短路程费用。//[分析]&#xff1a;其实题目本事没有太大的难点&#xff0c;只需要对每两个点进行连…

Caffe Caffe2入门博客存档

caffe2 教程入门&#xff08;python版&#xff09; https://www.jianshu.com/p/5c0fd1c9fef9?fromtimeline caffe入门学习 https://blog.csdn.net/hjimce/article/details/48933813 运行caffe自带的两个简单例子 https://www.linuxidc.com/Linux/2016-11/136774p9.htm 关于caf…

JavaScript中不得不说的断言?

断言主要应用于“调试”与“测试” 一、前端中的断言 仔细地查找一下JavaScript中的API&#xff0c;实际上并没有多少关于断言的方法。唯一一个就是console.assert&#xff1a; // console.assert(condition, message)const a 1console.assert(typeof a number, a should be…

Java EE状态会话Bean(EJB)示例

在本文中&#xff0c;我们将了解如何在简单的Web应用程序中使用状态会话Bean来跟踪客户端会话中的状态。 1.简介 有状态会话Bean通常保存有关特定客户端会话的信息&#xff0c;并在整个会话中保留该信息&#xff08;与无状态会话Bean相对&#xff09;。 有状态EJB实例仅与一个…

计算机科学速成课16:软件工程

概念&#xff1a;建造标准或者大型软件的方法和工具代码打包成函数 面向过程函数打包成对象 面向对象对象层层打包Car.Engine.CruiseControl.SetCruiseSpeed(55)应用程序接口api集成开发环境IDE&#xff1a;code&#xff0c;整理&#xff0c;编译&#xff0c;测试注释和readme文…

牛客网NOIP赛前集训营-提高组(第六场)B-选择题[背包]

题意 题目链接 分析 直接背包之后可以 \(O(n)\) 去除一个物品的影响。注意特判 \([p1]\) 的情况。总时间复杂度为 \(O(n^2)\) 。代码 #include<bits/stdc.h> using namespace std; #define go(u) for(int ihead[u],ve[i].to;i;ie[i].last,ve[i].to) #define rep(i,a,b) f…

起点海外版 Hybrid App-内嵌页优化实践

本文作者&#xff1a;刘文涛 原创声明&#xff1a;本文为阅文前端团队 YFE 成员出品&#xff0c;请尊重原创&#xff0c;转载请联系公众号 (id: yuewen_YFE) 获取授权&#xff0c;并注明作者、出处和链接。 今年年初我司开启了起点品牌的海外之旅&#xff0c;名为「 Webnovel 」…

aix 卸载mysql_AIX 删除数据库及集群软件

一、 删除数据库1、用dbca自动删库在CRT上无法打开dbca图形界面&#xff0c;要安装一个Xmanage软件&#xff0c;用Xstart连接终端&#xff0c;并修改oracle用户的.profile&#xff0c;加上“export DISPLAY192.168.8.120:0.0”Xstart配置信息如下&#xff1a;2、手工删除数据库…

简单轻量级池实现

对象池是包含指定数量的对象的容器。 从池中获取对象时&#xff0c;在将对象放回之前&#xff0c;该对象在池中不可用。 池中的对象具有生命周期&#xff1a;创建&#xff0c;验证&#xff0c;销毁等。池有助于更好地管理可用资源。 有许多使用示例。 特别是在应用程序服务器中…

如何在github中的readme.md加入项目截图

1. 先在之前的本地项目文件夹里创建一个存放截图的文件夹。&#xff08;如img文件夹&#xff09; 2. 将新增的内容通过github desktop上传到github中 3. 在github中立马能看到刚刚上传的图片&#xff0c;打开图片&#xff0c;点击Download 4. 直接复制地址栏的网址 5. 最后在RE…

记表格设计规范整理与页面可视化生成工具开发

前言 公司有一个项目在维护&#xff0c;大概有300左右&#xff0c;其中表单与表格的页面占比大概百分之五六十&#xff0c;为了节省开发时间&#xff0c;避免多人协作时&#xff0c;出现多套冗余代码&#xff0c;我们尝试写了一下表单和表格的生成工具&#xff0c;从梳理到规范…

java仿qq空间音乐播放_完美实现仿QQ空间评论回复特效

评论回复是个很常见的东西&#xff0c;但是各大网站实现的方式却不尽相同。大体上有两种方式1.像优酷这种最常见&#xff0c;在输入框中要回复的人&#xff0c;这种方式下&#xff0c;用www.cppcns.com户可以修改。新浪微博则是在这个基础上&#xff0c;弹出好友菜单。这种方式…

使用签名保护基于HTTP的API

我在EMC上的一个平台上可以构建SaaS解决方案。 与越来越多的其他应用程序一样&#xff0c;该平台具有基于RESTful HTTP的API。 使用像JAX-RS这样的开发框架&#xff0c;构建这样的API相对容易。 但是&#xff0c; 正确构建它们并不容易。 建立基于HTTP的API的问题 问题不仅…

Python开发【模块】:Celery 分布式异步消息任务队列

前言&#xff1a; Celery 是一个 基于python开发的分布式异步消息任务队列&#xff0c;通过它可以轻松的实现任务的异步处理&#xff0c; 如果你的业务场景中需要用到异步任务&#xff0c;就可以考虑使用celery&#xff0c; 举几个实例场景中可用的例子: 你想对100台机器执行一…

iOS开发者的一些前端感悟

很多前端工程师会把自己比作“魔法师”&#xff0c;而对于JavaScript这门语言&#xff0c;我也想把它唤作一门“有魔力的语言”。因为这群有无限想法的人&#xff0c;真的在用它创造各种让你惊叹的事物。 Web三件套一、前言 几年前&#xff0c;笔者还是一名初涉编程的学生&…

windows下github 出现Permission denied (publickey)

github教科书传送门:http://www.liaoxuefeng.com/wiki/0013739516305929606dd18361248578c67b8067c8c017b000 再学习到"添加远程仓库"的时候遇到了 Permission denied (publickey) 这个问题&#xff0c; 总结来说以前的步骤如下所示&#xff1a; 1、git config --glo…

php如何逐条读取数据库,php从数据库中读取特定的行(实例)

有的时候我们需要从数据库中读取特定的数据&#xff0c;来检验用户的输入&#xff0c;这个时候需要执行的sql语句是&#xff1a;select * from table_name where idnum;需要执行这样的一个语句。那么怎样来执行这样的语句。为了执行sql语句&#xff0c;我们需要和数据库相连接$…

Java 7 vs Groovy 2.1性能比较

自从我与Grails上一次接触以来&#xff0c;我已经有两年没有使用Groovy了。 我陷入&#xff08;硬&#xff09;核心企业Java中&#xff0c;并且在后台遇到了一些性能方面的问题。 我几乎错过了学习Spock的机会&#xff0c;但是幸运的是&#xff0c; 华沙Java用户组帮助我摆脱了…