Istio Pilot源码学习(三):xDS的异步分发

本文基于Istio 1.18.0版本进行源码学习

5、xDS的异步分发

DiscoveryService主要包含下述逻辑:

  • 启动GRPC Server并接收来自Envoy端的连接请求
  • 接收Envoy端的xDS请求,从ConfigController和ServiceController中获取配置和服务信息,生成响应消息发送给Envoy
  • 监听来自ConfigController的配置变化消息和ServiceController的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy

1)、DiscoveryService初始化

DiscoveryService初始化流程如下图:

1)xds.NewDiscoveryServer

DiscoveryServer是通过调用xds.NewDiscoveryServer()方法初始化的,返回的是一个DiscoveryServer实例。DiscoveryServer定义如下:

// pilot/pkg/xds/discovery.go
type DiscoveryServer struct {// Env is the model environment.Env *model.Environment// ConfigGenerator is responsible for generating data plane configuration using Istio networking// APIs and service registry info// xds配置的生成器ConfigGenerator core.ConfigGenerator// Generators allow customizing the generated config, based on the client metadata.// Key is the generator type - will match the Generator metadata to set the per-connection// default generator, or the combination of Generator metadata and TypeUrl to select a// different generator for a type.// Normal istio clients use the default generator - will not be impacted by this.// 针对不同配置类型的生成器Generators map[string]model.XdsResourceGenerator// ProxyNeedsPush is a function that determines whether a push can be completely skipped. Individual generators// may also choose to not send any updates.// 判断本次推送是否涉及该envoyProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest) bool// concurrentPushLimit is a semaphore that limits the amount of concurrent XDS pushes.// 控制推送并发数concurrentPushLimit chan struct{}// RequestRateLimit limits the number of new XDS requests allowed. This helps prevent thundering hurd of incoming requests.RequestRateLimit *rate.Limiter// InboundUpdates describes the number of configuration updates the discovery server has receivedInboundUpdates *atomic.Int64// CommittedUpdates describes the number of configuration updates the discovery server has// received, process, and stored in the push context. If this number is less than InboundUpdates,// there are updates we have not yet processed.// Note: This does not mean that all proxies have received these configurations; it is strictly// the push context, which means that the next push to a proxy will receive this configuration.CommittedUpdates *atomic.Int64// pushChannel is the buffer used for debouncing.// after debouncing the pushRequest will be sent to pushQueue// 统一接收其他组件发来的pushRequest的channelpushChannel chan *model.PushRequest// mutex used for protecting Environment.PushContextupdateMutex sync.RWMutex// pushQueue is the buffer that used after debounce and before the real xds push.// 主要是在真正xds推送前做防抖缓存pushQueue *PushQueue// debugHandlers is the list of all the supported debug handlers.debugHandlers map[string]string// adsClients reflect active gRPC channels, for both ADS and EDS.// ads和eds的grpc连接adsClients      map[string]*ConnectionadsClientsMutex sync.RWMutexStatusReporter DistributionStatusCache// Authenticators for XDS requests. Should be same/subset of the CA authenticators.Authenticators []security.Authenticator// StatusGen is notified of connect/disconnect/nack on all connectionsStatusGen               *StatusGenWorkloadEntryController *autoregistration.Controller// serverReady indicates caches have been synced up and server is ready to process requests.serverReady atomic.BooldebounceOptions debounceOptionsinstanceID stringclusterID cluster.ID// Cache for XDS resources// xds资源缓存Cache model.XdsCache// JwtKeyResolver holds a reference to the JWT key resolver instance.JwtKeyResolver *model.JwksResolver// ListRemoteClusters collects debug information about other clusters this istiod reads from.ListRemoteClusters func() []cluster.DebugInfo// ClusterAliases are aliase names for cluster. When a proxy connects with a cluster ID// and if it has a different alias we should use that a cluster ID for proxy.ClusterAliases map[cluster.ID]cluster.ID
}

2)initGrpcServer

在初始化GRPC Server时,调用了XDSServer.Register()方法,向GRPC Server注册服务:

// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) Register(rpcs *grpc.Server) {// Register v3 serverdiscovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
}

DiscoveryServer实现了AggregatedDiscoveryServiceServer接口:

// envoy/service/discovery/v3/ads.pb.go
type AggregatedDiscoveryServiceServer interface {// This is a gRPC-only API.// 全量ads stream接口StreamAggregatedResources(AggregatedDiscoveryService_StreamAggregatedResourcesServer) error// 增量ads stream接口DeltaAggregatedResources(AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error
}

StreamAggregatedResources接收DiscoveryRequest,返回DiscoveryResponse流,包含全量的xDS数据

调用流程如下图:

3)initDiscoveryService

initDiscoveryService()方法中将DiscoveryServer启动函数添加到Server的startFuncs队列中,会在初始化完毕之后调用:

// pilot/pkg/bootstrap/server.go
func (s *Server) initDiscoveryService() {log.Infof("starting discovery service")// Implement EnvoyXdsServer grace shutdowns.addStartFunc(func(stop <-chan struct{}) error {log.Infof("Starting ADS server")s.XDSServer.Start(stop)return nil})
}

2)、DiscoveryService启动

