Istio Pilot源码学习(一):Pilot-Discovery启动流程、ConfigController配置规则发现

本文基于Istio 1.18.0版本进行源码学习

1、Pilot-Discovery工作原理

Pilot-Discovery是Istio控制面的核心,负责服务网格中的流量管理以及控制面和数据面之间的配置下发

Pilot-Discovery从注册中心(如Kubernetes)获取服务信息并汇集,从Kubernetes API Server中获取配置规则,将服务信息和配置数据转换为xDS接口的标准数据结构,通过GRPC下发到数据面的Envoy

2、Pilot-Discovery代码结构

在这里插入图片描述

Pilot-Discovery的入口函数为:pilot/cmd/pilot-discovery/main.go中的main方法。main方法中创建了Pilot Server,Pilot Server中主要包含三部分逻辑:

  • ConfigController:管理各种配置数据,包括用户创建的流量管理规则和策略
  • ServiceController:获取Service Registry中的服务发现数据
  • DiscoveryService:主要包含下述逻辑:
    • 启动GRPC Server并接收来自Envoy端的连接请求
    • 接收Envoy端的xDS请求,从ConfigController和ServiceController中获取配置和服务信息,生成响应消息发送给Envoy
    • 监听来自ConfigController的配置变化消息和ServiceController的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy

3、Pilot-Discovery启动流程

创建Pilot Server代码如下:

