hadoop emr_在Amazon EMR上运行Hadoop MapReduce作业

hadoop emr

不久前,我发布了如何使用CLI设置EMR群集的信息。 在本文中,我将展示如何使用适用于AWS的Java SDK来设置集群。 展示如何使用Java AWS开发工具包执行此操作的最佳方法是展示我认为完整的示例,因此让我们开始吧。

    • 设置一个新的Maven项目

为此,我创建了一个新的默认Maven项目。 您可以运行该项目中的主类来启动EMR集群并执行我在本文中创建的MapReduce作业:

package net.pascalalma.aws.emr;import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.*;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.UUID;/*** Created with IntelliJ IDEA.* User: pascal* Date: 22-07-13* Time: 20:45*/
public class MyClient {private static final String HADOOP_VERSION = "1.0.3";private static final int INSTANCE_COUNT = 1;private static final String INSTANCE_TYPE = InstanceType.M1Small.toString();private static final UUID RANDOM_UUID = UUID.randomUUID();private static final String FLOW_NAME = "dictionary-" + RANDOM_UUID.toString();private static final String BUCKET_NAME = "map-reduce-intro";private static final String S3N_HADOOP_JAR ="s3n://" + BUCKET_NAME + "/job/MapReduce-1.0-SNAPSHOT.jar";private static final String S3N_LOG_URI = "s3n://" + BUCKET_NAME + "/log/";private static final String[] JOB_ARGS =new String[]{"s3n://" + BUCKET_NAME + "/input/input.txt","s3n://" + BUCKET_NAME + "/result/" + FLOW_NAME};private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS);private static final List<JobFlowExecutionState> DONE_STATES = Arrays.asList(new JobFlowExecutionState[]{JobFlowExecutionState.COMPLETED,JobFlowExecutionState.FAILED,JobFlowExecutionState.TERMINATED});static AmazonS3 s3;static AmazonElasticMapReduceClient emr;private static void init() throws Exception {AWSCredentials credentials = new PropertiesCredentials(MyClient.class.getClassLoader().getResourceAsStream("AwsCredentials.properties"));s3 = new AmazonS3Client(credentials);emr = new AmazonElasticMapReduceClient(credentials);emr.setRegion(Region.getRegion(Regions.EU_WEST_1));}private static JobFlowInstancesConfig configInstance() throws Exception {// Configure instances to useJobFlowInstancesConfig instance = new JobFlowInstancesConfig();instance.setHadoopVersion(HADOOP_VERSION);instance.setInstanceCount(INSTANCE_COUNT);instance.setMasterInstanceType(INSTANCE_TYPE);instance.setSlaveInstanceType(INSTANCE_TYPE);// instance.setKeepJobFlowAliveWhenNoSteps(true);// instance.setEc2KeyName("4synergy_palma");return instance;}private static void runCluster() throws Exception {// Configure the job flowRunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, configInstance());request.setLogUri(S3N_LOG_URI);// Configure the Hadoop jar to useHadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);jarConfig.setArgs(ARGS_AS_LIST);try {StepConfig enableDebugging = new StepConfig().withName("Enable debugging").withActionOnFailure("TERMINATE_JOB_FLOW").withHadoopJarStep(new StepFactory().newEnableDebuggingStep());StepConfig runJar =new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1),jarConfig);request.setSteps(Arrays.asList(new StepConfig[]{enableDebugging, runJar}));//Run the job flowRunJobFlowResult result = emr.runJobFlow(request);//Check the status of the running jobString lastState = "";STATUS_LOOP:while (true) {DescribeJobFlowsRequest desc =new DescribeJobFlowsRequest(Arrays.asList(new String[]{result.getJobFlowId()}));DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);for (JobFlowDetail detail : descResult.getJobFlows()) {String state = detail.getExecutionStatusDetail().getState();if (isDone(state)) {System.out.println("Job " + state + ": " + detail.toString());break STATUS_LOOP;} else if (!lastState.equals(state)) {lastState = state;System.out.println("Job " + state + " at " + new Date().toString());}}Thread.sleep(10000);}} catch (AmazonServiceException ase) {System.out.println("Caught Exception: " + ase.getMessage());System.out.println("Reponse Status Code: " + ase.getStatusCode());System.out.println("Error Code: " + ase.getErrorCode());System.out.println("Request ID: " + ase.getRequestId());}}public static boolean isDone(String value) {JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);return DONE_STATES.contains(state);}public static void main(String[] args) {try {init();runCluster();} catch (Exception e) {e.printStackTrace();  }}
}

