Flink on k8s之historyServer

1.Flink HistoryServer用途
HistoryServer可以在Flink 作业终止运行(Flink集群关闭)之后,还可以查询已完成作业的统计信息。此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。

2.部署Flink HistoryServer
1、创建 flink historyserver pvc,保存Flink作业归档数据。

[root@k8s-demo001 ~]# cat flink-historyserver-pvc.yaml
#Flink Historyserver 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:name: flink-historyserver-pvc  # historyserver pvc名称namespace: flink   # 指定归属的名命空间
spec:storageClassName: nfs-storage   #sc名称,更改为实际的sc名称accessModes:- ReadWriteMany   #采用ReadWriteMany的访问模式resources:requests:storage: 1Gi    #存储容量,根据实际需要更改
[root@k8s-demo001 ~]# kubectl apply -f flink-historyserver-pvc.yaml

2、配置flink historyserver,创建flink historyserver configmap

[root@k8s-demo001 ~]# cat flink-historyserver-conf.yaml
kind: ConfigMap
apiVersion: v1
metadata:name: flink-historyserver-confnamespace: flinkannotations:kubesphere.io/creator: admin
data:flink-conf.yaml: |blob.server.port: 6124kubernetes.jobmanager.annotations: flinkdeployment.flink.apache.org/generation:2kubernetes.jobmanager.replicas: 1kubernetes.jobmanager.cpu: 1.0$internal.flink.version: v1_13kubernetes.taskmanager.cpu: 1.0jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122kubernetes.service-account: flinkkubernetes.cluster-id: flink-historyserverkubernetes.container.image: flink-hdfs:1.13.6parallelism.default: 2kubernetes.namespace: flinktaskmanager.numberOfTaskSlots: 2kubernetes.rest-service.exposed.type: ClusterIPkubernetes.operator.reconcile.interval: 15 skubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTEkubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactoryjobmanager.memory.process.size: 1024mtaskmanager.memory.process.size: 1024mkubernetes.internal.jobmanager.entrypoint.class: org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypointkubernetes.pod-template-file: /tmp/flink_op_generated_podTemplate_17272077926352838674.yamlexecution.target: kubernetes-sessionjobmanager.archive.fs.dir: file:///opt/flink/flink_historyhistoryserver.archive.fs.dir: file:///opt/flink/flink_historyhistoryserver.archive.fs.refresh-interval: 10000historyserver.web.port: 8082web.tmpdir: /opt/flink/webuploadweb.upload.dir: /opt/flink/webuploadweb.cancel.enable: falseinternal.cluster.execution-mode: NORMALqueryable-state.proxy.ports: 6125state.checkpoints.dir: file:///opt/flink/checkpointslog4j.properties: |# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.monitorInterval=30# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.file.ref = MainAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFOlogger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3logger.shaded_zookeeper.level = INFO# Log all infos in the given fileappender.main.name = MainAppenderappender.main.type = RollingFileappender.main.append = trueappender.main.fileName = ${sys:log.file}appender.main.filePattern = ${sys:log.file}.%iappender.main.layout.type = PatternLayoutappender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.main.policies.type = Policiesappender.main.policies.size.type = SizeBasedTriggeringPolicyappender.main.policies.size.size = 100MBappender.main.policies.startup.type = OnStartupTriggeringPolicyappender.main.strategy.type = DefaultRolloverStrategyappender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFFlog4j-console.properties: |# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF# Flink Deployment Logging Overrides# rootLogger.level = DEBUG
[root@k8s-demo001 ~]# kubectl apply -f flink-historyserver-conf.yaml

检查
在这里插入图片描述
3、创建Historyserver服务

