使用java远程提交flink任务到yarn集群

使用java远程提交flink任务到yarn集群

背景

由于业务需要,使用命令行的方式提交flink任务比较麻烦,要么将后端任务部署到大数据集群,要么弄一个提交机,感觉都不是很离线。经过一些调研,发现可以实现远程的任务发布。接下来就记录一下实现过程。这里用flink on yarn 的Application模式实现

环境准备

  • 大数据集群,只要有hadoop就行
  • 后端服务器,linux mac都行,windows不行

正式开始

1. 上传flink jar包到hdfs

去flink官网下载你需要的版本,我这里用的是flink-1.18.1,把flink lib目录下的jar包传到hdfs中。

在这里插入图片描述
其中flink-yarn-1.18.1.jar需要大家自己去maven仓库下载。

2. 编写一段flink代码

随便写一段flink代码就行,我们目的是测试

package com.azt;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;
import java.util.concurrent.TimeUnit;public class WordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> ctx) throws Exception {String[] words = {"spark", "flink", "hadoop", "hdfs", "yarn"};Random random = new Random();while (true) {ctx.collect(words[random.nextInt(words.length)]);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {}});source.print();env.execute();}
}

3. 打包第二步的代码,上传到hdfs

在这里插入图片描述

4. 拷贝配置文件

  • 拷贝flink conf下的所有文件到java项目的resource中
  • 拷贝hadoop配置文件到到java项目的resource中

具体看截图
在这里插入图片描述

5. 编写java远程提交任务的程序

这一步有个注意的地方就是,如果你跟我一样是windows电脑,那么本地用idea提交会报错;如果你是mac或者linux,那么可以直接在idea中提交任务。

