OpenYurt 之 Yurthub 数据过滤框架解析

简介:OpenYurt 是业界首个非侵入的边缘计算云原生开源项目,通过边缘自治,云边协同,边缘单元化,边缘流量闭环等能力为用户提供云边一体化的使用体验。在 Openyurt 里边缘网络可以使用数据过滤框架在不同节点池里实现边缘流量闭环能力。

作者:应健健,新华智云计算中心

OpenYurt 是业界首个非侵入的边缘计算云原生开源项目,通过边缘自治,云边协同,边缘单元化,边缘流量闭环等能力为用户提供云边一体化的使用体验。在 Openyurt 里边缘网络可以使用数据过滤框架在不同节点池里实现边缘流量闭环能力。

Yurthub 数据过滤框架解析

Yurthub 本质上是一层 kube-apiserver 的代理,在代理的基础上加了一层 cache,一来保证边缘节点离线的情况下可以使用本地 cache 保证业务稳定性,有效的解决了边缘自治的问题。二来可以降低大量的 list & watch 操作对云上 api 产生一定的负载。

Yurthub 的数据过滤通过节点上的 pod 以及 kubelet 的请求通过 Load Balancer 发送给 kube-apiserver,代理接收到响应消息进行数据过滤处理,之后再将过滤后的数据返回给请求方。如果节点是边缘节点会根据请求类型对响应请求体中的资源进行本地缓存,如果是云端节点考虑到网络状态良好不进行本地缓存。

Yurthub 的过滤框架实现原理图:

Yurthub 目前包含四种过滤规则,通过 addons 请求的 user-agent,resource,verb 判断经过那个过滤器进行相应的数据过滤。

四种过滤规则功能及实现

ServiceTopologyFilter

主要针对 EndpointSlice 资源进行数据过滤, 但 Endpoint Slice 特性需要在 Kubernetes v1.18 或以上版本才能支持,如果在 1.18 版本以下建议使用 endpointsFilter 过滤器。当经过该过滤器首先通过 kubernetes.io/service-name 找到 endpointSlice 资源所对应的 services 资源,之后判断 servces 资源是否存在 openyurt.io/topologyKeys 这个 Annotations,如果存在那么通过这个 Annotations 的值判断数据过滤规则,最后更新 response data 返回给 addons。

Annotations 的值分为两大类:

1、kubernetes.io/hostname:只过滤出相同节点的 endpoint ip

2、openyurt.io/nodepool 或者 kubernetes.io/zone: 通过这个 Annotations 获取对应节点池,最后遍历 endpointSlice 资源,通过 endpointSlice 里的 topology 字段中的 kubernetes.io/hostname 字段在 endpointSlice 对象里找到对应的 Endpoints,之后重组 endpointSlice 里的 Endpoints 后返回给 addons。 

代码实现:

func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice) *discovery.EndpointSlice {var serviceTopologyType string// get the service Topology typeif svcName, ok := endpointSlice.Labels[discovery.LabelServiceName]; ok {svc, err := fh.serviceLister.Services(endpointSlice.Namespace).Get(svcName)if err != nil {klog.Infof("skip reassemble endpointSlice, failed to get service %s/%s, err: %v", endpointSlice.Namespace, svcName, err)return endpointSlice}if serviceTopologyType, ok = svc.Annotations[AnnotationServiceTopologyKey]; !ok {klog.Infof("skip reassemble endpointSlice, service %s/%s has no annotation %s", endpointSlice.Namespace, svcName, AnnotationServiceTopologyKey)return endpointSlice}}var newEps []discovery.Endpoint// if type of service Topology is 'kubernetes.io/hostname'// filter the endpoint just on the local hostif serviceTopologyType == AnnotationServiceTopologyValueNode {for i := range endpointSlice.Endpoints {if endpointSlice.Endpoints[i].Topology[v1.LabelHostname] == fh.nodeName {newEps = append(newEps, endpointSlice.Endpoints[i])}}endpointSlice.Endpoints = newEps} else if serviceTopologyType == AnnotationServiceTopologyValueNodePool || serviceTopologyType == AnnotationServiceTopologyValueZone {// if type of service Topology is openyurt.io/nodepool// filter the endpoint just on the node which is in the same nodepool with current nodecurrentNode, err := fh.nodeGetter(fh.nodeName)if err != nil {klog.Infof("skip reassemble endpointSlice, failed to get current node %s, err: %v", fh.nodeName, err)return endpointSlice}if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {nodePool, err := fh.nodePoolLister.Get(nodePoolName)if err != nil {klog.Infof("skip reassemble endpointSlice, failed to get nodepool %s, err: %v", nodePoolName, err)return endpointSlice}for i := range endpointSlice.Endpoints {if inSameNodePool(endpointSlice.Endpoints[i].Topology[v1.LabelHostname], nodePool.Status.Nodes) {newEps = append(newEps, endpointSlice.Endpoints[i])}}endpointSlice.Endpoints = newEps}}return endpointSlice
}

