Flink Rest Basic Auth - 安全认证

背景

公司目前需要将Flink实时作业云化,构建多租户实时计算平台。目前考虑为了资源高效利用,并不打算为每个租户部署一套独立的Kubernetes集群。也就意味着多个租户的作业可能会运行在同一套kubernets集群中。此时实时作业的任务就变的很危险,因为网络可能是通的,就会存在危险的REST API暴露出去,被一些不坏好意的人利用,从而影响其他租户的作业。鉴于此考虑给Flink的作业添加一个认证方式,可以是Kerberos或者是Http 用户名密码Baisc认证。各种搜索和询问,最终发现了一些线索FLIP-181: Custom netty HTTP request inbound/outbound handlers 这里描述了为何flink官方否定这个诉求。当然不要着急,笔者在flink-basic-auth-handler上找到了方案,并且成功将方案迁移到了flink-1.17.2版本中。

改造步骤

Flink 的JobManager/SQLGateway是基于Netty实现的一套轻量级的web服务接口,这些接口都实现了RestServerEndpoint抽象类。因此我们可以看看这个类start方法中可以看到在启动的代码中可以看到InboundChannelHandlerFactory这个东西,通过改Factory创建一个Inbound的hander。

public final void start() throws Exception {synchronized (lock) {Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");log.info("Starting rest endpoint.");final Router router = new Router();final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();handlers = initializeHandlers(restAddressFuture);/* sort the handlers such that they are ordered the following:* /jobs* /jobs/overview* /jobs/:jobid* /jobs/:jobid/config* /:**/Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);checkAllEndpointsAndHandlersAreUnique(handlers);handlers.forEach(handler -> registerHandler(router, handler, log));ChannelInitializer<SocketChannel> initializer =new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws ConfigurationException {RouterHandler handler = new RouterHandler(router, responseHeaders);// SSL should be the first handler in the pipelineif (isHttpsEnabled()) {ch.pipeline().addLast("ssl",new RedirectingSslHandler(restAddress,restAddressFuture,sslHandlerFactory));}ch.pipeline().addLast(new HttpServerCodec()).addLast(new FileUploadHandler(uploadDir)).addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders));for (InboundChannelHandlerFactory factory :inboundChannelHandlerFactories) {Optional<ChannelHandler> channelHandler =factory.createHandler(configuration, responseHeaders);if (channelHandler.isPresent()) {ch.pipeline().addLast(channelHandler.get());}}ch.pipeline().addLast(new ChunkedWriteHandler()).addLast(handler.getName(), handler).addLast(new PipelineErrorHandler(log, responseHeaders));}};NioEventLoopGroup bossGroup =new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));NioEventLoopGroup workerGroup =new NioEventLoopGroup(0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(initializer);Iterator<Integer> portsIterator;try {portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);} catch (IllegalConfigurationException e) {throw e;} catch (Exception e) {throw new IllegalArgumentException("Invalid port range definition: " + restBindPortRange);}int chosenPort = 0;while (portsIterator.hasNext()) {try {chosenPort = portsIterator.next();final ChannelFuture channel;if (restBindAddress == null) {channel = bootstrap.bind(chosenPort);} else {channel = bootstrap.bind(restBindAddress, chosenPort);}serverChannel = channel.syncUninterruptibly().channel();break;} catch (final Exception e) {// syncUninterruptibly() throws checked exceptions via Unsafe// continue if the exception is due to the port being in use, fail early// otherwiseif (!(e instanceof java.net.BindException)) {throw e;}}}if (serverChannel == null) {throw new BindException("Could not start rest endpoint on any port in port range "+ restBindPortRange);}log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();final String advertisedAddress;if (bindAddress.getAddress().isAnyLocalAddress()) {advertisedAddress = this.restAddress;} else {advertisedAddress = bindAddress.getAddress().getHostAddress();}port = bindAddress.getPort();log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();restAddressFuture.complete(restBaseUrl);state = State.RUNNING;startInternal();}}

然后在构造函数中可以发现inboundChannelHandlerFactories对象是通过SPI方案加载进来的。

 public RestServerEndpoint(Configuration configuration)throws IOException, ConfigurationException {Preconditions.checkNotNull(configuration);RestServerEndpointConfiguration restConfiguration =RestServerEndpointConfiguration.fromConfiguration(configuration);Preconditions.checkNotNull(restConfiguration);this.configuration = configuration;this.restAddress = restConfiguration.getRestAddress();this.restBindAddress = restConfiguration.getRestBindAddress();this.restBindPortRange = restConfiguration.getRestBindPortRange();this.sslHandlerFactory = restConfiguration.getSslHandlerFactory();this.uploadDir = restConfiguration.getUploadDir();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/24168.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

106.网络游戏逆向分析与漏洞攻防-装备系统数据分析-在UI中显示装备与技能信息

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 如果看不懂、不知道现在做的什么&#xff0c;那就跟着做完看效果&#xff0c;代码看不懂是正常的&#xff0c;只要会抄就行&#xff0c;抄着抄着就能懂了 内容…

新媒体暴力起号必备因素!沈阳新媒体运营培训学校

1周涨粉10w&#xff1f;这对普通人来说可以说是天文数字&#xff0c;但只要掌握方式方法&#xff0c;普通人也能做到&#xff01; 面试经验丰富的人都深知&#xff0c;给面试官留下的第一印象相当重要&#xff0c;几乎决定了80%的面试机会。标题也是如此&#xff0c;在完成一篇…

Android SBL是什么

Android SBL&#xff08;Secondary Bootloader&#xff09;是Android系统中一个关键的组成部分&#xff0c;它属于Bootloader的二级引导程序。以下是关于Android SBL的详细解释&#xff1a; 定义&#xff1a; SBL是Secondary Bootloader的缩写&#xff0c;中文称为第二级引导程…

中国剩余定理学习

中国剩余定理( C R T CRT CRT)及其扩展( E X C R T EXCRT EXCRT)详解 基本形式 中国剩余定理给出了以下的一元线性同余方程组的解&#xff1a; { x ≡ a 1 ( m o d m 1 ) x ≡ a 2 ( m o d m 2 ) ⋮ x ≡ a n ( m o d m n ) \begin{cases} x \equiv a_1 \pmod{m_1} \\ x \eq…

力扣209.长度最小的数组

力扣209.长度最小的数组 模版滑窗求最小 class Solution {public:int minSubArrayLen(int target, vector<int>& nums) {int n nums.size(),resn1;int sum 0;for(int i0,j0;i<n;i){sum nums[i];//尽可能缩小区间while(sum - nums[j] > target){sum - num…

[经验] 蝉联一词的含义是什么 #知识分享#职场发展

蝉联一词的含义是什么 蝉联这个词起源于古代中国&#xff0c;最初是指天子连续两年以上的年号相同。后来&#xff0c;这个词被用于形容某个人或某个团体连续多次获得某种荣誉或奖项的情况。在现代生活中&#xff0c;我们常常听到某个体育运动员蝉联冠军、某个企业蝉联业绩排行榜…

荆州餐饮环保在行动:清洗油烟净化器,守护城市环境

我最近分析了餐饮市场的油烟净化器等产品报告&#xff0c;解决了餐饮业厨房油腻的难题&#xff0c;更加方便了在餐饮业和商业场所有需求的小伙伴们。 在荆州&#xff0c;餐饮业不仅是美食爱好者的天堂&#xff0c;更是城市生活的重要组成部分。然而&#xff0c;随着餐饮业的发…

基于拓扑漏洞分析的网络安全态势感知模型

漏洞态势分析是指通过获取网络系统中的漏洞信息、拓扑信息、攻击信息等&#xff0c;分析网络资产可能遭受的安全威胁以及预测攻击者利用漏洞可能发动的攻击&#xff0c;构建拓扑漏洞图&#xff0c;展示网络中可能存在的薄弱环节&#xff0c;以此来评估网络安全状态。 在网络安…

科普|大数据风险检测对申贷人有哪些好处?

大数据风险检测可以极大地提高金融机构在用户肖像、反欺诈和信用评级等方面的效率和风险控制能力&#xff0c;这是金融企业发展过程中必须结合的一种科技技术。大数据风险检测覆盖信贷领域的所有流程&#xff0c;从客户获取到身份验证&#xff0c;再到信贷中和信贷后。因此&…

Ruby语言与Python:深度比较与独特魅力探索

Ruby语言与Python&#xff1a;深度比较与独特魅力探索 在编程语言的浩瀚海洋中&#xff0c;Ruby和Python无疑是两颗璀璨的明星。它们各自拥有独特的魅力和广泛的应用领域。本文将从四个方面、五个方面、六个方面和七个方面&#xff0c;深入探讨Ruby语言和Python的异同&#xf…

sudo快可以在Windows中使用了,以下是它的内容和使用方法

sudo命令受到Linux用户的尊敬。它允许你以另一个用户的身份运行命令,通常是管理员(或者Linux中的root用户),所以正如你所想象的,它几乎经常被开发人员、技术支持代理和系统管理员使用。现在,你可以在Windows中使用它! 为什么sudo如此受人尊敬 sudo命令允许你运行任务,…

电赛报告书写

一、总体要求 &#xff08;1&#xff09;摘要&#xff1a;一页&#xff0c;小于300字 &#xff08;2&#xff09;正文&#xff1a;不超过8页 &#xff08;3&#xff09;附录&#xff1a;可以没有&#xff0c;但是不能超过2页 二、摘要书写 摘要要小于等于300字&#xff0c…

使用kubectl apply deployment 支持指定deployment 的更新脚本

使用kubectl apply deployment &#xff0c;指定deployment&#xff0c;默认更新他第一个container的镜像&#xff0c;并且给出了更新该container某些env的样例&#xff1a; DEPLOYMENT_NAME"xxx" NAMESPACEXXX xxx_ENV_VALxxxx IMAGE_VERSIONV1.0 container_name&q…

实现飞书机器人推送消息到指定群组或者用户

实现飞书机器人推送消息到指定群组或者用户 1 简介2 创建飞书应用2.1 注册登录2.2 创建应用2.3 添加应用能力2.4 权限管理3 发布应用4 代码示例4.1 获取应用ID与token4.2 使用Python SDK4.3 简单示例4.4 获取用户或机器人所在的群列表4.5 通过手机号或邮箱获取用户 ID4.6 给群组…

生活使用英语口语柯桥外语学校成人英语学习

● “自来水”英语怎么说&#xff1f; ● “自来水”的英语表达是&#xff1a;Running water或者Tap water. 例句&#xff1a; There are hot and cold running water in all the bedrooms. 所有的卧室里都有冷热自来水。 ● “热水”英文怎么水&#xff1f; ● 我们不管…

C++设计模式——Adapter适配器模式

一&#xff0c;适配器模式简介 适配器模式是一种结构型设计模式&#xff0c;用于将已有接口转换为调用者所期望的另一种接口。 适配器模式让特定的API接口可以适配多种场景。例如&#xff0c;现有一个名为"Reader()"的API接口只能解析txt格式的文件&#xff0c;给这…

【css3】png图片实现动态动画

.border_style {width: 400px;height: 400px;background-color: black;margin: auto;}keyframes sprite-animation {0% {background-position: 0 0;}100% {background-position: 0 -2064px;/* 假设每个图像的宽度为100px */}}.wrj_box {width: 86px;height: 86px;background-im…

Android的SELinux详解

标签: Android的SELinux详解; SELinux;Enforcing; Android的SELinux详解 概述 SELinux(Security-Enhanced Linux)是一个Linux内核模块和用户空间工具的集合,提供强制访问控制(MAC)机制。Android引入SELinux以增强系统的安全性,通过限制进程的权限来减少安全漏洞的…

STL中stack和queue模拟实现+容器适配器

目录 容器适配器 STL标准库中stack和queue的底层结构 deque的简单介绍 deque的缺陷 为什么选择deque作为stack和queue的底层默认容器 stack的模拟实现 queue的模拟实现 容器适配器 适配器是一种设计模式&#xff08;设计模式是一套被反复使用的&#xff0c;多数人知晓…

OpenAI模型规范概览

这是OpenAI对外分享的模型规范文档&#xff08;Model Spec&#xff09;&#xff0c;它定义了OpenAI希望在API接口和ChatGPT&#xff08;含GPT系列产品&#xff09;中模型的行为方式&#xff0c;这也是OpenAI超级对齐团队奉行的行为准则&#xff0c;希望能对国内做RLHF的同学有帮…