Apisix-Ingress服务发现详解

apisix

Apache APISIX 是一个基于微服务 API 网关,其不仅可以处理南北向的流量,也可以处理东西向的流量即服务之间的流量。Apache APISIX 集成了控制面板和数据面,与其他 API 网关相比,Apache APISIX 的上游、路由、插件全是动态的,修改这些东西时都不用重启。并且 Apache APISIX 的插件也是热加载,可以随时插拔、修改插件。

Apache APISIX 其设计理念是基于API 网关的数据平面和控制平面分离。控制平面不仅仅能够控制 Apache APISIX ,同时其还能够控制其他组件;数据平面不仅仅能够被自身的控制平面控制,还能被其他组件所控制。由于其基于ETCD 来存储和分发路由数据,默认具备高可用,无单点故障风险。除此之外,其能够友好地支持 Prometheus、SkyWalking 动态追踪、流量复制、故障注入等相关功能。

apisix ingress

在 K8s 生态中,Ingress 作为表示 K8s 流量入口的一种资源,想要让其生效,就需要有一个 Ingress Controller 去监听 K8s 中的 Ingress 资源,并对这些资源进行相应规则的解析和实际承载流量。在当下趋势中,像 Kubernetes Ingress Nginx 就是使用最广泛的 Ingress Controller 实现。

而 APISIX Ingress 则是另一种 Ingress Controller 的实现。跟 Kubernetes Ingress Nginx 的区别主要在于 APISIX Ingress 是以 Apache APISIX 作为实际承载业务流量的数据面。

Apache APISIX Ingress Controller 除了覆盖 NGINX Ingress Controller 已有的能力外,还解决了一些 Nginx Ingress Controller 的痛点。具体如下:

  • 1、配置的动态化加载
    通常情况下,作为接入层的 Ingress Controller ,其承载着服务的入口流量引入,在生产环境中,我们的业务对系统的可靠性有着更高的要求,然而,基于 Apache APISIX Ingress Controller 其能够支持动态配置,即时生效,降低生产事故的意外及风险,有助于提高运维可维护性。

  • 2、较强的灰度能力
    在实际的业务场景中,有的时候,往往会依据某些特定的需求进行权重调整,结合业务需求按比例进行流量控制,Apache APISIX Ingress Controller 可以支持 Service和 Pod 级别的权重调整,配置清晰而且可读性更强。
    除此,相对于NGINX Ingress Controller 中通过 Annotation 的方式提供 Canary 灰度方案,Apache APISIX Ingress Controller 能够解决其缺陷,从而能够更好的提供灰度策略。

  • 3、较好的扩展能力
    基于 Apache APISIX 强大的插件能力,Apache APISIX Ingress Controller 通过动态绑定插件来增强功能。Apache APISIX 通过插件封装逻辑,易于管理;完善的文档,易于使用和理解。Apache APISIX Ingress Controller 通过配置即可绑定和解绑插件,无需操作脚本。

APISIX Ingress 目前已经支持的自定义资源主要是以下 5 类,涉及到路由、上游、消费者、证书相关和集群公共配置的相关类别。

内置服务发现

APISIX 内置了下面这些服务发现机制:

  • 基于 Eureka 的服务发现

  • 基于 Nacos 的服务发现

  • 基于 Consul 的服务发现

  • 基于 Consul KV 的服务发现

  • 基于 DNS 的服务发现

  • 基于 APISIX-Seed 架构的控制面服务发现

  • 基于 Kubernetes 的服务发现

上面介绍的这些都是基于数据面apisix配置信息手动变更操作的方案集成。其实,在基于k8s的云原生场景下,apisix还提供了一个控制面组件来对apisix的服务发现进行自动管理,那就是apisix ingress

下面就从源码的角度来看看apisix ingress是怎么做到自动的服务发现的

apisix ingress启动

main.go启动入口一路跟踪,进入providers/controller.gorun方法:

