52、Flink的应用程序参数处理-ParameterTool介绍及使用示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、应用程序参数处理
    • 1、用 ParameterTool 读取配置值
      • 1)、配置值来自 .properties 文件
      • 2)、配置值来自命令行
      • 3)、配置值来自系统属性
    • 2、在 Flink 程序中使用参数
      • 1)、直接从 ParameterTool 获取
      • 2)、注册全局参数
  • 二、示例:ParameterTool几种的应用示例
    • 1、maven依赖
    • 2、实现及验证
      • 1)、测试文件准备
      • 2)、实现


本文介绍了ParameterTool 的获取以及在应用程序中的使用方式。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

一、应用程序参数处理

几乎所有的批和流的 Flink 应用程序,都依赖于外部配置参数。这些配置参数可以用于指定输入和输出源(如路径或地址)、系统参数(并行度,运行时配置)和特定的应用程序参数(通常使用在用户自定义函数)。

为解决以上问题,Flink 提供一个名为 Parametertool 的简单公共类,其中包含了一些基本的工具。

这里说的 Parametertool 并不是必须使用的。

Commons CLI 和 argparse4j 等其他框架也可以非常好地兼容 Flink。

1、用 ParameterTool 读取配置值

ParameterTool 定义了一组静态方法,用于读取配置信息。该工具类内部使用了 Map<string,string> 类型,这样使得它可以很容易地与你的配置集成在一起。

1)、配置值来自 .properties 文件

以下方法可以读取 Properties 文件并解析出键/值对:

String propertiesFilePath = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

2)、配置值来自命令行

以下方法可以从命令行中获取参数,如 --input hdfs:///mydata --elements 42。

public static void main(String[] args) {ParameterTool parameter = ParameterTool.fromArgs(args);// .. regular code ..}

3)、配置值来自系统属性

启动 JVM 时,可以将系统属性传递给 JVM:-Dinput=hdfs:///mydata。你也可以从这些系统属性初始化 ParameterTool:

ParameterTool parameter = ParameterTool.fromSystemProperties();

2、在 Flink 程序中使用参数

1)、直接从 ParameterTool 获取

ParameterTool 本身具有访问配置值的方法。

ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters();
// .. there are more methods available.

你可以在提交应用程序时直接在客户端的 main() 方法中使用这些方法的返回值。例如,你可以这样设置算子的并行度:

ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

由于 ParameterTool 是序列化的,你可以将其传递给函数本身:

ParameterTool parameters = ParameterTool.fromArgs(args);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

然后在函数内使用它以获取命令行的传递的参数。

2)、注册全局参数

从 JobManager web 界面和用户定义的所有函数中可以以配置值的方式访问在 ExecutionConfig 中注册的全局作业参数。

  • 注册全局参数
ParameterTool parameters = ParameterTool.fromArgs(args);// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
  • 在任意富函数中访问参数
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {ParameterTool parameters = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();parameters.getRequired("input");// .. do more ..

二、示例:ParameterTool几种的应用示例

本示例是将上述的内容以可运行的代码呈现。

1、maven依赖

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version>
</properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!-- <scope>provided</scope> --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
</dependencies>

2、实现及验证

本处是通过ParameterTool读取配置文件的内容,其他命令行、系统参数可以在运行时直接设置,没有进行截图。

1)、测试文件准备

文件目录及名称:tablesql/src/main/resources/testproperties.properties

jobmanager.rpc.address=server1
jobmanager.rpc.port=6123
jobmanager.memory.process.size=1600m
taskmanager.memory.process.size=4096m
taskmanager.numberOfTaskSlots=3
parallelism.default=1high-availability=zookeeper
high-availability.storageDir=hdfs://HadoopHAcluster/flink13_5/ha/
high-availability.zookeeper.quorum=server1:2118,server2:2118,server3:2118##单位毫秒,checkpoint时间间隔
execution.checkpointing.interval=5000
##单位个,保存checkpoint的个数
state.checkpoints.num-retained=20execution.checkpointing.mode=EXACTLY_ONCE
execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATIONstate.savepoints.dir=hdfs:///flink/checkpoints
execution.checkpointing.timeout=600000
execution.checkpointing.min-pause=500
execution.checkpointing.max-concurrent-checkpoints=1state.backend=filesystem
#state.checkpoints.dir=hdfs://server1:8020/flink13_5-checkpoints
state.checkpoints.dir=hdfs://HadoopHAcluster/flink13_5-checkpoints
jobmanager.execution.failover-strategy=regionweb.submit.enable=truejobmanager.archive.fs.dir=hdfs://HadoopHAcluster/flink13_5/completed-jobs/
historyserver.web.address=server1
historyserver.web.port=9082
#historyserver.archive.fs.dir=hdfs://server1:8020/flink13_5/completed-jobs/
historyserver.archive.fs.dir=hdfs://HadoopHAcluster/flink13_5/completed-jobs/historyserver.archive.fs.refresh-interval=10000