package com.test;import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;/*** @date :2021/5/12 7:16 下午*/
public class Main {public static void main(String[] args) throws Exception {///home/root/flink/lib/libSystem.setProperty("HADOOP_USER_NAME","root");
//        String configurationDirectory = "C:\\project\\test_flink_mode\\src\\main\\resources\\conf";String configurationDirectory = "/export/server/flink-1.18.1/conf";org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());String flinkLibs = "hdfs://node1.itcast.cn/flink/lib";String userJarPath = "hdfs://node1.itcast.cn/flink/user-lib/original.jar";String flinkDistJar = "hdfs://node1.itcast.cn/flink/lib/flink-yarn-1.18.1.jar";YarnClient yarnClient = YarnClient.createYarnClient();YarnConfiguration yarnConfiguration = new YarnConfiguration();yarnClient.init(yarnConfiguration);yarnClient.start();YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);//获取flink的配置Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory);flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);flinkConfiguration.set(PipelineOptions.JARS,Collections.singletonList(userJarPath));YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,configurationDirectory);Path remoteLib = new Path(flinkLibs);flinkConfiguration.set(YarnConfigOptions.PROVIDED_LIB_DIRS,Collections.singletonList(remoteLib.toString()));flinkConfiguration.set(YarnConfigOptions.FLINK_DIST_JAR,flinkDistJar);//设置为application模式flinkConfiguration.set(DeploymentOptions.TARGET,YarnDeploymentTarget.APPLICATION.getName());//yarn application nameflinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobname");//设置配置,可以设置很多flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);flinkConfiguration.setInteger("parallelism.default", 4);ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();//		设置用户jar的参数和主类ApplicationConfiguration appConfig = new ApplicationConfiguration(args,"com.azt.WordCount");YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration,yarnConfiguration,yarnClient,clusterInformationRetriever,true);ClusterClientProvider<ApplicationId> clusterClientProvider = null;try {clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(clusterSpecification,appConfig);} catch (ClusterDeploymentException e){e.printStackTrace();}ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();System.out.println(clusterClient.getWebInterfaceURL());ApplicationId applicationId = clusterClient.getClusterId();System.out.println(applicationId);Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();int counts = 30;while (jobStatusMessages.size() == 0 && counts > 0) {Thread.sleep(1000);counts--;jobStatusMessages = clusterClient.listJobs().get();if (jobStatusMessages.size() > 0) {break;}}if (jobStatusMessages.size() > 0) {List<String> jids = new ArrayList<>();for (JobStatusMessage jobStatusMessage : jobStatusMessages) {jids.add(jobStatusMessage.getJobId().toHexString());}System.out.println(String.join(",",jids));}}
}

由于我这里是windows电脑,所以我打包放到服务器上去运行
执行命令 :

java -cp test_flink_mode-1.0-SNAPSHOT.jar com.test.Main

不出以外的话,会打印如下日志

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
http://node2:33811
application_1715418089838_0017
6d4d6ed5277a62fc9a3a274c4f34a468

复制打印的url连接,就可以打开flink的webui了,在yarn的前端页面中也可以看到flink任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/11847.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LOTO示波器软件PC缓存(波形录制与回放)功能

当打开PC缓存功能后, 软件将采用先进先出的原则排队对示波器采集的每一帧数据, 进行帧缓存。 当发现屏幕中有感兴趣的波形掠过时, 鼠标点击软件的(暂停)按钮, 可以选择回看某一帧的波形。一帧数据的量 是 当前用户选择时基档位缓冲区总数据大小。不同时基档位缓冲区大小不同&am…

谈谈std::map的lower_bound

我们知道std::map内部是一个红黑树&#xff0c;放到std::map里的数据等有一个能比较大小的方法。它相当于java里面的TreeMap。 它里面有个lower_bound方法&#xff0c;返回一个迭代器&#xff0c;它指向map里第一个大于等于参数的元素。 方法的签名很简单&#xff0c;但是在不同…

富格林:有效预防黑幕阻挠被骗

富格林指出&#xff0c;在投资领域&#xff0c;现货黄金是一种备受推崇的贵金属投资品种。倘若能有效预防黑幕阻挠被骗的情况&#xff0c;事实上现货黄金是很多投资者的“理想型”。然而要想有效地预防黑幕阻挠被骗&#xff0c;就需要掌握足够多的投资技巧。为此&#xff0c;富…

Milvus 基本概念

Milvus 是一个开源的向量数据库&#xff0c;专门用于高效地存储、管理和检索大规模向量数据。它基于 Apache 许可证 2.0 版本发布&#xff0c;由 Zilliz 公司开源并维护。 Milvus 的设计理念是为了解决向量数据存储和检索的挑战。在许多应用中&#xff0c;向量数据是一种重要的…

强化学习——马尔可夫过程的理解

目录 一、马尔可夫过程1.随机过程2.马尔可夫性质3.马尔可夫过程4.马尔可夫过程示例 参考文献 一、马尔可夫过程 1.随机过程 随机过程是概率论的“动态”版本。普通概率论研究的是固定不变的随机现象&#xff0c;而随机过程则专注于那些随时间不断变化的情况&#xff0c;比如天…

C# 使用channel 实现Plc 异步任务之间的通信

channel 通信的例子: using ConsoleApp2; using System.Collections.Concurrent; using System.Threading.Channels;var queue = new BlockingCollection<Message>(new ConcurrentQueue<Message>());var opt = new BoundedChannelOptions(10) {FullMode = BoundedC…

Linux环境快速部署mysql5.7

1 网络下载rpm包 wget -c https://repo.huaweicloud.com/mysql/Downloads/MySQL-5.7/mysql-5.7.37-1.el7.x86_64.rpm-bundle.tar2 解压 tar xf mysql-5.7.37-1.el7.x86_64.rpm-bundle.tar3 数据库之间会冲突因此需要卸载mariadb-libs yum remove mariadb-libs4 安装 如果没有…

R语言两种方法实现随机分层抽样

为了减少数据分布的不平衡&#xff0c;提供高样本的代表性&#xff0c;可将数据按特征分层一定的层次&#xff0c;在每个层次抽取一定量的样本&#xff0c;为分层抽样。分层抽样的特点是将科学分组法与抽样法结合在一起&#xff0c;分组减小了各抽样层变异性的影响&#xff0c;…

HTTP协议及Python实现

最近的项目需要频繁在前后端之间传输数据&#xff0c;本篇主要介绍HTTP协议以及数据传输方法。 1 HTTP协议 1.1 http协议简介 HTTP(Hypertext Transfer Protocol)是一种用于传输超文本数据的应用层协议。它是万维网上数据交换的基础&#xff0c;定义了客户端和服务器之间进行通…

C语言指针详解(三)

目录 前言 一. 回调函数是什么&#xff1f; 1.定义 2. 代码示例&#xff1a;计数器 2.1 使用回调函数改造前 2.2 使用回调函数改造后 二. qsort使用举例 1. qsort介绍 2. 使用qsort函数排序整型数据 3. 使用qsort排序结构体数据 三. qsort函数的模拟实现 四. sizeo…

代码随想录:螺旋矩阵II相关题目推荐(54、LCR146)

59.螺旋矩阵II 题目 给你一个正整数 n &#xff0c;生成一个包含 1 到 n2 所有元素&#xff0c;且元素按顺时针顺序螺旋排列的 n x n 正方形矩阵 matrix 。 示例 1&#xff1a; 输入&#xff1a;n 3 输出&#xff1a;[[1,2,3],[8,9,4],[7,6,5]] 代码&#xff08;新解法&am…

MyBatis——MyBatis 参数处理

一、单个简单类型参数 简单类型包括&#xff1a; byte short int long float double char Byte Short Integer Long Float Double Character String java.util.Date java.sql.Date parameterType 属性&#xff1a;告诉 MyBatis 参数的类型 MyBatis 自带类型自动推断机制…

LLM应用-prompt提示:生成搜索相关问题、生成回答格式包含参考资料

参考: https://isou.chat/ (AI回答与相关问题都是根据问题的搜索引擎结果结合大模型生成的) prompt参考: https://github.com/yokingma/search_with_ai/blob/6d32aa8f05f5f6ee12b5204787035b3f7797c22a/src/prompt.ts#L8 ##rag 根据搜索结果知识回答RagQueryPrompt = ` …

在Go语言中,可以这样使用Json

在Go语言中&#xff0c;处理JSON数据通常涉及编码&#xff08;将Go结构体转换为JSON字符串&#xff09;和解码&#xff08;将JSON字符串转换为Go结构体&#xff09;。Go标准库中的encoding/json包提供了这些功能。第三方插件可以使用"github.com/goccy/go-json"也有同…

Git | git log 和 git status 的区别

如是我闻&#xff1a; git log和git status是Git中的两个非常有用的命令&#xff0c;它们用于不同的目的&#xff0c;并提供不同类型的信息。 git log git log命令用于显示一个或多个分支的提交历史记录。这个命令会列出提交历史&#xff0c;包括每次提交的SHA-1哈希值、提交…

程控水冷阻性负载主要工作方式

程控水冷阻性负载是一种先进的电力设备&#xff0c;主要用于电力系统的测试和研究。它的主要工作方式是通过控制水冷系统的温度&#xff0c;来模拟不同的阻性负载条件&#xff0c;从而对电力设备进行各种性能测试。 首先&#xff0c;我们需要了解什么是阻性负载。阻性负载是指那…

博弈智能的特点

博弈智能是指通过算法和模型对博弈过程进行分析和决策的智能系统。在博弈中&#xff0c;各方参与者追求自身利益和目标&#xff0c;会采取各种策略来达到自己的目标。其中&#xff0c;包括了一些不正当手段&#xff0c;如诡计和欺骗&#xff08;诡&#xff09;&#xff08;诈&a…

代码随想录算法训练营Day 42| 动态规划part04 | 01背包问题理论基础I、01背包问题理论基础II、416. 分割等和子集

代码随想录算法训练营Day 42| 动态规划part04 | 01背包问题理论基础I、01背包问题理论基础II、416. 分割等和子集 文章目录 代码随想录算法训练营Day 42| 动态规划part04 | 01背包问题理论基础I、01背包问题理论基础II、416. 分割等和子集01背包问题理论基础一、01背包问题二、…

WSL设置启动时自动启动docker服务或其他服务

方式一: Windows系统的WSL,当windows关机再开机后,WSL等于是重新开机的,默认情况下,不会启动Docker服务。例如在Ubuntu 22.04中,需要使用命令 service docker start来启动。由于我习惯关机断电,因此每天开机打开WSL后都要手动输入这个命令,非常麻烦。所以找了一个方法…

Redis教程——哨兵

在上篇文章我们学习了Redis教程——主从复制&#xff0c;这篇文章我们学习Redis教程——哨兵监控。 在主从复制中如果主机发生宕机&#xff0c;从机Redis会一直等到主机的恢复&#xff0c;这样会导致只能进行读操作&#xff0c;不能进行写操作&#xff0c;这大大降低了系统的高…