[root@k8s-demo001 ~]# cat flink-historyserver.yaml
apiVersion: apps/v1
kind: Deployment
metadata:namespace: flinklabels:app: flink-historyservername: flink-historyservername: flink-historyserver
spec:replicas: 1selector:matchLabels:name: flink-historyservertemplate:metadata:namespace: flinklabels:app: flink-historyservername: flink-historyserverspec:hostAliases:  # hosts配置- ip: "172.16.252.129"hostnames:- "Kafka-01"- ip: "172.16.252.130"hostnames:- "Kafka-02"- ip: "172.16.252.131"hostnames:- "Kafka-03"containers:- name: flink-historyserverenv:- name: TZvalue: Asia/Shanghaiimage: flink:1.13.6command: [ 'sh','-c','/docker-entrypoint.sh history-server' ]ports:- containerPort: 8082volumeMounts:- name: flink-historyserver-confmountPath: /opt/flink/conf/flink-conf.yamlsubPath: flink-conf.yaml- name: flink-historyserver-confmountPath: /opt/flink/conf/log4j.propertiessubPath: log4j.properties- name: flink-historyserver-confmountPath: /opt/flink/conf/log4j-console.propertiessubPath: log4j-console.properties- name: flink-historyservermountPath: /opt/flink/flink_historyvolumes:  # 挂载卷配置- name: flink-historyserver-confconfigMap:name: flink-historyserver-conf- name: flink-historyserverpersistentVolumeClaim:claimName: flink-historyserver-pvc
# ---
# kind: Service
# apiVersion: v1
# metadata:
#   namespace: flink
#   name: flink-historyserver
# spec:
#   type: NodePort
#   ports:
#     - port: 8082
#       nodePort: 31082
#   selector:
#     name: flink-historyserver# ingress按实际情况配置
---
apiVersion: v1
kind: Service
metadata:labels:app: flink-historyservername: flink-historyservername: flink-historyservernamespace: flink
spec:selector:app: flink-historyserverports:- port: 8082protocol: TCPtargetPort: 8082
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:namespace: flinkname: flink-historyserverannotations:nginx.ingress.kubernetes.io/default-backend: ingress-nginx-controllernginx.ingress.kubernetes.io/use-regex: 'true'
spec:ingressClassName: nginxrules:- host: "flink-hs.k8s.io"http:paths:- pathType: Prefixpath: "/"backend:service:name: flink-historyserverport:number: 8082
[root@k8s-demo001 ~]# kubectl apply -f flink-historyserver.yaml

验证:
在这里插入图片描述
访问Flink UI:
http://flink-hs.k8s.io/
在这里插入图片描述
3.提交flink作业
1、编写提交作业的yaml

这里需要挂在Historyserver的pvc,并配置Historyserver的归档路径到pvc挂载路径

[root@k8s-demo001 ~]# cat application-deployment-checkpoint-ha-hs.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:namespace: flinkname: application-deployment-checkpoint-ha-hs  # flink 集群名称
spec:image: flink:1.13.6  # flink基础镜像flinkVersion: v1_13  # flink版本,选择1.13imagePullPolicy: IfNotPresent  # 镜像拉取策略,本地没有则从仓库拉取ingress:   # ingress配置,用于访问flink web页面template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"className: "nginx"annotations:nginx.ingress.kubernetes.io/rewrite-target: "/$2"flinkConfiguration:taskmanager.numberOfTaskSlots: "2"state.checkpoints.dir: file:///opt/flink/checkpointshigh-availability.type: kuberneteshigh-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory # JobManager HAhigh-availability.storageDir: file:///opt/flink/flink_recovery  # JobManager HA数据保存路径jobmanager.archive.fs.dir: file:///opt/flink/flink_history      # JobManager 归档路径historyserver.archive.fs.dir: file:///opt/flink/flink_history      # Historyserver 归档路径historyserver.archive.fs.refresh-interval: "10000"              # Historyserver 文件刷新间隔serviceAccount: flinkjobManager:replicas: 2  # HA下, jobManger的副本数要大于1resource:memory: "1024m"cpu: 1taskManager:resource:memory: "1024m"cpu: 1podTemplate:spec:hostAliases:- ip: "172.16.252.129"hostnames:- "Kafka-01"- ip: "172.16.252.130"hostnames:- "Kafka-02"- ip: "172.16.252.131"hostnames:- "Kafka-03"containers:- name: flink-main-containerenv:- name: TZvalue: Asia/ShanghaivolumeMounts:- name: flink-jar  # 挂载nfs上的jarmountPath: /opt/flink/jar- name: flink-checkpoints  # 挂载checkpoint pvcmountPath: /opt/flink/checkpoints- name: flink-log  # 挂载日志 pvcmountPath: /opt/flink/log- name: flink-ha    # HA pvc配置mountPath: /opt/flink/flink_recovery- name: flink-historyservermountPath: /opt/flink/flink_historyvolumes:- name: flink-jarpersistentVolumeClaim:claimName: flink-jar-pvc- name: flink-checkpointspersistentVolumeClaim:claimName: flink-checkpoint-application-pvc- name: flink-logpersistentVolumeClaim:claimName: flink-log-pvc- name: flink-hapersistentVolumeClaim:claimName: flink-ha-pvc- name: flink-historyserverpersistentVolumeClaim:claimName: flink-historyserver-pvcjob:jarURI: local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar # 使用pv方式挂载jar包entryClass: org.fblinux.StreamWordCountWithCPargs:   # 传递到作业main方法的参数- "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092"- "flink_test"- "172.16.252.113"- "3306"- "flink_test"- "wc"- "file:///opt/flink/checkpoints"- "10000"- "1"parallelism: 1upgradeMode: stateless
[root@k8s-demo001 ~]# kubectl apply -f application-deployment-checkpoint-ha-hs.yaml