DiscoveryServer的Start()方法主要分别启动了四个协程:

// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {go s.WorkloadEntryController.Run(stopCh)go s.handleUpdates(stopCh)go s.periodicRefreshMetrics(stopCh)go s.sendPushes(stopCh)go s.Cache.Run(stopCh)
}

比较重要的是handleUpdates和sendPushes

Config、Service、Endpoint对资源的处理最后都会调用ConfigUpdate()方法向DiscoveryServer的pushChannel队列发送PushRequest实现的,处理流程如下:

DiscoveryServer首先通过handleUpdates协程阻塞式地接收并处理更新请求,并将PushRequest发送到DiscoveryServer的pushQueue中,然后由sendPushes协程并发地将PushRequest发送给每一条连接的pushChannel,最后由DiscoveryServer的流处理接口处理分发请求

3)、handleUpdates

// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {debounce(s.pushChannel, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates)
}func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) {var timeChan <-chan time.Timevar startDebounce time.Timevar lastConfigUpdateTime time.TimepushCounter := 0debouncedEvents := 0// Keeps track of the push requests. If updates are debounce they will be merged.var req *model.PushRequestfree := truefreeCh := make(chan struct{}, 1)push := func(req *model.PushRequest, debouncedEvents int, startDebounce time.Time) {pushFn(req)updateSent.Add(int64(debouncedEvents))debounceTime.Record(time.Since(startDebounce).Seconds())freeCh <- struct{}{}}pushWorker := func() {eventDelay := time.Since(startDebounce)quietTime := time.Since(lastConfigUpdateTime)// it has been too long or quiet enough// 当以下两个条件满足任意一个时,进行更新事件处理// 距离本轮第1次更新时间超过最大延迟时间(debounceMax默认10s)// 距离上次更新时间超过最大静默时间(debounceAfter默认100ms)if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {if req != nil {pushCounter++if req.ConfigsUpdated == nil {log.Infof("Push debounce stable[%d] %d for reason %s: %v since last change, %v since last push, full=%v",pushCounter, debouncedEvents, reasonsUpdated(req),quietTime, eventDelay, req.Full)} else {log.Infof("Push debounce stable[%d] %d for config %s: %v since last change, %v since last push, full=%v",pushCounter, debouncedEvents, configsUpdated(req),quietTime, eventDelay, req.Full)}free = falsego push(req, debouncedEvents, startDebounce)req = nildebouncedEvents = 0}} else {timeChan = time.After(opts.debounceAfter - quietTime)}}for {select {case <-freeCh:free = truepushWorker()case r := <-ch:// If reason is not set, record it as an unknown reasonif len(r.Reason) == 0 {r.Reason = []model.TriggerReason{model.UnknownTrigger}}if !opts.enableEDSDebounce && !r.Full {// 立即触发eds推送// trigger push now, just for EDSgo func(req *model.PushRequest) {pushFn(req)updateSent.Inc()}(r)continue}lastConfigUpdateTime = time.Now()if debouncedEvents == 0 {// 启动新一轮的配置下发定时器,定时长度为最小静默时间timeChan = time.After(opts.debounceAfter)// 记录第1次事件更新的时间startDebounce = lastConfigUpdateTime}debouncedEvents++// 合并pushRequestreq = req.Merge(r)case <-timeChan:if free {pushWorker()}case <-stopCh:return}}
}

debounce()方法中通过最小静默时间(debounceAfter)合并更新时间,通过最大延迟时间(debounceMax)控制xDS配置下发的时延。由于DiscoveryServer.Push需要初始化PushContext,会消耗大量内存,所以为了避免OOM,debounce()方法中控制DiscoveryServer.Push串行执行

DiscoveryServer.Push方法会一直往下调用,直到把数据推入到DiscoveryServer的pushQueue管道中,代码调用逻辑如下:

在这里插入图片描述

3)、PushContext的初始化

PushContext是xDS生成中最重要的结构对象,几乎包含所有网格资源信息,PushContext结构体定义如下:

// pilot/pkg/model/push_context.go
type PushContext struct {proxyStatusMutex sync.RWMutex// ProxyStatus is keyed by the error code, and holds a map keyed// by the ID.ProxyStatus map[string]map[string]ProxyPushStatus// Synthesized from env.MeshexportToDefaults exportToDefaults// ServiceIndex is the index of services by various fields.// service缓存ServiceIndex serviceIndex// serviceAccounts contains a map of hostname and port to service accounts.serviceAccounts map[serviceAccountKey][]string// virtualServiceIndex is the index of virtual services by various fields.// virtualService缓存virtualServiceIndex virtualServiceIndex// destinationRuleIndex is the index of destination rules by various fields.// destinationRule缓存destinationRuleIndex destinationRuleIndex// gatewayIndex is the index of gateways.gatewayIndex gatewayIndex// clusterLocalHosts extracted from the MeshConfigclusterLocalHosts ClusterLocalHosts// sidecarIndex stores sidecar resources// sidecar缓存sidecarIndex sidecarIndex// envoy filters for each namespace including global config namespace// envoyFilter缓存envoyFiltersByNamespace map[string][]*EnvoyFilterWrapper// wasm plugins for each namespace including global config namespacewasmPluginsByNamespace map[string][]*WasmPluginWrapper// AuthnPolicies contains Authn policies by namespace.AuthnPolicies *AuthenticationPolicies `json:"-"`// AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there// are no authorization policies in the cluster.AuthzPolicies *AuthorizationPolicies `json:"-"`// Telemetry stores the existing Telemetry resources for the cluster.Telemetry *Telemetries `json:"-"`// ProxyConfig stores the existing ProxyConfig resources for the cluster.ProxyConfigs *ProxyConfigs `json:"-"`// The following data is either a global index or used in the inbound path.// Namespace specific views do not apply here.// Mesh configuration for the mesh.Mesh *meshconfig.MeshConfig `json:"-"`// PushVersion describes the push version this push context was computed forPushVersion string// LedgerVersion is the version of the configuration ledgerLedgerVersion string// JwtKeyResolver holds a reference to the JWT key resolver instance.JwtKeyResolver *JwksResolver// GatewayAPIController holds a reference to the gateway API controller.GatewayAPIController GatewayController// cache gateways addresses for each network// this is mainly used for kubernetes multi-cluster scenarionetworkMgr *NetworkManagerNetworks *meshconfig.MeshNetworksInitDone        atomic.BoolinitializeMutex sync.MutexambientIndex    AmbientIndexes
}

PushContext对象的缓存为后续xDS配置的生成提供了快捷的资源索引

5)、sendPushes

// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
}func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {for {select {case <-stopCh:returndefault:// We can send to it until it is full, then it will block until a pushes finishes and reads from it.// This limits the number of pushes that can happen concurrently// semaphore默认容量100,用来控制推送并发数semaphore <- struct{}{}// Get the next proxy to push. This will block if there are no updates required.// 从pushQueue出队一个xdsConnectionclient, push, shuttingdown := queue.Dequeue()if shuttingdown {return}recordPushTriggers(push.Reason...)// Signals that a push is done by reading from the semaphore, allowing another send on it.doneFunc := func() {queue.MarkDone(client)<-semaphore}proxiesQueueTime.Record(time.Since(push.Start).Seconds())var closed <-chan struct{}if client.stream != nil {closed = client.stream.Context().Done()} else {closed = client.deltaStream.Context().Done()}go func() {// 初始化pushEventpushEv := &Event{pushRequest: push,done:        doneFunc,}select {// pushEvent添加到xdsConnection的pushChannel中case client.pushChannel <- pushEv:returncase <-closed: // grpc stream was closeddoneFunc()log.Infof("Client closed connection %v", client.conID)}}()}}
}

doSendPushes()方法内启动了一个无限循环,在default代码块中实现了主要的功能逻辑:

  1. push事件面向所有xDS客户端,使用semaphore来控制推送并发数,当semaphore满了之后会阻塞
  2. 如果semaphore允许,为每个客户端都启动一个发送协程,尝试发送pushEvent到客户端队列pushChannel中

向pushChannel发送的pushEvent会在StreamAggregatedResources()方法中被处理:

// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) StreamAggregatedResources(stream DiscoveryStream) error {return s.Stream(stream)
}func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {if knativeEnv != "" && firstRequest.Load() {// How scaling works in knative is the first request is the "loading" request. During// loading request, concurrency=1. Once that request is done, concurrency is enabled.// However, the XDS stream is long lived, so the first request would block all others. As a// result, we should exit the first request immediately; clients will retry.firstRequest.Store(false)return status.Error(codes.Unavailable, "server warmup not complete; try again")}// Check if server is ready to accept clients and process new requests.// Currently ready means caches have been synced and hence can build// clusters correctly. Without this check, InitContext() call below would// initialize with empty config, leading to reconnected Envoys loosing// configuration. This is an additional safety check inaddition to adding// cachesSynced logic to readiness probe to handle cases where kube-proxy// ip tables update latencies.// See https://github.com/istio/istio/issues/25495.if !s.IsServerReady() {return status.Error(codes.Unavailable, "server is not ready to serve discovery information")}ctx := stream.Context()peerAddr := "0.0.0.0"if peerInfo, ok := peer.FromContext(ctx); ok {peerAddr = peerInfo.Addr.String()}if err := s.WaitForRequestLimit(stream.Context()); err != nil {log.Warnf("ADS: %q exceeded rate limit: %v", peerAddr, err)return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err)}ids, err := s.authenticate(ctx)if err != nil {return status.Error(codes.Unauthenticated, err.Error())}if ids != nil {log.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)} else {log.Debugf("Unauthenticated XDS: %s", peerAddr)}// InitContext returns immediately if the context was already initialized.if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil {// Error accessing the data - log and close, maybe a different pilot replica// has more lucklog.Warnf("Error reading config %v", err)return status.Error(codes.Unavailable, "error reading config")}con := newConnection(peerAddr, stream)// Do not call: defer close(con.pushChannel). The push channel will be garbage collected// when the connection is no longer used. Closing the channel can cause subtle race conditions// with push. According to the spec: "It's only necessary to close a channel when it is important// to tell the receiving goroutines that all data have been sent."// Block until either a request is received or a push is triggered.// We need 2 go routines because 'read' blocks in Recv().go s.receive(con, ids)// Wait for the proxy to be fully initialized before we start serving traffic. Because// initialization doesn't have dependencies that will block, there is no need to add any timeout// here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to// reqChannel and the connection not being enqueued for pushes to pushChannel until the// initialization is complete.<-con.initializedfor {// Go select{} statements are not ordered; the same channel can be chosen many times.// For requests, these are higher priority (client may be blocked on startup until these are done)// and often very cheap to handle (simple ACK), so we check it first.select {case req, ok := <-con.reqChan:if ok {if err := s.processRequest(req, con); err != nil {return err}} else {// Remote side closed connection or error processing the request.return <-con.errorChan}case <-con.stop:return nildefault:}// If there wasn't already a request, poll for requests and pushes. Note: if we have a huge// amount of incoming requests, we may still send some pushes, as we do not `continue` above;// however, requests will be handled ~2x as much as pushes. This ensures a wave of requests// cannot completely starve pushes. However, this scenario is unlikely.select {case req, ok := <-con.reqChan:if ok {if err := s.processRequest(req, con); err != nil {return err}} else {// Remote side closed connection or error processing the request.return <-con.errorChan}case pushEv := <-con.pushChannel:// 调用pushConnection推送xds配置err := s.pushConnection(con, pushEv)// 通知semaphore,本次推送完成pushEv.done()if err != nil {return err}case <-con.stop:return nil}}
}

每个客户端在通过pushConnection将本次xDS推送完后,都会调用pushEv.done()方法,通知semaphore

从pushQueue到最终推送xDS配置流程如下图:

在这里插入图片描述

6)、xDS配置的生成与分发

pushConnection()方法负责向Envoy发送xDS配置,代码如下:

// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {pushRequest := pushEv.pushRequestif pushRequest.Full {// Update Proxy with current information.s.computeProxyState(con.proxy, pushRequest)}// 根据资源的变化情况,判断是否需要为proxy更新xdsif !s.ProxyNeedsPush(con.proxy, pushRequest) {log.Debugf("Skipping push to %v, no updates required", con.conID)if pushRequest.Full {// Only report for full versions, incremental pushes do not have a new version.reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, nil)}return nil}// Send pushes to all generators// Each Generator is responsible for determining if the push event requires a pushwrl, ignoreEvents := con.pushDetails()// 遍历proxy监听的资源类型for _, w := range wrl {// 根据订阅的资源类型生成xds配置并发送到客户端if err := s.pushXds(con, w, pushRequest); err != nil {return err}}if pushRequest.Full {// Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack.reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, ignoreEvents)}proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())return nil
}

