flink源码分析 - flink命令启动分析

flink版本: flink-1.12.1

源码位置:  flink-dist/src/main/flink-bin/bin/flink

flink命令源码:

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; doif [ "$iteration" -gt 100 ]; thenecho "Cannot resolve path: You have a cyclic symlink in $target."breakfils=`ls -ld -- "$target"`target=`expr "$ls" : '.* -> \(.*\)$'`iteration=$((iteration + 1))
done# Convert relative path to absolute path
bin=`dirname "$target"`# get flink config
. "$bin"/config.shif [ "$FLINK_IDENT_STRING" = "" ]; thenFLINK_IDENT_STRING="$USER"
fiCC_CLASSPATH=`constructFlinkClassPath`log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)# Add Client-specific JVM options
FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}"# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

首先讲第一段:

target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.# 注释内容翻译
# 对于可执行文件被建立符号链接的情况,按照其符号链接找出正确的bin路径, 符号链接次数有上限。
# 注意:如果我们想与POSIX兼容,就不能在这里使用readlink实用程序。iteration=0
while [ -L "$target" ]; doif [ "$iteration" -gt 100 ]; thenecho "Cannot resolve path: You have a cyclic symlink in $target."breakfils=`ls -ld -- "$target"`target=`expr "$ls" : '.* -> \(.*\)$'`iteration=$((iteration + 1))
done

工作中,很多人喜欢用符号链接(软连接)去将原始命令链接到一个新的文件。

例如: 

 ln  -s  /home/aaa    /opt/soft/flink-1.12.1/bin/flink

将 /home/aaa链接到/opt/soft/flink-1.12.1/bin/flink,  实际使用的时候就可以用 aaa去代替flink命令。 例如/opt/soft/flink-1.12.1/bin/flink run就可以用/home/aaa run代替。

以上代码主要是通过解析aaa命令的软链接信息获取到flink命令的原始为止。核心的是这两句

    ls=`ls -ld -- "$target"`target=`expr "$ls" : '.* -> \(.*\)$'`

通过ls -ld命令拿到当前"aaa"命令对应的软链接信息:软链接中最后一部分格式一般是

aaa -> /opt/soft/flink-1.12.1/bin/flink,

然后通过 expr命令解析这个字符串,获取到/opt/soft/flink-1.12.1/bin/flink这个字符串,然后获取到flink命令的位置,并赋值给target。

注意因为软链接可以链接多次,所以整个代码使用while循环包裹。

while [ -L "$target" ]

上面这句主要在判断当前的tatget是否是软连接,如果是才进入循环,寻找flink命令的位置;如果不是,说明target就是flink命令,则直接退出循环即可。

因为可以为软链接建立软链接,简单来说就是嵌套软链接,或者多次软链接。例如:

 ln  -s  /home/bbb    /home/aaaln  -s  /home/ccc    /home/bbb

代码iteration变量主要是针对以上情况,如果嵌套软链接的次数大于100次,循环终止执行。

# Convert relative path to absolute path
bin=`dirname "$target"`# get flink config
. "$bin"/config.shif [ "$FLINK_IDENT_STRING" = "" ]; thenFLINK_IDENT_STRING="$USER"
fiCC_CLASSPATH=`constructFlinkClassPath`log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)# Add Client-specific JVM options
FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}"

找到真正的flink命令后,下面这段主要是解析,拼接各种配置变量。

注意这里很多变量,例如 FLINK_IDENT_STRING,在当前文件中都没有声明,上面又有一句

# get flink config
. "$bin"/config.sh

