k8s之flink的几种创建方式

在此之前需要部署一下私人docker仓库,教程搭建 Docker 镜像仓库

注意:每台节点的daemon.json都需要配置"insecure-registries": ["http://主机IP:8080"] 并重启

一、session 模式

Session 模式是指在 Kubernetes 上启动一个共享的 Flink 集群(由 JobManager 和多个 TaskManagers 组成),然后多个 Flink 作业可以提交到这个共享集群上运行。这个模式下的集群会长期运行,直到用户手动停止它。这种模式适合多个作业需要频繁启动和停止,且对集群资源的利用率要求较高的场景。


Kubernetes 中的 Flink Session 集群部署至少包含三个组件:

  • 运行JobManager的部署

  • TaskManagers池的部署

  • 暴露JobManager的 REST 和 UI 端口的服务

1.1 Native Kubernetes 模式

Flink 的 Native Kubernetes 模式允许用户将 Apache Flink 无缝集成至 Kubernetes 环境中,实现在 Kubernetes 上运行 Flink 作业和应用程序。这种模式的主要优点是 Flink 能够利用 Kubernetes 提供的资源编排和管理能力,简化 Flink 集群的部署和管理。

在 Native Kubernetes 模式下,Flink 集群的部署和管理是通过 Flink 的 Kubernetes Operator 或者是直接使用 kubectl 命令行工具来完成的。Flink 的每个组件都被作为 Kubernetes 资源(如Pods, Services等)来管理。

1.1.1 构建镜像 Dockerfile

1.创建dockerfileFROM flink:1.16.2
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-82.开始构建镜像
docker build -t  192.168.20.62:2333/bigdata/flink-session:1.16.23.上传镜像
docker push 192.168.20.62:2333/bigdata/flink-session:1.16.2

1.1.2 创建命名空间和 serviceaccount

# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

1.1.3 创建 flink 集群

./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster  \
-Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-session:1.16.2 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort

1.1.4 提交任务

./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar \
-Dkubernetes.taskmanager.cpu=2000m \
-Dexternal-resource.limits.kubernetes.cpu=4000m \
-Dexternal-resource.limits.kubernetes.memory=10Gi \
-Dexternal-resource.requests.kubernetes.cpu=2000m \
-Dexternal-resource.requests.kubernetes.memory=8Gi \
-Dkubernetes.taskmanager.cpu=2000m \

1.1.5 删除 flink 集群

kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force

1.2 Standalone 模式

Standalone 模式通常指的是在 Kubernetes 集群上运行 Flink 的一个单独集群环境,但它不是专门为 Kubernetes 设计的。在 Kubernetes 上使用 Standalone 模式意味着你将手动设置 Flink 集群(包括 JobManager 和 TaskManagers),而不是通过 Kubernetes Operator 或者其他 Kubernetes 原生的资源调度和管理机制。换句话说,在这个模式下,Flink 集群的各个组件(JobManager和TaskManagers)运行在 Kubernetes Pod 中,但是它们的生命周期管理并不是通过 Kubernetes 原生的支持来实现的,而是类似于在任何其他环境中部署 Flink 的传统方式。

1.2.1 创建docker-entrypoint.sh脚本