func (c *Controller) run(ctx context.Context) {log.Infow("controller tries to leading ...",zap.String("namespace", c.namespace),zap.String("pod", c.name),)var cancelFunc context.CancelFuncctx, cancelFunc = context.WithCancel(ctx)defer cancelFunc()// give up leaderdefer c.leaderContextCancelFunc()clusterOpts := &apisix.ClusterOptions{AdminAPIVersion: c.cfg.APISIX.AdminAPIVersion,Name: c.cfg.APISIX.DefaultClusterName,AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,MetricsCollector: c.MetricsCollector,}err := c.apisix.AddCluster(ctx, clusterOpts)if err != nil && err != apisix.ErrDuplicatedCluster {// TODO give up the leader rolelog.Errorf("failed to add default cluster: %s", err)return}if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {// TODO give up the leader rolelog.Errorf("failed to wait the default cluster to be ready: %s", err)// re-create apisix cluster, used in next c.runif err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil {log.Errorf("failed to update default cluster: %s", err)return}return}// Creation Phasec.informers = c.initSharedInformers()common := &providertypes.Common{ControllerNamespace: c.namespace,ListerInformer: c.informers,Config: c.cfg,APISIX: c.apisix,KubeClient: c.kubeClient,MetricsCollector: c.MetricsCollector,Recorder: c.recorder,}c.namespaceProvider, err = namespace.NewWatchingNamespaceProvider(ctx, c.kubeClient, c.cfg)if err != nil {ctx.Done()return}c.podProvider, err = pod.NewProvider(common, c.namespaceProvider)if err != nil {ctx.Done()return}c.translator = translation.NewTranslator(&translation.TranslatorOptions{APIVersion: c.cfg.Kubernetes.APIVersion,EndpointLister: c.informers.EpLister,ServiceLister: c.informers.SvcLister,SecretLister: c.informers.SecretLister,PodLister: c.informers.PodLister,ApisixUpstreamLister: c.informers.ApisixUpstreamLister,PodProvider: c.podProvider,})c.apisixProvider, c.apisixTranslator, err = apisixprovider.NewProvider(common, c.namespaceProvider, c.translator)if err != nil {ctx.Done()return}c.ingressProvider, err = ingressprovider.NewProvider(common, c.namespaceProvider, c.translator, c.apisixTranslator)if err != nil {ctx.Done()return}c.kubeProvider, err = k8s.NewProvider(common, c.translator, c.namespaceProvider, c.apisixProvider, c.ingressProvider)if err != nil {ctx.Done()return}if c.cfg.Kubernetes.EnableGatewayAPI {c.gatewayProvider, err = gateway.NewGatewayProvider(&gateway.ProviderOptions{Cfg: c.cfg,APISIX: c.apisix,APISIXClusterName: c.cfg.APISIX.DefaultClusterName,KubeTranslator: c.translator,RestConfig: nil,KubeClient: c.kubeClient.Client,MetricsCollector: c.MetricsCollector,NamespaceProvider: c.namespaceProvider,})if err != nil {ctx.Done()return}}// Init Phaseif err = c.namespaceProvider.Init(ctx); err != nil {ctx.Done()return}if err = c.apisixProvider.Init(ctx); err != nil {ctx.Done()return}// Run Phasee := utils.ParallelExecutor{}e.Add(func() {c.checkClusterHealth(ctx, cancelFunc)})e.Add(func() {c.informers.Run(ctx)})e.Add(func() {c.namespaceProvider.Run(ctx)})e.Add(func() {c.kubeProvider.Run(ctx)})e.Add(func() {c.apisixProvider.Run(ctx)})e.Add(func() {c.ingressProvider.Run(ctx)})if c.cfg.Kubernetes.EnableGatewayAPI {e.Add(func() {c.gatewayProvider.Run(ctx)})}e.Add(func() {c.resourceSyncLoop(ctx, c.cfg.ApisixResourceSyncInterval.Duration)})c.MetricsCollector.ResetLeader(true)log.Infow("controller now is running as leader",zap.String("namespace", c.namespace),zap.String("pod", c.name),)<-ctx.Done()e.Wait()for _, execErr := range e.Errors() {log.Error(execErr.Error())}if len(e.Errors()) > 0 {log.Error("Start failed, abort...")cancelFunc()}
}

上面代码逻辑大致如下:

  • 初始化apisix集群配置信息:用于跟数据面服务apisix通信进行相关的配置操作

  • 初始化k8s资源inforrmers信息:用户对k8s各个资源进行监听及获取资源信息

  • 监听k8s集群namespace资源并处理

  • 监听k8s集群pod资源并处理

  • 监听k8s集群ingress资源并处理

  • 监听k8s集群中apisix自定义资源并处理,比如:apisixRoute等

  • 监听k8s集群endpoint资源并处理

  • 监听k8s集群secret资源并处理

  • 监听k8s集群configmap资源并处理

  • 监听k8s集群gateway资源并处理

下面以处理endpoint资源为例进行说明,其他资源的监听处理类似,就不一一讲解了。

服务发现

进入到k8s/endpoint/provider.go中,我们先来看看实例初始化方法:

func NewProvider(common *providertypes.Common, translator translation.Translator, namespaceProvider namespace.WatchingNamespaceProvider) (Provider, error) {p := &endpointProvider{cfg: common.Config,}base := &baseEndpointController{Common: common,translator: translator,svcLister: common.SvcLister,apisixUpstreamLister: common.ApisixUpstreamLister,}if common.Kubernetes.WatchEndpointSlices {p.endpointSliceController = newEndpointSliceController(base, namespaceProvider)} else {p.endpointsController = newEndpointsController(base, namespaceProvider)}return p, nil
}
func newEndpointsController(base *baseEndpointController, namespaceProvider namespace.WatchingNamespaceProvider) *endpointsController {ctl := &endpointsController{baseEndpointController: base,workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "endpoints"),workers: 1,namespaceProvider: namespaceProvider,epLister: base.EpLister,epInformer: base.EpInformer,}ctl.epInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: ctl.onAdd,UpdateFunc: ctl.onUpdate,DeleteFunc: ctl.onDelete,},)return ctl
}

注意到最后代码的AddEventHandler,这里就是我们经常见到的informer的处理回调方法设置的地方。

我们看到针对endpoint资源的增删改,设置了对应的回调方法。这里来看看onAdd方法:

func (c *endpointsController) onAdd(obj interface{}) {key, err := cache.MetaNamespaceKeyFunc(obj)if err != nil {log.Errorf("found endpoints object with bad namespace/name: %s, ignore it", err)return}if !c.namespaceProvider.IsWatchingNamespace(key) {return}log.Debugw("endpoints add event arrived",zap.String("object-key", key))c.workqueue.Add(&types.Event{Type: types.EventAdd,// TODO pass key.Object: kube.NewEndpoint(obj.(*corev1.Endpoints)),})c.MetricsCollector.IncrEvents("endpoints", "add")
}

该方法的参数表示增加的endpoint资源对象信息。该方法主要是向endpoint的队列workqueue中增加一个事件对象:包含事件类型、增加的endpoint对象

在最开始main我们介绍provider的启动方法中提到:执行了每个provider的run方法,下面我们来看下endpoint的provider的run方法:

func (c *endpointsController) run(ctx context.Context) {log.Info("endpoints controller started")defer log.Info("endpoints controller exited")defer c.workqueue.ShutDown()if ok := cache.WaitForCacheSync(ctx.Done(), c.epInformer.HasSynced); !ok {log.Error("informers sync failed")return}handler := func() {for {obj, shutdown := c.workqueue.Get()if shutdown {return}err := c.sync(ctx, obj.(*types.Event))c.workqueue.Done(obj)c.handleSyncErr(obj, err)}}for i := 0; i < c.workers; i++ {go handler()}<-ctx.Done()
}
func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {ep := ev.Object.(kube.Endpoint)ns, err := ep.Namespace()if err != nil {return err}newestEp, err := c.epLister.GetEndpoint(ns, ep.ServiceName())if err != nil {if errors.IsNotFound(err) {return c.syncEmptyEndpoint(ctx, ep)}return err}return c.syncEndpoint(ctx, newestEp)
}
func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {log.Debugw("endpoint controller syncing endpoint",zap.Any("endpoint", ep),)namespace, err := ep.Namespace()if err != nil {return err}svcName := ep.ServiceName()svc, err := c.svcLister.Services(namespace).Get(svcName)if err != nil {if k8serrors.IsNotFound(err) {return c.syncEmptyEndpoint(ctx, ep)}log.Errorf("failed to get service %s/%s: %s", namespace, svcName, err)return err}switch c.Kubernetes.APIVersion {case config.ApisixV2beta3:var subsets []configv2beta3.ApisixUpstreamSubsetsubsets = append(subsets, configv2beta3.ApisixUpstreamSubset{})auKube, err := c.apisixUpstreamLister.V2beta3(namespace, svcName)if err != nil {if !k8serrors.IsNotFound(err) {log.Errorf("failed to get ApisixUpstream %s/%s: %s", namespace, svcName, err)return err}} else if auKube.V2beta3().Spec != nil && len(auKube.V2beta3().Spec.Subsets) > 0 {subsets = append(subsets, auKube.V2beta3().Spec.Subsets...)}clusters := c.APISIX.ListClusters()for _, port := range svc.Spec.Ports {for _, subset := range subsets {nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels)if err != nil {log.Errorw("failed to translate upstream nodes",zap.Error(err),zap.Any("endpoints", ep),zap.Int32("port", port.Port),)}name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port, types.ResolveGranularity.Endpoint)for _, cluster := range clusters {if err := c.SyncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {return err}}}}case config.ApisixV2:var subsets []configv2.ApisixUpstreamSubsetsubsets = append(subsets, configv2.ApisixUpstreamSubset{})auKube, err := c.apisixUpstreamLister.V2(namespace, svcName)if err != nil {if !k8serrors.IsNotFound(err) {log.Errorf("failed to get ApisixUpstream %s/%s: %s", namespace, svcName, err)return err}} else if auKube.V2().Spec != nil && len(auKube.V2().Spec.Subsets) > 0 {subsets = append(subsets, auKube.V2().Spec.Subsets...)}clusters := c.APISIX.ListClusters()for _, port := range svc.Spec.Ports {for _, subset := range subsets {nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels)if err != nil {log.Errorw("failed to translate upstream nodes",zap.Error(err),zap.Any("endpoints", ep),zap.Int32("port", port.Port),)}name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port, types.ResolveGranularity.Endpoint)for _, cluster := range clusters {if err := c.SyncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {return err}}}}default:panic(fmt.Errorf("unsupported ApisixUpstream version %v", c.Kubernetes.APIVersion))}return nil
}