pushConnection()方法核心逻辑如下:

  1. 根据资源的变化情况,判断是否需要为该Envoy代理更新xDS,如果不需要更新直接返回
  2. 遍历该Envoy代理监听的资源类型,根据订阅的资源类型生成xds配置并发送到客户端

1)ProxyNeedsPush

ProxyNeedsPush函数根据资源的变化情况,判断是否需要为该Envoy代理更新xDS,默认实现为DefaultProxyNeedsPush函数,最主要使用的是根据SidecarProxy判断是否依赖对应资源,核心方法如下:

// pilot/pkg/model/sidecar.go
var (sidecarScopeKnownConfigTypes = map[kind.Kind]struct{}{kind.ServiceEntry:    {},kind.VirtualService:  {},kind.DestinationRule: {},kind.Sidecar:         {},}// clusterScopedConfigTypes includes configs when they are in root namespace,// they will be applied to all namespaces within the cluster.clusterScopedConfigTypes = map[kind.Kind]struct{}{kind.EnvoyFilter:           {},kind.AuthorizationPolicy:   {},kind.RequestAuthentication: {},kind.WasmPlugin:            {},}
)func (sc *SidecarScope) DependsOnConfig(config ConfigKey) bool {if sc == nil {return true}// This kind of config will trigger a change if made in the root namespace or the same namespace// clusterScopedConfigTypes类型配置,如果变更的配置在root namespace或相同namespace需要xds推送if _, f := clusterScopedConfigTypes[config.Kind]; f {return config.Namespace == sc.RootNamespace || config.Namespace == sc.Namespace}// This kind of config is unknown to sidecarScope.if _, f := sidecarScopeKnownConfigTypes[config.Kind]; !f {return true}// 如果sidecarScope包含对应配置需要进行xds推送_, exists := sc.configDependencies[config.HashCode()]return exists
}

DependsOnConfig()方法核心逻辑如下:

  1. 如果变更的clusterScopedConfigTypes类型配置在root namespace或相同namespace需要xDS推送
  2. 如果SidecarScope包含对应配置需要进行xDS推送

2)pushXds

