云原生之深入解析Flink on k8s的运行模式与实战操作

一、概述

  • Flink 核心是一个流式的数据流执行引擎,并且能够基于同一个 Flink 运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。
  • Flink 官网
  • 不同版本的文档
  • flink on k8s 官方文档
  • GitHub 地址

二、Flink 运行模式

  • 官方文档
  • Flink on yarn 有三种运行模式:
    • yarn-session 模式(Seesion Mode)
    • yarn-cluster 模式(Per-Job Mode)
    • Application 模式(Application Mode)

在这里插入图片描述

  • 注意:Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用,它将被丢弃在 FLINK-26000 中。

三、Flink on k8s 实战操作

① flink 下载

  • 下载地址,如下所示:
wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz

② 构建基础镜像

docker pull apache/flink:1.14.6-scala_2.12
docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12
docker push myharbor.com/bigdata/flink:1.14.6-scala_2.12

③ session 模式

  • Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行,可以在一个 Session 集群上运行多个 Flink 作业,每个作业都需要在集群部署完成后提交到集群。
  • Kubernetes 中的 Flink Session 集群部署至少包含三个组件:
    • 运行 JobManager 的部署;
    • TaskManagers 池的部署;
    • 暴露JobManager 的 REST 和 UI 端口的服务。

(A)Native Kubernetes 模式

  • 参数配置:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace
  • 构建镜像 Dockerfile:
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
  • 开始构建镜像:
docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache# 上传镜像
docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12
  • 创建命名空间和 serviceaccount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
  • 创建 flink 集群:
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster  \
-Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort

在这里插入图片描述
在这里插入图片描述

  • 提交任务(注意 jdk 版本,目前 jdk8 是正常的):
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar#   参数配置
./examples/streaming/WordCount.jar
-Dkubernetes.taskmanager.cpu=2000m \
-Dexternal-resource.limits.kubernetes.cpu=4000m \
-Dexternal-resource.limits.kubernetes.memory=10Gi \
-Dexternal-resource.requests.kubernetes.cpu=2000m \
-Dexternal-resource.requests.kubernetes.memory=8Gi \
-Dkubernetes.taskmanager.cpu=2000m \
  • 查看:
kubectl get pods -n flink
kubectl logs -f my-first-flink-cluster-taskmanager-1-1

在这里插入图片描述
在这里插入图片描述

  • 删除 flink 集群:
kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force

(B)Standalone 模式

  • 构建镜像:默认用户是 flink 用户,这里换成 admin,根据企业需要更换用户,脚本可以通过上面运行的 pod 拿到。启动脚本 docker-entrypoint.sh:
#!/usr/bin/env bash###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd() {if [ $(id -u) != 0 ]; then# Don't need to drop privs if EUID != 0returnelif [ -x /sbin/su-exec ]; then# Alpineecho su-exec adminelse# Othersecho gosu adminfi
}copy_plugins_if_required() {if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; thenreturn 0fiecho "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); doecho "Linking ${target_plugin} to plugin directory"plugin_name=${target_plugin%.jar}mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; thenecho "Plugin ${target_plugin} does not exist. Exiting."exit 1elseln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"echo "Successfully enabled ${target_plugin}"fidone
}set_config_option() {local option=$1local value=$2# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; thensed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"elseecho "${option}: ${value}" >> "${CONF_FILE}"fi
}prepare_configuration() {set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}set_config_option blob.server.port 6124set_config_option query.server.port 6125if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; thenset_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}fiif [ -n "${FLINK_PROPERTIES}" ]; thenecho "${FLINK_PROPERTIES}" >> "${CONF_FILE}"fienvsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}maybe_enable_jemalloc() {if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; thenJEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if [ -f "$JEMALLOC_PATH" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATHelif [ -f "$JEMALLOC_FALLBACK" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; thenMSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"fiecho "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."fifi
}maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@")
if [ "$1" = "help" ]; thenprintf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"printf "    Or $(basename "$0") help\n\n"printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"exit 0
elif [ "$1" = "jobmanager" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; thenargs=("${args[@]:1}")echo "Starting History Server"exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"
  • 编排 Dockerfile:
FROM myharbor.com/bigdata/centos:7.9.2009USER root# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN mkdir -p /opt/apacheADD jdk-8u212-linux-x64.tar.gz /opt/apache/ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/ENV FLINK_HOME /opt/apache/flink-1.14.6
ENV JAVA_HOME /opt/apache/jdk1.8.0_212
ENV PATH $JAVA_HOME/bin:$PATH# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/
RUN chmod +x /opt/apache/docker-entrypoint.shRUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin adminRUN chown -R admin:admin /opt/apache#设置的工作目录
WORKDIR $FLINK_HOME# 对外暴露端口
EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]
  • 开始构建镜像:
docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache# 上传镜像
docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12# 删除镜像
docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
crictl rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
  • 创建命名空间和 serviceaccount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
  • 编排 yaml 文件:
    • flink-configuration-configmap.yaml:
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
    • jobmanager-service.yaml可选服务,仅非 HA 模式需要:
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
    • jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口:
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
    • taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口:
apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager
  • 如下几个配置文件是公共的:
    • jobmanager-session-deployment-non-ha.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanager
spec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12args: ["jobmanager"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.14.6/conf/securityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
    • taskmanager-session-deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.14.6/conf/securityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
  • 创建 flink 集群:
    • 如下所示:
kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink# Create the deployments for the cluster
kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
kubectl create -f taskmanager-session-deployment.yaml -n flink
    • 镜像逆向解析 dockerfile:
alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler"
whaler flink:1.14.6-scala_2.12
    • 查看:
kubectl get pods,svc -n flink -owide

在这里插入图片描述

    • Web UI 地址。
  • 提交任务:
./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar
kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink

在这里插入图片描述
在这里插入图片描述

  • 删除 flink 集群:
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f taskmanager-session-deployment.yaml -n flink
kubectl delete -f jobmanager-session-deployment.yaml -n flink
kubectl delete ns flink --force
  • 访问 flink web:
    • 端口就是 jobmanager-rest-service.yaml 文件中的 NodePort:
http://192.168.182.110:30081/#/overview

在这里插入图片描述

④ application 模式(推荐)

  • Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件:
    • 运行 JobManager 的应用程序;
    • TaskManagers 池的部署;
    • 暴露 JobManager 的 REST 和 UI 端口的服务。

(A)Native Kubernetes 模式(常用)

  • 构建镜像 Dockerfile:
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
RUN mkdir -p $FLINK_HOME/usrlib
COPY  TopSpeedWindowing.jar $FLINK_HOME/usrlib/
  • 开始构建镜像:
docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache# 上传镜像
docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12# 删除镜像
docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
  • 创建命名空间和 serviceacount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
  • 创建 flink 集群并提交任务:
./bin/flink run-application \--target kubernetes-application \-Dkubernetes.cluster-id=my-first-application-cluster  \-Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \-Dkubernetes.jobmanager.replicas=1 \-Dkubernetes.namespace=flink \-Dkubernetes.jobmanager.service-account=flink-service-account \-Dexternal-resource.limits.kubernetes.cpu=2000m \-Dexternal-resource.limits.kubernetes.memory=2Gi \-Dexternal-resource.requests.kubernetes.cpu=1000m \-Dexternal-resource.requests.kubernetes.memory=1Gi \-Dkubernetes.rest-service.exposed.type=NodePort \local:///opt/flink/usrlib/TopSpeedWindowing.jar
  • local 是应用模式中唯一支持的方案,local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。查看:
kubectl get pods pods,svc -n flink

在这里插入图片描述

kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • 删除 flink 集群:

kubectl delete deployment/my-first-application-cluster -n flink
kubectl delete ns flink --force

(B)Standalone 模式

  • 构建镜像 Dockerfile,启动脚本 docker-entrypoint.sh:
#!/usr/bin/env bash###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd() {if [ $(id -u) != 0 ]; then# Don't need to drop privs if EUID != 0returnelif [ -x /sbin/su-exec ]; then# Alpineecho su-exec adminelse# Othersecho gosu adminfi
}copy_plugins_if_required() {if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; thenreturn 0fiecho "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); doecho "Linking ${target_plugin} to plugin directory"plugin_name=${target_plugin%.jar}mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; thenecho "Plugin ${target_plugin} does not exist. Exiting."exit 1elseln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"echo "Successfully enabled ${target_plugin}"fidone
}set_config_option() {local option=$1local value=$2# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; thensed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"elseecho "${option}: ${value}" >> "${CONF_FILE}"fi
}prepare_configuration() {set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}set_config_option blob.server.port 6124set_config_option query.server.port 6125if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; thenset_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}fiif [ -n "${FLINK_PROPERTIES}" ]; thenecho "${FLINK_PROPERTIES}" >> "${CONF_FILE}"fienvsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}maybe_enable_jemalloc() {if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; thenJEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if [ -f "$JEMALLOC_PATH" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATHelif [ -f "$JEMALLOC_FALLBACK" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; thenMSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"fiecho "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."fifi
}maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@")
if [ "$1" = "help" ]; thenprintf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"printf "    Or $(basename "$0") help\n\n"printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"exit 0
elif [ "$1" = "jobmanager" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; thenargs=("${args[@]:1}")echo "Starting History Server"exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"
  • 编排 Dockerfile:
FROM myharbor.com/bigdata/centos:7.9.2009USER root# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN mkdir -p /opt/apacheADD jdk-8u212-linux-x64.tar.gz /opt/apache/ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/ENV FLINK_HOME /opt/apache/flink-1.14.6
ENV JAVA_HOME /opt/apache/jdk1.8.0_212
ENV PATH $JAVA_HOME/bin:$PATH# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin adminRUN chown -R admin:admin /opt/apache
RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh#设置的工作目录
WORKDIR $FLINK_HOME# 对外暴露端口
EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]
docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache# 上传镜像
docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12# 删除镜像
docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
  • 创建命名空间和 serviceacount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
  • 编排 yaml 文件:
    • flink-configuration-configmap.yaml:
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
    • jobmanager-service.yaml可选服务,仅非 HA 模式需要:
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
    • jobmanager-rest-service.yaml 可选服务,将 jobmanager rest 端口公开为公共 Kubernetes 节点的端口:
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
    • taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口:
apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager
    • jobmanager-application-non-ha.yaml,非高可用(注意这里的挂载 /mnt/bigdata/flink/usrlib,最好这里使用共享目录):
apiVersion: batch/v1
kind: Job
metadata:name: flink-jobmanager
spec:template:metadata:labels:app: flinkcomponent: jobmanagerspec:restartPolicy: OnFailurecontainers:- name: jobmanagerimage: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12env:args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.14.6/conf- name: job-artifacts-volumemountPath: /opt/apache/flink-1.14.6/usrlibsecurityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /mnt/nfsdata/flink/application/job-artifacts
    • taskmanager-job-deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12env:args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.14.6/conf- name: job-artifacts-volumemountPath: /opt/apache/flink-1.14.6/usrlibsecurityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /mnt/nfsdata/flink/application/job-artifacts
  • 创建 flink 集群并提交任务:
kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink# Create the deployments for the cluster
kubectl create -f  jobmanager-application-non-ha.yaml -n flink
kubectl create -f  taskmanager-job-deployment.yaml -n flink
  • 查看:
kubectl get pods,svc -n flink

在这里插入图片描述

  • 删除 flink 集群:
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f jobmanager-rest-service.yaml -n flink
kubectl delete -f taskmanager-query-state-service.yaml -n flink
kubectl delete -f jobmanager-application-non-ha.yaml -n flink
kubectl delete -f taskmanager-job-deployment.yaml -n flinkkubectl delete ns flink --force
  • 查看:
kubectl get pods,svc -n flink
kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1045.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CVE-2023-1454注入分析复现

简介 JeecgBoot的代码生成器是一种可以帮助开发者快速构建企业级应用的工具&#xff0c;它可以通过一键生成前后端代码&#xff0c;无需写任何代码&#xff0c;让开发者更多关注业务逻辑。 影响版本 Jeecg-Boot<3.5.1 环境搭建 idea 后端源码&#xff1a; https://git…

vue3项目创建(vite3+ts+elementui-plus)

文章目录 1.创建工程 1.创建工程 目的&#xff1a;vue3vitets 安装依赖&#xff0c;安装vite的工具 Vite下一代的前端工具链为开发提供极速响应v4.3 npm install -g create-vite创建工程 create-vite font-userui --template vue-ts –template vue-ts 后面的是配置模板&#…

git bash设置字体大小

背景 git bash默认字体太小了&#xff0c;每次读信息都要伸头盯着屏幕&#xff0c;很不自在&#xff0c;不符合我的风格&#xff0c;so let’s do it&#xff01; 修改前的git bash&#xff1a; 正确的打开方式 1、在任意目录下&#xff0c;右键选择“Git Bash Here”&…

ubuntu netplan工具原理(网络配置、ip修改ip、固定ip)(NetworkManager)

https://netplan.io/ 文章目录 netplan工作原理netplan -h原翻译命令释义- help&#xff1a;显示netplan的帮助消息。- apply&#xff1a;将当前netplan配置应用到运行系统。示例命令&#xff1a;netplan apply --debug- generate&#xff1a;从/etc/netplan/*.yaml生成特定于后…

JVM 运行流程、类加载、垃圾回收

一、JVM 简介 1、JVM JVM 是 Java Virtual Machine 的简称&#xff0c;意为 Java 虚拟机。 虚拟机是指通过软件模拟的具有完整硬件功能的、运行在一个完全隔离的环境中的完整计算机系统。 常见的虚拟机&#xff1a;JVM、VMwave、Virtual Box。 JVM 和其他两个虚拟机的区别…

Android Java代码与JNI交互字符串转换(四)

🔥 Android Studio 版本 🔥 🔥 创建JNIString.java 🔥 package com.cmake.ndk1.jni;public class JNIString {static{System.loadLibrary("string-lib");}public native String callNativeString(String str);public native void stringMethod(String str)…

C/C++的发展历程和未来趋势

文章目录 C/C的起源C/C的应用C/C开发的工具C/C未来趋势 C/C的起源 C语言 C语言是一种通用的高级编程语言&#xff0c;由美国计算机科学家Dennis Ritchie在20世纪70年代初期开发出来。起初&#xff0c;C语言是作为操作系统UNIX的开发语言而创建的。C语言的设计目标是提供一种功…

【玩转循环】探索Python中的无限可能性

前言 循环可能是每个编程语言中使用比较多的语法了&#xff0c;如果能合理利用好循环&#xff0c;就会出现意想不到的结果&#xff0c;大大地减少代码量&#xff0c;让机器做那些简单枯燥的循环过程&#xff0c;今天我将为大家分享 python 中的循环语法使用。&#x1f697;&am…

spring复习:(22)实现了BeanNameAware等Aware接口的bean,相应的回调方法是在哪里被调用的?

AbstractAutowireCapableBeanFactory的doCreateBean用来创建bean, 其中调用了initializeBean方法对bean进行初始化 initializeBean包含如下代码&#xff1a; 而invokeAwareMethods代码如下&#xff1a; 可见其分别判断是否实现了BeanNameAware接口、BeanClassLoaderAware接口…

基于ArcGIS、ENVI、InVEST、FRAGSTATS等多技术融合提升技术

空间数据获取与制图 1.1 软件安装与应用讲解 1.2 空间数据介绍 1.3海量空间数据下载 1.4 ArcGIS软件快速入门 1.5 Geodatabase地理数据库 ArcGIS专题地图制作 2.1专题地图制作规范 2.2 空间数据的准备与处理 2.3 空间数据可视化&#xff1a;地图符号与注记 2.4 研究区…

Maven下载和配置教程:Windows、Mac和Linux系统安装指南

&#x1f337;&#x1f341; 博主 libin9iOak带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33…

Nginx配置白名单访问

一、背景 在项目运行的时候&#xff0c;需要设置特定的访问权限&#xff0c;以拒绝其他可能存在的恶意访问。 二、配置 2.1、关键字 允许访问关键字&#xff1a;allow 屏蔽访问关键字&#xff1a;deny 2.2、作用域 作用域如下&#xff1a; http&#xff1a;所有网站屏蔽I…

如何搭建自己的图床(GitHub版)

文章目录 1.图床的概念2.用GitHub创建图床服务器2.1.新建仓库2.2.生成Token令牌2.3.创建img分支和该分支下的img文件夹(可选) 3.使用PicGo软件上传图片3.1 下载PicGo软件3.2配置PicGo3.3用PicGo实现上传 4. Typora实现自动上传5.免费图片网站 前言&#xff1a; 如果没有自己的服…

ARM Coresight 系列文章 7 - ARM Coresight 通过 AHB-AP 访问 cpu 内部 coresight 组件

文章目录 如下图所示&#xff0c;如果A78想去访问M33的内部 coresight 组件 ETM&#xff0c;需要要怎么做&#xff1f; 答案也正是在图中&#xff0c;首先A78 通过AXI 互联&#xff0c;接入到 APBIC 的 slave port&#xff0c;再通过APBIC 的 master 送出&#xff0c;而APBIC中…

网安学习经历小记

明明自觉学会了不少知识&#xff0c;可真正开始做题时&#xff0c;却还是出现了“一支笔&#xff0c;一双手&#xff0c;一道力扣&#xff08;Leetcode&#xff09;做一宿”的窘境&#xff1f;你是否也有过这样的经历&#xff0c;题型不算很难&#xff0c;看题解也能弄明白&…

MySql高级篇-006 MySQL架构篇-02MySQL的数据目录:数据库下的主要目录结构、文件系统如何存储数据

第02章_MySQL的数据目录 1.MySQL8的主要目录结构 # 查询名称叫做mysql的文件目录都有哪些[rootatguigu07 ~]# find / -name mysql安装好MySQL 8之后&#xff0c;我们查看如下的目录结构&#xff1a; 1.1 数据库文件的存放路径 MySQL数据库文件的存放路径&#xff1a;/var/…

Foxit PDF ActiveX 5.9.8 Crack

Foxit PDF SDK ActiveX 即时添加PDF显示功能至Windows应用程序&#xff0c;快速投放市场&#xff0c;可视化编程组件功能强大且易于使用的PDF软件开发工具包 对于刚接触PDF或不愿投入过多精力学习PDF技术的产品管理者及开发者来说&#xff0c;Foxit PDF SDK ActiveX无疑是理想…

第二十章:CANet:具有迭代细化和专注少样本学习的无类别分割网络

0.摘要 最近在语义分割方面的进展是由深度卷积神经网络和大规模标注图像数据集推动的。然而&#xff0c;像素级别的数据标注是繁琐和昂贵的。此外&#xff0c;训练好的模型只能在一组预定义的类别中进行预测。在本文中&#xff0c;我们提出了CANet&#xff0c;一种无类别偏见的…

现代化 Android 开发:Jetpack Compose 最佳实践

作者&#xff1a;古哥E下 如果一直关注 Compose 的发展的话&#xff0c;可以明显感受到 2022 年和 2023 年的 Compose 使用讨论的声音已经完全不一样了, 2022 年还多是观望&#xff0c;2023 年就有很多团队开始采纳 Compose 来进行开发了。不过也有很多同学接触了下 Compose&am…

基于深度学习的高精度鸟类目标检测识别系统(PyTorch+Pyside6+YOLOv5模型)

摘要&#xff1a;基于深度学习的高精度鸟类目标&#xff08;鹦鹉&#xff08;Crested Myna&#xff09;、麻雀&#xff08;Eurasian Tree Sparrow&#xff09;、黑头文鸟&#xff08;Chestnut Munia&#xff09;、白领翡翠&#xff08;Collared Kingfisher&#xff09;、太阳鸟…