2)、实现

/** @Author: alanchan* @LastEditors: alanchan* @Description: */
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Map;import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TestParameterToolDemo {static void test1() throws Exception {String propertiesFilePath = "tablesql/src/main/resources/testproperties.properties";// 方式一:直接通过配置文件的路径获取ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);// 方式二:通过配置文件路径构造File的方式获取File propertiesFile = new File(propertiesFilePath);ParameterTool parameter2 = ParameterTool.fromPropertiesFile(propertiesFile);// 方式三:通过配置文件路径构造InputStream的方式获取InputStream propertiesFileInputStream = new FileInputStream(propertiesFilePath);ParameterTool parameter3 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);// 遍历配置文件内容Map<String, String> parameterMap = parameter.toMap();for (String key : parameterMap.keySet()) {System.out.println("parameter :" + key + " = " + parameterMap.get(key));}// 获取必须的参数,否则会出现异常System.out.println("jobmanager.rpc.address--->" + parameter.getRequired("jobmanager.rpc.address"));// 获取参数,并设有默认值System.out.println("state.checkpoints.num-retained--->" + parameter.get("state.checkpoints.num-retained", "30"));// 获取Long类型的参数,并设置默认值System.out.println("parallelism.default--->" + parameter.getLong("parallelism.default", 4L));// 获取配置文件中有效参数的总行数System.out.println("getNumberOfParameters--->" + parameter.getNumberOfParameters());// 运行输出:// parameter :historyserver.web.address = server1// parameter :state.checkpoints.num-retained = 20// parameter :historyserver.web.port = 9082// parameter :jobmanager.execution.failover-strategy = region// parameter :jobmanager.rpc.address = server1// parameter :state.savepoints.dir = hdfs:///flink/checkpoints// parameter :high-availability.storageDir =// hdfs://HadoopHAcluster/flink13_5/ha/// parameter :parallelism.default = 1// parameter :taskmanager.numberOfTaskSlots = 3// parameter :historyserver.archive.fs.dir =// hdfs://HadoopHAcluster/flink13_5/completed-jobs/// parameter :jobmanager.archive.fs.dir =// hdfs://HadoopHAcluster/flink13_5/completed-jobs/// parameter :execution.checkpointing.mode = EXACTLY_ONCE// parameter :taskmanager.memory.process.size = 4096m// parameter :jobmanager.memory.process.size = 1600m// parameter :historyserver.archive.fs.refresh-interval = 10000// parameter :jobmanager.rpc.port = 6123// parameter :execution.checkpointing.timeout = 600000// parameter :execution.checkpointing.interval = 5000// parameter :high-availability.zookeeper.quorum =// server1:2118,server2:2118,server3:2118// parameter :high-availability = zookeeper// parameter :execution.checkpointing.externalized-checkpoint-retention =// RETAIN_ON_CANCELLATION// parameter :web.submit.enable = true// parameter :state.backend = filesystem// parameter :execution.checkpointing.min-pause = 500// parameter :execution.checkpointing.max-concurrent-checkpoints = 1// parameter :state.checkpoints.dir =// hdfs://HadoopHAcluster/flink13_5-checkpoints// jobmanager.rpc.address--->server1// state.checkpoints.num-retained--->20// parallelism.default--->1// getNumberOfParameters--->26}static void test2() throws Exception {ParameterTool parameter = ParameterTool.fromSystemProperties();// 遍历配置系统属性内容Map<String, String> parameterMap = parameter.toMap();for (String key : parameterMap.keySet()) {System.out.println("parameter :" + key + " = " + parameterMap.get(key));}}static void test3(String[] args) throws Exception {ParameterTool parameter = ParameterTool.fromArgs(args);// 遍历配配置值来自命令行内容Map<String, String> parameterMap = parameter.toMap();for (String key : parameterMap.keySet()) {System.out.println("parameter :" + key + " = " + parameterMap.get(key));}}static void test4(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ParameterTool parameters = ParameterTool.fromArgs(args);// 遍历配配置值来自命令行内容Map<String, String> parameterMap = parameters.toMap();for (String key : parameterMap.keySet()) {System.out.println("parameter :" + key + " = " + parameterMap.get(key));}// 获取命令行参数mapParallelism的值(默认设置为2)并设置map的并行度int parallelism = parameters.getInt("mapParallelism", 2);DataStream<String> source = env.socketTextStream("192.168.10.42", 8888).map(o -> {String[] lines = o.split(",");return "name:" + lines[0] + " age: " + lines[1];}).setParallelism(parallelism);source.print();env.execute();}// 注册全局参数static void test5(String[] args) throws Exception {ParameterTool parameters = ParameterTool.fromArgs(args);// 遍历配配置值来自命令行内容Map<String, String> parameterMap = parameters.toMap();for (String key : parameterMap.keySet()) {System.out.println("parameter :" + key + " = " + parameterMap.get(key));}// set up the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(parameters);DataStream<String> source = env.socketTextStream("192.168.10.42", 8888).map(new RichMapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();// 获取命令行中的 prefix 参数值String prefix = parameters.getRequired("prefix");String[] lines = value.split(",");// 将prefix加在name的前面进行拼接return "name:" + prefix + "_" + lines[0] + " age: " + lines[1];}});source.print();env.execute();}public static void main(String[] args) throws Exception {test2();}
}

以上,本文介绍了ParameterTool 的获取以及在应用程序中的使用方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/600808.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode第102题 - 二叉树的层序遍历

题目 解答 class Solution {List<List<Integer>> nodeLevels new ArrayList<>();public List<List<Integer>> levelOrder(TreeNode root) {levelOrder(root, 0);return nodeLevels;}public void levelOrder(TreeNode root, int k) {if (root …

噬菌体序列分析工具PhaVa的使用和使用方法

github: 25280841/PhaVa: Adapting the phasefinder approach for identifying phase variation to long reads (github.com) 挺简单的&#xff0c;这里就不翻译了&#xff0c;大家看着直接用吧。 PhaVa PhaVa is an approach for finding potentially Phase Variable invert…

第7章-第1节-Java中的异常处理

1、异常Exception概述&#xff1a; 1&#xff09;、异常的概念&#xff1a; 现实生活中万物在发展和变化会出现各种各样不正常的现象。 例如&#xff1a;人的成长过程中会生病。 实际工作中&#xff0c;遇到的情况不可能是非常完美的。 比如&#xff1a;你写的某个模块&…

使用jmeter从0开始完成性能测试

使用JMeter从0开始完成性能测试 介绍 在软件开发过程中&#xff0c;性能测试是一项关键任务&#xff0c;它可以帮助我们评估系统在不同负载条件下的性能表现&#xff0c;发现潜在的性能瓶颈。JMeter是一款功能强大且易于使用的性能测试工具&#xff0c;它可以帮助我们完成各种…

欧洲最好的AI大模型:Mistral 7B!(开源、全面超越Llama 2)

你可能已经听说过Meta&#xff08;原Facebook&#xff09;的Llama 2&#xff0c;这是一款拥有13亿参数的语言模型&#xff0c;能够生成文本、代码、图像等多种内容。 但是你知道吗&#xff0c;有一家法国的创业公司Mistral AI&#xff0c;推出了一款只有7.3亿参数的语言模型&am…

GitHub Copilot 最佳免费平替:阿里通义灵码

之前分享了不少关于 GitHub Copilot 的文章&#xff0c;不少粉丝都评论让我试试阿里的通义灵码&#xff0c;这让我对通义灵码有了不少的兴趣。 今天&#xff0c;阿七就带大家了解一下阿里的通义灵码&#xff0c;我们按照之前 GitHub Copilot 的顺序分享通义灵码在相同场景下的…

Vue 之 修饰符汇总

一、简介 在Vue中&#xff0c;修饰符是一种特殊的语法&#xff0c;用于修改指令或事件绑定的行为&#xff0c;它们以点号&#xff08;.&#xff09;的形式添加到指令或事件的后面&#xff0c;并可以改变其默认行为或添加额外的功能&#xff0c;如&#xff1a;禁止事件冒泡、数…

java 中数组常用排序方法举例说明

java 中数组常用排序方法举例说明 在Java中&#xff0c;数组的排序是常见的操作之一&#xff0c;而Java提供了多种排序方法来满足不同场景的需求。下面详细介绍5种常用的数组排序方法&#xff1a; 冒泡排序&#xff08;Bubble Sort&#xff09;&#xff1a; 冒泡排序是一种简单…

【mars3d】new mars3d.layer.GeoJsonLayer(实现环状面应该怎么传data

问题&#xff1a;【mars3d】new mars3d.layer.GeoJsonLayer(实现环状面应该怎么传data 解决方案&#xff1a; 1.在示例中修改showDraw()方法的data数据&#xff0c;实现以下环状面效果 2.示例链接&#xff1a; 功能示例(Vue版) | Mars3D三维可视化平台 | 火星科技 export f…

Ubuntu20.04安装ROS2 Foxy

Ubuntu20.04安装ROS2 Foxy 实操安装 安装ROS2的教程在网上很多&#xff0c;但是我操作之后都有问题&#xff0c;大部分的问题是在 sudo apt update 时访问packages.ros.org无法成功&#xff0c;主要的原因是没有外网&#xff0c;而自己整一个外网代理又非常麻烦&#xff0c;所…

读书之深入理解ffmpeg_简单笔记3(初步)

通读完只能对书中内容有大概的了解&#xff0c;具体的细节还得一一实践攻克。 10: libavformat接口使用 媒体流&#xff0c;文件等封装&#xff0c;解封装&#xff0c;转封装 视频截取&#xff0c;AVFormatContext,AVPacket等介绍 11&#xff1a;libavcodec接口使用 视频&…

Android开发中“真正”的仓库模式

原文地址&#xff1a;https://proandroiddev.com/the-real-repository-pattern-in-android-efba8662b754原文发表日期&#xff1a;2019.9.5作者&#xff1a;Denis Brandi翻译&#xff1a;tommwq翻译日期&#xff1a;2024.1.3 Figure 1: 仓库模式 多年来我见过很多仓库模式的实…

pytest安装失败,报错Could not find a version that satisfies the requirement pytest

问题 安装pytest失败&#xff0c;尝试使用的命令有 pip install pytest pip3 install pytest pip install -U pytest pip install pytest -i https://pypi.tuna.tsinghua.edu.cn/simple但是都会报同样的错&#xff1a; 解决方案 发现可能是挂了梯子的原因&#xff0c;关掉…

代码随想录算法训练营Day20|654.最大二叉树、617.合并二叉树、700.二叉搜索树中的搜索、98.验证二叉搜索树

目录 654.最大二叉树 前言 递归法 617.合并二叉树 前言 递归法 700.二叉搜索树中的搜索 前言 递归法 递归法 98.验证二叉搜索树 前言 递归法 迭代法 总结 654.最大二叉树 题目链接 文章链接 前言 本题延续昨天最后一题&#xff0c;依然是一道构造二叉树的题目…

烟花燃放如何管控?智能分析网关V4烟火检测保障烟火安全

一、方案背景 随着元旦佳节的热潮退去&#xff0c;春节也即将来临&#xff0c;在众多传统的中国节日里&#xff0c;烟花与烧纸祭祀都是必不可少的&#xff0c;一方面表达了人们对节日的庆祝的期许&#xff0c;另一方面也是一种对故者思念的寄托。烟花爆竹的燃放不仅存在着巨大的…

Node.js中的模块,常用模块具体代码示例

核心模块&#xff1a;https://blog.csdn.net/kkkys_kkk/article/details/135409851?spm1001.2014.3001.5501 目录 第三方模块 代码示例 Express示例 Lodash示例 MongoDB示例 Async示例 Request示例 发送GET 发送POST请求 自定义模块 创建步骤 常见示例 导出一个函数&a…

【PHP】TP5 使用模型一对一关联查询,条件筛选及字段过滤

目录 方法一&#xff1a;使用Eloquent ORM的with关联查询 方法二&#xff1a;使用JOIN进行查询 方法一&#xff1a;使用Eloquent ORM的with关联查询 在 ThinkPHP5 中&#xff0c;可以使用模型关联和条件查询来实现一对一关联查询。以下是一个示例&#xff1a; 假设有两个表&a…

跑通大模型领域的 hello world

跑通书生浦语大模型的 3 个趣味 demo&#xff08;InternLM-Chat-7B 智能对话、Lagent工具调用解简单数学题、浦语灵笔多模态图文创作和理解&#xff09;视频和文档。 1、两个框架 InternLM 是⼀个开源的轻量级训练框架&#xff0c;旨在⽀持⼤模型训练⽽⽆需⼤量的依赖。 Lage…

Underactuated Robotics - 欠驱动机器人学(一)- 全驱动与欠驱动系统

系列文章目录 前言 如今的机器人行动过于保守&#xff0c;只能完成机械性能所能完成的一小部分任务&#xff0c;实现一小部分性能。在某些情况下&#xff0c;我们仍然从根本上受限于在结构化工厂环境中成熟的刚性机械臂控制技术&#xff0c;在这种环境中&#xff0c;可以使用大…

烧录FRU方法

烧录FRU ipmitool远程命令示例: ipmitool -I lanp -H 127.0.0.1 -U admin -P admin write FRUID FRUfilename 1、修改Chassis PN ipmitool fru edit 0 field c 0 01234567892、修改 Board PN ipmitool fru edit 0 field b 3 01234567893、修改 Product PN ipmitool fru ed…