使用java远程提交flink任务到yarn集群

使用java远程提交flink任务到yarn集群

背景

由于业务需要,使用命令行的方式提交flink任务比较麻烦,要么将后端任务部署到大数据集群,要么弄一个提交机,感觉都不是很离线。经过一些调研,发现可以实现远程的任务发布。接下来就记录一下实现过程。这里用flink on yarn 的Application模式实现

环境准备

  • 大数据集群,只要有hadoop就行
  • 后端服务器,linux mac都行,windows不行

正式开始

1. 上传flink jar包到hdfs

去flink官网下载你需要的版本,我这里用的是flink-1.18.1,把flink lib目录下的jar包传到hdfs中。

在这里插入图片描述
其中flink-yarn-1.18.1.jar需要大家自己去maven仓库下载。

2. 编写一段flink代码

随便写一段flink代码就行,我们目的是测试

package com.azt;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;
import java.util.concurrent.TimeUnit;public class WordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> ctx) throws Exception {String[] words = {"spark", "flink", "hadoop", "hdfs", "yarn"};Random random = new Random();while (true) {ctx.collect(words[random.nextInt(words.length)]);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {}});source.print();env.execute();}
}

3. 打包第二步的代码,上传到hdfs

在这里插入图片描述

4. 拷贝配置文件

  • 拷贝flink conf下的所有文件到java项目的resource中
  • 拷贝hadoop配置文件到到java项目的resource中

具体看截图
在这里插入图片描述

5. 编写java远程提交任务的程序

这一步有个注意的地方就是,如果你跟我一样是windows电脑,那么本地用idea提交会报错;如果你是mac或者linux,那么可以直接在idea中提交任务。