// pilot/pkg/bootstrap/server.go
func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {e := model.NewEnvironment()e.DomainSuffix = args.RegistryOptions.KubeOptions.DomainSuffixe.SetLedger(buildLedger(args.RegistryOptions))ac := aggregate.NewController(aggregate.Options{MeshHolder: e,})e.ServiceDiscovery = acs := &Server{clusterID:               getClusterID(args),environment:             e,fileWatcher:             filewatcher.NewWatcher(),httpMux:                 http.NewServeMux(),monitoringMux:           http.NewServeMux(),readinessProbes:         make(map[string]readinessProbe),readinessFlags:          &readinessFlags{},workloadTrustBundle:     tb.NewTrustBundle(nil),server:                  server.New(),shutdownDuration:        args.ShutdownDuration,internalStop:            make(chan struct{}),istiodCertBundleWatcher: keycertbundle.NewWatcher(),webhookInfo:             &webhookInfo{},}// Apply custom initialization functions.for _, fn := range initFuncs {fn(s)}// Initialize workload Trust Bundle before XDS Servere.TrustBundle = s.workloadTrustBundle// 初始化discoveryServers.XDSServer = xds.NewDiscoveryServer(e, args.PodName, s.clusterID, args.RegistryOptions.KubeOptions.ClusterAliases)prometheus.EnableHandlingTimeHistogram()// make sure we have a readiness probe before serving HTTP to avoid marking ready too soons.initReadinessProbes()// 初始化http和grpc server,向grpc server注册discoveryServers.initServers(args)if err := s.initIstiodAdminServer(args, s.webhookInfo.GetTemplates); err != nil {return nil, fmt.Errorf("error initializing debug server: %v", err)}if err := s.serveHTTP(); err != nil {return nil, fmt.Errorf("error serving http: %v", err)}// Apply the arguments to the configuration.if err := s.initKubeClient(args); err != nil {return nil, fmt.Errorf("error initializing kube client: %v", err)}// used for both initKubeRegistry and initClusterRegistriesargs.RegistryOptions.KubeOptions.EndpointMode = kubecontroller.DetectEndpointMode(s.kubeClient)s.initMeshConfiguration(args, s.fileWatcher)spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())s.initMeshNetworks(args, s.fileWatcher)s.initMeshHandlers()s.environment.Init()if err := s.environment.InitNetworksManager(s.XDSServer); err != nil {return nil, err}// Options based on the current 'defaults' in istio.caOpts := &caOptions{TrustDomain:      s.environment.Mesh().TrustDomain,Namespace:        args.Namespace,DiscoveryFilter:  args.RegistryOptions.KubeOptions.GetFilter(),ExternalCAType:   ra.CaExternalType(externalCaType),CertSignerDomain: features.CertSignerDomain,}if caOpts.ExternalCAType == ra.ExtCAK8s {// Older environment variable preserved for backward compatibilitycaOpts.ExternalCASigner = k8sSigner}// CA signing certificate must be created first if needed.if err := s.maybeCreateCA(caOpts); err != nil {return nil, err}// 初始化configController和serviceControllerif err := s.initControllers(args); err != nil {return nil, err}s.XDSServer.InitGenerators(e, args.Namespace, s.internalDebugMux)// Initialize workloadTrustBundle after CA has been initializedif err := s.initWorkloadTrustBundle(args); err != nil {return nil, err}// Parse and validate Istiod Address.istiodHost, _, err := e.GetDiscoveryAddress()if err != nil {return nil, err}// Create Istiod certs and setup watches.if err := s.initIstiodCerts(args, string(istiodHost)); err != nil {return nil, err}// Secure gRPC Server must be initialized after CA is created as may use a Citadel generated cert.if err := s.initSecureDiscoveryService(args); err != nil {return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)}// common https server for webhooks (e.g. injection, validation)if s.kubeClient != nil {s.initSecureWebhookServer(args)wh, err := s.initSidecarInjector(args)if err != nil {return nil, fmt.Errorf("error initializing sidecar injector: %v", err)}s.webhookInfo.mu.Lock()s.webhookInfo.wh = whs.webhookInfo.mu.Unlock()if err := s.initConfigValidation(args); err != nil {return nil, fmt.Errorf("error initializing config validator: %v", err)}}// This should be called only after controllers are initialized.// 向configController和serviceController注册事件回调函数s.initRegistryEventHandlers()// 设置discoveryServer启动函数s.initDiscoveryService()// Notice that the order of authenticators matters, since at runtime// authenticators are activated sequentially and the first successful attempt// is used as the authentication result.authenticators := []security.Authenticator{&authenticate.ClientCertAuthenticator{},}if args.JwtRule != "" {jwtAuthn, err := initOIDC(args)if err != nil {return nil, fmt.Errorf("error initializing OIDC: %v", err)}if jwtAuthn == nil {return nil, fmt.Errorf("JWT authenticator is nil")}authenticators = append(authenticators, jwtAuthn)}// The k8s JWT authenticator requires the multicluster registry to be initialized,// so we build it later.if s.kubeClient != nil {authenticators = append(authenticators,kubeauth.NewKubeJWTAuthenticator(s.environment.Watcher, s.kubeClient.Kube(), s.clusterID, s.multiclusterController.GetRemoteKubeClient, features.JwtPolicy))}if len(features.TrustedGatewayCIDR) > 0 {authenticators = append(authenticators, &authenticate.XfccAuthenticator{})}if features.XDSAuth {s.XDSServer.Authenticators = authenticators}caOpts.Authenticators = authenticators// Start CA or RA server. This should be called after CA and Istiod certs have been created.s.startCA(caOpts)// TODO: don't run this if galley is started, one ctlz is enoughif args.CtrlZOptions != nil {_, _ = ctrlz.Run(args.CtrlZOptions, nil)}// This must be last, otherwise we will not know which informers to registerif s.kubeClient != nil {s.addStartFunc(func(stop <-chan struct{}) error {s.kubeClient.RunAndWait(stop)return nil})}return s, nil
}

NewServer()方法中核心逻辑如下:

  1. 初始化DiscoveryServer
  2. 初始化HTTP和GRPC Server,向GRPC Server注册DiscoveryServer
  3. 初始化ConfigController和ServiceController
  4. 向ConfigController和ServiceController注册事件回调函数,有配置和服务信息变更时会通知DiscoveryServer
  5. 设置DiscoveryServer启动函数

Pilot Server定义如下:

// pilot/pkg/bootstrap/server.go
type Server struct {// discoveryServerXDSServer *xds.DiscoveryServerclusterID cluster.ID// pilot环境所需的api集合environment *model.Environment// 处理kubernetes主集群的注册中心kubeClient kubelib.Client// 处理kubernetes多个集群的注册中心multiclusterController *multicluster.Controller// 统一处理配置规则的controllerconfigController model.ConfigStoreController// 配置规则缓存ConfigStores []model.ConfigStoreController// 负责serviceEntry的服务发现serviceEntryController *serviceentry.ControllerhttpServer  *http.Server // debug, monitoring and readiness Server.httpAddr    stringhttpsServer *http.Server // webhooks HTTPS Server.grpcServer        *grpc.ServergrpcAddress       stringsecureGrpcServer  *grpc.ServersecureGrpcAddress string// monitoringMux listens on monitoringAddr(:15014).// Currently runs prometheus monitoring and debug (if enabled).monitoringMux *http.ServeMux// internalDebugMux is a mux for *internal* calls to the debug interface. That is, authentication is disabled.internalDebugMux *http.ServeMux// httpMux listens on the httpAddr (8080).// If a Gateway is used in front and https is off it is also multiplexing// the rest of the features if their port is empty.// Currently runs readiness and debug (if enabled)httpMux *http.ServeMux// httpsMux listens on the httpsAddr(15017), handling webhooks// If the address os empty, the webhooks will be set on the default httpPort.httpsMux *http.ServeMux // webhooks// fileWatcher used to watch mesh config, networks and certificates.fileWatcher filewatcher.FileWatcher// certWatcher watches the certificates for changes and triggers a notification to Istiod.cacertsWatcher *fsnotify.WatcherdnsNames       []stringCA *ca.IstioCARA ra.RegistrationAuthority// TrustAnchors for workload to workload mTLSworkloadTrustBundle     *tb.TrustBundlecertMu                  sync.RWMutexistiodCert              *tls.CertificateistiodCertBundleWatcher *keycertbundle.Watcher// pilot的所有组件都注册启动任务到此对象,便于在Start()方法中批量启动及管理server server.InstancereadinessProbes map[string]readinessProbereadinessFlags  *readinessFlags// duration used for graceful shutdown.shutdownDuration time.Duration// internalStop is closed when the server is shutdown. This should be avoided as much as possible, in// favor of AddStartFunc. This is only required if we *must* start something outside of this process.// For example, everything depends on mesh config, so we use it there rather than trying to sequence everything// in AddStartFuncinternalStop chan struct{}webhookInfo *webhookInfostatusReporter *distribution.ReporterstatusManager  *status.Manager// RWConfigStore is the configstore which allows updates, particularly for status.RWConfigStore model.ConfigStoreController
}

Pilot-Discovery启动流程如下图:

在这里插入图片描述

3、配置规则发现:ConfigController

Pilot的配置规则指网络路由规则及网络安全规则,包含Virtualservice、Destinationrule、Gateway、PeerAuthentication、RequestAuthentication等资源。目前支持三种类型的ConfigController:

  • MCP:是一种服务网格配置传输协议,用于隔离Pilot和底层平台(Kubernetes、文件系统或者其他注册中心),使得Pilot无需感知底层平台的差异,更专注于Envoy xDS配置的生成与分发
  • Kubernetes:基于Kubernetes的Config发现利用了Kubernetes Informer的List-Watch能力。在Kubernetes集群中,Config以CustomResource的形式存在。Pilot通过配置控制器(CRD Controller)监听Kubernetes APIServer配置规则资源,维护所有资源的缓存,并触发事件处理回调函数
  • File:通过文件监视器周期性地读取本地配置文件,将配置规则缓存在内存中,并维护配置的增加、更新、删除事件,当缓存有变化时,异步通知内存控制器执行事件回调函数

1)、ConfigController的核心接口

ConfigController实现了ConfigStoreController接口:

// pilot/pkg/model/config.go
type ConfigStoreController interface {// 配置缓存接口ConfigStore// 注册事件处理函数// RegisterEventHandler adds a handler to receive config update events for a// configuration typeRegisterEventHandler(kind config.GroupVersionKind, handler EventHandler)// 运行控制器// Run until a signal is received.// Run *should* block, so callers should typically call `go controller.Run(stop)`Run(stop <-chan struct{})// 配置缓存是否已同步// HasSynced returns true after initial cache synchronization is completeHasSynced() bool
}

ConfigStoreController继承ConfigStore接口,ConfigStore为控制器核心的资源缓存接口提供了对Config资源的增删改查功能:

// pilot/pkg/model/config.go
type ConfigStore interface {// Schemas exposes the configuration type schema known by the config store.// The type schema defines the bidirectional mapping between configuration// types and the protobuf encoding schema.Schemas() collection.Schemas// Get retrieves a configuration element by a type and a keyGet(typ config.GroupVersionKind, name, namespace string) *config.Config// List returns objects by type and namespace.// Use "" for the namespace to list across namespaces.List(typ config.GroupVersionKind, namespace string) []config.Config// Create adds a new configuration object to the store. If an object with the// same name and namespace for the type already exists, the operation fails// with no side effects.Create(config config.Config) (revision string, err error)// Update modifies an existing configuration object in the store.  Update// requires that the object has been created.  Resource version prevents// overriding a value that has been changed between prior _Get_ and _Put_// operation to achieve optimistic concurrency. This method returns a new// revision if the operation succeeds.Update(config config.Config) (newRevision string, err error)UpdateStatus(config config.Config) (newRevision string, err error)// Patch applies only the modifications made in the PatchFunc rather than doing a full replace. Useful to avoid// read-modify-write conflicts when there are many concurrent-writers to the same resource.Patch(orig config.Config, patchFn config.PatchFunc) (string, error)// Delete removes an object from the store by key// For k8s, resourceVersion must be fulfilled before a deletion is carried out.// If not possible, a 409 Conflict status will be returned.Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error
}

2)、ConfigController的初始化

Kubernetes ConfigController实际上是一个CRD Operator,它从Kubernetes API Server监听所有的Istio API资源,其初始化过程如下:

crdclient.New()方法代码如下:

// pilot/pkg/config/kube/crdclient/client.go
func New(client kube.Client, opts Option) (*Client, error) {schemas := collections.Pilotif features.EnableGatewayAPI {schemas = collections.PilotGatewayAPI()}return NewForSchemas(client, opts, schemas)
}

collections.Pilot中定义了Istio所有的Config资源类型,代码如下:

// pkg/config/schema/collections/collections.gen.go// Pilot contains only collections used by Pilot.Pilot = collection.NewSchemasBuilder().MustAdd(AuthorizationPolicy).MustAdd(DestinationRule).MustAdd(EnvoyFilter).MustAdd(Gateway).MustAdd(PeerAuthentication).MustAdd(ProxyConfig).MustAdd(RequestAuthentication).MustAdd(ServiceEntry).MustAdd(Sidecar).MustAdd(Telemetry).MustAdd(VirtualService).MustAdd(WasmPlugin).MustAdd(WorkloadEntry).MustAdd(WorkloadGroup).Build()

crdclient.New()方法中调用了NewForSchemas()方法:

// pilot/pkg/config/kube/crdclient/client.go
func NewForSchemas(client kube.Client, opts Option, schemas collection.Schemas) (*Client, error) {schemasByCRDName := map[string]resource.Schema{}for _, s := range schemas.All() {// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())schemasByCRDName[name] = s}// 实例化crd clientout := &Client{domainSuffix:     opts.DomainSuffix,schemas:          schemas,schemasByCRDName: schemasByCRDName,revision:         opts.Revision,queue:            queue.NewQueue(1 * time.Second),kinds:            map[config.GroupVersionKind]*cacheHandler{},handlers:         map[config.GroupVersionKind][]model.EventHandler{},client:           client,// 创建crdWatcher,监听crd的创建crdWatcher:       crdwatcher.NewController(client),logger:           scope.WithLabels("controller", opts.Identifier),namespacesFilter: opts.NamespacesFilter,crdWatches: map[config.GroupVersionKind]*waiter{gvk.KubernetesGateway: newWaiter(),gvk.GatewayClass:      newWaiter(),},}// 添加回调函数,当crd创建时调用handleCRDAdd方法out.crdWatcher.AddCallBack(func(name string) {handleCRDAdd(out, name)})// 获取集群中当前所有的crdknown, err := knownCRDs(client.Ext())if err != nil {return nil, err}// 遍历istio所有的config资源类型for _, s := range schemas.All() {// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."name := fmt.Sprintf("%s.%s", s.Plural(), s.Group())if s.IsBuiltin() {handleCRDAdd(out, name)} else {// istio config资源类型对应crd已创建,调用handleCRDAdd方法if _, f := known[name]; f {handleCRDAdd(out, name)} else {out.logger.Warnf("Skipping CRD %v as it is not present", s.GroupVersionKind())}}}return out, nil
}

NewForSchemas()方法中实例化了CRD Client,CRD Client定义如下:

// pilot/pkg/config/kube/crdclient/client.go
type Client struct {// schemas defines the set of schemas used by this client.// Note: this must be a subset of the schemas defined in the codegenschemas collection.Schemas// domainSuffix for the config metadatadomainSuffix string// revision for this control plane instance. We will only read configs that match this revision.revision string// kinds keeps track of all cache handlers for known types// 记录所有资源类型对应的informer控制器kinds   map[config.GroupVersionKind]*cacheHandlerkindsMu sync.RWMutex// 事件处理队列queue queue.Instance// handlers defines a list of event handlers per-type// 资源类型及对应的事件处理回调函数handlers map[config.GroupVersionKind][]model.EventHandler// crd相关的schemaschemasByCRDName map[string]resource.Schema// kubernetes客户端,包含istioClient操作istio api对象,istio informer监听istio api对象变更事件client kube.Client// 监听crd的创建crdWatcher *crdwatcher.Controllerlogger     *log.Scope// namespacesFilter is only used to initiate filtered informer.namespacesFilter func(obj interface{}) bool// crdWatches notifies consumers when a CRD is presentcrdWatches map[config.GroupVersionKind]*waiterstop       <-chan struct{}
}

3)、ConfigController的工作机制