上面代码主要逻辑就是:

  • 从endpoint的队列workqueue中获取事件对象

  • 根据endpoint信息从k8s集群中获取最新的namespace和service等信息

  • 根据namespace和servicename从k8s集群中获取apisix upstream资源信息

  • 对每一个service端口,向数据面服务apisix发送配置更新请求

https://xiaorui.cc/archives/7369

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/109193.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣 095. 最长公共子序列(C语言+动态规划)

1. 题目 给定两个字符串 text1 和 text2&#xff0c;返回这两个字符串的最长 公共子序列 的长度。如果不存在 公共子序列 &#xff0c;返回 0 。 一个字符串的 子序列 是指这样一个新的字符串&#xff1a;它是由原字符串在不改变字符的相对顺序的情况下删除某些字符&#xff08…

什么是云原生?零基础学云原生难吗?

伴随着云计算的浪潮&#xff0c;云原生概念也应运而生&#xff0c;而且火得一塌糊涂&#xff0c;但真正谈起“云原生”&#xff0c;大多数非 IT 从业者的认知往往仅限于将服务应用放入云端&#xff0c;在云上处理业务。实际上&#xff0c;云原生远不止于此。 现在越来越多的企…

使用Git在本地创建一个仓库并将其推送到GitHub

前记&#xff1a; git svn sourcetree gitee github gitlab gitblit gitbucket gitolite gogs 版本控制 | 仓库管理 ---- 系列工程笔记. Platform&#xff1a;Windows 10 Git version&#xff1a;git version 2.32.0.windows.1 Function&#xff1a; 使用Git在本地创建一个…

redis知识总结

一、什么是Redis Redis是一个开源的内存数据存储系统&#xff0c;也是一个高性能的键值存储数据库。它支持多种数据结构&#xff0c;包括字符串、哈希表、列表、集合以及有序集合。Redis的特点是快速、灵活和可扩展。 首先&#xff0c;Redis是一个内存数据存储系统&#xff0c…

SVN服务端客户端安装配置

SVN服务端客户端安装配置 1、服务端下载安装1.1 软件下载1.2 软件安装 2、客户端下载安装2.1 软件下载2.2 软件安装2.2.1 安装2.2.2. 汉化 3、SVN配置3.1 SVN服务器端配置3. 2 SVN客户端配置 1、服务端下载安装 1.1 软件下载 百度网盘链接&#xff1a;VisualSVN-Server-5.3.0…

SAP-QM-采购过程模式与特性检验不匹配QD244

在创建物理样本&#xff0c;维护检验计划的采样过程时报错&#xff0c;从报错看&#xff0c;这个采样过程的评估模式和检验特性不匹配&#xff0c; 评估模式为700 700是根据特征上下线&#xff0c;计算平均值进行评估&#xff0c;是定量特性&#xff0c;经过测试 是检验特性的…

Window 窗口函数 (Spark Sql)

在 Spark SQL 中&#xff0c;Window 函数是一种用于在查询结果集中执行聚合、排序和分析操作的强大工具。它允许你在查询中创建一个窗口&#xff0c;然后对窗口内的数据进行聚合计算。 import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions…

认识SQL注入

认识SQL注入 什么是SQL注入&#xff08;SQL Injection&#xff09; 我们先从字面意思去理解&#xff0c;SQL是一种数据库查询语言&#xff0c;他是用来与数据库交互的语言&#xff0c;而注入就是向内部强行添加内容&#xff0c;他们组合起来&#xff0c;就是在SQL语句中强行注…

