k8s 搭建基于session模式的flink集群

1.flink集群搭建

不废话直接上代码,都是基于官网的,在此记录一下 Kubernetes | Apache Flink

flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122jobmanager.memory.process.size: 1600mtaskmanager.memory.process.size: 1728mparallelism.default: 2    log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.pekko.name = org.apache.pekkologger.pekko.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF    

jobmanager-service.yaml Optional service, which is only necessary for non-HA mode.

apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager

Session cluster resource definitions #

jobmanager-session-deployment-non-ha.yaml

apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanager
spec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: apache/flink:latestargs: ["jobmanager"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/confsecurityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties

taskmanager-session-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: apache/flink:latestargs: ["taskmanager"]ports:- containerPort: 6122name: rpclivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf/securityContext:runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties

 kubectl apply -f xxx.yaml 或者 kubectl apply -f ./flink  flink为文件夹,存放的是以上这几个.yaml文件

为flink的ui界面添加nodeport即可外部访问

2. demo代码测试

创建一个maven工程,pom.xml引入依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>test-platform</artifactId><groupId>com.test</groupId><version>2.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>flink-demo</artifactId><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version><log4j.version>2.20.0</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><scope>compile</scope><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><scope>compile</scope><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><scope>compile</scope><version>${log4j.version}</version></dependency></dependencies></project>

log4j2.xml:

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5"><Properties><property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /><!-- LOG_LEVEL 配置你需要的日志输出级别       --><property name="LOG_LEVEL" value="INFO" /></Properties><appenders><console name="Console" target="SYSTEM_OUT"><PatternLayout pattern="${LOG_PATTERN}"/><ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/></console></appenders><loggers><root level="${LOG_LEVEL}"><appender-ref ref="Console"/></root></loggers></configuration>

计数代码:

package com.test.flink;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountUnboundStreamDemo {public static void main(String[] args) throws Exception {// TODO 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
//                3, // 尝试重启的次数
//                Time.of(10, TimeUnit.SECONDS) // 间隔
//        ));// TODO 2.读取数据DataStreamSource<String> lineDS = env.socketTextStream("192.168.0.28", 7777);// TODO 3.处理数据: 切分、转换、分组、聚合// TODO 3.1 切分、转换SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = lineDS //<输入类型, 输出类型>.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 按照 空格 切分String[] words = value.split(" ");for (String word : words) {// 转换成 二元组 (word,1)Tuple2<String, Integer> wordsAndOne = Tuple2.of(word, 1);// 通过 采集器 向下游发送数据out.collect(wordsAndOne);}}});// TODO 3.2 分组KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});// TODO 3.3 聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);// TODO 4.输出数据sumDS.print("接收到的数据=======").setParallelism(1);// TODO 5.执行:类似 sparkstreaming最后 ssc.start()env.execute(sumDS.getClass().getSimpleName());}}

打成jar包导入flink dashboard:

在另一台机器上运行 nc -lk -p 7777,如果出现连接拒绝,查看是否放开端口号

k8s查看读取到的数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/69470.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

postgresql-条件表达式

postgresql-条件表达式 简单Case表达式搜索Case表达式缩写函数总结 简单Case表达式 select e.first_name , e.last_name , e.department_id , case e.department_id when 90 then 管理when 60 then 开发else 其他end as "部门" from cps.public.employees e ;-- 统…

面试:谈一下你对Nginx的理解

Nginx是什么&#xff1a;Nginx是一个高性能、开源的Web服务器和反向代理服务器&#xff0c;以其卓越的性能和可扩展性而闻名。它通常用于将客户端请求转发到后端服务器、提供静态文件服务和负载均衡。 高性能和高并发&#xff1a;Nginx的异步事件驱动架构使其能够有效地处理大…

Ubuntu22.04安装ROS

Ubuntu22.04安装ROS_笔记大全_设计学院 Excerpt 在安装ROS之前&#xff0c;需要先安装Ubuntu22.04操作系统。您可以从Ubuntu官网下载Ubuntu22.04的最新版本镜像文件&#xff0c;并创建一个可启动的USB。您可以参考以下步骤&#xff1a; 一、安装Ubuntu22.04操作系统 在安装ROS…

Vue笔记

第一章&#xff1a;Vue环境搭建 1.搭建Vue环境 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><!-- 1.引入Vue.js--><script src"1.vue.js"></scr…

软件生命周期及流程

软件生命周期&#xff1a; 软件生命周期(SDLC&#xff0c;Systems Development Life Cycle)是软件开始研制到最终被废弃不用所经历的各个阶段. 需求分析阶段--输出需求规格说明书&#xff08;原型图&#xff09; 测试介入的晚--回溯成本高 敏捷开发模型&#xff1a; 从1990年…

php常用加密算法大全aes、3des、rsa等

目录 一、可解密加解密算法 1、aes 加解密算法 2、旧3des加解密方法 3、新3des加解密方法 4、rsa公私钥加解密、签名验签方法 5、自定义加密算法1 6、自定义加密算法2 7、自定义加密算法3 二、不可解密加密算法 1、md5算法 2、crypt算法 3、sha1算法 5、hash 算…

ifstream之seekg/tellg

声明&#xff1a;我个人特别讨厌&#xff1a;收费专栏、关注博主才可阅读等行为&#xff0c;推崇知识自由分享&#xff0c;推崇开源精神&#xff0c;呼吁你一起加入&#xff0c;大家共同成长进步&#xff01; 在文件读写的时候&#xff0c;一般需要借助fstream来进行文件操作&a…

STM32CUBEMX_创建时间片轮询架构的软件框架

STM32CUBEMX_创建时间片轮询架构的软件框架 说明&#xff1a; 1、这种架构避免在更新STM32CUBEMX配置后把用户代码清除掉 2、利用这种时间片的架构可以使得代码架构清晰易于维护 创建步骤&#xff1a; 1、使用STM32CUBEMX创建基础工程 2、新建用户代码目录 3、构建基础的代码框…

LeetCode 2511. 最多可以摧毁的敌人城堡数目

【LetMeFly】2511.最多可以摧毁的敌人城堡数目 力扣题目链接&#xff1a;https://leetcode.cn/problems/maximum-enemy-forts-that-can-be-captured/ 给你一个长度为 n &#xff0c;下标从 0 开始的整数数组 forts &#xff0c;表示一些城堡。forts[i] 可以是 -1 &#xff0c…

OA与CRM与ORACLE

办公自动化&#xff08;Office Automation&#xff0c;简称OA&#xff09;&#xff0c;是将计算机、通信等现代化技术运用到传统办公方式&#xff0c;进而形成的一种新型办公方式。办公自动化利用现代化设备和信息化技术&#xff0c;代替办公人员传统的部分手动或重复性业务活动…

OpenLdap +PhpLdapAdmin + Grafana docker-compose部署安装

目录 一、OpenLdap介绍 二、PhpLdapAdmin介绍 三、使用docker-compose进行安装 1. docker-compose.yml 2. grafana配置文件 3. provisioning 四、安装openldap、phpldapadmin、grafana 五、配置OpenLDAP 1. 登陆PhpLdapAdmin web管理 2. 需要注意的细节 内容介绍参考…

Java作业3

1.下面代码的运行结果是&#xff08;C&#xff09; public static void main(String[] args){String s;System.out.println("s"s);}A.代码编程成功&#xff0c;并输出”s” B.代码编译成功&#xff0c;并输出”snull” C.由于String s没有初始化&#xff0c;代码不…

spingboot按照依赖包除了maven还有Gradle,两者的区别?

Maven和Gradle是两种常用的构建工具&#xff0c;用于管理Java项目的依赖关系和构建过程。它们之间的区别如下&#xff1a; 语法&#xff1a;Maven使用XML作为构建文件的格式&#xff0c;而Gradle使用基于Groovy或Kotlin的领域特定语言&#xff08;DSL&#xff09;。 灵活性&am…

运维监控背景信息

监控需求来源 刚开始的需求就是出了问题&#xff0c;我们可以精确感知到。 后来的需求扩展为&#xff1a; 通过监控了解数据趋势&#xff0c;知道系统在未来的某个时刻可能出问题&#xff0c;预知问题。 通过监控了解系统的水位情况&#xff0c;为服务扩缩容提供数据支撑。 通…

python基础运用例子

python基础运用例子 1、⼀⾏代码交换 a , b &#xff1a;a, b b, a2、⼀⾏代码反转列表 l[::-1]3、合并两个字典 res {**dict1, **dict2}**操作符合并两个字典for循环合并dict(a, **b) 的方式dict(a.items() b.items()) 的方式dict.update(other_dict) 的方式 4、⼀⾏代码列…

Golang - go build打包文件

Go编译打包文件 1、简单打包 程序 main1.go&#xff1a; package mainimport "fmt"func main() {fmt.Println("Hello World!") } 打包&#xff1a; # 在linux服务上执行下面的3个命令 # linux平台,生成main1可执行程序 CGO_ENABLED0 GOOSlinux GOARCHam…

Tomcat 集群介绍

一.Tomcat 集群介绍 在实际生产环境中&#xff0c;单台 Tomcat 服务器的负载能力或者说并发能力在四五百左右。大 部分情况下随着业务增长&#xff0c;访问量的增加(并发量不止四五百)&#xff0c;单台 Tomcat 服务器是 无法承受的。这时就需要将多台 Tomcat 服务器组织起来&a…

【多尺度双域引导网络:Pan-sharpening】

Multi-Scale Dual-Domain Guidance Network for Pan-sharpening &#xff08;用于泛锐化的多尺度双域引导网络&#xff09; 全色锐化的目标是在纹理丰富的全色图像的指导下&#xff0c;通过超分辨低空间分辨率多光谱图像&#xff08;LRMS&#xff09;的对应物产生高空间分辨率…

语音特征提取与预处理

导入相关包 import librosa import librosa.display import soundfile as sf import numpy as np import matplotlib.pyplot as plt from playsound import playsound 语音读取与显示 file_path test1.wav data, fs librosa.load(file_path, srNone, monoTrue) librosa.d…

数学建模--整数规划匈牙利算法的Python实现

目录 1.算法流程简介 2.算法核心代码 3.算法效果展示 1.算法流程简介 #整数规划模型--匈牙利算法求解 """ 整数规划模型及概念&#xff1a;规划问题的数学模型一般由三个因素构成 决策变量 目标函数 约束条件&#xff1b;线性规划即以线性函数为目标函数&a…