【极数系列】Flink配置参数如何获取?(06)

文章目录

  • gitee码云地址
  • 简介概述
  • 01 配置值来自.properties文件
    • 1.通过路径读取
    • 2.通过文件流读取
    • 3.通过IO流读取
  • 02 配置值来自命令行
  • 03 配置来自系统属性
  • 04 注册以及使用全局变量
  • 05 Flink获取参数值Demo
    • 1.项目结构
    • 2.pom.xml文件如下
    • 3.配置文件
    • 4.项目主类
    • 5.运行查看相关日志

gitee码云地址

直接下载解压可用 https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:GetParamsStreamingJob

简介概述

​ 1.几乎所有的批和流的 Flink 应用程序,都依赖于外部配置参数。这些配置参数可以用于指定输入和输出源(如路径或地址)、系统参数(并行度,运行时配置)和特定的应用程序参数(通常使用在用户自定义函数)。

​ 2.为解决以上问题,Flink 提供一个名为 Parametertool 的简单公共类,其中包含了一些基本的工具。请注意,这里说的 Parametertool 并不是必须使用的。Commons CLI 和 argparse4j 等其他框架也可以非常好地兼容 Flink。

​ 3.**ParameterTool**定义了一组静态方法,用于读取配置信息。该工具类内部使用了 Map` 类型,这样使得它可以很容易地与你的配置集成在一起。

01 配置值来自.properties文件

1.通过路径读取

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";//方式一:直接使用内置工具类
ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);
String jobName_01 = parameter_01.get("jobName");
logger.info("方式一:读取配置文件中指定的key值={}",jobName_01);

2.通过文件流读取

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";//方式二:使用文件
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter_02 = ParameterTool.fromPropertiesFile(propertiesFile);
String jobName_02 = parameter_02.get("jobName");
logger.info("方式二:读取配置文件中指定的key值={}",jobName_02);

3.通过IO流读取

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";//方式三:使用IO流
InputStream propertiesFileInputStream = new FileInputStream(new File(propertiesFilePath));
ParameterTool parameter_03 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
String jobName_03 = parameter_03.get("jobName");
logger.info("方式三:读取配置文件中指定的key值={}",jobName_03);

02 配置值来自命令行

tips:在idea的命令行传参,格式:–jobName program_job_aurora

在这里插入图片描述

ParameterTool parameter_04 = ParameterTool.fromArgs(args);
String jobName_04 = parameter_04.get("jobName");
logger.info("方式四:命令行传参key值={}",jobName_04);

03 配置来自系统属性

tips:在idea的的jvm系统参数设置,格式:-Dinput=hdfs:///mydata

在这里插入图片描述

//方式五:获取jvm参数值
ParameterTool parameter_05 = ParameterTool.fromSystemProperties();
String jobName_05 = parameter_05.get("input");
logger.info("方式五:获取jvm参数key值={}",jobName_05);

04 注册以及使用全局变量

注意:Flink全局变量仅支持在富函数中使用,即Rich开头的类使用

//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";//直接使用内置工具类获取参数
ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);//方式六:注册全局参数final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(parameter_01);//在任意富函数中均可以获取,注意!注意!注意!只有富文本函数才可以使用//1.创建富函数RichFlatMapFunction<String, String> richFlatMap = new RichFlatMapFunction<>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {//获取运行环境ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();//获取对应的值String jobName = parameters.getRequired("jobName");logger.info("方式六:获取全局注册参数key值={}",jobName_05);}};//2.创建数据集ArrayList<String> list = new ArrayList<>();list.add("001");list.add("002");list.add("003");//3.把有限数据集转换为数据源DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);//4.执行富文本处理dataStreamSource.flatMap(richFlatMap);//5.启动程序env.execute();

05 Flink获取参数值Demo

1.项目结构

在这里插入图片描述

2.pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xsy</groupId><artifactId>aurora_flink</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>11</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.18.0</flink.version><!--scala版本--><scala.binary.version>2.11</scala.binary.version><!--log4j依赖--><log4j.version>2.17.1</log4j.version></properties><!--通用依赖--><dependencies><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--================================集成外部依赖==========================================--><!--集成日志框架 start--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!--集成日志框架 end--></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build><!--配置Maven项目中需要使用的远程仓库--><repositories><repository><id>aliyun-repos</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></repository></repositories><!--用来配置maven插件的远程仓库--><pluginRepositories><pluginRepository><id>aliyun-plugin</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories></project>

3.配置文件

(1)application.properties

jobName=job_aurora
jobMemory=1024
taskName=task_aurora

(2)log4j2.properties

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

4.项目主类

package com.aurora;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;/*** @description flink获取外部参数作业** @author 浅夏的猫* @datetime 15:54 2024/1/28
*/
public class GetParamsStreamingJob {private static final Logger logger = LoggerFactory.getLogger(GetParamsStreamingJob.class);public static void main(String[] args) throws Exception {//定义文件路径String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";//方式一:直接使用内置工具类ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);String jobName_01 = parameter_01.get("jobName");logger.info("方式一:读取配置文件中指定的key值={}",jobName_01);//方式二:使用文件File propertiesFile = new File(propertiesFilePath);ParameterTool parameter_02 = ParameterTool.fromPropertiesFile(propertiesFile);String jobName_02 = parameter_02.get("jobName");logger.info("方式二:读取配置文件中指定的key值={}",jobName_02);//方式三:使用IO流InputStream propertiesFileInputStream = new FileInputStream(new File(propertiesFilePath));ParameterTool parameter_03 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);String jobName_03 = parameter_03.get("jobName");logger.info("方式三:读取配置文件中指定的key值={}",jobName_03);//方式四:命令行传参格式:--jobName program_job_auroraParameterTool parameter_04 = ParameterTool.fromArgs(args);String jobName_04 = parameter_04.get("jobName");logger.info("方式四:命令行传参key值={}",jobName_04);//方式五:获取jvm参数值ParameterTool parameter_05 = ParameterTool.fromSystemProperties();String jobName_05 = parameter_05.get("input");logger.info("方式五:获取jvm参数key值={}",jobName_05);//方式六:注册全局参数final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(parameter_01);//在任意富函数中均可以获取,注意!注意!注意!只有富文本函数才可以使用//1.创建富函数RichFlatMapFunction<String, String> richFlatMap = new RichFlatMapFunction<>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {//获取运行环境ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();//获取对应的值String jobName = parameters.getRequired("jobName");logger.info("方式六:获取全局注册参数key值={}",jobName_05);}};//2.创建数据集ArrayList<String> list = new ArrayList<>();list.add("001");list.add("002");list.add("003");//3.把有限数据集转换为数据源DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);//4.执行富文本处理dataStreamSource.flatMap(richFlatMap);//5.启动程序env.execute();}}

5.运行查看相关日志

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/653505.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sqli-labs第一关

1.判断是否存在注入&#xff0c;注入是字符型还是数字型? ?id1 and 11 ?id1 and 12 因为输入and 11与and 12 回显正常&#xff0c;所以该地方不是数字型。 ?id1 ?id1-- 输入单引号后报错&#xff0c;在单引号后添加--恢复正常&#xff0c;说明存在字符注入 2.猜解SQL查…

前端——JavaScript

目录 文章目录 前言 一. JavaScript基础 1.JavaScript基本结构 2. JavaScript 执行过程 3. JavaScript 引入方式 二. JavaScript 语法 1.数据类型 2.变量 2.1 var 关键字定义变量 2.2 let 关键字定义变量 2.3 var 与 let 的区别 3.字符串 3.1定义字符串 3.2 字…

Java中this引用详解

文章目录 一、 为什么要有this引用二、什么是this引用三、this引用的特性四、如何用好this关键字 一、 为什么要有this引用 我们先看一段代码 class Data {public int year;public int month;public int day;public void setDay(int y,int m,int d) {year y;month m;day d…

《动手学深度学习(PyTorch版)》笔记4.7

Chapter4 Multilayer Perceptron 4.7 Forward/Backward Propagation and Computational Graphs 本节将通过一些基本的数学和计算图&#xff0c;深入探讨反向传播的细节。首先&#xff0c;我们将重点放在带权重衰减&#xff08; L 2 L_2 L2​正则化&#xff09;的单隐藏层多层…

【教学类-44-04】20240128汉字字帖的字体(一)——文艺空心黑体

背景需求&#xff1a; 【教学类-XX -XX 】20240128名字字卡1.0&#xff08;15CM正方形手工纸、黑体&#xff0c;说明是某个孩子的第几个名字&#xff09;-CSDN博客文章浏览阅读254次&#xff0c;点赞4次&#xff0c;收藏2次。【教学类-XX -XX 】20240128名字字卡1.0&#xff0…

12.Elasticsearch应用(十二)

Elasticsearch应用&#xff08;十二&#xff09; 1.单机ES面临的问题 海量数据存储问题单点故障问题 2.ES集群如何解决上面的问题 海量数据存储解决问题&#xff1a; 将索引库从逻辑上拆分为N个分片&#xff08;Shard&#xff09;&#xff0c;存储到多个节点单点故障问题&a…

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之CheckboxGroup组件

鸿蒙&#xff08;HarmonyOS&#xff09;项目方舟框架&#xff08;ArkUI&#xff09;之CheckboxGroup组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、CheckboxGroup组件 提供多选框组件&#xff0c;通常用于某选项的打开或关…

【python】GtkWindow程序

一、多个GtkWindow 在GTK中&#xff0c;并不推荐使用多个GtkWindow来创建多文档界面&#xff08;MDI&#xff09;&#xff0c;而是推荐使用单个GtkWindow内嵌入的小部件&#xff08;如GtkNotebook&#xff09;来实现类似的效果。然而&#xff0c;如果确实想要创建多个窗口的例…

教育能打破阶层固化吗

中式教育以应试为核心&#xff0c;强调知识的灌输和学生被动接受。随着社会的发展&#xff0c;中式教育的短板逐渐显现&#xff0c;创新能力的缺乏、对记忆的过度依赖、忽视个体差异等问题日益突出。 建议所有大学生都能去看看《上海交通大学生存手册》&#xff0c;它道出了中…

日常学习之:vue + django + docker + heroku 对后端项目 / 前后端整体项目进行部署

文章目录 使用 docker 在 heroku 上单独部署 vue 前端使用 docker 在 heroku 上单独部署 django 后端创建 heroku 项目构建 Dockerfile设置 settings.pydatabase静态文件管理安全设置applicaiton & 中间件配置 设置 requirements.txtheroku container 部署应用 前后端分别部…

(自用)learnOpenGL学习总结-高级OpenGL-模板测试

模板测试 模板测试简单来说就是一个mask&#xff0c;根据你的mask来保留或者丢弃片段。 那么可以用来显示什么功能呢&#xff1f;剪切&#xff0c;镂空、透明度等操作。 和深度缓冲的关系是&#xff1a; 先片段着色器&#xff0c;然后进入深度测试&#xff0c;最后加入模板测…

2023年中国工控自动化市场现状及竞争分析,美日占主角,国产品牌初崭头角

工控自动化是一种运用控制理论、仪器仪表理论、计算机和信息技术&#xff0c;对工业生产过程实现检测、控制、优化、调度、管理和决策&#xff0c;达到增加产量、提高质量、降低消耗、确保安全等目的综合性技术。产品应用领域广泛&#xff0c;可分为OEM型行业和项目型行业。 近…

2024年最新 MySQL的下载、安装、启动与停止

一、MySQL的下载 MySQL最常用的2个版本&#xff1a; 社区版&#xff1a;免费开源&#xff0c;自由下载&#xff0c;不提供官方技术支持&#xff0c;大多数普通用户选择这个即可。企业版&#xff1a;需要付费&#xff0c;不能在线下载&#xff0c;可以使用30天&#xff0c;提供…

aio-max-nr达到上限导致数据库性能问题

问题说明&#xff1a; rac数据库节点一表面上看由于归档等待事件导致业务性能问题。 问题分析过程&#xff1a; 查看awr报告top事件&#xff0c;等待事件主要为归档切换问题&#xff1a; 查看事件&#xff0c;归档等待达到20多分钟 检查节点alert日志发现&#xff0c;最…

Linux第37步_解决“Boot interface 6 not supported”之问题

在使用USB OTG将“自己移植的固件”烧写到eMMC中时&#xff0c;串口会输出“Boot interface 6 not supported”&#xff0c;发现很多人踩坑&#xff0c;我也一样。 见下图&#xff1a; 解决办法&#xff1a; 1、打开终端 输入“ls回车”&#xff0c;列出当前目录下所有的文件…

SpringCloud-高级篇(十六)

前面学习了Lua的语法&#xff0c;就可以在nginx去做编程&#xff0c;去实现nginx类里面的业务&#xff0c;查询Redis&#xff0c;查询tomcat等 &#xff0c;业务逻辑的编写依赖于其他组件&#xff0c;这些组件会用到OpenResty的工具去实现 &#xff08;1&#xff09;安装OpenRe…

C++(16)——vector的模拟实现

前面的文章中&#xff0c;给出了对于的模拟实现&#xff0c;本篇文章将给出关于的模拟实现。 目录 1.基本框架&#xff1a; 2. 返回值与迭代器&#xff1a; 2.1 返回值capacity与size: 2.2 两种迭代器iterator和const_iterator: 3. 扩容与push_back与pop_back&#xff1a…

后端学习:数据库MySQL学习

数据库简介 数据库&#xff1a;英文为 DataBase&#xff0c;简称DB&#xff0c;它是存储和管理数据的仓库。   接下来&#xff0c;我们来学习Mysql的数据模型&#xff0c;数据库是如何来存储和管理数据的。在介绍 Mysql的数据模型之前&#xff0c;需要先了解一个概念&#xf…

log4j2 配置入门介绍

配置 将日志请求插入到应用程序代码中需要进行大量的计划和工作。 观察表明&#xff0c;大约4%的代码专门用于日志记录。因此&#xff0c;即使是中等规模的应用程序也会在其代码中嵌入数千条日志记录语句。 考虑到它们的数量&#xff0c;必须管理这些日志语句&#xff0c;而…

【QT+QGIS跨平台编译】之十三:【giflib+Qt跨平台编译】(一套代码、一套框架,跨平台编译)

文章目录 一、giflib介绍二、文件下载三、文件分析四、pro文件五、编译实践一、giflib介绍 GIFlib(又称为Libgif)是一个开源的C语言库,用于处理GIF图像格式。它提供了一组函数和工具,使得开发者可以读取、写入和操作GIF图像文件。 GIFlib支持GIF87a和GIF89a两种版本的GIF格…