在此类中,我首先声明一些常量,我认为这些常量是显而易见的。 在init()方法中,我使用添加到项目中的凭据属性文件。 我将此文件添加到了Maven项目的'/ main / resources'文件夹中。 它包含我的访问密钥和秘密密钥。
我还将EMR客户的区域设置为“ EU-WEST”。
下一个方法是“ configInstance()”。 在这种方法中,我通过设置Hadoop版本,实例数,实例大小等来创建和配置JobFlowInstance。您还可以配置'keepAlive'设置,以在作业完成后使集群保持活动状态。 在某些情况下这可能会有所帮助。 如果要使用此选项,则也可以设置要用于访问集群的密钥对,这可能会很有用,因为如果不设置此密钥就无法访问集群。 方法“ runCluster()”是集群实际运行的地方。 它创建启动集群的请求。 在此请求中,添加了必须执行的步骤。 在我们的例子中,其中一个步骤是运行在先前步骤中创建的JAR文件。 我还添加了一个调试步骤,以便在集群完成并终止后我们可以访问调试日志记录。 我们可以简单地访问我用常量'S3N_LOG_URI'设置的S3存储桶中的日志文件。 创建此请求后,我们将基于此请求启动集群。 然后,我们每隔10秒钟拉动一次,以查看作业是否完成,并在控制台上显示一条消息,指示作业的当前状态。 要执行第一次运行,我们必须准备输入。

    • 准备输入

作为作业的输入(有关此示例作业的更多信息,请参见此),我们必须使字典内容可用于EMR群集。 此外,我们必须使JAR文件可用,并确保输出和日志目录存在于我们的S3存储桶中。 有几种方法可以执行此操作:您也可以通过使用SDK以编程方式执行此操作,也可以使用S3cmd从命令行执行此操作,也可以使用AWS管理控制台进行此操作 。 只要您得到类似的设置,就可以了:

  • s3:// map-reduce-intro
  • s3:// map-reduce-intro / input
  • s3://map-reduce-intro/input/input.txt
  • s3:// map-reduce-intro / job
  • s3://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
  • s3:// map-reduce-intro / log
  • s3:// map-reduce-intro / result

或在使用S3cmd时如下所示:

s3cmd-1.5.0-alpha1$ s3cmd ls --recursive s3://map-reduce-intro/
2013-07-20 13:06    469941   s3://map-reduce-intro/input/input.txt
2013-07-20 14:12      5491   s3://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
2013-08-06 14:30         0   s3://map-reduce-intro/log/
2013-08-06 14:27         0   s3://map-reduce-intro/result/

在上面的示例中,我已经在代码中引入了S3客户端。 您还可以使用它来准备输入或获取输出,作为客户工作的一部分。

    • 运行集群

当一切就绪后,我们就可以运行工作。 我只是在IntelliJ中运行'MyClient'的主要方法,并在控制台中获得以下输出:

Job STARTING at Tue Aug 06 16:31:55 CEST 2013
Job RUNNING at Tue Aug 06 16:36:18 CEST 2013
Job SHUTTING_DOWN at Tue Aug 06 16:38:40 CEST 2013
Job COMPLETED: {JobFlowId: j-JDB14HVTRC1L,Name: dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43,LogUri: s3n://map-reduce-intro/log/,AmiVersion: 2.4.0,ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:36:14 CEST 2013,ReadyDateTime: Tue Aug 06 16:36:14 CEST 2013,EndDateTime: Tue Aug 06 16:39:02 CEST 2013,LastStateChangeReason: Steps completed},Instances: {MasterInstanceType: m1.small,MasterPublicDnsName: ec2-54-216-104-11.eu-west-1.compute.amazonaws.com,MasterInstanceId: i-93268ddf,InstanceCount: 1,InstanceGroups: [{InstanceGroupId: ig-2LURHNAK5NVKZ,Name: master,Market: ON_DEMAND,InstanceRole: MASTER,InstanceType: m1.small,InstanceRequestCount: 1,InstanceRunningCount: 0,State: ENDED,LastStateChangeReason: Job flow terminated,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:34:28 CEST 2013,ReadyDateTime: Tue Aug 06 16:36:10 CEST 2013,EndDateTime: Tue Aug 06 16:39:02 CEST 2013}],NormalizedInstanceHours: 1,Ec2KeyName: 4synergy_palma,Placement: {AvailabilityZone: eu-west-1a},KeepJobFlowAliveWhenNoSteps: false,TerminationProtected: false,HadoopVersion: 1.0.3},Steps: [{StepConfig: {Name: Enable debugging,ActionOnFailure: TERMINATE_JOB_FLOW,HadoopJarStep: {Properties: [],Jar: s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args: [s3://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch]}},ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:36:12 CEST 2013,EndDateTime: Tue Aug 06 16:36:40 CEST 2013,}}, {StepConfig: {Name: /map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar,ActionOnFailure: TERMINATE_JOB_FLOW,HadoopJarStep: {Properties: [],Jar: s3n://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar,Args: [s3n://map-reduce-intro/input/input.txt, s3n://map-reduce-intro/result/dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43]}},ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013,StartDateTime: Tue Aug 06 16:36:40 CEST 2013,EndDateTime: Tue Aug 06 16:38:10 CEST 2013,}}],BootstrapActions: [],SupportedProducts: [],VisibleToAllUsers: false
,}
Process finished with exit code 0