EndpointsFilter

针对 endpoints 资源进行相应的数据过滤,首先判断 endpoint 是否存在对应的 service,通过 node 的 label: apps.openyurt.io/nodepool 获取节点池,之后获取节点池下的所有节点,遍历 endpoints.Subsets 下的资源找出同一个节点池的 Ready pod address 以及 NotReady pod address 重组成新的 endpoints 之后返回给 addons。

func (fh *endpointsFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints {svcName := endpoints.Name_, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName)if err != nil {klog.Infof("skip reassemble endpoints, failed to get service %s/%s, err: %v", endpoints.Namespace, svcName, err)return endpoints}// filter the endpoints on the node which is in the same nodepool with current nodecurrentNode, err := fh.nodeGetter(fh.nodeName)if err != nil {klog.Infof("skip reassemble endpoints, failed to get current node %s, err: %v", fh.nodeName, err)return endpoints}if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {nodePool, err := fh.nodePoolLister.Get(nodePoolName)if err != nil {klog.Infof("skip reassemble endpoints, failed to get nodepool %s, err: %v", nodePoolName, err)return endpoints}var newEpSubsets []v1.EndpointSubsetfor i := range endpoints.Subsets {endpoints.Subsets[i].Addresses = filterValidEndpointsAddr(endpoints.Subsets[i].Addresses, nodePool)endpoints.Subsets[i].NotReadyAddresses = filterValidEndpointsAddr(endpoints.Subsets[i].NotReadyAddresses, nodePool)if endpoints.Subsets[i].Addresses != nil || endpoints.Subsets[i].NotReadyAddresses != nil {newEpSubsets = append(newEpSubsets, endpoints.Subsets[i])}}endpoints.Subsets = newEpSubsetsif len(endpoints.Subsets) == 0 {// this endpoints has no nodepool valid addresses for ingress controller, return nil to ignore itreturn nil}}return endpoints
}

MasterServiceFilter

针对 services 下的域名进行 ip 以及端口替换,这个过滤器的场景主要在于边缘端的 pod 无缝使用 InClusterConfig 访问集群资源。