Typora使用教程

相关介绍 Typora是一款所写即所得的Markdown编辑器&#xff0c;支持跨平台Window,Linux和Mac都可以。 相关链接 Typora安装包历史版本地址&#xff1a;官网历史版本地址 问题及知识点汇总 关于图片的问题-设置图像自动复制到自己指定文件夹下 问题描述 在写笔记的过程中&am…

【yolov8系列】yolov8的目标检测、实例分割、关节点估计的原理解析

1 YOLO时间线 这里简单列下yolo的发展时间线&#xff0c;对每个版本的提出有个时间概念。 2 yolov8 的简介 工程链接&#xff1a;https://github.com/ultralytics/ultralytics 2.1 yolov8的特点 采用了anchor free方式&#xff0c;去除了先验设置可能不佳带来的影响借鉴Genera…

目标检测YOLO实战应用案例100讲-基于YOLOv5_tiny算法的路面裂缝智能检测

目录 前言 国内外研究现状 公路路面裂缝检测方法现状 基于深度学习检测算法现状

MongoDB 未授权访问漏洞

简介 MongoDB是一个基于分布式文件存储的数据库&#xff0c;是一个介于关系数据库和非关系数据库之间的产品&#xff0c;它的特点是高性能、易部署、易使用&#xff0c;存储数据非常方便&#xff0c;默认情况下是没有认证的这就导致不熟悉它的研发人员部署后没有做访问控制导致…

智能化安全巡更巡查系统—提升安全管理效率

传统的巡检都是手工完成&#xff0c;记录、拍照&#xff0c;回到办公室打印表单再交给作业队伍整改&#xff0c;再去现场核实复查&#xff0c;流程繁琐&#xff0c;效率低。而且大部分工地为了减少麻烦&#xff0c;人员往往都是口头沟通&#xff0c;存在很大质量风险&#xff0…

模型量化笔记--KL散度量化

KL散度量化 前面介绍的非对称量化中&#xff0c;是将数据中的min值和max值直接映射到[-128, 127]。 同样的&#xff0c;前面介绍的对称量化是将数据的最大绝对值 ∣ m a x ∣ |max| ∣max∣直接映射到127。 上面两种直接映射的方法比较粗暴&#xff0c;而TensorRT中的int8量化…

家中种绿植有什么风水讲究?

现在越来越多的人&#xff0c;都居住在小区高楼里&#xff0c;与绿植的接触也越来越少&#xff0c; 因此&#xff0c;很多人会选择在自己家中种上几株绿植。在家里种植植物&#xff0c;不仅美观&#xff0c;陶冶情操&#xff0c;还能净化空气&#xff0c;为家中增添好的风水。 …

凉鞋的 Unity 笔记 109. 专题一 小结

109. 专题一 小结 在这一篇&#xff0c;我们来对第一个专题做一个小的总结。 到目前为止&#xff0c;大家应该能够感受到此教程的基调。 内容的难度非常简单&#xff0c;接近于零基础的程度&#xff0c;不过通过这些零基础内容所介绍的通识内容其实是笔者好多年的时间一点点…

下拉选择器的树状结构图

类似&#xff1a;【Vue-Treeselect 和 vue3-treeselect】树形下拉框 一&#xff1a;图 二&#xff1a;如果有多层级的数据结构&#xff0c;可以用treeselect插件实现 1、安装&#xff1a; npm install --save riophae/vue-treeselect 2、实现&#xff1a; <el-form ref&qu…

树叶识别系统python+Django网页界面+TensorFlow+算法模型+数据集+图像识别分类

一、介绍 树叶识别系统。使用Python作为主要编程语言开发&#xff0c;通过收集常见的6中树叶&#xff08;‘广玉兰’, ‘杜鹃’, ‘梧桐’, ‘樟叶’, ‘芭蕉’, ‘银杏’&#xff09;图片作为数据集&#xff0c;然后使用TensorFlow搭建ResNet50算法网络模型&#xff0c;通过对…

vue3弹窗中循环生成表单的校验和重置问题

应用场景&#xff1a; 1、弹框里的表单是根据后台返回的时段生成的&#xff0c;后台返回几个时段&#xff0c;就渲染几组表单。 -1- 重置&#xff1a;遍历每个表单&#xff0c;获取当前表单的引用&#xff0c;在resetFields() -2- 校验&#xff1a;创建一个数组来存储每个表单的…

java线程

1. 总体路线 pom依赖 <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies><dependency> <groupId>org.projectlombo…