Yarn 监控 - 监控任务运行状态 (包括Spark,MR 所有在Yarn中运行的任务)

目录

Maven pom引用

配置文件

代码


    平时开发中可以在yarn的web页面查看应用程序运行状态,如下图

 

下面代码实现了,代码监控Yarn运行程序,可以对部分任务进行实时监控

Maven pom引用

这里Demo使用的hadoop版本是 3.0.0

                <dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-api</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-client</artifactId><version>${hadoop.version}</version></dependency>

配置文件

yarn-site.xml 放在resources目录下

代码

import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;public class YarnMonitor {/*** 获取任务的applicationId* @return String* @param jobName* @return*/public static String getAppId(String jobName) {YarnClient client = YarnClient.createYarnClient();Configuration conf = new Configuration();client.init(conf);client.start();EnumSet<YarnApplicationState> appStates = EnumSet.noneOf(YarnApplicationState.class);if (appStates.isEmpty()) {appStates.add(YarnApplicationState.RUNNING);appStates.add(YarnApplicationState.ACCEPTED);appStates.add(YarnApplicationState.SUBMITTED);}List<ApplicationReport> appsReport = null;try {//返回EnumSet<YarnApplicationState>中个人任务状态的所有任务appsReport = client.getApplications(appStates);} catch (YarnException | IOException e) {e.printStackTrace();}assert appsReport != null;for (ApplicationReport appReport : appsReport) {//获取任务名String jn = appReport.getName();String applicationType = appReport.getApplicationType();if (jn.equals(jobName) && "Apache Flink".equals(applicationType)) {try {client.close();} catch (IOException e) {e.printStackTrace();}return appReport.getApplicationId().toString();}}try {client.close();} catch (IOException e) {e.printStackTrace();}return null;}/*** 根据任务的applicationId去获取任务的状态* @return YarnApplicationState* @param appId* @return*/public static YarnApplicationState getState(String appId) {YarnClient client = YarnClient.createYarnClient();Configuration conf = new Configuration();client.init(conf);client.start();ApplicationId applicationId = ApplicationId.fromString(appId);//	        ApplicationId appId = ConverterUtils.toApplicationId(appId);YarnApplicationState yarnApplicationState = null;try {ApplicationReport applicationReport = client.getApplicationReport(applicationId);yarnApplicationState = applicationReport.getYarnApplicationState();} catch (YarnException | IOException e) {e.printStackTrace();}try {client.close();} catch (IOException e) {e.printStackTrace();}return yarnApplicationState;}public static void main(String[] args) throws IOException, InterruptedException {while(true) {TimeUnit.SECONDS.sleep(3);boolean yarnIsContains = yarnIsContains("Spark Pi");System.out.println(yarnIsContains);}}/*** 判断任务名为appName的任务,是否在yarn中运行,状态为RUNNING* @return boolean* @param appName* @return*/public static boolean yarnIsContains(String appName) {Configuration conf = new YarnConfiguration();YarnClient yarnClient = YarnClient.createYarnClient();yarnClient.init(conf);yarnClient.start();boolean isContains = false;List<ApplicationReport> applications = new ArrayList<ApplicationReport>();try {//applications = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.FINISHED));applications = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));for(ApplicationReport application:applications) {String name = application.getName();if(name.equals(appName)) {System.out.println("ApplicationId ============> "+application.getApplicationId());System.out.println("name ============> "+application.getName());System.out.println("queue ============> "+application.getQueue());System.out.println("queue ============> "+application.getUser());System.out.println(applications);isContains = true;}}/** if(applications.contains(appName)) {* System.out.println("ApplicationId ============> "+applications.get(0).* getApplicationId());* System.out.println("name ============> "+applications.get(0).getName());* System.out.println("queue ============> "+applications.get(0).getQueue());* System.out.println("queue ============> "+applications.get(0).getUser());* System.out.println(applications); }*/} catch (YarnException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {yarnClient.stop();} return isContains;}
}

Yarn的状态有以下几种,可根据个人需要进行使用,上面Demo我们是判断任务名为 Spark Pi 的程序是否处于运行状态,如果是则返回true。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509792.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HugeGraph 图数据库索引介绍 - 范围索引,全文索引

目录 HugeGraph 索引介绍 二级索引 组合索引 范围索引 全文索引 HugeGraph 索引介绍 二级索引 创建schema和添加数据 schema.propertyKey("name").asText().ifNotExist().create();schema.propertyKey("uid").asLong().ifNotExist().create();schem…

Hbase JMX 监控 - Region

获取Region监控信息页面&#xff1a; http://regionServerName:16030/jmx?qryHadoop:serviceHBase,nameRegionServer,subRegions 获得数据如下 参数代表含义 *** 为前缀代表&#xff1a;Namespace_${namespace}_table_${tableName}_region_${regionName} ***_metric_storeCo…

c++ 之类的前置声明

转自&#xff1a;http://blog.csdn.net/fjb2080/archive/2010/04/27/5533514.aspx 作者&#xff1a;清林&#xff0c;博客名&#xff1a;飞空静渡 刚开始学习c的人都会遇到这样的问题&#xff1a; 定义一个类 class A&#xff0c;这个类里面使用了类B的对象b&#xff0c;然后定…

Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略

目录 类的关系图 ​ RatioBasedCompactionPolicy selectCompaction 方法 getCurrentEligibleFiles方法 skipLargeFiles方法 createCompactionRequest方法 filterBulk方法 applyCompactionPolicy方法 removeExcessFiles方法 setIsMajor方法 其他相关文章 Hbase Compa…

Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

目录 CompactSplitThread requestCompactionInternal方法 selectCompaction方法 requestCompaction方法 其他相关文章 Hbase Compaction 源码分析 - CompactionChecker Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略 Hbase Compaction 源码分析 - CompactS…

Hbase Compaction 队列数量较大分析

目录 问题 问题原因分析 总结建议 问题 前几天朋友公司Hbase集群出现Compaction队列持续处于比较大的情况&#xff0c;并且mem flush队列也比较大&#xff0c;一起看了下问题&#xff0c;大概情况如下图 从图中可以看出来压缩队列总和持续在1000-2000&#xff0c;平对压缩队列…

Hbase 2.x Region in transition (永久RIT) 异常解决

环境 Hbase 版本&#xff1a;2.0 问题原因 hbase长时间出现RIT&#xff0c;并且发生RIT的Region是已经删除了的Hbase表&#xff0c;表未删除的情况下执行assgin可以消除该问题 Hbase Region in transition (RIT) 异常解决&#xff1a;https://datamining.blog.csdn.net/artic…

sigslot库源码分析

言归正传&#xff0c;sigslot是一个用标准C语法实现的信号与槽机制的函数库&#xff0c;类型和线程安全。提到信号与槽机制&#xff0c;恐怕最容易想到的就是大名鼎鼎的Qt所支持的对象之间通信的模式吧。不过这里的信号与槽虽然在概念上等价与Qt所实现的信号与槽&#xff0c;但…

Hue开发指南 - 提交 Spark 程序

目录 Hue开发指南 1.Spark文件打包成一个Jar包提交Hue运行 1.1 上传Spark Jar包至HDFS文件系统 1.2.Hue中创建Spark任务 2.多jar包导入执行&#xff08;依赖jar包与主程序jar包分开打包&#xff09; 2.1 修改worksapce 2.2 添加程序依赖jar包 Hue开发指南 Hue是面向 Had…

如何缩小码农和高手的差距

为什么同样的时间有的人可以漂亮的完成工作&#xff0c;而有些人废了很大的力气也没有完成&#xff1f;前者我们常常称之为“大牛”&#xff0c;后者我们常常叫他们“菜鸟”。当然“大牛”都是相对而言的&#xff0c;“大牛”也不可能方方面面都非常厉害&#xff0c;换句话说大…

OpenResty 安装,收集日志保存到文本文件

目录 安装 1.安装相关类库 2.安装编译openresty 3.编写配置启动openresty服务 4.通过 openresty 保存日志数据到系统 安装 1.安装相关类库 yum install -y readline-devel pcre-devel openssl-devel gcc 2.安装编译openresty wget https://openresty.org/download/open…

Hadoop Yarn REST API未授权漏洞利用挖矿分析

目录 一、背景情况 二、 漏洞说明 攻击步骤&#xff1a; 三、入侵分析 四、安全建议 清理病毒 安全加固 五、IOCs 一、背景情况 5月5日腾讯云安全曾针对攻击者利用Hadoop Yarn资源管理系统REST API未授权漏洞对服务器进行攻击&#xff0c;攻击者可以在未授权的情况…

Linux shell编程学习总结

主要内容&#xff1a; shell编程sed命令awk命令crontab定时器 什么是Shell&#xff1f; Shell是用户与内核进行交互操作的一种接口&#xff0c;目前最流行的Shell称为bash Shell Shell也是一门编程语言<解释型的编程语言>&#xff0c;即shell脚本 一个系统可以存在多…

Flink ProcessFunction 介绍使用

目录 实现功能 代码 测试 问题 官网描述&#xff1a;https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html The ProcessFunction is a low-level stream processing operation, giving access to the basic build…

Flink keyby 数据倾斜问题处理

上一篇我们使用keyby后发现数据严重倾斜 https://datamining.blog.csdn.net/article/details/105316728 大概看下问题所在&#xff0c;大量数据在一个subtask中运行 这里我们使用两阶段keyby 解决该问题 之前的问题如下图所示 我们期望的是 但我们的需要根据key进行聚合统计&a…

linux中iptables对防火墙的操作

Iptables教程 1. iptables防火墙简介 Iptables也叫netfilter是Linux下自带的一款免费且优秀的基于包过滤的防火墙工具&#xff0c;它的功能十分强大&#xff0c;使用非常灵活&#xff0c;可以对流入、流出、流经服务器的数据包进行精细的控制。iptables是Linux2.4及2.6内核中…

Web Components入门不完全指北

目前流行的各类前端框架&#xff0c;不管是react, angular还是vue&#xff0c;都有一个共同点&#xff0c;那就是支持组件化开发&#xff0c;但事实上随着浏览器的发展&#xff0c;现在浏览器也原生支持组件式开发&#xff0c;本文将通过介绍Web Components 的三个主要概念&…

Flink 1.9 CDH 6.3 集成

目录 1.下载准备文件 2.felink csa jar包准备 3.将 Flink Parcel放入httpd目录下 4.配置CDH Flink Parcel 5.安装Flink 1.下载准备文件 https://archive.cloudera.com/csa/1.0.0.0/csd/FLINK-1.9.0-csa1.0.0.0-cdh6.3.0.jarhttps://archive.cloudera.com/csa/1.0.0.0/parc…

ssh免密登陆机制示意图

ssh免密登陆机制示意图

CDH 6.x 安装 Phoenix 服务

最近有个新项目启动&#xff0c;版本升级到6.3&#xff0c;发现CDH6.2 版本已经支持Phoenix parcel安装 一、准备文件 下载 https://archive.cloudera.com/phoenix/6.2.0/csd/PHOENIX-1.0.jar 下载parcel #目录 https://archive.cloudera.com/phoenix/6.2.0/parcels/ #根据…