目录
Maven pom引用
配置文件
代码
平时开发中可以在yarn的web页面查看应用程序运行状态,如下图
下面代码实现了,代码监控Yarn运行程序,可以对部分任务进行实时监控
Maven pom引用
这里Demo使用的hadoop版本是 3.0.0
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-api</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-client</artifactId><version>${hadoop.version}</version></dependency>
配置文件
yarn-site.xml 放在resources目录下
代码
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;public class YarnMonitor {/*** 获取任务的applicationId* @return String* @param jobName* @return*/public static String getAppId(String jobName) {YarnClient client = YarnClient.createYarnClient();Configuration conf = new Configuration();client.init(conf);client.start();EnumSet<YarnApplicationState> appStates = EnumSet.noneOf(YarnApplicationState.class);if (appStates.isEmpty()) {appStates.add(YarnApplicationState.RUNNING);appStates.add(YarnApplicationState.ACCEPTED);appStates.add(YarnApplicationState.SUBMITTED);}List<ApplicationReport> appsReport = null;try {//返回EnumSet<YarnApplicationState>中个人任务状态的所有任务appsReport = client.getApplications(appStates);} catch (YarnException | IOException e) {e.printStackTrace();}assert appsReport != null;for (ApplicationReport appReport : appsReport) {//获取任务名String jn = appReport.getName();String applicationType = appReport.getApplicationType();if (jn.equals(jobName) && "Apache Flink".equals(applicationType)) {try {client.close();} catch (IOException e) {e.printStackTrace();}return appReport.getApplicationId().toString();}}try {client.close();} catch (IOException e) {e.printStackTrace();}return null;}/*** 根据任务的applicationId去获取任务的状态* @return YarnApplicationState* @param appId* @return*/public static YarnApplicationState getState(String appId) {YarnClient client = YarnClient.createYarnClient();Configuration conf = new Configuration();client.init(conf);client.start();ApplicationId applicationId = ApplicationId.fromString(appId);// ApplicationId appId = ConverterUtils.toApplicationId(appId);YarnApplicationState yarnApplicationState = null;try {ApplicationReport applicationReport = client.getApplicationReport(applicationId);yarnApplicationState = applicationReport.getYarnApplicationState();} catch (YarnException | IOException e) {e.printStackTrace();}try {client.close();} catch (IOException e) {e.printStackTrace();}return yarnApplicationState;}public static void main(String[] args) throws IOException, InterruptedException {while(true) {TimeUnit.SECONDS.sleep(3);boolean yarnIsContains = yarnIsContains("Spark Pi");System.out.println(yarnIsContains);}}/*** 判断任务名为appName的任务,是否在yarn中运行,状态为RUNNING* @return boolean* @param appName* @return*/public static boolean yarnIsContains(String appName) {Configuration conf = new YarnConfiguration();YarnClient yarnClient = YarnClient.createYarnClient();yarnClient.init(conf);yarnClient.start();boolean isContains = false;List<ApplicationReport> applications = new ArrayList<ApplicationReport>();try {//applications = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.FINISHED));applications = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));for(ApplicationReport application:applications) {String name = application.getName();if(name.equals(appName)) {System.out.println("ApplicationId ============> "+application.getApplicationId());System.out.println("name ============> "+application.getName());System.out.println("queue ============> "+application.getQueue());System.out.println("queue ============> "+application.getUser());System.out.println(applications);isContains = true;}}/** if(applications.contains(appName)) {* System.out.println("ApplicationId ============> "+applications.get(0).* getApplicationId());* System.out.println("name ============> "+applications.get(0).getName());* System.out.println("queue ============> "+applications.get(0).getQueue());* System.out.println("queue ============> "+applications.get(0).getUser());* System.out.println(applications); }*/} catch (YarnException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {yarnClient.stop();} return isContains;}
}
Yarn的状态有以下几种,可根据个人需要进行使用,上面Demo我们是判断任务名为 Spark Pi 的程序是否处于运行状态,如果是则返回true。