Istio Pilot 源码分析(一)

张海东, ‍多点生活(成都)云原生开发工程师。

Istio 作为目前 Servic Mesh 方案中的翘楚,吸引着越来越多的企业及开发者。越来越多的团队想将其应用于微服务的治理,但在实际落地时却因为不了解 Istio 黑盒中的运行机制而左右为难,本文将基于 1.7 的源码讲解 Istio 的核心组件 Pilot 的结构及运行流程,希望对读者应用 Istio 有所助益。

注:本文基于 istio release-1.7 分支分析,其他版本的代码结构会有所不同。

背景

随着 Istio 1.7 的发布,内部组件精简后的 istiod 日趋稳定,越来越多的公司将其应用到自身微服务的流量治理、安全通信及监测中。多点也不例外,应用 Istio 来落地业务系统所有 Dubbo 服务的网格化,下沉 SDK 逻辑,解决基础中间件与业务系统过于耦合等痛点。目前,我们是通过自己开发的 Controller 组件对接 Zookeeper 等注册中心,将注册到 Zookeeper 的节点实时转化为 ServiceEntry 及 WorkloadEntry 等 Istio 配置类型写入 kube-apiserver,再由 Pilot 转化为 xDS 协议下发至数据面,同时对集群、虚拟机中的服务进行治理。随着公司服务网格化的逐步落地,对 Istio 及数据面组件源码级掌握的诉求越来越高,没有足够的深度及广度很难解决开发过程中遇到的难题,让我们一起揭开 Istio 神秘的面纱,看看黑箱内部是如何运作的。

本文作为 Istio 控制面组件 Pilot 的源码分析系列,主要面向刚接触 Istio 或仅停留在使用 Istio 基本配置类型(如 VirtualServiceDestinationRule 等)的同学,需要熟悉 Istio 的一些 基础概念及名词[1] 。文章会涉及较多的代码细节,我们会以不同的篇幅分别介绍以下内容:

1.pilot-discovery 宏观架构及启动流程梳理2.pilot-discovery 接口设计及关键接口分析3.pilot-discovery xDS 生成及下发流程梳理4.pilot-agent 流程梳理5.pilot 中的身份认证及安全通信解析

相信通过源码一步一步分析,能消除读者对 Pilot 的陌生感,在基于 Pilot 做适配开发时会更加清楚的了解其底层运行逻辑,碰到问题时也能更好的定位。

Pilot 的代码主要分为两部分:

pilot-discoverypilot-agent

其中 pilot-agent 负责数据面 Sidecar 实例的生命周期管理,而 pilot-discovery 负责控制面流量管理配置及路由规则的生成和下发。

宏观架构

pilot-discovery 的核心组件如图:

pilot-discovery-struct

其中 Server 为 pilot-discovery 的主服务,包含了三个比较重要的组件:

Config Controller:从不同来源接收流量控制和路由规则等 Istio 的配置,并响应各类事件。•Service Controller:从不同注册中心同步服务及实例,并响应各类事件。•EnvoyXdsServer:核心的 xDS 协议推送服务,根据上面组件的数据生成 xDS 协议并下发。

Config Controller 比较核心的就是对接 Kubernetes,从 kube-apiserver 中 Watch 集群中的 VirtualServiceServiceEntryDestinationRules 等配置信息,有变化则生成 PushRequest 推送至 EnvoyXdsServer 中的推送队列。除此之外,还支持对接 MCP(Mesh Configuration Protocol) 协议的 gRPC Server,如 Nacos 的 MCP 服务等,只需要在 meshconfig 中配置 configSources 即可。最后一种是基于内存的 Config Controller 实现,通过 Watch 一个文件目录,加载目录中的 yaml 文件生成配置数据,主要用来测试。

Service Controller 目前原生支持 Kubernetes 和 Consul,注册在这些注册中心中的服务可以无痛接入 Mesh,另外一种比较特殊,就是 ServiceEntryStore,它本质是储存在 Config Controller 中的 Istio 配置数据,但它描述的却是集群外部的服务信息,详情可阅读文档 ServiceEntry[2]Istio 通过它将集群外部,如部署在虚拟机中的服务、非 Kubernetes 的原生服务同步到 Istio 中,纳入网格统一进行流量控制和路由,所以 ServiceEntryStore 也可以视为一种注册中心。还有一种就是 Mock Service Registry,主要用来测试。

ServiceEntryStore 从 Config Controller 到 Service Controller 的转化流程大致如图(后续会做详细的代码分析,这里简单了解一下即可):

pilot-discovery-serviceentrystore

ConfigStores 是一个列表,里面存储了各类 Istio 配置文件,包括 ServiceEntry 、WorkloadEntry 等服务数据,也包括 VirtualServiceDestinationRulesSidecar 等流量控制、路由规则的配置数据,pilot-discovery 将这些 ConfigStores 聚合成一个 configController 统一进行管理,之后再从其中衍生出 IstioConfigStore,将其作为 serviceEntryStore 的配置源。serviceEntryStore 其实就是 ServiceEntry Controller,响应 ServiceEntry 和 WorkloadEntry 这类服务信息的变化。