当然,我们在S3存储桶中配置的“结果”文件夹中有一个结果:

屏幕截图-2013-08-06-at-19-39-15

我将结果传输到我的本地计算机上,并进行了查看:

屏幕截图-2013-08-06-at-19-41-44

这样就可以得出这个简单的结论,但我认为,这是创建Hadoop作业并在对它进行单元测试之后在群集上运行它的完整示例,就像我们对所有软件所做的那样。

以该设置为基础,可以轻松地提出更复杂的业务案例,并对其进行测试和配置以在AWS EMR上运行。

参考: The Pragmatic Integrator博客上的JCG合作伙伴 Pascal Alma在Amazon EMR上运行Hadoop MapReduce作业 。

翻译自: https://www.javacodegeeks.com/2013/09/run-your-hadoop-mapreduce-job-on-amazon-emr.html

hadoop emr

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/347009.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apache Spark中的自定义日志

您是否曾经对运行了几个小时的Spark作业感到沮丧&#xff0c;但由于基础设施问题而失败了。 您会很晚才知道此故障&#xff0c;并浪费了数小时的时间&#xff0c;当Spark UI日志也无法用于事后检查时&#xff0c;它会更加痛苦。 你不是一个人&#xff01; 在这篇文章中&…

【MFC系列1】之简单Win32程序

一、位运算 标志位组合&#xff1a;wParam MK_RBUTTON|MK_CONTROL|MK_SHIFT 标志位的判断&#xff1a;wParam & MK_LBUTTON 标志位的分离&#xff1a;wParam & ~MK_RBUTTON; 二、调用约定 __cdecl_stdcall是C Declaration的缩写&#xff08;declaration&#xff0c…

【MFC系列2】Win32项目转换为MFC项目

关注公号【逆向通信猿】更精彩&#xff01;&#xff01;&#xff01; 一、MFC的组成 a)必须有一个CWinApp的派生类&#xff1b; b)必须用派生类在全局定义一个对象theApp; c)在派生类中必须重写InitInstance虚函数&#xff1b; d)MFC程序就用CWinApp派生类中的InitInstance虚…

servlet面试常问问题_50个Servlet面试问答

servlet面试常问问题Servlet是Java EE的一个非常重要的主题&#xff0c;所有Web应用程序框架&#xff08;例如Spring和Struts&#xff09;都建立在它之上。 这使servlet成为Java访谈中的热门话题。 在这里&#xff0c;我提供了50个servlet面试问题的列表&#xff0c;并提供了答…

Win10图片查看器打不开图片,报错内存不足

关注公号【逆向通信猿】更精彩&#xff01;&#xff01;&#xff01; 按如下设置即可

学习Java的最佳方法

Java是某些人可能会说很难学习的语言之一&#xff0c;而其他人则认为它与其他语言具有相同的学习曲线。 两种观察都是正确的。 但是&#xff0c;由于Java具有平台无关性&#xff0c;因此在语言方面具有相当大的优势。 Java是一种低级语言&#xff0c;它以一种简单的方式来实现…

【MFC系列3】永远点不到的按钮

变量的声明与定义 全局变量theApp的定义 CXXXXApp theApp; // 定义在其他类中想要访问全局变量theApp时&#xff0c;需在stdafx.h中进行声明 #include "XXXX.h" extern CXXXXApp theApp; // 声明三个被系统接管的消息&#xff08;缺省&#xff0c;虚函数回调&…

CSAPP:Attack lab

关注公号【逆向通信猿】更精彩&#xff01;&#xff01;&#xff01; 原文地址&#xff1a;https://www.jianshu.com/p/db731ca57342 本文介绍的是CSAPP书籍中的第三个lab: Attack lab。通过这个lab我们能够更加清楚和深入的了解到缓冲区溢出的隐患&#xff0c;以及如何利用缓…

antlr java_使用ANTLR和Java创建外部DSL

antlr java在我以前的文章中&#xff0c;有一段时间我写了关于使用Java的内部DSL的文章。 在Martin Fowler撰写的《 领域特定语言 》一书中&#xff0c;他讨论了另一种称为外部DSL的DSL&#xff0c;其中DSL是用另一种语言编写的&#xff0c;然后由宿主语言进行解析以填充语义模…