推测是在 config.sh中进行的。找到config.sh:

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################constructFlinkClassPath() {local FLINK_DISTlocal FLINK_CLASSPATHwhile read -d '' -r jarfile ; doif [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; thenFLINK_DIST="$FLINK_DIST":"$jarfile"elif [[ "$FLINK_CLASSPATH" == "" ]]; thenFLINK_CLASSPATH="$jarfile";elseFLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"fidone < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)if [[ "$FLINK_DIST" == "" ]]; then# write error message to stderr since stdout is stored as the classpath(>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")# exit function with empty classpath to force process failureexit 1fiecho "$FLINK_CLASSPATH""$FLINK_DIST"
}findFlinkDistJar() {local FLINK_DIST="`find "$FLINK_LIB_DIR" -name 'flink-dist*.jar'`"if [[ "$FLINK_DIST" == "" ]]; then# write error message to stderr since stdout is stored as the classpath(>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")# exit function with empty classpath to force process failureexit 1fiecho "$FLINK_DIST"
}# These are used to mangle paths that are passed to java when using
# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere
# but the windows java version expects them in Windows Format, i.e. C:\bla\blub.
# "cygpath" can do the conversion.
manglePath() {UNAME=$(uname -s)if [ "${UNAME:0:6}" == "CYGWIN" ]; thenecho `cygpath -w "$1"`elseecho $1fi
}manglePathList() {UNAME=$(uname -s)# a path list, for example a java classpathif [ "${UNAME:0:6}" == "CYGWIN" ]; thenecho `cygpath -wp "$1"`elseecho $1fi
}# Looks up a config value by key from a simple YAML-style key-value map.
# $1: key to look up
# $2: default value to return if key does not exist
# $3: config file to read from
readFromConfig() {local key=$1local defaultValue=$2local configFile=$3# first extract the value with the given key (1st sed), then trim the result (2nd sed)# if a key exists multiple times, take the "last" one (tail)local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1`[ -z "$value" ] && echo "$defaultValue" || echo "$value"
}########################################################################################################################
# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml
# -or- the respective environment variables are not set.
######################################################################################################################### WARNING !!! , these values are only used if there is nothing else is specified in
# conf/flink-conf.yamlDEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=10                              # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS=""                            # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM=""                         # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM=""                         # Optional JVM args (TaskManager)
DEFAULT_ENV_JAVA_OPTS_HS=""                         # Optional JVM args (HistoryServer)
DEFAULT_ENV_JAVA_OPTS_CLI=""                        # Optional JVM args (Client)
DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessary
DEFAULT_HBASE_CONF_DIR=""                           # HBase Configuration Directory, if necessary########################################################################################################################
# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml
########################################################################################################################KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"KEY_ENV_PID_DIR="env.pid.dir"
KEY_ENV_LOG_DIR="env.log.dir"
KEY_ENV_LOG_MAX="env.log.max"
KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"
KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"
KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir"
KEY_ENV_JAVA_HOME="env.java.home"
KEY_ENV_JAVA_OPTS="env.java.opts"
KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver"
KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client"
KEY_ENV_SSH_OPTS="env.ssh.opts"
KEY_HIGH_AVAILABILITY="high-availability"
KEY_ZK_HEAP_MB="zookeeper.heap.mb"########################################################################################################################
# PATHS AND CONFIG
########################################################################################################################target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; doif [ "$iteration" -gt 100 ]; thenecho "Cannot resolve path: You have a cyclic symlink in $target."breakfils=`ls -ld -- "$target"`target=`expr "$ls" : '.* -> \(.*\)$'`iteration=$((iteration + 1))
done# Convert relative path to absolute path and resolve directory symlinks
bin=`dirname "$target"`
SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`# Define the main directory of the flink installation
# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here.
if [ -z "$_FLINK_HOME_DETERMINED" ]; thenFLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
fi
FLINK_LIB_DIR=$FLINK_HOME/lib
FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
FLINK_OPT_DIR=$FLINK_HOME/opt# These need to be mangled because they are directly passed to java.
# The above lib path is used by the shell script to retrieve jars in a
# directory, so it needs to be unmangled.
FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"`
if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi
FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin
DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log
FLINK_CONF_FILE="flink-conf.yaml"
YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}### Exported environment variables ###
export FLINK_CONF_DIR
export FLINK_BIN_DIR
export FLINK_PLUGINS_DIR
# export /lib dir to access it during deployment of the Yarn staging files
export FLINK_LIB_DIR
# export /opt dir to access it for the SQL client
export FLINK_OPT_DIR########################################################################################################################
# ENVIRONMENT VARIABLES
######################################################################################################################### read JAVA_HOME from config with no default value
MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")
# check if config specified JAVA_HOME
if [ -z "${MY_JAVA_HOME}" ]; then# config did not specify JAVA_HOME. Use system JAVA_HOMEMY_JAVA_HOME=${JAVA_HOME}
fi
# check if we have a valid JAVA_HOME and if java is not available
if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; thenecho "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME."exit 1
elseJAVA_HOME=${MY_JAVA_HOME}
fiUNAME=$(uname -s)
if [ "${UNAME:0:6}" == "CYGWIN" ]; thenJAVA_RUN=java
elseif [[ -d $JAVA_HOME ]]; thenJAVA_RUN=$JAVA_HOME/bin/javaelseJAVA_RUN=javafi
fi# Define HOSTNAME if it is not already set
if [ -z "${HOSTNAME}" ]; thenHOSTNAME=`hostname`
fiIS_NUMBER="^[0-9]+$"# Verify that NUMA tooling is available
command -v numactl >/dev/null 2>&1
if [[ $? -ne 0 ]]; thenFLINK_TM_COMPUTE_NUMA="false"
else# Define FLINK_TM_COMPUTE_NUMA if it is not already setif [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; thenFLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}")fi
fiif [ -z "${MAX_LOG_FILE_NUMBER}" ]; thenMAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}")export MAX_LOG_FILE_NUMBER
fiif [ -z "${FLINK_LOG_DIR}" ]; thenFLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}")
fiif [ -z "${YARN_CONF_DIR}" ]; thenYARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}")
fiif [ -z "${HADOOP_CONF_DIR}" ]; thenHADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")
fiif [ -z "${HBASE_CONF_DIR}" ]; thenHBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}")
fiif [ -z "${FLINK_PID_DIR}" ]; thenFLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}")
fiif [ -z "${FLINK_ENV_JAVA_OPTS}" ]; thenFLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//'  -e 's/"$//' )"
fiif [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; thenFLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//'  -e 's/"$//' )"
fiif [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; thenFLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//'  -e 's/"$//' )"
fiif [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; thenFLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//'  -e 's/"$//' )"
fiif [ -z "${FLINK_ENV_JAVA_OPTS_CLI}" ]; thenFLINK_ENV_JAVA_OPTS_CLI=$(readFromConfig ${KEY_ENV_JAVA_OPTS_CLI} "${DEFAULT_ENV_JAVA_OPTS_CLI}" "${YAML_CONF}")# Remove leading and ending double quotes (if present) of valueFLINK_ENV_JAVA_OPTS_CLI="$( echo "${FLINK_ENV_JAVA_OPTS_CLI}" | sed -e 's/^"//'  -e 's/"$//' )"
fiif [ -z "${FLINK_SSH_OPTS}" ]; thenFLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")
fi# Define ZK_HEAP if it is not already set
if [ -z "${ZK_HEAP}" ]; thenZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
fi# High availability
if [ -z "${HIGH_AVAILABILITY}" ]; thenHIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")if [ -z "${HIGH_AVAILABILITY}" ]; then# Try deprecated valueDEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")if [ -z "${DEPRECATED_HA}" ]; thenHIGH_AVAILABILITY="none"elif [ ${DEPRECATED_HA} == "standalone" ]; then# Standalone is now 'none'HIGH_AVAILABILITY="none"elseHIGH_AVAILABILITY=${DEPRECATED_HA}fifi
fi# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that!
if [ -z "${JVM_ARGS}" ]; thenJVM_ARGS=""
fi# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.
if [ -z "$HADOOP_CONF_DIR" ]; thenif [ -n "$HADOOP_HOME" ]; then# HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME pathif [ -d "$HADOOP_HOME/conf" ]; then# It's Hadoop 1.xHADOOP_CONF_DIR="$HADOOP_HOME/conf"fiif [ -d "$HADOOP_HOME/etc/hadoop" ]; then# It's Hadoop 2.2+HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"fifi
fi# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available)
if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; thenif [ -d "/etc/hadoop/conf" ]; thenecho "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set."HADOOP_CONF_DIR="/etc/hadoop/conf"fi
fi# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty.
if [ -z "$HBASE_CONF_DIR" ]; thenif [ -n "$HBASE_HOME" ]; then# HBASE_HOME is set.if [ -d "$HBASE_HOME/conf" ]; thenHBASE_CONF_DIR="$HBASE_HOME/conf"fifi
fi# try and set HBASE_CONF_DIR to some common default if it's not set
if [ -z "$HBASE_CONF_DIR" ]; thenif [ -d "/etc/hbase/conf" ]; thenecho "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set."HBASE_CONF_DIR="/etc/hbase/conf"fi
fiINTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"if [ -n "${HBASE_CONF_DIR}" ]; thenINTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"
fi# Auxilliary function which extracts the name of host from a line which
# also potentially includes topology information and the taskManager type
extractHostName() {# handle comments: extract first part of string (before first # character)WORKER=`echo $1 | cut -d'#' -f 1`# Extract the hostname from the network hierarchyif [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; thenWORKER=${BASH_REMATCH[1]}fiecho $WORKER
}readMasters() {MASTERS_FILE="${FLINK_CONF_DIR}/masters"if [[ ! -f "${MASTERS_FILE}" ]]; thenecho "No masters file. Please specify masters in 'conf/masters'."exit 1fiMASTERS=()WEBUIPORTS=()MASTERS_ALL_LOCALHOST=trueGOON=truewhile $GOON; doread line || GOON=falseHOSTWEBUIPORT=$( extractHostName $line)if [ -n "$HOSTWEBUIPORT" ]; thenHOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:)MASTERS+=(${HOST})if [ -z "$WEBUIPORT" ]; thenWEBUIPORTS+=(0)elseWEBUIPORTS+=(${WEBUIPORT})fiif [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; thenMASTERS_ALL_LOCALHOST=falsefifidone < "$MASTERS_FILE"
}readWorkers() {WORKERS_FILE="${FLINK_CONF_DIR}/workers"if [[ ! -f "$WORKERS_FILE" ]]; thenecho "No workers file. Please specify workers in 'conf/workers'."exit 1fiWORKERS=()WORKERS_ALL_LOCALHOST=trueGOON=truewhile $GOON; doread line || GOON=falseHOST=$( extractHostName $line)if [ -n "$HOST" ] ; thenWORKERS+=(${HOST})if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; thenWORKERS_ALL_LOCALHOST=falsefifidone < "$WORKERS_FILE"
}# starts or stops TMs on all workers
# TMWorkers start|stop
TMWorkers() {CMD=$1readWorkersif [ ${WORKERS_ALL_LOCALHOST} = true ] ; then# all-local setupfor worker in ${WORKERS[@]}; do"${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"doneelse# non-local setup# start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when availablecommand -v pdsh >/dev/null 2>&1if [[ $? -ne 0 ]]; thenfor worker in ${WORKERS[@]}; dossh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"doneelsePDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""fifi
}runBashJavaUtilsCmd() {local cmd=$1local conf_dir=$2local class_path=$3local dynamic_args=${@:4}class_path=`manglePathList "${class_path}"`local output=`${JAVA_RUN} -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`if [[ $? -ne 0 ]]; thenecho "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2# Print the output in case the user redirect the log to console.echo "$output" 1>&2exit 1fiecho "$output"
}extractExecutionResults() {local output="$1"local expected_lines="$2"local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"local execution_resultslocal num_linesexecution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX})num_lines=$(echo "${execution_results}" | wc -l)# explicit check for empty result, becuase if execution_results is empty, then wc returns 1if [[ -z ${execution_results} ]]; thenecho "[ERROR] The execution result is empty." 1>&2exit 1fiif [[ ${num_lines} -ne ${expected_lines} ]]; thenecho "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2echo "$output" 1>&2exit 1fiecho "${execution_results//${EXECUTION_PREFIX}/}"
}extractLoggingOutputs() {local output="$1"local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"echo "${output}" | grep -v ${EXECUTION_PREFIX}
}parseJmArgsAndExportLogs() {java_utils_output=$(runBashJavaUtilsCmd GET_JM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "$@")logging_output=$(extractLoggingOutputs "${java_utils_output}")params_output=$(extractExecutionResults "${java_utils_output}" 2)if [[ $? -ne 0 ]]; thenecho "[ERROR] Could not get JVM parameters and dynamic configurations properly."echo "[ERROR] Raw output from BashJavaUtils:"echo "$java_utils_output"exit 1fijvm_params=$(echo "${params_output}" | head -n1)export JVM_ARGS="${JVM_ARGS} ${jvm_params}"export DYNAMIC_PARAMETERS=$(IFS=" " echo "${params_output}" | tail -n1)export FLINK_INHERITED_LOGS="
$FLINK_INHERITED_LOGSJM_RESOURCE_PARAMS extraction logs:
jvm_params: $jvm_params
logs: $logging_output
"
}

可以看到里面确实有各个变量的解析构造过程,具体看代码,这里不再赘述。

最后执行了一句:

# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
$JAVA_RUN从config.sh可知,其实就是Java命令。 其他的变量都是些参数, 最核心的就是这个类: 
org.apache.flink.client.cli.CliFrontend。 这个类是真正启动flink命令后执行的Java类,今天主要介绍flink命令, 后面再专门介绍这个类。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/623082.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Clickhouse表引擎之CollapsingMergeTree引擎的原理与使用

前言 继续上次关于clickhouse的一些踩坑点&#xff0c;今天讲讲另外一个表引擎——CollapsingMergeTree。这个对于引擎对于数据量较大的场景是个不错的选择。注意&#xff0c;选择clickhouse的一般原因都是为了高效率查询&#xff0c;提高用户体验感&#xff0c;说白了就是以空…

MySQL单表的查询练习

作业要求&#xff1a; 作业实现&#xff1a; 首先&#xff0c;创建worker表并插入相关数据 CREATE TABLE worker (部门号 int(11) NOT NULL,职工号 int(11) NOT NULL,工作时间 date NOT NULL,工资 float(8,2) NOT NULL,政治面貌 varchar(10) NOT NULL DEFAULT 群众,姓名 varc…

OpenJDK 和 OracleJDK 哪个jdk更好更稳定,正式项目用哪个呢?关注者

OpenJDK 和 OracleJDK&#xff1a;哪个JDK更好更稳定&#xff0c;正式项目应该使用哪个呢&#xff1f;我会从&#xff0c;从开源性质、更新和支持、功能差异等方面进行比较&#xff0c;如何选择&#xff0c;哪个jdk更好更稳定&#xff0c;正式项目用哪个呢&#xff0c;进行比较…

定制elementPlus主题

1. 安装sass 基于vite的项目默认不支持css预处理器&#xff0c;需要开发者单独安装 npm i sass -D 2. 准备定制化的样式文件 styles/element/index.scss /* 只需要重写你需要的即可 */ forward element-plus/theme-chalk/src/common/var.scss with ($colors: (primary: (/…

测试:虚拟机查看CPU使用率

Windows虚拟机 任务管理器: 在虚拟机中&#xff0c;您可以打开任务管理器&#xff08;Ctrl Shift Esc&#xff09;&#xff0c;然后在“进程”标签下查看CPU使用率。 PowerShell: 使用PowerShell命令Get-VM | Select-Object Name, CPUUsage可以查询虚拟机的CPU使用率。 VMwa…

[足式机器人]Part3 机构运动学与动力学分析与建模 Ch00-4(1) 刚体的速度与角速度

本文仅供学习使用&#xff0c;总结很多本现有讲述运动学或动力学书籍后的总结&#xff0c;从矢量的角度进行分析&#xff0c;方法比较传统&#xff0c;但更易理解&#xff0c;并且现有的看似抽象方法&#xff0c;两者本质上并无不同。 2024年底本人学位论文发表后方可摘抄 若有…

关于java类与对象的创建

关于java类与对象的创建 我们在前面的文章中回顾了方法的定义和方法的调用&#xff0c;以及了解了面向对象的初步认识&#xff0c;我们本篇文章来了解一下类和对象的关系&#xff0c;还是遵循结合现实的方式去理解&#xff0c;不是死记硬背&#x1f600;。 1、类 类是一种抽…

【InternLM 大模型实战】第五课

LMDeploy 大模型量化部署实践 大模型部署背景模型部署定义&#xff1a;产品形态计算设备 大模型特点内存开销巨大动态shape相对视觉模型&#xff0c;LLM结构简单 大模型部署挑战设备推理服务 大模型部署方案技术点方案云端移动端 LMDeploy 简介高效推理引擎完备易用的工具链支持…

环境配置注解 @PostConstruct作用以及在springboot框架中的加载时间

作用 PostConstruct 是 Java EE 5 引入的一个注解&#xff0c;用于 Spring 框架中。它标记在方法上&#xff0c;以表示该方法应该在对象的依赖注入完成后&#xff0c;并且在类的任何业务方法被调用之前执行。这个注解的主要用途是进行一些初始化工作。需要注意的是&#xff1a;…

统计学-R语言-4.5

文章目录 前言多变量数据多维列联表复式条形图并列箱线图R语言中取整运算主要包括以下五种&#xff1a; 点带图多变量散点图重叠散点图矩阵式散点图 练习 前言 本篇文章将继续对数据的类型做介绍&#xff0c;本片也是最后一个介绍数据的。 多变量数据 掌握描述多变量数据的分…

CDN内容分发网络

1、CDN的含义 1.1 什么是CDN&#xff1f; CDN是内容分发网络&#xff08;Content Delivery Network&#xff09;的缩写。它是一种通过将内容部署到全球各地的服务器节点&#xff0c;使用户能够快速访问和下载内容的网络架构。 简单来说&#xff0c;CDN通过将内容分发到离用户更…

Redis-redis.conf配置文件中的RDB与AOF持久化方式的详解与区别

RDB&#xff08;Redis Database&#xff09; RDB是Redis的默认持久化方式&#xff0c;它将内存中的数据以二进制格式写入磁盘&#xff0c;形成一个快照。RDB持久化有以下几个重要的配置选项&#xff1a; save&#xff1a;指定了保存RDB的策略&#xff0c;默认的配置是每900秒&…

【软件测试】学习笔记-后端性能测试工具原理与行业常用工具简介

不管是什么类型的性能测试方法&#xff0c;都需要去模拟大量并发用户的同时执行&#xff0c;所以性能测试基本都是靠工具实现。没有工具&#xff0c;性能测试将寸步难行。 这篇文章从后端性能测试的工具讲起&#xff0c;讨论它们的实现原理&#xff0c;以及如何用于后端的性能…

SpringCloud:Gateway服务网关

文章目录 Gateway服务网关快速入门断言工厂默认过滤器自定义过滤器过滤器执行顺序跨域问题处理 Gateway服务网关 网关&#xff08;Gateway&#xff09;是将两个使用不同协议的网络段连接在一起的设备。 网关的作用就是对两个网络段中的使用不同传输协议的数据进行互相的翻译转换…

案例123:基于微信小程序的在线订餐系统的设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder …

CSS 雷达监测效果

<template><view class="center"><view class="loader"><view></view></view></view></template><script></script><style>/* 设置整个页面的背景颜色为深灰色 */body {background-col…

leetcode - 934. Shortest Bridge

Description You are given an n x n binary matrix grid where 1 represents land and 0 represents water. An island is a 4-directionally connected group of 1’s not connected to any other 1’s. There are exactly two islands in grid. You may change 0’s to 1…

【记录】重装系统后的软件安装

考完研重装了系统&#xff0c;安装软件乱七八糟&#xff0c;用到什么装什么。在这里记录一套标准操作&#xff0c;备用。一个个装还是很麻烦&#xff0c;我为什么不直接写个脚本直接下载安装包呢&#xff1f;奥&#xff0c;原来是我太菜了还不会写脚本啊&#xff01;先记着吧&a…

业务题day02

2-1 说一下生成课表的业务流程 当点击马上学习或者报名的时候&#xff0c;先去数据库查询课程是否存在或者是否在有效期内&#xff0c;如果判断通过&#xff0c;就成功。 接下来就要保存对应的课表&#xff0c;在上述操作中涉及两个微服务&#xff0c;下单过程trade微服务调用…