package com.test;import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;/*** @date :2021/5/12 7:16 下午*/
public class Main {public static void main(String[] args) throws Exception {///home/root/flink/lib/libSystem.setProperty("HADOOP_USER_NAME","root");
//        String configurationDirectory = "C:\\project\\test_flink_mode\\src\\main\\resources\\conf";String configurationDirectory = "/export/server/flink-1.18.1/conf";org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());String flinkLibs = "hdfs://node1.itcast.cn/flink/lib";String userJarPath = "hdfs://node1.itcast.cn/flink/user-lib/original.jar";String flinkDistJar = "hdfs://node1.itcast.cn/flink/lib/flink-yarn-1.18.1.jar";YarnClient yarnClient = YarnClient.createYarnClient();YarnConfiguration yarnConfiguration = new YarnConfiguration();yarnClient.init(yarnConfiguration);yarnClient.start();YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);//获取flink的配置Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory);flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);flinkConfiguration.set(PipelineOptions.JARS,Collections.singletonList(userJarPath));YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,configurationDirectory);Path remoteLib = new Path(flinkLibs);flinkConfiguration.set(YarnConfigOptions.PROVIDED_LIB_DIRS,Collections.singletonList(remoteLib.toString()));flinkConfiguration.set(YarnConfigOptions.FLINK_DIST_JAR,flinkDistJar);//设置为application模式flinkConfiguration.set(DeploymentOptions.TARGET,YarnDeploymentTarget.APPLICATION.getName());//yarn application nameflinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobname");//设置配置,可以设置很多flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);flinkConfiguration.setInteger("parallelism.default", 4);ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();//		设置用户jar的参数和主类ApplicationConfiguration appConfig = new ApplicationConfiguration(args,"com.azt.WordCount");YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration,yarnConfiguration,yarnClient,clusterInformationRetriever,true);ClusterClientProvider<ApplicationId> clusterClientProvider = null;try {clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(clusterSpecification,appConfig);} catch (ClusterDeploymentException e){e.printStackTrace();}ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();System.out.println(clusterClient.getWebInterfaceURL());ApplicationId applicationId = clusterClient.getClusterId();System.out.println(applicationId);Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();int counts = 30;while (jobStatusMessages.size() == 0 && counts > 0) {Thread.sleep(1000);counts--;jobStatusMessages = clusterClient.listJobs().get();if (jobStatusMessages.size() > 0) {break;}}if (jobStatusMessages.size() > 0) {List<String> jids = new ArrayList<>();for (JobStatusMessage jobStatusMessage : jobStatusMessages) {jids.add(jobStatusMessage.getJobId().toHexString());}System.out.println(String.join(",",jids));}}
}

由于我这里是windows电脑,所以我打包放到服务器上去运行
执行命令 :

java -cp test_flink_mode-1.0-SNAPSHOT.jar com.test.Main

不出以外的话,会打印如下日志

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
http://node2:33811
application_1715418089838_0017
6d4d6ed5277a62fc9a3a274c4f34a468

复制打印的url连接,就可以打开flink的webui了,在yarn的前端页面中也可以看到flink任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/11847.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LOTO示波器软件PC缓存(波形录制与回放)功能

当打开PC缓存功能后, 软件将采用先进先出的原则排队对示波器采集的每一帧数据, 进行帧缓存。 当发现屏幕中有感兴趣的波形掠过时, 鼠标点击软件的(暂停)按钮, 可以选择回看某一帧的波形。一帧数据的量 是 当前用户选择时基档位缓冲区总数据大小。不同时基档位缓冲区大小不同&am…

强化学习——马尔可夫过程的理解

目录 一、马尔可夫过程1.随机过程2.马尔可夫性质3.马尔可夫过程4.马尔可夫过程示例 参考文献 一、马尔可夫过程 1.随机过程 随机过程是概率论的“动态”版本。普通概率论研究的是固定不变的随机现象&#xff0c;而随机过程则专注于那些随时间不断变化的情况&#xff0c;比如天…

R语言两种方法实现随机分层抽样

为了减少数据分布的不平衡&#xff0c;提供高样本的代表性&#xff0c;可将数据按特征分层一定的层次&#xff0c;在每个层次抽取一定量的样本&#xff0c;为分层抽样。分层抽样的特点是将科学分组法与抽样法结合在一起&#xff0c;分组减小了各抽样层变异性的影响&#xff0c;…

C语言指针详解(三)

目录 前言 一. 回调函数是什么&#xff1f; 1.定义 2. 代码示例&#xff1a;计数器 2.1 使用回调函数改造前 2.2 使用回调函数改造后 二. qsort使用举例 1. qsort介绍 2. 使用qsort函数排序整型数据 3. 使用qsort排序结构体数据 三. qsort函数的模拟实现 四. sizeo…

代码随想录:螺旋矩阵II相关题目推荐(54、LCR146)

59.螺旋矩阵II 题目 给你一个正整数 n &#xff0c;生成一个包含 1 到 n2 所有元素&#xff0c;且元素按顺时针顺序螺旋排列的 n x n 正方形矩阵 matrix 。 示例 1&#xff1a; 输入&#xff1a;n 3 输出&#xff1a;[[1,2,3],[8,9,4],[7,6,5]] 代码&#xff08;新解法&am…

MyBatis——MyBatis 参数处理

一、单个简单类型参数 简单类型包括&#xff1a; byte short int long float double char Byte Short Integer Long Float Double Character String java.util.Date java.sql.Date parameterType 属性&#xff1a;告诉 MyBatis 参数的类型 MyBatis 自带类型自动推断机制…

LLM应用-prompt提示:生成搜索相关问题、生成回答格式包含参考资料

参考: https://isou.chat/ (AI回答与相关问题都是根据问题的搜索引擎结果结合大模型生成的) prompt参考: https://github.com/yokingma/search_with_ai/blob/6d32aa8f05f5f6ee12b5204787035b3f7797c22a/src/prompt.ts#L8 ##rag 根据搜索结果知识回答RagQueryPrompt = ` …

程控水冷阻性负载主要工作方式

程控水冷阻性负载是一种先进的电力设备&#xff0c;主要用于电力系统的测试和研究。它的主要工作方式是通过控制水冷系统的温度&#xff0c;来模拟不同的阻性负载条件&#xff0c;从而对电力设备进行各种性能测试。 首先&#xff0c;我们需要了解什么是阻性负载。阻性负载是指那…

代码随想录算法训练营Day 42| 动态规划part04 | 01背包问题理论基础I、01背包问题理论基础II、416. 分割等和子集

代码随想录算法训练营Day 42| 动态规划part04 | 01背包问题理论基础I、01背包问题理论基础II、416. 分割等和子集 文章目录 代码随想录算法训练营Day 42| 动态规划part04 | 01背包问题理论基础I、01背包问题理论基础II、416. 分割等和子集01背包问题理论基础一、01背包问题二、…

Redis教程——哨兵

在上篇文章我们学习了Redis教程——主从复制&#xff0c;这篇文章我们学习Redis教程——哨兵监控。 在主从复制中如果主机发生宕机&#xff0c;从机Redis会一直等到主机的恢复&#xff0c;这样会导致只能进行读操作&#xff0c;不能进行写操作&#xff0c;这大大降低了系统的高…

资料同化 | 搭建docker环境-1

Community Gridpoint Statistical Interpolation (GSI) system DTC 是一个分布式设施&#xff0c;NWP 社区可以在这里测试和评估用于研究和操作的新模型和技术。 DTC的目标包括&#xff1a; 链接研究和操作社区 研究成果转化为实际操作的速度 加快改善天气预报 开发和测试有…

Cocos Creator 3.8.x 透明带滚动功能的容器

ScrollView 是一种带滚动功能的容器 1、删除ScrollView下Sprite组件的SpriteFrame 2、ScrollView下scrollBar的Sprite组件的Color设为&#xff1a;FFFFFF00 3、ScrollView下view的Graphics组件的FillColor设为&#xff1a;FFFFFF00

IP代理如何帮助SEO进行优化?

IP代理在SEO优化中扮演着重要的角色&#xff0c;它通过多种方式帮助提升网站的搜索排名和可见性。以下是IP代理如何帮助SEO进行优化的详细阐述&#xff1a; 第一点&#xff0c;数据采集与分析&#xff1a;在SEO过程中&#xff0c;大量的数据是必不可少的。通过使用IP代理&…

c++ std::shared_ptr学习

背景 c中智能指针shared_ptr用于自动管理资源&#xff0c;通过引用计数来记录资源被多少出地方使用。在不使用资源时&#xff0c;减少引用计数&#xff0c;如果引用计数为0&#xff0c;表示资源不会再被使用&#xff0c;此时会释放资源。本文记录对c中std::shared_ptr的源码学习…

攻防世界PHP2

1、打开靶机链接http://61.147.171.105:49513/&#xff0c;没有发现任何线索 2、尝试访问http://61.147.171.105:49513/index.php&#xff0c;页面没有发生跳转 3、尝试将访问 尝试访问http://61.147.171.105:49513/index.phps index.php 和 index.phps 文件之间的主要区别在于…

GNU Radio创建时间戳 C++ OOT块

文章目录 前言一、创建自定义的 C OOT 块1、创建 timestamp_sender C OOT 模块①、创建 timestamp_sender OOT 块②、修改 C 代码 2、创建 timestamp_receiver C OOT 模块①、创建 timestamp_receiver OOT 块②、修改 C 代码 3、创建 delayMicroSec C OOT 模块①、创建 delayMi…

Vue3实战笔记(20)—封装头部导航组件

文章目录 前言一、封装头部导航栏二、使用步骤总结 前言 Vue 3 封装头部导航栏有助于提高代码复用性、统一风格、降低维护成本、提高可配置性和模块化程度&#xff0c;同时还可以实现动态渲染等功能&#xff0c;有利于项目开发和维护。 一、封装头部导航栏 封装头部导航栏&am…

HFSS学习-day4-建模操作

通过昨天的学习&#xff0c;我们已经熟悉了HFSS的工作环境&#xff1b;今天我们来讲解HFSS中创建物体模型的县体步骤和相关操作。物体建模是HFSS仿真设计工作的第一步&#xff0c;HFSS中提供了诸如矩形、圆面、长方体圆柱体和球体等多种基本模型(Primitive)&#xff0c;这些基本…

新书速览|MATLAB科技绘图与数据分析

提升你的数据洞察力&#xff0c;用于精确绘图和分析的高级MATLAB技术。 本书内容 《MATLAB科技绘图与数据分析》结合作者多年的数据分析与科研绘图经验&#xff0c;详细讲解MATLAB在科技图表制作与数据分析中的使用方法与技巧。全书分为3部分&#xff0c;共12章&#xff0c;第1…

精英都是时间控!职场精英的完美一天~~~谷歌FB都在用的时间管理术!

如何超高效使用24小时 每个人的一天都只有24小时&#xff0c;使用时间的方法将决定整个人生。时间管理术并不提倡把自己忙死榨干&#xff0c;而是通过在合适的时间做合适的事情&#xff0c;把大脑机能发挥到极致&#xff0c;从而提高效率&#xff0c;节省下更多时间用于生活与…