EnvoyXdsServer 比较核心,一切与 xDS 协议相关的接收、转换、下发操作都由它完成。EnvoyXdsServer 对接所有集群中的边车代理,如 EnvoyMOSN 等,当配置或服务发生变化时主动推送,也会响应代理发送的请求,依据请求的信息下发相应的 xDS 配置。

理解了这三个核心组件的定义,就能比较好的理解下面分析的各类流程了。

pilot-discovery 的整个业务流程梳理如下,可以先大概浏览一遍,之后我们逐一进行分析:

pilot-discovery-sequence-all

启动流程梳理

首先详细看一下 pilot-discovery 的启动流程。pilot-discovery 组件的入口代码在 istio/pilot/cmd/pilot-discovery 中。该目录中包含两个文件: main.go 和 request.gomain.go 中定义了 pilot-discovery 根命令及 discovery 命令,是启动服务发现及配置下发的主流程; 另一个文件 request.go 中定义了 request 命令,用来请求 Pilot 中的 metrics/debug 接口,多用来调试。

main.go 中 discoveryCmd的 RunE 函数定义了启动过程,代码如下:

// 创建一个接收空结构的 stop channel 用来停止所有 servers
stop := make(chan struct{})
// 创建服务发现的 Server
discoveryServer, err := bootstrap.NewServer(serverArgs)
if err != nil {return fmt.Errorf("failed to create discovery service: %v", err)
}
// 运行 Server 中注册的所有服务
if err := discoveryServer.Start(stop); err != nil {return fmt.Errorf("failed to start discovery service: %v", err)
}
// 等待 SIGINT 和 SIGTERM 信号并关闭 stop channel
cmd.WaitSignal(stop)

启动流程如图所示:

pilot-discovery-init

初始化流程

接下来介绍 discoveryServer ,即 pilot-discovery 组件的核心。在这之前先看下 Server 的结构,代码位于 istio/pilot/pkg/bootstrap/server.go 文件中。

Server 的关键字段如下:

type Server struct {XDSServer *xds.DiscoveryServer  // Xds 服务environment *model.Environment  // Pilot 环境所需的 API 集合kubeRegistry *kubecontroller.Controller   // 处理 Kubernetes 主集群的注册中心multicluster *kubecontroller.Multicluster // 处理 Kubernetes 多个集群的注册中心configController  model.ConfigStoreCache  // 统一处理配置数据(如 VirtualService 等) 的 ControllerConfigStores      []model.ConfigStoreCache // 不同配置信息的缓存器,提供 Get、List、Create 等方法serviceEntryStore *serviceentry.ServiceEntryStore // 单独处理 ServiceEntry 的 ControllerfileWatcher filewatcher.FileWatcher // 文件监听器,主要 watch meshconfig 和 networks 配置文件等startFuncs []startFunc // 保存了上述所有服务的启动函数,便于在 Start() 方法中批量启动及管理
}

再看 NewServer() 方法中的内容,有以下几个关键步骤:

image.png

我们对每个步骤逐一进行分析:

初始化 Environment

什么是 Environment 呢?根据定义 Environment 为 Pilot 提供了一个汇总的、运行中所需的 API 集合。 Environment 中字段(接口)如下:

type Environment struct {ServiceDiscovery  // 服务发现的接口模型,主要列出 services 和 instancesIstioConfigStore  // Istio 配置文件的存储器,主要列出 ServiceEntry 等配置mesh.Watcher      // mesh config 文件的监听器mesh.NetworksWatcher // mesh network config 文件的监听器PushContext *PushContext // 在推送(下发 xDS)生成期间保存信息的上下文DomainSuffix string // istio server 默认的后缀域名
}

其中 PushContext 是 Pilot 在推送 xDS 前,生成配置期间保存相关信息的上下文的地方,在全量推送配置和配置发生改变时重置。它会保存所有的错误和统计信息,并缓存一些配置的计算信息。 ServiceDiscovery 提供了枚举 Istio 中服务和实例的方法。 mesh.Watcher 和 mesh.NetworksWatcher 负责监听 istiod 启动时挂载的两个配置文件,这两个配置文件是通过 configmap 映射到 Pod 的文件系统中的,监听器将在监听到配置文件变化时运行预先注册的 Handler 。文件挂载参考 istiod 的配置文件:

apiVersion: v1
kind: Pod
metadata:name: istiod-56c488887d-z9k5cnamespace: istio-system
spec:containers:volumeMounts:- mountPath: /etc/istio/configname: config-volumevolumes:- configMap:defaultMode: 420name: istioname: config-volume

相应的配置存储在 istio-system/istio 这个 configmap 中,里面保存了 mesh 和 meshNetworks 两种配置,样例如下:

apiVersion: v1
kind: ConfigMap
metadata:name: istionamespace: istio-system
data:mesh: |-accessLogEncoding: TEXTaccessLogFile: ""accessLogFormat: ""defaultConfig:binaryPath: /usr/local/bin/mosnconcurrency: 2configPath: ./etc/istio/proxy...meshNetworks: 'networks: {}'

再回头看 Environment 的初始化:

e := &model.Environment{PushContext:  model.NewPushContext(),DomainSuffix: args.RegistryOptions.KubeOptions.DomainSuffix,
}
ac := aggregate.NewController(aggregate.Options{MeshHolder: e,
})
e.ServiceDiscovery = ac

首先是初始化了一份 PushContext ,创建 PushContext 所需的各种列表和 Map 。其次是初始化了一个聚合所有注册中心的 Controller 作为 Environment 中的 ServiceDiscovery 。该 Controller 提供从所有注册中心(如 Kubernetes, Consul, MCP 等)获取服务和实例列表的方法。这里传入了一个参数 MeshHolder 是想利用 Environment 中的 mesh.Watcher 将 mesh 这个配置同步过去。

初始化 Server

Server 的结构之前分析过,这里将之前初始化的 Environment 传入后,开始初始化 XDSServer 。

s := &Server{clusterID:       getClusterID(args),environment:     e,XDSServer:       xds.NewDiscoveryServer(e, args.Plugins), // 初始化 XDSServerfileWatcher:     filewatcher.NewWatcher(),httpMux:         http.NewServeMux(),monitoringMux:   http.NewServeMux(),readinessProbes: make(map[string]readinessProbe),
}

XDSServer 相关的代码在 istio/pilot/pkg/xds/discovery.go 中,对应为 DiscoveryServer ,该服务为 Envoy xDS APIs的 gRPC 实现。 DiscoveryServer 关键定义如下:

type DiscoveryServer struct {Env *model.Environment   // 即上述 pilot server 中的 EnvironmentConfigGenerator core.ConfigGenerator  // 控制面 Istio 配置的生成器,如 VirtualService 等Generators map[string]model.XdsResourceGenerator // 针对不同配置类型的定制化生成器concurrentPushLimit chan struct{}// 不同服务所有实例的集合,增量更新,key 为 service 和 namespace// EndpointShards 中是以不同的注册中心名为 key 分组保存实例EndpointShardsByService map[string]map[string]*EndpointShards pushChannel chan *model.PushRequest  // 接收 push 请求的 channelpushQueue *PushQueue     // 防抖之后,真正 Push xDS 之前所用的缓冲队列adsClients      map[string]*Connection  // ADS 和 EDS 的 gRPC 连接StatusReporter DistributionStatusCache  // 监听 xDS ACK 和连接断开// xDS 状态更新的生成器(更新 connect, disconnect, nacks, acks)// 状态更新后向所有 connection 推送 DiscoveryResponseInternalGen *InternalGen serverReady bool     // 表示缓存已同步,server 可以接受请求debounceOptions debounceOptions  // 防抖设置cache Cache // xDS 资源的缓存,目前仅适用于 EDS,线程安全
}

初始化 MeshConfig 、 KubeClient 、 MeshNetworks 和 MeshHandlers

s.initMeshConfiguration(args, s.fileWatcher)
if err := s.initKubeClient(args); err != nil {return nil, fmt.Errorf("error initializing kube client: %v", err)
}
s.initMeshNetworks(args, s.fileWatcher)
s.initMeshHandlers()

这几个初始化函数比较好理解, initMeshConfiguration 和 initMeshNetworks 都是通过 fileWatcher 对 istiod 从 configmap 中挂载的两个配置文件 mesh 和 meshNetworks 进行监听。当配置文件发生变化时重载配置并触发相应的 Handlers 。

filewatcher 的代码在另一个管理通用工具包的项目里: github.com/istio/pkg/filewatcher ,感兴趣的同学可以再详细研究下,底层使用到了 fsnotify[3] 这个库来推送文件变化事件。

initMeshHandlers 为上述两个配置文件注册了两个 Handler ,当配置文件发生变化时触发全量 xDS 下发。

初始化 Controllers

这部分比较核心,初始化了三种控制器分别处理证书、配置信息和注册信息,证书及安全相关的内容本篇先暂不讨论。主要来看 initConfigController 和 initServiceControllers 。

func (s *Server) initControllers(args *PilotArgs) error {log.Info("initializing controllers")if err := s.initCertController(args); err != nil {return fmt.Errorf("error initializing certificate controller: %v", err)}if err := s.initConfigController(args); err != nil {return fmt.Errorf("error initializing config controller: %v", err)}if err := s.initServiceControllers(args); err != nil {return fmt.Errorf("error initializing service controllers: %v", err)}return nil
}

配置信息大都是 Istio 定义的一系列 CRD(如 VirtualService 、 DestinationRules 等),一个控制面可以通过 MCP 同时接入多个 Kubernetes 之外的配置数据源,也可通过文件目录(主要用来调试)挂载,默认是读取 Kubernetes 中的配置数据:

func (s *Server) initK8SConfigStore(args *PilotArgs) error {configController, err := s.makeKubeConfigController(args)...s.initStatusController(args, features.EnableStatus) // 初始化上面提到的 StatusReporterreturn nil
}

配置数据包括以下类型,具体每个类型的含义 Istio 官网都有介绍及用例,这里不再赘述:

// PilotServiceApi contains only collections used by Pilot, including experimental Service Api.
PilotServiceApi = collection.NewSchemasBuilder().MustAdd(IstioNetworkingV1Alpha3Destinationrules).MustAdd(IstioNetworkingV1Alpha3Envoyfilters).MustAdd(IstioNetworkingV1Alpha3Gateways).MustAdd(IstioNetworkingV1Alpha3Serviceentries).MustAdd(IstioNetworkingV1Alpha3Sidecars).MustAdd(IstioNetworkingV1Alpha3Virtualservices).MustAdd(IstioNetworkingV1Alpha3Workloadentries).MustAdd(IstioNetworkingV1Alpha3Workloadgroups).MustAdd(IstioSecurityV1Beta1Authorizationpolicies).MustAdd(IstioSecurityV1Beta1Peerauthentications).MustAdd(IstioSecurityV1Beta1Requestauthentications).MustAdd(K8SServiceApisV1Alpha1Gatewayclasses).MustAdd(K8SServiceApisV1Alpha1Gateways).MustAdd(K8SServiceApisV1Alpha1Httproutes).MustAdd(K8SServiceApisV1Alpha1Tcproutes).Build()

详细看下 initK8SConfigStore 中的 makeKubeConfigController 方法,这里初始化了一个处理 Istio CRDs 的 Client ,实现 ConfigStoreCache 这个接口中增删改查等方法。

func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) {c, err := crdclient.New(s.kubeClient, buildLedger(args.RegistryOptions), args.Revision, args.RegistryOptions.KubeOptions)if err != nil {return nil, err}return c, nil
}

Client 定义如下:

type Client struct {schemas collection.Schemas  // Istio CRDs shemasdomainSuffix stringconfigLedger ledger.Ledgerrevision stringkinds map[resource.GroupVersionKind]*cacheHandler // 跟踪已知类型的所有缓存 handlerqueue queue.InstanceistioClient istioclient.InterfaceserviceApisClient serviceapisclient.Interface
}

再依次对这些类型创建 Informer 开启监听。回到 initConfigController ,创建好 ConfigStore 之后,再对其进一步包装:

// 将所有 ConfigStore 聚合并缓存
aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)
// 通过 s.configController 统一操作上面聚合的 ConfigStores
s.configController = aggregateConfigController
// 将其包装为 IstioConfigStore 传入 environment,便于操作 ServiceEntry/Gateway 等资源
// IstioConfigStore 会在之后的 ServiceEntryStore 中用到
s.environment.IstioConfigStore = model.MakeIstioStore(s.configController)

最后将该 Controller 的启动函数注册到 startFuncs 中:

s.addStartFunc(func(stop <-chan struct{}) error {go s.configController.Run(stop)return nil
})

再来看 initServiceControllers 处理服务发现的 Controller 初始化:

func (s *Server) initServiceControllers(args *PilotArgs) error {serviceControllers := s.ServiceController()for _, r := range args.RegistryOptions.Registries {// ...switch serviceRegistry {case serviceregistry.Kubernetes:if err := s.initKubeRegistry(serviceControllers, args); err != nil {return err}// ...}// ...
}

从之前初始化的 environment.ServiceDiscovery 中获取已注册的服务中心,如果是 Kubernetes 则执行 initKubeRegistry:

// initKubeRegistry creates all the k8s service controllers under this pilot
func (s *Server) initKubeRegistry(serviceControllers *aggregate.Controller, args *PilotArgs) (err error) {// ...log.Infof("Initializing Kubernetes service registry %q", args.RegistryOptions.KubeOptions.ClusterID)kubeRegistry := kubecontroller.NewController(s.kubeClient, args.RegistryOptions.KubeOptions)s.kubeRegistry = kubeRegistryserviceControllers.AddRegistry(kubeRegistry)return
}

进一步初始化 Kubernetes 注册中心,方法为 NewController ,先看一下这个 Controller 的结构:

type Controller struct {client kubernetes.Interfacequeue queue.InstanceserviceInformer cache.SharedIndexInformerserviceLister   listerv1.ServiceListerendpoints kubeEndpointsControllernodeInformer cache.SharedIndexInformernodeLister   listerv1.NodeListerpods *PodCachemetrics         model.MetricsnetworksWatcher mesh.NetworksWatcherxdsUpdater      model.XDSUpdaterdomainSuffix    stringclusterID       stringserviceHandlers  []func(*model.Service, model.Event)instanceHandlers []func(*model.ServiceInstance, model.Event)workloadHandlers []func(*model.WorkloadInstance, model.Event)sync.RWMutexservicesMap map[host.Name]*model.ServicenodeSelectorsForServices map[host.Name]labels.InstancenodeInfoMap map[string]kubernetesNodeexternalNameSvcInstanceMap map[host.Name][]*model.ServiceInstanceworkloadInstancesByIP map[string]*model.WorkloadInstanceranger cidranger.RangernetworkForRegistry stringonce sync.Once
}

可以看到 Controller 对 Services 、 Nodes 、 Pods 等资源各自初始化了 Informer 、 Lister 以及对应的 Map,各类 Handlers 在 Informer 监听到增删改查时推送相应的事件到 queue ,再由 onServiceEvent 、 onNodeEvent 、 c.pods.onEvent 中更新对应的 Map 。

回到 initServiceControllers ,初始化完 Kubernetes 注册中心之后,还需要关注 Kubernetes 集群之外的服务,这些服务基本都是通过 ServiceEntry 注册到控制面的,所有 ServiceEntry 配置数据目前还都在之前初始化的 configController 配置中心控制器中,这里将 ServiceEntry 数据单独拎出来初始化一个 ServicEntry 注册中心,加入到 serviceControllers 中:

s.serviceEntryStore = serviceentry.NewServiceDiscovery(s.configController, s.environment.IstioConfigStore, s.XDSServer)
serviceControllers.AddRegistry(s.serviceEntryStore)

serviceEntryStore 相关的逻辑会在后续 xDS 下发流程的分析中再阐述。

最后将 serviceControllers 中所有的服务注册中心的 Controller 的启动函数都注册到 startFuncs 中:

s.addStartFunc(func(stop <-chan struct{}) error {go serviceControllers.Run(stop)return nil
})
// Run starts all the controllers
func (c *Controller) Run(stop <-chan struct{}) {for _, r := range c.GetRegistries() {go r.Run(stop)}<-stoplog.Info("Registry Aggregator terminated")
}

初始化 RegistryEventHandlers

initRegistryEventHandlers 设置了三个事件处理器 serviceHandler 、 instanceHandler 和 configHandler 分别响应服务、实例和配置数据的更新事件。

serviceHandler 如下:

serviceHandler := func(svc *model.Service, _ model.Event) {pushReq := &model.PushRequest{Full: true,ConfigsUpdated: map[model.ConfigKey]struct{}{{Kind:      gvk.ServiceEntry,Name:      string(svc.Hostname),Namespace: svc.Attributes.Namespace,}: {}},Reason: []model.TriggerReason{model.ServiceUpdate},}s.XDSServer.ConfigUpdate(pushReq)
}
if err := s.ServiceController().AppendServiceHandler(serviceHandler); err != nil {return fmt.Errorf("append service handler failed: %v", err)
}

可以看到当服务本身发生变化时,会触发 xDS 的全量下发,所有与该服务相关的代理都会收到推送。

实例的变动也会触发 xDS 的全量下发,不过仅在连接 Consul 时生效。Kubernetes 和 MCP 这两种服务发现的场景下,更新事件的 Handler 是在别的地方注册的。

instanceHandler := func(si *model.ServiceInstance, _ model.Event) {// TODO: This is an incomplete code. This code path is called for consul, etc.// In all cases, this is simply an instance update and not a config update. So, we need to update// EDS in all proxies, and do a full config push for the instance that just changed (add/update only).s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true,ConfigsUpdated: map[model.ConfigKey]struct{}{{Kind:      gvk.ServiceEntry,Name:      string(si.Service.Hostname),Namespace: si.Service.Attributes.Namespace,}: {}},Reason: []model.TriggerReason{model.ServiceUpdate},})
}
// 跳过 Kubernetes 和 MCP
for _, registry := range s.ServiceController().GetRegistries() {// Skip kubernetes and external registries as they are handled separatelyif registry.Provider() == serviceregistry.Kubernetes ||registry.Provider() == serviceregistry.External {continue}if err := registry.AppendInstanceHandler(instanceHandler); err != nil {return fmt.Errorf("append instance handler to registry %s failed: %v", registry.Provider(), err)}
}

上一步初始化了 configController ,它操作的对象主要是像 VirtualService 、 DestinationRules 这些 Istio 定义的配置,这些配置的变化也会触发 xDS 的全量下发,所有与该配置相关的代理都会收到推送。不过 ServiceEntry 和 WorkloadEntry 除外,这两个资源的配置下发是由 ServiceEntryStore 管理的,之前在初始化 ServiceController 时定义的 s.serviceEntryStore 会处理,之后的篇幅再做详细介绍。

configHandler := func(_, curr model.Config, event model.Event) {pushReq := &model.PushRequest{Full: true,ConfigsUpdated: map[model.ConfigKey]struct{}{{Kind:      curr.GroupVersionKind,Name:      curr.Name,Namespace: curr.Namespace,}: {}},Reason: []model.TriggerReason{model.ConfigUpdate},}s.EnvoyXdsServer.ConfigUpdate(pushReq)
}

下面是跳过 ServiceEntry 和 WorkloadEntry 的代码:

for _, schema := range schemas {// This resource type was handled in external/servicediscovery.go, no need to rehandle here.if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind() {continue}if schema.Resource().GroupVersionKind() == collections.IstioNetworkingV1Alpha3Workloadentries.Resource().GroupVersionKind() {continue}s.configController.RegisterEventHandler(schema.Resource().GroupVersionKind(), configHandler)
}

初始化 DiscoveryService

func (s *Server) initDiscoveryService(args *PilotArgs) error {log.Infof("starting discovery service")// Implement EnvoyXdsServer grace shutdowns.addStartFunc(func(stop <-chan struct{}) error {s.EnvoyXdsServer.Start(stop)return nil})s.initGrpcServer(args.KeepaliveOptions)grpcListener, err := net.Listen("tcp", args.ServerOptions.GRPCAddr)if err != nil {return err}s.GRPCListener = grpcListenerreturn nil
}

这里将 EnvoyXdsServer 的启动添加至 startFuncs 中,便于后续统一启动。并初始化 gRPC 服务器,监听对应的端口。

初始化 gRPC 服务器,并注册 xDS V2 和 xDS V3 的 ADS 服务到 gRPC 服务器上:

func (s *Server) initGrpcServer(options *istiokeepalive.Options) {grpcOptions := s.grpcServerOptions(options)s.grpcServer = grpc.NewServer(grpcOptions...)s.EnvoyXdsServer.Register(s.grpcServer)reflection.Register(s.grpcServer)
}
func (s *DiscoveryServer) Register(rpcs *grpc.Server) {// Register v2 and v3 serversdiscovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)discoveryv2.RegisterAggregatedDiscoveryServiceServer(rpcs, s.createV2Adapter())
}

可以看到 ADS 的 gRPC 服务包含两个流式方法,一个是全量推送,一个是增量推送。

var _AggregatedDiscoveryService_serviceDesc = grpc.ServiceDesc{ServiceName: "envoy.service.discovery.v3.AggregatedDiscoveryService",HandlerType: (*AggregatedDiscoveryServiceServer)(nil),Methods:     []grpc.MethodDesc{},Streams: []grpc.StreamDesc{{StreamName:    "StreamAggregatedResources",Handler:       _AggregatedDiscoveryService_StreamAggregatedResources_Handler,ServerStreams: true,ClientStreams: true,},{StreamName:    "DeltaAggregatedResources",Handler:       _AggregatedDiscoveryService_DeltaAggregatedResources_Handler,ServerStreams: true,ClientStreams: true,},},Metadata: "envoy/service/discovery/v3/ads.proto",
}

注册 kubeClient.RunAndWait

将 kubeClient.RunAndWait 方法注册至 startFuncs 中, RunAndWait 启动后所有 Informer 将开始缓存,并等待它们同步完成。之所以在最后运行,可以保证所有的 Informer 都已经注册。

if s.kubeClient != nil {s.addStartFunc(func(stop <-chan struct{}) error {s.kubeClient.RunAndWait(stop)return nil})
}

启动过程

启动流程比较简单,核心是依次启动初始化过程中注册到 startFuncs 中的启动函数:

for _, fn := range s.startFuncs {if err := fn(stop); err != nil {return err}
}

然后调用 waitForCache 等待需要监听资源的 Informer 缓存完毕,完成后开启 HTTP 服务响应 readiness 事件。

至此 pilot-discovery 的启动流程就结束了,有了大概了解后,可以大致归纳出整个 Pilot 的接口架构。

接口设计

在接口设计方面,Pilot 主要有两类接口:一种是 Store 类接口,定义对资源的增删改查等方法;另一种是 Controller 类接口,定义了 RegisterEventHandler 和 Run 方法。

Store 类接口主要指 ConfigStore 接口,以及它衍生出的 IstioConfigStore,后者操作的对象为 Istio 定义的配置类型,如 VirtualServiceServiceEntry 等。

而 Controller 类接口指基于 ConfigStore 定义的 ConfigStoreCache 接口,这个接口在哪里用到了呢?之前讨论初始化流程的时候,分析过 Pilot 的 Server 的结构,其中用到该接口的有如下几个字段:

type Server struct {configController  model.ConfigStoreCacheConfigStores      []model.ConfigStoreCacheserviceEntryStore *serviceentry.ServiceEntryStore
}
type ServiceEntryStore struct {store model.IstioConfigStore
}

可以看到 ConfigStores 是存储所有配置类数据的 Controller 的地方,ConfigStores 都是在哪里添加的呢?之前分析 initConfigController 方法中提到过,可以再对照代码看一下调用的地方:

image.png

都添加完毕后,会把这些 ConfigStoreCache 都聚合到 Server.configController 中统一处理。

    // Wrap the config controller with a cache.aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)if err != nil {return err}s.configController = aggregateConfigController

而 ServiceEntryStore 中用到的 IstioConfigStore 也是在这里得到的:

s.environment.IstioConfigStore = model.MakeIstioStore(s.configController)

以上,当服务启动后,会逐个调用这些 ConfigStoreCache 中的 Run 方法处理资源的增删改事件。

总结

pilot-discovery 的启动流程初看是比较复杂,但理清楚中间核心的步骤后结构也比较清晰。有了本篇的介绍,之后再走读几遍代码,相信就能很好的掌握 pilot-discovery 初始化的流程。

Pilot 源码分析的第一部分就到这里,后续会针对重要的组件和接口做更细致的分析,如 EnvoyXdsServer 、ServiceEntryStore 等,以及梳理 xDS 协议的生成和下发流程,会比 pilot-discovery 的启动流程复杂的多,敬请期待。

参考

•Istio Pilot 代码深度解析 - 赵化冰[4]

引用链接

[1] 基础概念及名词: https://istio.io/latest/zh/docs/concepts/traffic-management/
[2] ServiceEntry: https://istio.io/latest/docs/reference/config/networking/service-entry/
[3] fsnotify: https://github.com/fsnotify/fsnotify
[4] Istio Pilot 代码深度解析 - 赵化冰: https://zhaohuabing.com/post/2019-10-21-pilot-discovery-code-analysis/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/307841.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

结营啦!有缘相聚于青训,未来高处见呀~~

&#x1f4f8;叮&#xff01; 记 字节跳动第一届青训营顺利结营啦&#xff01; 从8月份的青训营&#xff0c;到9月份的实训营&#xff0c;搁置了许久的结营心得终于拾起来辽&#xff01; &#x1f3ac;开营进行时 从答疑会开始&#xff0c;负责人仔细的阐述了本次训练营的…

MVC三层架构(详解)

1:初始MVC (1):三层架构 三层架构是指&#xff1a;视图层 View、服务层 Service&#xff0c;与持久层 Dao。它们分别完成不同的功能。 View 层&#xff1a;用于接收用户提交请求的代码在这里编写。 Service 层&#xff1a;系统的业务逻辑主要在这里完成。 Dao 层&#xff1a;…

「offer来了」保姆级巩固你的js知识体系(4.0w字)

「面试专栏」前端面试之JavaScript篇&#x1f9d0;序言&#x1f973;思维导图环节&#x1f60f;一、JS规范1、说几条JavaScript的基本规范。2、对原生JavaScript的了解。3、说下对JS的了解吧。4、JS原生拖拽节点5、谈谈你对ES6的理解6、知道ES6的class嘛&#xff1f;7、说说你对…

写作是人生最大的杠杆

职场&认知洞察 丨 作者 / 易洋 这是findyi公众号的第71篇原创文章不知不觉&#xff0c;公众号写作已经持续了9个月了。去年11月底&#xff0c;心血来潮写了第一篇文章&#xff0c;更多是为了复盘过去的一些工作经历。在前几天&#xff0c;读者数突破了3万&#xff0c;虽然…

拥塞控制(详解)

一&#xff1a;TCP的拥塞控制 1:是什么 (1):是什么(拥塞现象) 网络的 吞吐量 与 通信子网 负荷(即通信子网中正在传输的分组数)有着密切的关系。当 通信子网 负荷比较小时,网络的 吞吐量 (分组数/秒)随网络负荷(每个 节点 中分组的平均数)的增加而线性增加。当网络负荷增加到…

解决 WPF 绑定集合后数据变动界面却不更新的问题(使用 ObservableCollection)

解决 WPF 绑定集合后数据变动界面却不更新的问题独立观察员 2020 年 9 月 9 日在 .NET Core 3.1 的 WPF 程序中打算用 ListBox 绑定显示一个集合&#xff08;满足需求即可&#xff0c;无所谓什么类型的集合&#xff09;&#xff0c;以下是 Xaml 代码&#xff08;瞟一眼就行&…

Kubernetes Liveness and Readiness Probes

在设计关键任务、高可用应用程序时&#xff0c;弹性是要考虑的最重要因素之一。当应用程序可以快速从故障中恢复时&#xff0c;它便具有弹性。云原生应用程序通常设计为使用微服务架构&#xff0c;其中每个组件都位于容器中。为了确保Kubernetes托管的应用程序高可用&#xff0…

「offer来了」2种递进学习思维,24道计网题目,保姆级巩固你的计网知识体系

「面试专栏」前端面试之计算机网络篇⚾序言&#x1f3d0;一、基础知识环节1、专栏学习2、书籍学习⚽二、思维导图环节&#x1f3b3;三、OSI七层模型1、OSI模型是什么&#xff1f;2、OSI七层模型遵循原则&#x1f3cf;四、TCP与UDP1、TCP与UDP的区别2、TCP/UDP的优缺点&#xff…

leetcode236. 二叉树的最近公共祖先

一:题目 二:上码 /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode(int x) : val(x), left(NULL), right(NULL) {}* };*/ class Solution { public:/**思路:1.这里我们需要的是从底向上开…

进击吧! Blazor !第二期 页面制作

Blazor 是一个 Web UI 框架&#xff0c;可通过 WebAssembly 在任意浏览器中运行 .Net 。Blazor 旨在简化快速的单页面 .Net 浏览器应用的构建过程&#xff0c;它虽然使用了诸如 CSS 和 HTML 之类的 Web 技术&#xff0c;但它使用 C&#xff03;语言和 Razor 语法代替 JavaScrip…

「软件项目管理」一文详解软件项目管理概述

一文详解软件项目管理概述&#x1f6b5;前言&#x1f93d;一、项目与软件项目1、项目的定义2、项目的特征3、项目与日常运作举例&#xff08;1&#xff09;判断哪些活动是项目&#xff08;2&#xff09;举例结果&#xff08;3&#xff09;项目与日常运作区别总结4、软件项目的特…

初识ABP vNext(9):ABP模块化开发-文件管理

点击上方蓝字"小黑在哪里"关注我吧创建模块模块开发应用服务运行模块单元测试模块使用前言在之前的章节中介绍过ABP扩展实体&#xff0c;当时在用户表扩展了用户头像字段&#xff0c;用户头像就涉及到文件上传和文件存储。文件上传是很多系统都会涉及到的一个基础功能…

「offer来了」浏览器原理被问懵?5大知识板块巩固你的http知识体系(3.6w字)

「面试专栏」前端面试之浏览器原理篇&#x1f3d4;️序言&#x1f304;一、http和https协议&#xff08;一&#xff09;http和https之间的关系&#x1f9ed;1、http和https是什么&#xff1f;2、http和https的区别&#xff08;二&#xff09;http协议&#x1f9ed;1、http1.0、…

使用Azure DevOps Pipeline实现.Net Core程序的CD

上一次我们讲了使用Azure DevOps Pipeline实现.Net Core程序的CI。这次我们来演示下如何使用Azure DevOps实现.Net Core程序的CD。实现本次目标我们除了Azure DevOps外还需要&#xff1a;一台安装了Docker的主机一个 Docker Hub 账号上一次我们的CI实现了&#xff1a;发布>编…

TCP四次挥手(详解)

一:TCP四次挥手 1:图示 二:TCP四次挥手的过程 所谓的四次挥手即TCP连接的释放(解除)。连接的释放必须是一方主动释放&#xff0c;另一方被动释放。挥手之前主动释放连接的客户端结束ESTABLISHED阶段。随后开始“四次挥手”&#xff1a; a:首先客户端想要释放连接&#xff0c…

「软件项目管理」项目初始——项目确立与生存期模型

「软件项目管理」项目初始阶段——项目确立与生存期模型&#x1f6f0;️序言Preface&#x1f680;一、项目评估1、评估内容2、净利润与投资回报率3、举例阐述&#x1fa90;二、项目立项1、立项流程2、Make or Buy决策3、Make or Buy决策实例&#x1f6f8;三、项目招投标1、项目…

双城生活,一种相对无奈且幸福的选择

这是头哥侃码的第215篇原创我小时候经常被人问到一个问题&#xff1a;“你喜欢夏天还是冬天&#xff1f;”“夏天啊&#xff01;因为夏天可以有两个月的暑假&#xff0c;而且还可以玩水&#xff0c;还有清凉的盐水棒冰、短裤和凉拖&#xff0c;还可以在空调间里打游戏&#xff…

leetcode450. 删除二叉搜索树中的节点(详解)

一:题目 二:上码 1.确定递归函数和参数 TreeNode* deleteNode(TreeNode* root, int key) 这里的返回参数 我们也用一个指针接住&#xff0c;反正我们最终是返回的整棵树&#xff08;如果没找到那就是空&#xff09; 2.确定递归函数的终止条件 if(root NULL) return root; 3.确…

「offer来了」进程线程有啥关系?10个知识点带你巩固操作系统基础知识

「面试专栏」前端面试之操作系统篇&#x1f3b9;序言&#x1f3b8;一、思维导图&#x1f3ba;二、常见面试题1、进程和线程以及它们的区别2、进程间通信的几种方式&#xff08;1&#xff09;管道(pipe)及命名管道(named pipe)&#xff08;2&#xff09;信号(signal)&#xff08…

推荐一个IT老鸟肝了2月有余的免费开源WPF企业级开发框架

JHRS一个新学WPF的IT老鸟&#xff0c;肝了2个月做了这么一个WPF企业级开发框架&#xff0c;站长clone学习&#xff0c;觉得甚是不错。这是一个使用了Prism搭建的插件式框架&#xff0c;封装了DataGrid的使用&#xff0c;使整个框架子模块简单易学、易扩展&#xff0c;特别是作者…