作业提交之后,可以手动往Kafka 写入一些数据,然后关闭作业
作业运行中historyserver是没有信息的,作业终止后history service才会查询到相关信息

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/676356.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探索设计模式的魅力:代理模式揭秘-软件世界的“幕后黑手”

设计模式专栏:http://t.csdnimg.cn/U54zu 目录 引言 一、魔法世界 1.1 定义与核心思想 1.2 静态代理 1.3 动态代理 1.4 虚拟代理 1.5 代理模式结构图 1.6 实例展示如何工作(场景案例) 不使用模式实现 有何问题 使用模式重构示例 二、…

STM32 适合人群

STM32 适合各种需要进行嵌入式系统开发的人群,具体如下: 嵌入式系统工程师:嵌入式系统工程师可以使用 STM32 进行系统设计、硬件和软件编程、测试和部署等工作。学生和研究人员:学生和研究人员可以使用 STM32 进行实验、学习和研…

【Java万花筒】实时洞察与智能分析:构建成熟的Java应用程序监控与日志处理方案

全方位监控与可视化:JMX、Spring Boot Admin和Kibana的强大功能与实践技巧 欢迎订阅专栏:Java万花筒 文章目录 全方位监控与可视化:JMX、Spring Boot Admin和Kibana的强大功能与实践技巧前言1. JMX(Java Management Extensions&a…

SECS/GEM300需要实现哪些内容

GEM300实现设备全自动化,也是金南瓜已经全面支持功能,作为国内首家和最好的300mm标准软件。 GEM300包含E4、E5、E30、E37、E39、E40、E84、E87、E90、E94、E116等 CJob全称Conrtol Job 1. 控制设备作业的控制 2. 包括队列、开始、暂停、继续、完成等等…

STL之stack+queue的使用及其实现

STL之stackqueue的使用及其实现 1. stack,queue的介绍与使用1.1stack的介绍1.2stack的使用1.3queue的介绍1.4queue的使用 2.stack,queue的模拟实现2.1stack的模拟是实现2.2queue的模拟实现 3.总结 所属专栏:C“嘎嘎" 系统学习❤️ &…

SPSS基础操作:对数据按照样本观测值进行排序

在整理数据资料或者查看分析结果时,我们通常希望样本观测值能够按照某一变量的大小进行升序或者降序排列,比如我们想按照学生的学习成绩进行排序,按照销售额的大小对各个便利店进行排序等。以本章附带的数据4为例,如果要按照y4体重…

139. 单词拆分

139. 单词拆分 题目链接&#xff1a;139. 单词拆分 代码如下&#xff1a; //动态规划 //参考leetcode官方题解 class Solution { public:bool wordBreak(string s, vector<string>& wordDict) {unordered_set<string> word(wordDict.begin(),wordDict.end())…

mybatis-plus的批量修改源码遇到的问题

看了下mybatis-plus的批量修改源码&#xff0c;然后就理解了mybatis的一级&#xff0c;二级缓存 1.一级缓存是放在sqlsession中的&#xff0c;他里面有一个map对象&#xff0c;key是et字符串&#xff0c;value就是对象 2.每一个sqlsession都是私有的&#xff0c;不会共享 3.当…

ChatGPT升级至GPT-4 Turbo:性能升级同时更为经济

ChatGPT升级至GPT-4 Turbo&#xff1a;性能升级同时更为经济 随着技术的进步&#xff0c;ChatGPT迎来了其最新的迭代版本——GPT-4 Turbo。这一最新版本不仅在性能上得到了显著提升&#xff0c;而且在成本效益上也更加引人注目。 性能提升 GPT-4 Turbo带来了诸多改进&#x…

Linux探秘之旅:透彻理解路径、命令与系统概念

目录 如何远程连接 远程登录简明指南 linux区别 1.严格区分大小写 2.linux的命令返回结果判断 3.如何查看网络信息 4.关于后缀名&#xff08;Linux不关心文件后缀&#xff09; 4.1 需要记忆的后缀 5.echo命令 6.linux一切皆文件 6.1比如磁盘的文件 6.2可执行文件 …

SpringBoot配置文总结

官网配置手册 官网&#xff1a;https://spring.io/ 选择SpringBoot 选择LEARN 选择 Application Properties 配置MySQL数据库连接 针对Maven而言&#xff0c;会搜索出两个MySQL的连接驱动。 com.mysql mysql-connector-j 比较新&#xff0c;是在mysql mysql-connect…

【微机原理与单片机接口技术】MCS-51单片机的引脚功能介绍

前言 MCS-51是指由美国Intel公司生产的一系列单片机的总称。MCS-51系列单片机型号有很多&#xff0c;按功能分位基本型和增强型两大类&#xff0c;分别称为8051系列单片机和8052系列单片机&#xff0c;两者以芯片型号中的末位数字区分&#xff0c;1为基本型&#xff0c;2为增强…

springboot167基于springboot的医院后台管理系统的设计与实现

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获取资料方式 **项…

Sklearn、TensorFlow 与 Keras 机器学习实用指南第三版(七)

原文&#xff1a;Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 第十六章&#xff1a;使用 RNN 和注意力进行自然语言处理 当艾伦图灵在 1950 年想象他著名的Turing 测试时&#xff0c;他提出了…

linux(redhat)重置root密码

首先将root密码改成几乎不可能记住的密码 [rootexample ~]# echo fheowafuflaeijifehowf|passwd --stdin root Changing password for user root. passwd: all authentication tokens updated successfully.重启系统&#xff0c;进入救援模式 出现此页面&#xff0c;按e键 lin…

剑指offer——二维数组中的查找(杨氏矩阵)

目录 1. 题目描述2. 常见错误思路3. 分析3.1 特例分析3.2 规律总结 4. 完整代码 1. 题目描述 在一个二维数组中&#xff0c;每一行都按照从左到右递增的顺序排序&#xff0c;每一列都按照从上到下递增的顺序排序。请完成一个函数&#xff0c;输入这样的一个二维数组和一个整数&…

【Linux】——期末复习题(六)

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

java——学习并推荐java8

学习并推荐java8 为什么要学习java8我推荐使用lambda Java 8的核心新特性:Lambda(匿名函数)、流、默认方法。 Java 8于2019年1月停止「免费维护」&#xff0c;java的生态已经发生翻天覆地的变化&#xff0c;应该努力拥抱变化&#xff0c;才能避免被时代淘汰。 为什么要学习java…

【精选】java初识多态 多态调用成员的特点

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【python】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏…

【Redis笔记】分布式锁及4种常见实现方法

线程锁 主要用来给方法、代码块加锁。当某个方法或代码使用锁&#xff0c;在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果&#xff0c;因为线程锁的实现在根本上是依靠线程之间共享内存实现的&#xff0c;比如Synchronized、Lock等。 进程锁 控制同…