Kubernetes ConfigController为每种Config资源都创建了一个Informer,用于监听所有Config资源并注册EventHandler

NewForSchemas()方法中,如果Istio Config资源类型对应CRD已创建或者crdWatcher监听CRD创建后,都会调用handleCRDAdd()方法:

// pilot/pkg/config/kube/crdclient/client.go
func handleCRDAdd(cl *Client, name string) {cl.logger.Debugf("adding CRD %q", name)s, f := cl.schemasByCRDName[name]if !f {cl.logger.Debugf("added resource that we are not watching: %v", name)return}resourceGVK := s.GroupVersionKind()gvr := s.GroupVersionResource()cl.kindsMu.Lock()defer cl.kindsMu.Unlock()if _, f := cl.kinds[resourceGVK]; f {cl.logger.Debugf("added resource that already exists: %v", resourceGVK)return}var i informers.GenericInformervar ifactory startervar err error// 根据api group添加到不同的sharedInformerFactory中switch s.Group() {case gvk.KubernetesGateway.Group:ifactory = cl.client.GatewayAPIInformer()i, err = cl.client.GatewayAPIInformer().ForResource(gvr)case gvk.Pod.Group, gvk.Deployment.Group, gvk.MutatingWebhookConfiguration.Group:ifactory = cl.client.KubeInformer()i, err = cl.client.KubeInformer().ForResource(gvr)case gvk.CustomResourceDefinition.Group:ifactory = cl.client.ExtInformer()i, err = cl.client.ExtInformer().ForResource(gvr)default:ifactory = cl.client.IstioInformer()i, err = cl.client.IstioInformer().ForResource(gvr)}if err != nil {// Shouldn't happencl.logger.Errorf("failed to create informer for %v: %v", resourceGVK, err)return}_ = i.Informer().SetTransform(kube.StripUnusedFields)// 调用createCacheHandler方法,为informer添加事件回调函数cl.kinds[resourceGVK] = createCacheHandler(cl, s, i)if w, f := cl.crdWatches[resourceGVK]; f {cl.logger.Infof("notifying watchers %v was created", resourceGVK)w.once.Do(func() {close(w.stop)})}// Start informer. In startup case, we will not start here as// we will start all factories once we are ready to initialize.// For dynamically added CRDs, we need to start immediately thoughif cl.stop != nil {// 启动informerifactory.Start(cl.stop)}
}

每种Informer的事件回调函数均通过createCacheHandler()方法注册,代码如下:

// pilot/pkg/config/kube/crdclient/cache_handler.go
func createCacheHandler(cl *Client, schema resource.Schema, i informers.GenericInformer) *cacheHandler {scope.Debugf("registered CRD %v", schema.GroupVersionKind())h := &cacheHandler{client: cl,schema: schema,// 创建informer,支持配置namespace级别隔离informer: kclient.NewUntyped(cl.client, i.Informer(), kclient.Filter{ObjectFilter: cl.namespacesFilter}),}kind := schema.Kind()// 添加事件回调函数h.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj any) {incrementEvent(kind, "add")// 创建任务对象并将其发送到任务队列中cl.queue.Push(func() error {return h.onEvent(nil, obj, model.EventAdd)})},UpdateFunc: func(old, cur any) {incrementEvent(kind, "update")cl.queue.Push(func() error {return h.onEvent(old, cur, model.EventUpdate)})},DeleteFunc: func(obj any) {incrementEvent(kind, "delete")cl.queue.Push(func() error {return h.onEvent(nil, obj, model.EventDelete)})},})return h
}

当Config资源在Kubernetes中创建、更新和删除时,EventHandler会创建任务对象并将其发送到任务列中,然后由任务处理协程处理。处理资源变化的onEvent()方法代码如下:

// pilot/pkg/config/kube/crdclient/cache_handler.go
func (h *cacheHandler) onEvent(old any, curr any, event model.Event) error {currItem := controllers.ExtractObject(curr)if currItem == nil {return nil}// 进行对象转换currConfig := TranslateObject(currItem, h.schema.GroupVersionKind(), h.client.domainSuffix)var oldConfig config.Configif old != nil {oldItem, ok := old.(runtime.Object)if !ok {log.Warnf("Old Object can not be converted to runtime Object %v, is type %T", old, old)return nil}oldConfig = TranslateObject(oldItem, h.schema.GroupVersionKind(), h.client.domainSuffix)}if h.client.objectInRevision(&currConfig) {// 执行事件处理回调函数h.callHandlers(oldConfig, currConfig, event)return nil}// Check if the object was in our revision, but has been moved to a different revision. If so,// it has been effectively deleted from our revision, so process it as a delete event.if event == model.EventUpdate && old != nil && h.client.objectInRevision(&oldConfig) {log.Debugf("Object %s/%s has been moved to a different revision, deleting",currConfig.Namespace, currConfig.Name)// 执行事件处理回调函数h.callHandlers(oldConfig, currConfig, model.EventDelete)return nil}log.Debugf("Skipping event %s for object %s/%s from different revision",event, currConfig.Namespace, currConfig.Name)return nil
}func (h *cacheHandler) callHandlers(old config.Config, curr config.Config, event model.Event) {// TODO we may consider passing a pointer to handlers instead of the value. While spec is a pointer, the meta will be copied// 执行该资源类型对应的事件处理回调函数for _, f := range h.client.handlers[h.schema.GroupVersionKind()] {f(old, curr, event)}
}

onEvent()方法中通过TranslateObject()方法进行对象转换,然后执行该资源类型对应的事件处理回调函数

h.client.handlers是各种资源类型的处理函数集合,是通过ConfigController的RegisterEventHandler()注册的,注册代码如下:

// pilot/pkg/bootstrap/server.go
func (s *Server) initRegistryEventHandlers() {...if s.configController != nil {configHandler := func(prev config.Config, curr config.Config, event model.Event) {defer func() {// 状态报告if event != model.EventDelete {s.statusReporter.AddInProgressResource(curr)} else {s.statusReporter.DeleteInProgressResource(curr)}}()log.Debugf("Handle event %s for configuration %s", event, curr.Key())// For update events, trigger push only if spec has changed.// 对于更新事件,仅当对象的spec发生变化时才触发xds推送if event == model.EventUpdate && !needsPush(prev, curr) {log.Debugf("skipping push for %s as spec has not changed", prev.Key())return}// 触发xds全量更新pushReq := &model.PushRequest{Full:           true,ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.MustFromGVK(curr.GroupVersionKind), Name: curr.Name, Namespace: curr.Namespace}),Reason:         []model.TriggerReason{model.ConfigUpdate},}s.XDSServer.ConfigUpdate(pushReq)}schemas := collections.Pilot.All()if features.EnableGatewayAPI {schemas = collections.PilotGatewayAPI().All()}for _, schema := range schemas {// This resource type was handled in external/servicediscovery.go, no need to rehandle here.// 下面3种类型在serviceEntry controller中处理,这里不用为其注册事件处理函数if schema.GroupVersionKind() == gvk.ServiceEntry {continue}if schema.GroupVersionKind() == gvk.WorkloadEntry {continue}if schema.GroupVersionKind() == gvk.WorkloadGroup {continue}// 注册其他所有api对象的事件处理函数s.configController.RegisterEventHandler(schema.GroupVersionKind(), configHandler)}...
}

完整的Config事件处理流程如下图所示:

  1. EventHandler构造任务(Task),任务实际上是对onEvent函数的封装
  2. EventHandler将任务推送到任务队列中
  3. 任务处理协程阻塞式地读取任务队列,执行任务,通过onEvent方法处理事件,并通过ConfigHandler触发xDS的更新

参考:

《Istio权威指南 下》

3.深入Istio源码:Pilot配置规则ConfigController

Istio Pilot代码深度解析

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/8157.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

直播带货app开发开发流程分析

随着小视频管理体系愈来愈变成人们的生活中的一部分&#xff0c;也随之短视频卖货逐步形成岗位内主流的转现方式&#xff0c;将短视频平台生产制造变成短视频带货体系计划愈来愈多&#xff0c;那样&#xff0c;把小视频管理体系开发设计变成短视频带货体系必须两步&#xff1f;…

谷粒商城篇章5 ---- P173-P192 ---- 检索服务【分布式高级篇二】

目录 1 检索服务 1.1 搭建页面环境 1.1.1 引入依赖 1.1.2 将检索页面放到gulimall-search的src/main/resources/templates/目录下 1.1.3 调整搜索页面 1.1.4 将静态资源放到linux的nginx相关映射目录下/root/docker/nginx/html/static/ search/ 1.1.5 SwitchHosts配置域…

疲劳驾驶检测和识别3:Android实现疲劳驾驶检测和识别(含源码,可实时检测)

疲劳驾驶检测和识别3&#xff1a;Android实现疲劳驾驶检测和识别(含源码&#xff0c;可实时检测) 目录 疲劳驾驶检测和识别3&#xff1a;Android实现疲劳驾驶检测和识别(含源码&#xff0c;可实时检测) 1.疲劳驾驶检测和识别方法 2.人脸检测方法 3.疲劳驾驶检测和识别模型…

《人工智能安全》课程总体结构

1 课程内容 人工智能安全观&#xff1a;人工智能安全问题、安全属性、技术体系等基本问题进行了归纳整理。人工智能安全的主要数据处理方法&#xff0c;即非平衡数据分类、噪声数据处理和小样本学习。人工智能技术赋能网络空间安全攻击与防御&#xff1a;三个典型实例及攻击图…

Vue异步更新、$nextTick

需求&#xff1a;编辑标题, 编辑框自动聚焦 1. 点击编辑&#xff0c;显示编辑框 2. 让编辑框&#xff0c; 立刻获取焦点 this. isShowEdit true // 显示输入框 this . $refs . inp . focus () // 获取焦点 问题&#xff1a;"显示之后"&#xff0c;立刻获…

24 鼠标常用事件

鼠标进入&#xff1a;enterEvent鼠标离开&#xff1a;leaveEvent鼠标按下&#xff1a;mousePressEvent鼠标释放&#xff1a;mouseRelaseEvent鼠标移动&#xff1a;mouseMoveEvent 提升为自定义控件MyLabel 代码&#xff1a; //mylabel.h #ifndef MYLABEL_H #define MYLABEL_H#…

易班开放应用授权重定向,出现跨域的解决方案

问题描述 今天开发H5网站需要接入易班&#xff0c;经过易班授权然后重定向&#xff08;code: 302&#xff09;&#xff0c;使用axios发请求&#xff0c;但是前后端均配置跨域的情况下&#xff0c;不管怎么弄都是一直跨域 但是我们看network&#xff0c;network中对应请求的res…

微服务初始

今天准备开始学习微服务&#xff0c;使用微服务肯定是因为他有好处。 首先了解到的三种架构&#xff0c;传统单体&#xff0c;集群架构&#xff0c;微服务架构 单体架构 有单点问题&#xff0c;如果宕机所有的服务都不可用所有业务的功能模块都聚集在一起&#xff0c;如果代…

tinkerCAD案例:9. Saw Shaped Wrench 锯形扳手

tinkerCAD案例&#xff1a;9. Saw Shaped Wrench 锯形扳手 ln this lesson you will learn how to create a cool saw shaped wrench. 在本课中&#xff0c;您将学习如何制作一个很酷的锯形扳手。 Start the lesson by dragging a polygon to the workplane. 通过将多边形拖动…

Windows实现端口转发(附配置过程图文详解)

文章目录 1. 前言2. 命令提示符3. 防火墙4. netsh 命令4.1 查看已有的转发规则4.2 新增转发规则4.3 删除转发规则 5. 图解汇总6. 欢迎纠正~ 1. 前言 利用Windows端口转发&#xff0c;实现本地设备 ⬅➡ 公网主机 ⬅➡ 远端服务器 2. 命令提示符 以管理员身份打开“命令提示…

python调用百度ai将图片识别为表格excel

python调用百度ai将图片识别为表格excel ocr ocr 百度ai官方文档&#xff1a;https://ai.baidu.com/ai-doc/OCR/Ik3h7y238 import requests import json import base64 import time文档&#xff1a;https://ai.baidu.com/ai-doc/OCR/Ik3h7y238 # 获取access_token def get_acc…

vue3+ts+element-plus 之使用node.js对接mysql进行表格数据展示

vue3tselement-plus axiosnode.jsmysql开发管理系统之表格展示 ✏️ 1. 新建一个node项目* 初始化node* 安装可能用到的依赖* 配置文件目录* 添加路由router1. 添加router.js文件&#xff0c;添加一个test目录2. 修改app.js ,引入router&#x1f4d2; 3. 启动并在浏览器打开 * …

【1++的C++初阶】之适配器

&#x1f44d;作者主页&#xff1a;进击的1 &#x1f929; 专栏链接&#xff1a;【1的C初阶】 文章目录 一&#xff0c;什么是适配器二&#xff0c;栈与队列模拟实现三&#xff0c;优先级队列四&#xff0c;reverse_iterator 一&#xff0c;什么是适配器 适配器作为STL的六大组…

【爬虫逆向案例】某道翻译js逆向—— sign解密

声明&#xff1a;本文只作学习研究&#xff0c;禁止用于非法用途&#xff0c;否则后果自负&#xff0c;如有侵权&#xff0c;请告知删除&#xff0c;谢谢&#xff01; 【爬虫逆向案例】某道翻译js逆向—— sign解密 1、前言2、步骤3、源码4、号外 1、前言 相信各位小伙伴在写…

SAMStable-Diffusion集成进化!分割、生成一切!AI绘画新玩法

自SAM「分割一切」模型推出之后&#xff0c;二创潮就开始了&#xff0c;有想法有行动&#xff01;飞桨AI Studio开发者会唱歌的炼丹师就创作出SAM进化版&#xff0c;将SAM、Stable Diffusion集成&#xff0c;实现「分割」、「生成」能力二合一&#xff0c;并部署为应用&#xf…

JVM - 运行时数据区域

文章目录 程序计数器栈堆方法区知识延申 -- 字符串常量池 程序计数器 并发情况下&#xff0c;会发生线程之间的上下文切换&#xff0c;当 线程1 的CPU时间片用完后&#xff0c;需要程序计数器记录 线程1 的下一条JVM指令的地址&#xff0c;等下一次 线程1 继续运行的时&#x…

水环境综合治理监测系统:筑牢城市水生态安全屏障

水是生命之源&#xff0c;是人类赖以生存的基础。然而&#xff0c;随着工业化、城市化的快速发展&#xff0c;水污染问题日益凸显&#xff0c;给居民的环境卫生以及用水安全带来了巨大的威胁。因此&#xff0c;加强水环境综合治理&#xff0c;保护水资源和维护生态平衡&#xf…

28.1 kibana

Kibana 是一个免费且开放的用户界面&#xff0c;能够对 Elasticsearch 数据进行可视化操作&#xff0c;从跟踪查询负载&#xff0c;到理解请求如何流经整个应用&#xff0c;都能轻松完成。 1.Kibana安装 注意要与ES版本保持一致 https://www.elastic.co/downloads/past-relea…

看完这篇,别再说不会Spring 分库分表了

多数据源&#xff0c;读写分离&#xff0c;分库分表&#xff0c;基本上已经是现在任何一个项目的基本配置了&#xff0c;在之前的文章Spring多数据源实现https://blog.csdn.net/wangerrong/article/details/131910740 里讲了多数据源的实现&#xff0c;其实已经包含了读写分离…

Linux使用教程

一、Linux命令基础 1、ls、ll命令——展示数据 ①ls命令——平铺展示数据 其中ls命令以平铺的方式展现数据 ②ll命令——列表展示数据 ll命令以列表的方式展现数据 -a选项&#xff0c;表示&#xff1a;all的意思&#xff0c;即列出全部文件&#xff08;包含隐藏的文件/文件夹…