#!/usr/bin/env bash###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd() {if [ $(id -u) != 0 ]; then# Don't need to drop privs if EUID != 0returnelif [ -x /sbin/su-exec ]; then# Alpineecho su-exec adminelse# Othersecho gosu adminfi
}copy_plugins_if_required() {if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; thenreturn 0fiecho "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); doecho "Linking ${target_plugin} to plugin directory"plugin_name=${target_plugin%.jar}mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; thenecho "Plugin ${target_plugin} does not exist. Exiting."exit 1elseln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"echo "Successfully enabled ${target_plugin}"fidone
}set_config_option() {local option=$1local value=$2# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; thensed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"elseecho "${option}: ${value}" >> "${CONF_FILE}"fi
}prepare_configuration() {set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}set_config_option blob.server.port 6124set_config_option query.server.port 6125if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; thenset_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}fiif [ -n "${FLINK_PROPERTIES}" ]; thenecho "${FLINK_PROPERTIES}" >> "${CONF_FILE}"fienvsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}maybe_enable_jemalloc() {if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; thenJEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if [ -f "$JEMALLOC_PATH" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATHelif [ -f "$JEMALLOC_FALLBACK" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; thenMSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"fiecho "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."fifi
}maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@")
if [ "$1" = "help" ]; thenprintf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"printf "    Or $(basename "$0") help\n\n"printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"exit 0
elif [ "$1" = "jobmanager" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; thenargs=("${args[@]:1}")echo "Starting History Server"exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"

1.2.2 编排 Dockerfile

FROM centos:7.9.2009USER root# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN mkdir -p /opt/apacheADD jdk-8u231-linux-x64.tar.gz /opt/apache/ADD flink-1.16.2-bin-scala_2.12.tgz  /opt/apache/ENV FLINK_HOME /opt/apache/flink-1.16.2
ENV JAVA_HOME /opt/apache/jdk1.8.0_231
ENV PATH $JAVA_HOME/bin:$PATH# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/
RUN chmod +x /opt/apache/docker-entrypoint.shRUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin adminRUN chown -R admin:admin /opt/apache#设置的工作目录
WORKDIR $FLINK_HOME# 对外暴露端口
EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]

1.2.3 开始构建镜像

docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2 . --no-cache# 上传镜像
docker push 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2

1.2.4 创建命名空间和 serviceaccount

# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

1.2.5 编排 yaml 文件

1.2.5.1  flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
1.2.5.2 jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
1.2.5.3 jobmanager-rest-service.yaml

将 jobmanager rest端口公开为公共 Kubernetes 节点的端口

apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
1.2.5.4 taskmanager-query-state-service.yaml

公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口

apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager
1.2.5.5  jobmanager-session-deployment-non-ha.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanager
spec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2args: ["jobmanager"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf/securityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
1.2.5.6  taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf/securityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties

1.2.6 创建 flink 集群

kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink# Create the deployments for the cluster
kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
kubectl create -f taskmanager-session-deployment.yaml -n flink

1.2.7 提交任务

./bin/flink run -m 192.168.20.62:30081 ./examples/streaming/TopSpeedWindowing.jar

1.2.8 删除集群

kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f taskmanager-session-deployment.yaml -n flink
kubectl delete -f jobmanager-session-deployment.yaml -n flink
kubectl delete ns flink --force

二 、application 模式(推荐)

Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件

  • 运行JobManager的应用程序

  • TaskManagers池的部署

  • 暴露JobManager的 REST 和 UI 端口的服务

2.1 Native Kubernetes 模式(常用)

2.1.1 构建镜像 Dockerfile

FROM flink:1.16.2
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
RUN mkdir -p $FLINK_HOME/usrlib
COPY   ./flink-1.16.2/examples/streaming/TopSpeedWindowing.jar /opt/flink/usrlib/开始构建镜像docker build -t 192.168.20.62:2333/bigdata/flink-application:1.16.2 . --no-cache
docker push  192.168.20.62:2333/bigdata/flink-application:1.16.2

2.1.2 创建命名空间和 serviceacount

# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

2.1.3 创建 flink 集群并提交任务

./bin/flink run-application \--target kubernetes-application \-Dkubernetes.cluster-id=my-first-application-cluster  \-Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-application:1.16.2 \-Dkubernetes.jobmanager.replicas=1 \-Dkubernetes.namespace=flink \-Dkubernetes.jobmanager.service-account=flink-service-account \-Dexternal-resource.limits.kubernetes.cpu=2000m \-Dexternal-resource.limits.kubernetes.memory=2Gi \-Dexternal-resource.requests.kubernetes.cpu=1000m \-Dexternal-resource.requests.kubernetes.memory=1Gi \-Dkubernetes.rest-service.exposed.type=NodePort \local:///opt/flink/usrlib/TopSpeedWindowing.jar

local是application模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。

2.1.4 删除 flink 集群

kubectl delete deployment/my-first-application-cluster -n flink
kubectl delete ns flink --force

2.2 Standalone 模式

在此之前需要使用nfs设置一个共享目录,配置文件中设置共享目录是/mnt/bigdata/flink/usrlib

NFS共享存储服务

2.2.1 创建启动脚本docker-entrypoint.sh

#!/usr/bin/env bash###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd() {if [ $(id -u) != 0 ]; then# Don't need to drop privs if EUID != 0returnelif [ -x /sbin/su-exec ]; then# Alpineecho su-exec adminelse# Othersecho gosu adminfi
}copy_plugins_if_required() {if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; thenreturn 0fiecho "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); doecho "Linking ${target_plugin} to plugin directory"plugin_name=${target_plugin%.jar}mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; thenecho "Plugin ${target_plugin} does not exist. Exiting."exit 1elseln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"echo "Successfully enabled ${target_plugin}"fidone
}set_config_option() {local option=$1local value=$2# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; thensed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"elseecho "${option}: ${value}" >> "${CONF_FILE}"fi
}prepare_configuration() {set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}set_config_option blob.server.port 6124set_config_option query.server.port 6125if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; thenset_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}fiif [ -n "${FLINK_PROPERTIES}" ]; thenecho "${FLINK_PROPERTIES}" >> "${CONF_FILE}"fienvsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}maybe_enable_jemalloc() {if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; thenJEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if [ -f "$JEMALLOC_PATH" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATHelif [ -f "$JEMALLOC_FALLBACK" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; thenMSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"fiecho "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."fifi
}maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@")
if [ "$1" = "help" ]; thenprintf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"printf "    Or $(basename "$0") help\n\n"printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"exit 0
elif [ "$1" = "jobmanager" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; thenargs=("${args[@]:1}")echo "Starting History Server"exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"

2.2.2 编排Dockerfile

FROM centos:7.9.2009USER root# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN mkdir -p /opt/apacheADD jdk-8u231-linux-x64.tar.gz /opt/apache/ADD flink-1.16.2-bin-scala_2.12.tgz  /opt/apache/ENV FLINK_HOME /opt/apache/flink-1.16.2
ENV JAVA_HOME /opt/apache/jdk1.8.0_231
ENV PATH $JAVA_HOME/bin:$PATH# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/COPY /RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin adminRUN chown -R admin:admin /opt/apache
RUN chmod +x /opt/apache/docker-entrypoint.sh#设置的工作目录
WORKDIR $FLINK_HOME# 对外暴露端口
EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2 . --no-cache
docker push 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2

2.2.3 创建命名空间和 serviceacount

# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

2.2.4 flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF

2.2.5 jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager

2.2.6  jobmanager-rest-service.yaml

将 jobmanager rest 端口公开为公共 Kubernetes 节点的端口。

apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager

2.2.7 taskmanager-query-state-service.yaml

公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager

2.2.8 jobmanager-application-non-ha.yaml

apiVersion: batch/v1
kind: Job
metadata:name: flink-jobmanager
spec:template:metadata:labels:app: flinkcomponent: jobmanagerspec:restartPolicy: OnFailurecontainers:- name: jobmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2env:args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","--output","/tmp/result"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf- name: job-artifacts-volumemountPath: /opt/apache/flink-1.16.2/usrlibsecurityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /mnt/bigdata/flink/usrlib

2.2.9 taskmanager-job-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2env:args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf- name: job-artifacts-volumemountPath: /opt/apache/flink-1.16.2/usrlibsecurityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /mnt/bigdata/flink/usrlib

2.2.10 创建 flink 集群并提交任务

kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink# Create the deployments for the cluster
kubectl create -f  jobmanager-application-non-ha.yaml -n flink
kubectl create -f  taskmanager-job-deployment.yaml -n flink

2.2.11 删除 flink 集群

kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f jobmanager-rest-service.yaml -n flink
kubectl delete -f taskmanager-query-state-service.yaml -n flink
kubectl delete -f jobmanager-application-non-ha.yaml -n flink
kubectl delete -f taskmanager-job-deployment.yaml -n flinkkubectl delete ns flink --force

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/603554.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智慧旅游景区解决方案:PPT全文49页,附下载

关键词&#xff1a;智慧景区建设&#xff0c;智慧旅游平台&#xff0c;智慧旅游运营检测系统项目&#xff0c;智慧文旅&#xff0c;智慧景区开发与管理&#xff0c;智慧景区建设核心&#xff0c;智慧景区开发与管理 一、智慧景区建设现状 1、基础设施建设&#xff1a;智慧景区…

推荐收藏!万字长文带入快速使用 keras

这些年&#xff0c;有很多感悟&#xff1a;一个人精力是有限的&#xff0c;一个人视野也有有限的&#xff0c;你总会不经意间发现优秀人的就在身边。 看我文章的小伙伴应该经常听我说过的一句话&#xff1a;技术要学会交流、分享&#xff0c;不建议闭门造车。一个人可以走的很…

Keil5,ARM编译器 软件优化注意事项

优化C代码中的环路终止 循环是大多数程序中的常见结构。由于大量的执行时间通常花费在循环中&#xff0c;因此值得关注时间关键循环。 如果不谨慎地编写&#xff0c;环路终止条件可能会导致大量开销。在可能的情况下&#xff1a; 使用简单的终止条件。 写入倒计时到零循环。…

MySQL三种常见存储引擎【理论】【需动手操作】

先放一个大佬的博客 等以后有时间按大佬写的 动手操作一下 链接 MySOL 的存储引擎是指 MySOL 数据库管理系统中用于处理数据存诸和检索的组件。 MySOL 常用的存储引擎有以下几个: InnoDB: InnoDB 是 MySQL(5.5)的默认存储引擎&#xff0c;支持事务处理、行级锁定和物理外键约…

2024年超详细的Python3学习路径规划

前言 基于Python3.5 1.第一阶段基础&#xff08;必须&#xff09; Python3 环境搭建Python3 基础语法Python3 基本数据类型Python3 数据类型转换Python3 解释器Python3 注释Python3 运算符Python3 数字(Number)Python3 字符串Python3 列表Python3 元组Python3 字典Python3 集…

dnSpy调试工具二次开发1-新增菜单

测试环境&#xff1a; window 10 visual studio 2019 版本号&#xff1a;16.11.15 .net framework 4.8 开发者工具包 下载 .NET Framework 4.8 | 免费官方下载 .net 5开发者工具包 下载 .NET 5.0 (Linux、macOS 和 Windows) 利用git拉取代码(源码地址&#xff1a;Gi…

启动IDEA报错,web servcer failed to start.port 8080 was already in use.

启动IDEA报错&#xff0c;web servcer failed to start.port 8080 was already in use. 问题现状 启动IDEA失败&#xff0c;端口被占用。 解决办法&#xff1a; 使用netstat -ano指令&#xff0c;查看端口占用情况 因为我是win11的系统&#xff0c;使用指令时出现如下提示。…

【IC设计】移位寄存器

目录 理论讲解背景介绍什么是移位寄存器按工作模式分类verilog语法注意事项 设计实例循环移位寄存器算术双向移位寄存器5位线性反馈移位寄存器伪随机码发生器3位线性反馈移位寄存器32位线性反馈移位寄存器串行移位寄存器&#xff08;打4拍&#xff09;双向移位寄存器&#xff1…

c语言题目之统计二级制数中1的个数

文章目录 题目一、方法1二、方法2三&#xff0c;方法3总结 题目 统计二进制数中1的个数 输入一行&#xff0c;输出一行 输入&#xff1a; 输入一个整数 输出&#xff1a; 输出存储在内存中二进制的1的个数 一、方法1 之前的文章中&#xff0c;小编写了有关于内存在二进制中的存…

Fiddler工具 — 8.会话列表(Session List)

1、会话列表说明 Fiddler抓取到的每条HTTP请求&#xff08;每一条称为一个session&#xff09;。 主要包含了请求的ID编号、状态码、协议、主机名、URL、内容类型、body大小、进程信息、自定义备注等信息。如下图&#xff1a; 说明&#xff1a; 名称含义#抓取HTTP Request的顺…

Ribbon相关问题及答案(2024)

1、Ribbon是什么&#xff0c;它在微服务架构中扮演什么角色&#xff1f; Ribbon是一个客户端负载均衡器&#xff0c;它在微服务架构中扮演着关键性的角色。Ribbon的设计理念是在客户端进行服务发现和负载均衡&#xff0c;这种方式不同于传统的通过中心化的负载均衡器&#xff…

YHZ018 Python 运算符优先级

资源编号&#xff1a;YHZ018 配套视频&#xff1a;https://www.bilibili.com/video/BV1zy4y1Z7nk?p19 YHZ018&#xff1a;运算符优先级 &#x1fabf; 运算符优先级 Python支持多种运算符&#xff0c;下表按照优先级从高到低的顺序列出了所有运算符。运算符的优先级决定了在表…

面试算法90:环形房屋偷盗

题目 一条环形街道上有若干房屋。输入一个数组表示该条街道上的房屋内财产的数量。如果这条街道上相邻的两幢房屋被盗就会自动触发报警系统。请计算小偷在这条街道上最多能偷取的财产的数量。例如&#xff0c;街道上5家的财产用数组[2&#xff0c;3&#xff0c;4&#xff0c;5…

js实现全选按钮,反选

点击全选按钮&#xff0c;下面的按钮全部选中&#xff1b;再次点击&#xff0c;全部取消选择。 点击下面的按钮时&#xff0c;检查下面的按钮是不是全部都选中&#xff0c;如果全部选中了&#xff0c;需要修改全选按钮的选中状态为ture。 全选反选 <!DOCTYPE html> <…

Linux系统IO—探索输入输出操作的奥秘

&#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;HEART BEAT—YOASOBI 2:20━━━━━━️&#x1f49f;──────── 5:35 &#x1f504; ◀️ ⏸ ▶️ ☰ …

c++之迭代器

目录 一、迭代器 二、几种常见的迭代器类型 三、使用迭代器时注意事项 一、迭代器 在C中&#xff0c;迭代器是一种用于遍历容器元素的对象。迭代器提供了一种通用的方式来访问各种不同类型的容器&#xff0c;如数组、向量、列表、集合和映射等。 使用迭代器可以避免直接操作…

三、Qt核心与Qt类库

一、Qt核心&#xff1a;元对象系统 1、Qt核心特点 Qt对标准C进行了扩展&#xff0c;引入了一些新的概念和功能元对象编译器&#xff08;MOC&#xff09;是一个预处理器&#xff0c;先将Qt的特性程序转为标准C程序&#xff0c;再由标准C编译器进行编译Qt为C语言增加的特性在Qt…

提升开发效率:npm包管理器的使用技巧

文章目录 一、npm简介二、npm的基本操作1. 安装Node.js和npm2. 创建和管理项目3. 安装依赖4. 卸载依赖5. 更新依赖 三、npm的高级特性1. 使用不同版本的依赖项2. 查看已安装的依赖项和它们的版本信息3. 运行脚本命令 《Node.js从入门到精通&#xff08;软件开发视频大讲堂&…

09-生成器模式(Builder)模式

意图 将一个复杂对象的构建与它的表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。 理解 如果构建一个对象的的过程会比较复杂&#xff0c;或者说在写代码的过程中&#xff0c;需要比较频繁地构建某个对象&#xff0c;那么可以针对这个对象写一个专门用于构建这…

2024前端炫酷源码分享(附效果图及在线演示)

分享10款非常有趣的前端特效源码 其中包含css动画特效、js原生特效、svg特效以及小游戏等 下面我会给出特效样式图或演示效果图 但你也可以点击在线预览查看源码的最终展示效果及下载源码资源 GSAP-火箭动画特效 GSAP 火箭动画 当氮气充足的情况下 火箭会冲出 并继续飞行 图片…