// pilot/pkg/xds/xdsgen.go
func (s *DiscoveryServer) pushXds(con *Connection, w *model.WatchedResource, req *model.PushRequest) error {if w == nil {return nil}// 获取xds生成器gen := s.findGenerator(w.TypeUrl, con)if gen == nil {return nil}t0 := time.Now()// If delta is set, client is requesting new resources or removing old ones. We should just generate the// new resources it needs, rather than the entire set of known resources.// Note: we do not need to account for unsubscribed resources as these are handled by parent removal;// See https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#deleting-resources.// This means if there are only removals, we will not respond.var logFiltered stringif !req.Delta.IsEmpty() && features.PartialFullPushes &&!con.proxy.IsProxylessGrpc() {logFiltered = " filtered:" + strconv.Itoa(len(w.ResourceNames)-len(req.Delta.Subscribed))w = &model.WatchedResource{TypeUrl:       w.TypeUrl,ResourceNames: req.Delta.Subscribed.UnsortedList(),}}// xds生成器生成xds配置res, logdata, err := gen.Generate(con.proxy, w, req)info := ""if len(logdata.AdditionalInfo) > 0 {info = " " + logdata.AdditionalInfo}if len(logFiltered) > 0 {info += logFiltered}if err != nil || res == nil {// If we have nothing to send, report that we got an ACK for this version.if s.StatusReporter != nil {s.StatusReporter.RegisterEvent(con.conID, w.TypeUrl, req.Push.LedgerVersion)}if log.DebugEnabled() {log.Debugf("%s: SKIP%s for node:%s%s", v3.GetShortType(w.TypeUrl), req.PushReason(), con.proxy.ID, info)}// If we are sending a request, we must respond or we can get Envoy stuck. Assert we do.// One exception is if Envoy is simply unsubscribing from some resources, in which case we can skip.isUnsubscribe := features.PartialFullPushes && !req.Delta.IsEmpty() && req.Delta.Subscribed.IsEmpty()if features.EnableUnsafeAssertions && err == nil && res == nil && req.IsRequest() && !isUnsubscribe {log.Fatalf("%s: SKIPPED%s for node:%s%s but expected a response for request", v3.GetShortType(w.TypeUrl), req.PushReason(), con.proxy.ID, info)}return err}defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()resp := &discovery.DiscoveryResponse{ControlPlane: ControlPlane(),TypeUrl:      w.TypeUrl,// TODO: send different version for incremental edsVersionInfo: req.Push.PushVersion,Nonce:       nonce(req.Push.LedgerVersion),Resources:   model.ResourcesToAny(res),}configSize := ResourceSize(res)configSizeBytes.With(typeTag.Value(w.TypeUrl)).Record(float64(configSize))ptype := "PUSH"if logdata.Incremental {ptype = "PUSH INC"}// 调用send方法将discoveryResponse发送出去if err := con.send(resp); err != nil {if recordSendError(w.TypeUrl, err) {log.Warnf("%s: Send failure for node:%s resources:%d size:%s%s: %v",v3.GetShortType(w.TypeUrl), con.proxy.ID, len(res), util.ByteCount(configSize), info, err)}return err}switch {case !req.Full:if log.DebugEnabled() {log.Debugf("%s: %s%s for node:%s resources:%d size:%s%s",v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res), util.ByteCount(configSize), info)}default:debug := ""if log.DebugEnabled() {// Add additional information to logs when debug mode enabled.debug = " nonce:" + resp.Nonce + " version:" + resp.VersionInfo}log.Infof("%s: %s%s for node:%s resources:%d size:%v%s%s", v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res),util.ByteCount(ResourceSize(res)), info, debug)}return nil
}

pushXds()方法首先根据订阅的资源类型找到对应的xDS生成器,然后通过生成器相应的xDS配置,最后通过send接口发送出去

3)XdsResourceGenerator.Generate

Pilot主要负责6种xDS配置资源(CDS、EDS、LDS、RDS、ECDS、NDS)的生成及下发。接来下以CDS生成器为例,看看DiscoveryServer是如何根据代理的属性及PushContext缓存生成原始的Cluster配置的