func (fh *masterServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {list, err := fh.serializer.Decode(b)if err != nil || list == nil {klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of masterServiceFilterHandler, %v", err)return b, nil}// return data un-mutated if not ServiceListserviceList, ok := list.(*v1.ServiceList)if !ok {return b, nil}// mutate master servicefor i := range serviceList.Items {if serviceList.Items[i].Namespace == MasterServiceNamespace && serviceList.Items[i].Name == MasterServiceName {serviceList.Items[i].Spec.ClusterIP = fh.hostfor j := range serviceList.Items[i].Spec.Ports {if serviceList.Items[i].Spec.Ports[j].Name == MasterServicePortName {serviceList.Items[i].Spec.Ports[j].Port = fh.portbreak}}klog.V(2).Infof("mutate master service into ClusterIP:Port=%s:%d for request %s", fh.host, fh.port, util.ReqString(fh.req))break}}// return the mutated serviceListreturn fh.serializer.Encode(serviceList)
}

DiscardCloudService

该过滤器针对两种 service 其中的一种类型是 LoadBalancer,因为边缘端无法访问 LoadBalancer 类型的资源,所以该过滤器会将这种类型的资源直接过滤掉。另外一种是针对 kube-system 名称空间下的 x-tunnel-server-internal-svc,这个 services 主要存在 cloud 节点用于访问 yurt-tunnel-server,对于 edge 节点会直接过滤掉该 service。

func (fh *discardCloudServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {list, err := fh.serializer.Decode(b)if err != nil || list == nil {klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of discardCloudServiceFilterHandler %v", err)return b, nil}serviceList, ok := list.(*v1.ServiceList)if ok {var svcNew []v1.Servicefor i := range serviceList.Items {nsName := fmt.Sprintf("%s/%s", serviceList.Items[i].Namespace, serviceList.Items[i].Name)// remove lb serviceif serviceList.Items[i].Spec.Type == v1.ServiceTypeLoadBalancer {if serviceList.Items[i].Annotations[filter.SkipDiscardServiceAnnotation] != "true" {klog.V(2).Infof("load balancer service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)continue}}// remove cloud clusterIP serviceif _, ok := cloudClusterIPService[nsName]; ok {klog.V(2).Infof("clusterIP service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName)continue}svcNew = append(svcNew, serviceList.Items[i])}serviceList.Items = svcNewreturn fh.serializer.Encode(serviceList)}return b, nil
}

过滤框架现状

目前的过滤框架比较僵硬,将资源过滤硬编码至代码中,只能是已注册的资源才能进行相应的过滤,为了解决这个问题,需要对过滤框架进行相应的改造。

解决方案

方案一:

使用参数或者环境变量的形式自定义过滤配置,但是这种方式有以下弊端:

1、配置复杂需要将所以需要自定义的配置写入到启动参数或者读取环境变量 例如下格式:

--filter_serviceTopology=coredns/endpointslices#list,kube-proxy/services#list;watch --filter_endpointsFilter=nginx-ingress-controller/endpoints#list;watch

2、无法热更新,每次修改配置都需要重启 Yurthub 生效。

方案二:

1、使用 configmap 的形式自定义过滤配置降低配置复杂度配置格式(user-agent/resource#list,watch) 多个资源通过逗号隔开。如下所示:

filter_endpoints: coredns/endpoints#list;watch,test/endpoints#list;watch
filter_servicetopology: coredns/endpointslices#list;watch
filter_discardcloudservice: ""
filter_masterservice: ""

2、利用 Informer 机制保证配置实时生效

综合以上两点在 OpenYurt 中我们选择了解决方案二。

开发过程中遇到的问题

在边缘端 Informer watch 的 api 地址是 Yurthub 的代理地址,那么 Yurthub 在启动代理端口之前都是无法保证 configmap 的数据是正常的。如果在启动完成之后 addons 的请求先于 configmap 数据更新 这个时候会导致数据在没有过滤的情况下就返回给了 addons,这样会导致很多预期以外的问题。

为了解决这个问题 我们需要在 apporve 中加入 WaitForCacheSync 保证数据同步完成之后才能返回相应的过滤数据,但是在 apporve 中加入 WaitForCacheSync 也直接导致 configmap 进行 watch 的时候也会被阻塞,所以需要在 WaitForCacheSync 之前加入一个白名单机制,当 Yurthub 使用 list & watch 访问 configmap 的时候我们直接不进行数据过滤,相应的代码逻辑如下:

func (a *approver) Approve(comp, resource, verb string) bool {if a.isWhitelistReq(comp, resource, verb) {return false}if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); !ok {panic("wait for configMap cache sync timeout")}a.Lock()defer a.Unlock()for _, requests := range a.nameToRequests {for _, request := range requests {if request.Equal(comp, resource, verb) {return true}}}return false
}

总结

1、通过上述的扩展能力可以看出,YurtHub 不仅仅是边缘节点上的带有数据缓存能力的反向代理。而是对 Kubernetes 节点应用生命周期管理加了一层新的封装,提供边缘计算所需要的核心管控能力。

2、YurtHub 不仅仅适用于边缘计算场景,其实可以作为节点侧的一个常备组件,适用于使用 Kubernetes 的任意场景。相信这也会驱动 YurtHub 向更高性能,更高稳定性发展。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/511076.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

金融核心系统云原生转型的三个挑战、六个误区和四个步骤

嘉宾 | 马振雄出品 | CSDN云原生近年来,云原生技术发展如火如荼,IT系统云原生转型已成共识,留给各行各业的只是何时云原生、怎样云原生的时间和方式问题。金融行业作为各类转型历来的排头兵,在推进云原生转型上同样不逞多让。金融…

Java 定时任务技术趋势

简介:定时任务是每个业务常见的需求,比如每分钟扫描超时支付的订单,每小时清理一次数据库历史数据,每天统计前一天的数据并生成报表等等。 作者:黄晓萌(学仁) Java 中自带的解决方案 使用 Ti…

从再造到赋能——360数科举办第二届技术开放日

8月31日,360数科在北京举办“从再造到赋能——2022年技术开放日”。这是360数科的第二届开放日,全面展示了公司在金融科技业务全环节的精细化运营成果,呈现了360数科从市场连接者到流程再造者的技术演化历程,展望了其成为行业赋能…

EventBridge 特性介绍|以 IaC 的方式使用 EventBridge

简介:本文将重点介绍 EventBridge 和 IaC 的重点概念和特性,然后演示如何应用 IaC 理念自动化部署 EventBridge 来使用这些概念和特性。 作者:王川(弗丁) 引言 EventBridge 作为构建 EDA 架构的基础设施&#xff0c…

如何使用 Serverless Devs 部署静态网站到函数计算(上)

简介:部署个静态网站到函数计算~ 前言 公司经常有一些网站需要发布上线,对比了几款不同的产品后,决定使用阿里云的函数计算(FC)来托管构建出来的静态网站。 FC 弹性实例自带的500 Mb 存储空间对静态网站来说简直是太充足了 。 函…

盛邦安全创始人权晓文入选IDC中国CSO名人堂十大人物

在近日举办的IDC 2022 CSO全球网络安全峰会(中国站)上,盛邦安全入选API领域推荐厂商,盛邦安全创始人权晓文入选“中国CSO名人堂(十大人物)”,展现了盛邦安全在网络安全创新方面的不断进取和突出…

硬核调试实操 | 手把手带你实现 Serverless 断点调试

简介:本文将借助 Serverless Devs 工具,对函数计算 (FC)应用的断点调试步骤进行详细指导,手把手带你实现 Serverless 的断点调试,并从以下四个方面为你厘清“硬核调试”的脉络步骤,干货满满。 导读&#x…

兑现 Service Mesh 的新价值:精确控制“爆炸半径”

简介:本文分享了阿里云内部所沉淀的全链路流量打标与路由的能力,做出服务网格技术新体验的同时,很好地兑现了服务网格的新价值。 作者:至简 软件是以持续迭代的方式去不断演进的。某种程度上,我们并不担心软件不完善…

15M安装包就能玩《原神》,带你了解云游戏背后的技术秘密

简介:对于大多数玩家来说,云游戏已经不是一个陌生的概念,它经常和秒玩、不吃设备、大屏临场感、上手门槛低、真香等字眼一起出现在评论留言区。的确,对于既想尝试高品质游戏大作又不想一直卷装备的玩家来说,云游戏做到…

ref绑定dom的三种写法

1、字符串形式 这种字符出串写法因为效率不好&#xff0c;所以不推荐使用 语法 标签上使用ref"name" 进行绑定 方法中this.refs.name拿到dom <input ref"input1" type"text" placeholder"点击按钮弹出内容" /> <button onC…

一文看懂边缘云在广电行业的应用

简介&#xff1a;随着中国广电的5G布局在不断加速&#xff0c;各地广电运营商均已开展面向边缘云建设和业务探索。边缘云作为5G网络架构中关键一环&#xff0c;具有广覆盖、低时延、大带宽的技术特点&#xff0c;是打通智慧广电建设的“经脉”&#xff0c;对未来开展4K/8K超高清…

2022 互联网中秋月饼大赏,腾讯送火腿,字节寓意圆满,你最钟爱哪款呢?(文末有抽奖)...

整理 | 梦依丹出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;配图来自视觉中国又是一年花好处&#xff0c;人月中秋两团圆&#xff01;今年的中秋&#xff0c;你是在家乡还是在他乡度过呢&#xff1f;无论在何处&#xff0c;只要心在一起&#xff0c;多远都不是距…

宜搭小技巧|自动计算日期时长,3个公式帮你敲定

简介&#xff1a;使用「时间函数」实现日期时长自动计算功能&#xff0c;让表单填写更轻松。 上一期&#xff0c;我们学会了如何巧用日期组件保证时间填写不出错。 今天&#xff0c;宜小搭要出差&#xff0c;由于公司要根据出差时长发放补贴&#xff0c;但手动计算出差天数太…

构造函数、实例、原型对象、继承

一、构造函数与原型对象之间的关系&#xff1a; 有一个Star构造函数&#xff0c;每一个构造函数里面都有一个原型对象&#xff0c;是通过构造函数的prototype指向这个原型对象的 同样在这个原型对象里面也有一个属性叫constructor&#xff0c;它又指回了构造函数 可以把构造函…

从中心走向边缘——深度解析云原生边缘计算落地痛点

简介&#xff1a;边缘计算平台的建设&#xff0c;以 Kubernetes 为核心的云原生技术体系&#xff0c;无疑是当前最佳的选择与建设路径&#xff1b;但是云原生体系庞大&#xff0c;组件复杂&#xff0c;将体系下沉至边缘会面临很大的挑战与困难&#xff0c;同时充满巨大的机遇及…

5G+元宇宙创新应用来了,第五届“绽放杯”5G 应用征集大赛云 XR 专题赛落下帷幕...

2022“云 XR 年度十大标杆案例”诞生&#xff01;8 月 31 日&#xff0c;第五届“绽放杯”5G 应用征集大赛云 XR 专题赛决赛在浙江杭州举办&#xff0c;经全天激烈角逐&#xff0c;“中国历代绘画大系 5G 云 XR 应用”等 10 个优秀项目脱颖而出荣获一等奖&#xff0c;并将经组委…

阿里云资深专家李国强:云原生的一些趋势和新方向

简介&#xff1a;云原生不仅仅是技术&#xff0c;更重要的是云原生技术需要和云计算进行结合&#xff0c;帮助用户构建云原生架构的应用。 2021 年 11 月 26 日&#xff0c;阿里云用户组&#xff08;AUG&#xff09;第 3 期活动在广州顺利举行。具有丰富的容器、微服务等领域经…

从建好到用好,阿里云原生微服务生态的演进

简介&#xff1a;随着微服务技术的成熟&#xff0c;微服务核心架构分层愈加清晰&#xff0c;技术标准化和产业化正在形成&#xff0c;服务治理成为用好、管好服务的必选项&#xff0c;服务网格则成为多语言微服务架构下的技术趋势&#xff0c;阿里云原生微服务生态的演进恰好映…

韵达基于云原生的业务中台建设 | 实战派

简介&#xff1a;本文将为大家分享韵达业务中台基于云原生的建设过程。主要分为三部分&#xff0c;第一部分是 IT 信息的发展规划&#xff0c;第二部分是韵达业务中台建设的详细过程&#xff0c;第三部分是对应云原生技术的支撑。 本文将为大家分享韵达业务中台基于云原生的建…

阿里云PolarDB开源数据库社区与 Tapdata 联合共建开放数据技术生态

简介&#xff1a;近日&#xff0c;阿里云PolarDB开源数据库社区宣布将与 Tapdata 联合共建开放数据技术生态。 近日&#xff0c;阿里云PolarDB开源数据库社区宣布将与 Tapdata 联合共建开放数据技术生态。在此之际&#xff0c;一直专注实时数据服务平台的 Tapdata &#xff0c…