【MFC系列-第7天】MFC类库封装原理

关注公号【逆向通信猿】更精彩&#xff01;&#xff01;&#xff01; 运算符重载 operator RECT* () {return this; }CString类库 例1 CString str;int n str.GetLength();::GetSystemDirectory(str.GetBuffer(1000), 1000);n str.GetLength();str.ReleaseBuffer();//必须…

【MFC系列-第8天】小型软件项目开发

第8天 小型软件项目开发 8.1 记事本开发 小技巧&#xff1a;用VC6新建工程&#xff0c;以资源方式打开系统自带notepad.exe中的MENU资源&#xff0c;加入到自己新建的工程中&#xff1b;然后再添加到VS工程中&#xff0c;即可获取现有exe的菜单资源。 EndDialog中传入的参数…

Spring休眠教程

1.简介 在本文中&#xff0c;我们将演示如何利用最流行的ORM&#xff08;对象关系映射&#xff09;工具之一的Hibernate的功能 &#xff0c;该工具可将面向对象的域模型转换为传统的关系数据库。 Hibernate是目前最流行的Java框架之一。 由于这个原因&#xff0c;我们在Java Co…

【MFC系列-第9天】MFC消息映射机制的原理

关注公号【逆向通信猿】更精彩&#xff01;&#xff01;&#xff01; 第9天 MFC消息映射机制的原理 9.1 对话框常用的回调函数 a)窗口创建时的消息和虚函数包括&#xff1a;WM_CREATE&#xff0c;WM_INITDIALOG,和PreSubclassWindow等&#xff1b; b)窗口关闭时的消息和虚函…

无状态会话的ejb_Java EE状态会话Bean(EJB)示例

无状态会话的ejb在本文中&#xff0c;我们将了解如何在简单的Web应用程序中使用状态会话Bean来跟踪客户端会话中的状态。 1.简介 有状态会话Bean通常保存有关特定客户端会话的信息&#xff0c;并在整个会话中保留该信息&#xff08;与无状态会话Bean相对&#xff09;。 有状态…

ArrayList源码学习笔记(3)

时隔两年&#xff0c;重新读ArrayList源码&#xff0c;轻松了很多&#xff0c;以问题的方式记录一下收获 装饰器模式 注释中提到ArrayList本身不是线程安全的&#xff0c;注释如下&#xff1a; * <p><strong>Note that this implementation is not synchronized.&…

【MFC系列-第10天】非模式对话框开发

10.1 程序左上角图标设置 通过SendMessage发送WM_SETICON消息来设置 10.2 纯Win32程序开发和技巧&#xff08;借助MFC源码&#xff09; 10.3 非模式对话框的调用 a)调用CDialog::Create函数来创建&#xff0c;并且调用ShowWindow来显示&#xff1b; b)单例模式每次判断句柄…

Maven教程之春

1.简介 在本文中&#xff0c;我们将演示如何针对非常特定的用例对Spring使用Maven依赖项。 我们使用的所有库的最新版本都可以在Maven Central上找到。 对于一个有效的构建周期而言&#xff0c;了解Maven依赖项的工作方式以及如何对其进行管理非常重要&#xff0c;并且对于在我…

【MFC系列-第11天】CWinApp类成员分析

11.1 资源管理器开发&#xff08;C语言&#xff09; 三种位运算 //#include <AtlBase.h> //混合 c_file.attrib | _A_HIDDEN|_A_RDONLY; //判断使用if(c_file.attrib & _A_HIDDEN) //删除属性c_file.attrib&~_A_HIDDENT;11.2 资源管理器开发&#xff08;API&a…

【MFC系列-第12天】Windows系统对话框

12.1 INI配置文件 UINT GetProfileInt( LPCTSTR lpszSection, LPCTSTR lpszEntry, int nDefault ); 从应用程序的配置文件&#xff08;.INI&#xff09;的一个配置项中获取一个整数 CString GetProfileString(LPCTSTR szSection, LPCTSTR szEntry, LPCTSTR szDefault NULL )…

【BCH码2】BCH码的快速BM迭代译码原理详解及MATLAB实现(不使用MATLAB库函数【全部代码需私信另外付费获取】)

理论基础 订阅《信道编码》专栏,首先查阅各子程序的详解 【有限域生成】本原多项式生成有限域的原理及MATLAB实现 【有限域除法】二元多项式除法电路原理及MATLAB详解 【有限域元素加法和乘法】有限域元素加法和乘法的原理及MATLAB实现 【多元域乘法】多项式乘法电路原理…