CDS配置通过ConfigGenerator.BuildClusters()方法生成,代码如下:`

// pilot/pkg/networking/core/v1alpha3/cluster.go
func (configgen *ConfigGeneratorImpl) BuildClusters(proxy *model.Proxy, req *model.PushRequest) ([]*discovery.Resource, model.XdsLogDetails) {// In Sotw, we care about all services.var services []*model.Serviceif features.FilterGatewayClusterConfig && proxy.Type == model.Router {services = req.Push.GatewayServices(proxy)} else {services = proxy.SidecarScope.Services()}return configgen.buildClusters(proxy, req, services)
}func (configgen *ConfigGeneratorImpl) buildClusters(proxy *model.Proxy, req *model.PushRequest,services []*model.Service,
) ([]*discovery.Resource, model.XdsLogDetails) {clusters := make([]*cluster.Cluster, 0)resources := model.Resources{}envoyFilterPatches := req.Push.EnvoyFilters(proxy)// 创建cluster生成器cb := NewClusterBuilder(proxy, req, configgen.Cache)instances := proxy.ServiceInstancescacheStats := cacheStats{}switch proxy.Type {// 生成sidecarProxy clustercase model.SidecarProxy:// Setup outbound clustersoutboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_OUTBOUND}// 构建outbound clusterob, cs := configgen.buildOutboundClusters(cb, proxy, outboundPatcher, services)cacheStats = cacheStats.merge(cs)resources = append(resources, ob...)// Add a blackhole and passthrough cluster for catching traffic to unresolved routes// 添加blackhole和passthrough cluster为默认的路由转发流量clusters = outboundPatcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster(), cb.buildDefaultPassthroughCluster())clusters = append(clusters, outboundPatcher.insertedClusters()...)// Setup inbound clustersinboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_INBOUND}// 构建inbound clusterclusters = append(clusters, configgen.buildInboundClusters(cb, proxy, instances, inboundPatcher)...)if proxy.EnableHBONE() {clusters = append(clusters, configgen.buildInboundHBONEClusters())}// Pass through clusters for inbound traffic. These cluster bind loopback-ish src address to access node local service.// 添加passthrough cluster为默认的路由转发流量clusters = inboundPatcher.conditionallyAppend(clusters, nil, cb.buildInboundPassthroughClusters()...)clusters = append(clusters, inboundPatcher.insertedClusters()...)case model.Waypoint:svcs := findWaypointServices(proxy, req.Push)// Waypoint proxies do not need outbound clusters in most cases, unless we have a route pointing to somethingoutboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_OUTBOUND}ob, cs := configgen.buildOutboundClusters(cb, proxy, outboundPatcher, filterWaypointOutboundServices(req.Push.ServicesAttachedToMesh(), svcs, services))cacheStats = cacheStats.merge(cs)resources = append(resources, ob...)// Setup inbound clustersinboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_INBOUND}clusters = append(clusters, configgen.buildWaypointInboundClusters(cb, proxy, req.Push, svcs)...)clusters = append(clusters, inboundPatcher.insertedClusters()...)default: // Gatewayspatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_GATEWAY}ob, cs := configgen.buildOutboundClusters(cb, proxy, patcher, services)cacheStats = cacheStats.merge(cs)resources = append(resources, ob...)// Gateways do not require the default passthrough cluster as they do not have original dst listeners.clusters = patcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster())if proxy.Type == model.Router && proxy.MergedGateway != nil && proxy.MergedGateway.ContainsAutoPassthroughGateways {clusters = append(clusters, configgen.buildOutboundSniDnatClusters(proxy, req, patcher)...)}clusters = append(clusters, patcher.insertedClusters()...)}// OutboundTunnel cluster is needed for sidecar and gateway.if proxy.EnableHBONE() {clusters = append(clusters, cb.buildConnectOriginate(proxy, req.Push, nil))}// if credential socket exists, create a cluster for itif proxy.Metadata != nil && proxy.Metadata.Raw[security.CredentialMetaDataName] == "true" {clusters = append(clusters, cb.buildExternalSDSCluster(security.CredentialNameSocketPath))}for _, c := range clusters {resources = append(resources, &discovery.Resource{Name: c.Name, Resource: protoconv.MessageToAny(c)})}resources = cb.normalizeClusters(resources)if cacheStats.empty() {return resources, model.DefaultXdsLogDetails}return resources, model.XdsLogDetails{AdditionalInfo: fmt.Sprintf("cached:%v/%v", cacheStats.hits, cacheStats.hits+cacheStats.miss)}
}

7)、响应Envoy主动发起的xDS请求

StreamAggregatedResources()方法中开启receive协程接收Envoy的xDS请求,代码如下:

// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) receive(con *Connection, identities []string) {defer func() {close(con.errorChan)close(con.reqChan)// Close the initialized channel, if its not already closed, to prevent blocking the stream.select {case <-con.initialized:default:close(con.initialized)}}()firstRequest := truefor {// 接收discoveryRequestreq, err := con.stream.Recv()if err != nil {if istiogrpc.IsExpectedGRPCError(err) {log.Infof("ADS: %q %s terminated", con.peerAddr, con.conID)return}con.errorChan <- errlog.Errorf("ADS: %q %s terminated with error: %v", con.peerAddr, con.conID, err)totalXDSInternalErrors.Increment()return}// This should be only set for the first request. The node id may not be set - for example malicious clients.if firstRequest {// probe happens before envoy sends first xDS requestif req.TypeUrl == v3.HealthInfoType {log.Warnf("ADS: %q %s send health check probe before normal xDS request", con.peerAddr, con.conID)continue}firstRequest = falseif req.Node == nil || req.Node.Id == "" {con.errorChan <- status.New(codes.InvalidArgument, "missing node information").Err()return}if err := s.initConnection(req.Node, con, identities); err != nil {con.errorChan <- errreturn}defer s.closeConnection(con)log.Infof("ADS: new connection for node:%s", con.conID)}select {// 将discoveryRequest添加到reqChancase con.reqChan <- req:case <-con.stream.Context().Done():log.Infof("ADS: %q %s terminated with stream closed", con.peerAddr, con.conID)return}}
}

receive()方法中接收Envoy的DiscoveryRequest,然后将DiscoveryRequest添加到reqChan中

StreamAggregatedResources()方法中从reqChan中获取DiscoveryRequest,然后调用processRequest()方法:

// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {stype := v3.GetShortType(req.TypeUrl)log.Debugf("ADS:%s: REQ %s resources:%d nonce:%s version:%s ", stype,con.conID, len(req.ResourceNames), req.ResponseNonce, req.VersionInfo)if req.TypeUrl == v3.HealthInfoType {s.handleWorkloadHealthcheck(con.proxy, req)return nil}// For now, don't let xDS piggyback debug requests start watchers.if strings.HasPrefix(req.TypeUrl, v3.DebugType) {return s.pushXds(con,&model.WatchedResource{TypeUrl: req.TypeUrl, ResourceNames: req.ResourceNames},&model.PushRequest{Full: true, Push: con.proxy.LastPushContext})}if s.StatusReporter != nil {s.StatusReporter.RegisterEvent(con.conID, req.TypeUrl, req.ResponseNonce)}shouldRespond, delta := s.shouldRespond(con, req)if !shouldRespond {return nil}request := &model.PushRequest{Full:   true,Push:   con.proxy.LastPushContext,Reason: []model.TriggerReason{model.ProxyRequest},// The usage of LastPushTime (rather than time.Now()), is critical here for correctness; This time// is used by the XDS cache to determine if a entry is stale. If we use Now() with an old push context,// we may end up overriding active cache entries with stale ones.Start: con.proxy.LastPushTime,Delta: delta,}// SidecarScope for the proxy may not have been updated based on this pushContext.// It can happen when `processRequest` comes after push context has been updated(s.initPushContext),// but proxy's SidecarScope has been updated(s.computeProxyState -> SetSidecarScope) due to optimizations that skip sidecar scope// computation.if con.proxy.SidecarScope != nil && con.proxy.SidecarScope.Version != request.Push.PushVersion {s.computeProxyState(con.proxy, request)}// 推送xds配置return s.pushXds(con, con.Watched(req.TypeUrl), request)
}

processRequest()方法中根据DiscoveryRequest信息推送xDS配置

8)、小结

配置变化后向Envoy推送xDS时序:

在这里插入图片描述

响应Envoy主动发起的xDS时序:

在这里插入图片描述

参考:

《Istio权威指南 下》

4.深入Istio源码:Pilot的Discovery Server如何执行xDS异步分发?

Istio Pilot代码深度解析

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/6936.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

文心一言 VS 讯飞星火 VS chatgpt (64)-- 算法导论6.5 3题

文心一言 VS 讯飞星火 VS chatgpt &#xff08;64&#xff09;-- 算法导论6.5 3题 三、要求用最小堆实现最小优先队列&#xff0c;请写出 HEAP-MINIMUM、HEAP-EXTRACT-MIN、HEAP DECREASE-KEY 和 MIN-HEAP-INSERT 的伪代码。 文心一言&#xff1a; 以下是使用最小堆实现最小…

国内攻克OLED屏幕修复术,成本暴降2/3不用拆机换屏

连着许多年各家手机发布会&#xff0c;屏幕基本是会花大精力宣传的三大件之一 也对&#xff0c;无论是外观还是实际的视觉、操作体验&#xff0c;屏幕都是非常重要的一环。 屏幕素质越来越高&#xff0c;用着也的确更爽了&#xff0c;但也越来越不敢摔了。 普普通通的一块 60…

SNMP协议是什么?

SNMP协议是什么&#xff1f; 简单网络管理协议&#xff08;Simple Network Management Protocol&#xff0c;SNMP&#xff09;是由互联网工程任务组定义的一套网络管理协议。该协议是基于简单网关监视协议&#xff08;Simple Gateway Monitor Protocol&#xff0c;SGMP&#x…

ChatGPT助力校招----面试问题分享(十二)

1 ChatGPT每日一题&#xff1a;运算放大器与比较器的区别 问题&#xff1a;运算放大器与比较器的区别 ChatGPT&#xff1a;运算放大器和比较器都是电子电路中常用的模拟电路元件&#xff0c;但它们的设计和应用略有不同。下面是两者的主要区别&#xff1a; 功能不同&#xf…

服务器负载均衡算法有哪些

算法举例 服务器负载均衡算法是用于分配网络流量到多个服务器的策略&#xff0c;以实现负载均衡和提高系统性能。以下是一些常见的服务器负载均衡算法的详细说明&#xff1a; 轮询&#xff08;Round Robin&#xff09;算法&#xff1a; 轮询算法是最简单且常见的负载均衡算法之…

DevOps系列文章之 Git知识大全

常用命令 其他参数 --inital-branch 初始化的分支 --bare 创建一个裸仓库&#xff08;纯 Git 目录&#xff0c;没有工作目录&#xff09; --template 可以通过模板来创建预先建好的自定义 git 目录 常见 Git 配置 用户名配置 git config --global user.name "yourname&qu…

cmder 使用简介

文章目录 1. cmder 简介2. 下载地址3. 安装4. 配置环境变量5. 添加 cmder 到右键菜单6. 解决中文乱码问题 1. cmder 简介 cmder 是一个增强型命令行工具&#xff0c;不仅可以使用 windows 下的所有命令&#xff0c;更爽的是可以使用 linux的命令, shell 命令。 2. 下载地址 …

选择合适的图表,高效展现数据魅力

随着大数据时代的来临&#xff0c;数据的重要性愈发凸显&#xff0c;数据分析和可视化成为了决策和传递信息的重要手段。在数据可视化中&#xff0c;选择合适的图表是至关重要的一环&#xff0c;它能让数据更加生动、直观地呈现&#xff0c;为观众提供更有说服力的信息。本文将…

模型构建——使用逻辑回归构建模型,lightGBM进行特征筛选

1、模型构建流程 1.1 实验设计 新的模型要跟原有方案对比&#xff0c;而且是通过实验证明&#xff0c;特别注意模型和策略不能同时调整。一般实验设计包含以下流程&#xff1a; 问题&#xff1a;业务稳定后&#xff0c;可以去掉人工审核吗&#xff1f; 答&#xff1a;不可以…

c语言练手项目【编写天天酷跑游戏2.0】EASYX图形库的运用。代码开源,素材已打包

天天酷跑项目的开发 项目前言 项目是基于Windows&#xff0c;easyX图形库进行开发的&#xff0c; 开发环境&#xff1a;Visual Studio 2022 项目技术最低要求&#xff1a; 常量&#xff0c;变量&#xff0c;数组&#xff0c;循环&#xff0c;函数。 文章目录 天天酷跑项目的…

超详细-Vivado配置Sublime+Sublime实现Verilog语法实时检查

目录 一、前言 二、准备工作 三、Vivado配置Sublime 3.1 Vivado配置Sublime 3.2 环境变量添加 3.3 环境变量验证 3.4 Vivado设置 3.5 配置验证 3.6 解决Vivado配置失败问题 四、Sublime配置 4.1 Sublime安装Package Control 4.2 Sublime安装Verilog插件 4.3 安装语…

#pragma region用法

简介 #pragma region 是VS(Visio Studio)所特有的预处理语法&#xff08;其他IDE或者Cmake会报错&#xff09;&#xff0c;其可以用来收缩或者展开一段代码。 #pragma region MyRegion// ...Code content #pragma endregion 其中&#xff0c;MyRegion 即给这代码块所定义的名…

【计算机网络】简易TCP网络小程序

文章目录 1. 简易TCP网络程序1.1 服务端1.1.1 服务端创建套接字1.1.2 服务端绑定1.1.3 服务端监听1.1.4 服务端获取连接1.1.5 服务端处理请求 1.2 客户端1.2.1 客户端创建套接字1.2.2 客户端连接服务器1.2.3 客户端发起请求 1.3 服务器测试1.4 单执行流服务器的弊端 2. 多进程版…

【Java】 服务器cpu过高如何排查和解决?

文章目录 前言一、常见能够引起CPU100%异常的情况都有哪些&#xff1f;二、服务器CPU使用率飙升异常&#xff0c;黄金4步排查法三、排查 CPU 故障的常用命令四、什么场景会造成 CPU 低而负载确很高呢&#xff1f;五、监控发现线上机器内存占用率居高不下&#xff0c;如何分析进…

webpack require.context

require.context((directory: String),(includeSubdirs: Boolean) /* 可选的&#xff0c;默认值是 true */,(filter: RegExp) /* 可选的&#xff0c;默认值是 /^\.\/.*$/&#xff0c;所有文件 */,(mode: String) /* 可选的&#xff0c; sync | eager | weak | lazy | lazy-onc…

RPA界面元素定位与操控技术详解-达观数据

RPA 入门介绍 什么是 RPA&#xff1f;RPA 是机器人流程自动化 Robotic Process Automation 的简写。在《智能RPA实战》中&#xff0c;我们这样定义&#xff1a;通过特定的、可模拟人类在计算机界面上进行操作的技术&#xff0c;按照规则自动执行相应的流程任务&#xff0c;代替…

Linux QT通过NFS挂载到Linux开发板上

Linux QT通过NFS挂载到Linux开发板上 说明&#xff1a;这里使用的Linux开发板是正点原子的阿尔法开发板 创建NFS 环境 NFS简介 网络文件系统&#xff0c;英文 Network File System(NFS)&#xff0c;是由 SUN 公司研制的 UNIX 表示层协议 (presentation layer protocol)&…

【面试】MySQL 中InnoDB与MyISAM的区别是什么?

文章目录 前言一、数据库存储引擎二、如何知道自己的数据库用的什么引擎呢&#xff1f;三、存储引擎原理四、B 树和 B 树五、MyISAM六、InnoDB七、InnoDB与MyISAM的区别总结 前言 许多同学都把 MySQL 作为自己的数据库&#xff0c;但是可能用过最多的就是 SQL 语句&#xff0c…

【图像分类】基于LIME的CNN 图像分类研究(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f308;4 Matlab代码实现 &#x1f4a5;1 概述 基于LIME&#xff08;Local Interpretable Model-Agnostic Explanations&#xff09;的CNN图像分类研究是一种用于解释CNN模型的方法。LIME是一…

node中间件-express框架

文章目录 前置 Express安装1. 基本使用2. 中间件2.1 中间件应用 3. 中间件的注册方式3.1 普通中间件的注册3.2 path匹配中间件3.3 method与路径匹配3.4 案列中间件匹配与执行方法 4. 中间件request数据解析4.1 解析request body中间件4.2 urlencoded解析 5. 第三方中间件5.1 mo…