本文基于Istio 1.18.0版本进行源码学习
5、xDS的异步分发
DiscoveryService主要包含下述逻辑:
- 启动GRPC Server并接收来自Envoy端的连接请求
- 接收Envoy端的xDS请求,从ConfigController和ServiceController中获取配置和服务信息,生成响应消息发送给Envoy
- 监听来自ConfigController的配置变化消息和ServiceController的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy
1)、DiscoveryService初始化
DiscoveryService初始化流程如下图:
1)xds.NewDiscoveryServer
DiscoveryServer是通过调用xds.NewDiscoveryServer()
方法初始化的,返回的是一个DiscoveryServer实例。DiscoveryServer定义如下:
// pilot/pkg/xds/discovery.go
type DiscoveryServer struct {// Env is the model environment.Env *model.Environment// ConfigGenerator is responsible for generating data plane configuration using Istio networking// APIs and service registry info// xds配置的生成器ConfigGenerator core.ConfigGenerator// Generators allow customizing the generated config, based on the client metadata.// Key is the generator type - will match the Generator metadata to set the per-connection// default generator, or the combination of Generator metadata and TypeUrl to select a// different generator for a type.// Normal istio clients use the default generator - will not be impacted by this.// 针对不同配置类型的生成器Generators map[string]model.XdsResourceGenerator// ProxyNeedsPush is a function that determines whether a push can be completely skipped. Individual generators// may also choose to not send any updates.// 判断本次推送是否涉及该envoyProxyNeedsPush func(proxy *model.Proxy, req *model.PushRequest) bool// concurrentPushLimit is a semaphore that limits the amount of concurrent XDS pushes.// 控制推送并发数concurrentPushLimit chan struct{}// RequestRateLimit limits the number of new XDS requests allowed. This helps prevent thundering hurd of incoming requests.RequestRateLimit *rate.Limiter// InboundUpdates describes the number of configuration updates the discovery server has receivedInboundUpdates *atomic.Int64// CommittedUpdates describes the number of configuration updates the discovery server has// received, process, and stored in the push context. If this number is less than InboundUpdates,// there are updates we have not yet processed.// Note: This does not mean that all proxies have received these configurations; it is strictly// the push context, which means that the next push to a proxy will receive this configuration.CommittedUpdates *atomic.Int64// pushChannel is the buffer used for debouncing.// after debouncing the pushRequest will be sent to pushQueue// 统一接收其他组件发来的pushRequest的channelpushChannel chan *model.PushRequest// mutex used for protecting Environment.PushContextupdateMutex sync.RWMutex// pushQueue is the buffer that used after debounce and before the real xds push.// 主要是在真正xds推送前做防抖缓存pushQueue *PushQueue// debugHandlers is the list of all the supported debug handlers.debugHandlers map[string]string// adsClients reflect active gRPC channels, for both ADS and EDS.// ads和eds的grpc连接adsClients map[string]*ConnectionadsClientsMutex sync.RWMutexStatusReporter DistributionStatusCache// Authenticators for XDS requests. Should be same/subset of the CA authenticators.Authenticators []security.Authenticator// StatusGen is notified of connect/disconnect/nack on all connectionsStatusGen *StatusGenWorkloadEntryController *autoregistration.Controller// serverReady indicates caches have been synced up and server is ready to process requests.serverReady atomic.BooldebounceOptions debounceOptionsinstanceID stringclusterID cluster.ID// Cache for XDS resources// xds资源缓存Cache model.XdsCache// JwtKeyResolver holds a reference to the JWT key resolver instance.JwtKeyResolver *model.JwksResolver// ListRemoteClusters collects debug information about other clusters this istiod reads from.ListRemoteClusters func() []cluster.DebugInfo// ClusterAliases are aliase names for cluster. When a proxy connects with a cluster ID// and if it has a different alias we should use that a cluster ID for proxy.ClusterAliases map[cluster.ID]cluster.ID
}
2)initGrpcServer
在初始化GRPC Server时,调用了XDSServer.Register()
方法,向GRPC Server注册服务:
// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) Register(rpcs *grpc.Server) {// Register v3 serverdiscovery.RegisterAggregatedDiscoveryServiceServer(rpcs, s)
}
DiscoveryServer实现了AggregatedDiscoveryServiceServer接口:
// envoy/service/discovery/v3/ads.pb.go
type AggregatedDiscoveryServiceServer interface {// This is a gRPC-only API.// 全量ads stream接口StreamAggregatedResources(AggregatedDiscoveryService_StreamAggregatedResourcesServer) error// 增量ads stream接口DeltaAggregatedResources(AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error
}
StreamAggregatedResources接收DiscoveryRequest,返回DiscoveryResponse流,包含全量的xDS数据
调用流程如下图:
3)initDiscoveryService
initDiscoveryService()
方法中将DiscoveryServer启动函数添加到Server的startFuncs队列中,会在初始化完毕之后调用:
// pilot/pkg/bootstrap/server.go
func (s *Server) initDiscoveryService() {log.Infof("starting discovery service")// Implement EnvoyXdsServer grace shutdowns.addStartFunc(func(stop <-chan struct{}) error {log.Infof("Starting ADS server")s.XDSServer.Start(stop)return nil})
}
2)、DiscoveryService启动
DiscoveryServer的Start()
方法主要分别启动了四个协程:
// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {go s.WorkloadEntryController.Run(stopCh)go s.handleUpdates(stopCh)go s.periodicRefreshMetrics(stopCh)go s.sendPushes(stopCh)go s.Cache.Run(stopCh)
}
比较重要的是handleUpdates和sendPushes
Config、Service、Endpoint对资源的处理最后都会调用ConfigUpdate()
方法向DiscoveryServer的pushChannel队列发送PushRequest实现的,处理流程如下:
DiscoveryServer首先通过handleUpdates协程阻塞式地接收并处理更新请求,并将PushRequest发送到DiscoveryServer的pushQueue中,然后由sendPushes协程并发地将PushRequest发送给每一条连接的pushChannel,最后由DiscoveryServer的流处理接口处理分发请求
3)、handleUpdates
// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) {debounce(s.pushChannel, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates)
}func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, opts debounceOptions, pushFn func(req *model.PushRequest), updateSent *atomic.Int64) {var timeChan <-chan time.Timevar startDebounce time.Timevar lastConfigUpdateTime time.TimepushCounter := 0debouncedEvents := 0// Keeps track of the push requests. If updates are debounce they will be merged.var req *model.PushRequestfree := truefreeCh := make(chan struct{}, 1)push := func(req *model.PushRequest, debouncedEvents int, startDebounce time.Time) {pushFn(req)updateSent.Add(int64(debouncedEvents))debounceTime.Record(time.Since(startDebounce).Seconds())freeCh <- struct{}{}}pushWorker := func() {eventDelay := time.Since(startDebounce)quietTime := time.Since(lastConfigUpdateTime)// it has been too long or quiet enough// 当以下两个条件满足任意一个时,进行更新事件处理// 距离本轮第1次更新时间超过最大延迟时间(debounceMax默认10s)// 距离上次更新时间超过最大静默时间(debounceAfter默认100ms)if eventDelay >= opts.debounceMax || quietTime >= opts.debounceAfter {if req != nil {pushCounter++if req.ConfigsUpdated == nil {log.Infof("Push debounce stable[%d] %d for reason %s: %v since last change, %v since last push, full=%v",pushCounter, debouncedEvents, reasonsUpdated(req),quietTime, eventDelay, req.Full)} else {log.Infof("Push debounce stable[%d] %d for config %s: %v since last change, %v since last push, full=%v",pushCounter, debouncedEvents, configsUpdated(req),quietTime, eventDelay, req.Full)}free = falsego push(req, debouncedEvents, startDebounce)req = nildebouncedEvents = 0}} else {timeChan = time.After(opts.debounceAfter - quietTime)}}for {select {case <-freeCh:free = truepushWorker()case r := <-ch:// If reason is not set, record it as an unknown reasonif len(r.Reason) == 0 {r.Reason = []model.TriggerReason{model.UnknownTrigger}}if !opts.enableEDSDebounce && !r.Full {// 立即触发eds推送// trigger push now, just for EDSgo func(req *model.PushRequest) {pushFn(req)updateSent.Inc()}(r)continue}lastConfigUpdateTime = time.Now()if debouncedEvents == 0 {// 启动新一轮的配置下发定时器,定时长度为最小静默时间timeChan = time.After(opts.debounceAfter)// 记录第1次事件更新的时间startDebounce = lastConfigUpdateTime}debouncedEvents++// 合并pushRequestreq = req.Merge(r)case <-timeChan:if free {pushWorker()}case <-stopCh:return}}
}
debounce()
方法中通过最小静默时间(debounceAfter)合并更新时间,通过最大延迟时间(debounceMax)控制xDS配置下发的时延。由于DiscoveryServer.Push需要初始化PushContext,会消耗大量内存,所以为了避免OOM,debounce()
方法中控制DiscoveryServer.Push串行执行
DiscoveryServer.Push方法会一直往下调用,直到把数据推入到DiscoveryServer的pushQueue管道中,代码调用逻辑如下:
3)、PushContext的初始化
PushContext是xDS生成中最重要的结构对象,几乎包含所有网格资源信息,PushContext结构体定义如下:
// pilot/pkg/model/push_context.go
type PushContext struct {proxyStatusMutex sync.RWMutex// ProxyStatus is keyed by the error code, and holds a map keyed// by the ID.ProxyStatus map[string]map[string]ProxyPushStatus// Synthesized from env.MeshexportToDefaults exportToDefaults// ServiceIndex is the index of services by various fields.// service缓存ServiceIndex serviceIndex// serviceAccounts contains a map of hostname and port to service accounts.serviceAccounts map[serviceAccountKey][]string// virtualServiceIndex is the index of virtual services by various fields.// virtualService缓存virtualServiceIndex virtualServiceIndex// destinationRuleIndex is the index of destination rules by various fields.// destinationRule缓存destinationRuleIndex destinationRuleIndex// gatewayIndex is the index of gateways.gatewayIndex gatewayIndex// clusterLocalHosts extracted from the MeshConfigclusterLocalHosts ClusterLocalHosts// sidecarIndex stores sidecar resources// sidecar缓存sidecarIndex sidecarIndex// envoy filters for each namespace including global config namespace// envoyFilter缓存envoyFiltersByNamespace map[string][]*EnvoyFilterWrapper// wasm plugins for each namespace including global config namespacewasmPluginsByNamespace map[string][]*WasmPluginWrapper// AuthnPolicies contains Authn policies by namespace.AuthnPolicies *AuthenticationPolicies `json:"-"`// AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there// are no authorization policies in the cluster.AuthzPolicies *AuthorizationPolicies `json:"-"`// Telemetry stores the existing Telemetry resources for the cluster.Telemetry *Telemetries `json:"-"`// ProxyConfig stores the existing ProxyConfig resources for the cluster.ProxyConfigs *ProxyConfigs `json:"-"`// The following data is either a global index or used in the inbound path.// Namespace specific views do not apply here.// Mesh configuration for the mesh.Mesh *meshconfig.MeshConfig `json:"-"`// PushVersion describes the push version this push context was computed forPushVersion string// LedgerVersion is the version of the configuration ledgerLedgerVersion string// JwtKeyResolver holds a reference to the JWT key resolver instance.JwtKeyResolver *JwksResolver// GatewayAPIController holds a reference to the gateway API controller.GatewayAPIController GatewayController// cache gateways addresses for each network// this is mainly used for kubernetes multi-cluster scenarionetworkMgr *NetworkManagerNetworks *meshconfig.MeshNetworksInitDone atomic.BoolinitializeMutex sync.MutexambientIndex AmbientIndexes
}
PushContext对象的缓存为后续xDS配置的生成提供了快捷的资源索引
5)、sendPushes
// pilot/pkg/xds/discovery.go
func (s *DiscoveryServer) sendPushes(stopCh <-chan struct{}) {doSendPushes(stopCh, s.concurrentPushLimit, s.pushQueue)
}func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQueue) {for {select {case <-stopCh:returndefault:// We can send to it until it is full, then it will block until a pushes finishes and reads from it.// This limits the number of pushes that can happen concurrently// semaphore默认容量100,用来控制推送并发数semaphore <- struct{}{}// Get the next proxy to push. This will block if there are no updates required.// 从pushQueue出队一个xdsConnectionclient, push, shuttingdown := queue.Dequeue()if shuttingdown {return}recordPushTriggers(push.Reason...)// Signals that a push is done by reading from the semaphore, allowing another send on it.doneFunc := func() {queue.MarkDone(client)<-semaphore}proxiesQueueTime.Record(time.Since(push.Start).Seconds())var closed <-chan struct{}if client.stream != nil {closed = client.stream.Context().Done()} else {closed = client.deltaStream.Context().Done()}go func() {// 初始化pushEventpushEv := &Event{pushRequest: push,done: doneFunc,}select {// pushEvent添加到xdsConnection的pushChannel中case client.pushChannel <- pushEv:returncase <-closed: // grpc stream was closeddoneFunc()log.Infof("Client closed connection %v", client.conID)}}()}}
}
doSendPushes()
方法内启动了一个无限循环,在default代码块中实现了主要的功能逻辑:
- push事件面向所有xDS客户端,使用semaphore来控制推送并发数,当semaphore满了之后会阻塞
- 如果semaphore允许,为每个客户端都启动一个发送协程,尝试发送pushEvent到客户端队列pushChannel中
向pushChannel发送的pushEvent会在StreamAggregatedResources()
方法中被处理:
// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) StreamAggregatedResources(stream DiscoveryStream) error {return s.Stream(stream)
}func (s *DiscoveryServer) Stream(stream DiscoveryStream) error {if knativeEnv != "" && firstRequest.Load() {// How scaling works in knative is the first request is the "loading" request. During// loading request, concurrency=1. Once that request is done, concurrency is enabled.// However, the XDS stream is long lived, so the first request would block all others. As a// result, we should exit the first request immediately; clients will retry.firstRequest.Store(false)return status.Error(codes.Unavailable, "server warmup not complete; try again")}// Check if server is ready to accept clients and process new requests.// Currently ready means caches have been synced and hence can build// clusters correctly. Without this check, InitContext() call below would// initialize with empty config, leading to reconnected Envoys loosing// configuration. This is an additional safety check inaddition to adding// cachesSynced logic to readiness probe to handle cases where kube-proxy// ip tables update latencies.// See https://github.com/istio/istio/issues/25495.if !s.IsServerReady() {return status.Error(codes.Unavailable, "server is not ready to serve discovery information")}ctx := stream.Context()peerAddr := "0.0.0.0"if peerInfo, ok := peer.FromContext(ctx); ok {peerAddr = peerInfo.Addr.String()}if err := s.WaitForRequestLimit(stream.Context()); err != nil {log.Warnf("ADS: %q exceeded rate limit: %v", peerAddr, err)return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err)}ids, err := s.authenticate(ctx)if err != nil {return status.Error(codes.Unauthenticated, err.Error())}if ids != nil {log.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)} else {log.Debugf("Unauthenticated XDS: %s", peerAddr)}// InitContext returns immediately if the context was already initialized.if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil {// Error accessing the data - log and close, maybe a different pilot replica// has more lucklog.Warnf("Error reading config %v", err)return status.Error(codes.Unavailable, "error reading config")}con := newConnection(peerAddr, stream)// Do not call: defer close(con.pushChannel). The push channel will be garbage collected// when the connection is no longer used. Closing the channel can cause subtle race conditions// with push. According to the spec: "It's only necessary to close a channel when it is important// to tell the receiving goroutines that all data have been sent."// Block until either a request is received or a push is triggered.// We need 2 go routines because 'read' blocks in Recv().go s.receive(con, ids)// Wait for the proxy to be fully initialized before we start serving traffic. Because// initialization doesn't have dependencies that will block, there is no need to add any timeout// here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to// reqChannel and the connection not being enqueued for pushes to pushChannel until the// initialization is complete.<-con.initializedfor {// Go select{} statements are not ordered; the same channel can be chosen many times.// For requests, these are higher priority (client may be blocked on startup until these are done)// and often very cheap to handle (simple ACK), so we check it first.select {case req, ok := <-con.reqChan:if ok {if err := s.processRequest(req, con); err != nil {return err}} else {// Remote side closed connection or error processing the request.return <-con.errorChan}case <-con.stop:return nildefault:}// If there wasn't already a request, poll for requests and pushes. Note: if we have a huge// amount of incoming requests, we may still send some pushes, as we do not `continue` above;// however, requests will be handled ~2x as much as pushes. This ensures a wave of requests// cannot completely starve pushes. However, this scenario is unlikely.select {case req, ok := <-con.reqChan:if ok {if err := s.processRequest(req, con); err != nil {return err}} else {// Remote side closed connection or error processing the request.return <-con.errorChan}case pushEv := <-con.pushChannel:// 调用pushConnection推送xds配置err := s.pushConnection(con, pushEv)// 通知semaphore,本次推送完成pushEv.done()if err != nil {return err}case <-con.stop:return nil}}
}
每个客户端在通过pushConnection将本次xDS推送完后,都会调用pushEv.done()
方法,通知semaphore
从pushQueue到最终推送xDS配置流程如下图:
6)、xDS配置的生成与分发
pushConnection()
方法负责向Envoy发送xDS配置,代码如下:
// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {pushRequest := pushEv.pushRequestif pushRequest.Full {// Update Proxy with current information.s.computeProxyState(con.proxy, pushRequest)}// 根据资源的变化情况,判断是否需要为proxy更新xdsif !s.ProxyNeedsPush(con.proxy, pushRequest) {log.Debugf("Skipping push to %v, no updates required", con.conID)if pushRequest.Full {// Only report for full versions, incremental pushes do not have a new version.reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, nil)}return nil}// Send pushes to all generators// Each Generator is responsible for determining if the push event requires a pushwrl, ignoreEvents := con.pushDetails()// 遍历proxy监听的资源类型for _, w := range wrl {// 根据订阅的资源类型生成xds配置并发送到客户端if err := s.pushXds(con, w, pushRequest); err != nil {return err}}if pushRequest.Full {// Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack.reportAllEvents(s.StatusReporter, con.conID, pushRequest.Push.LedgerVersion, ignoreEvents)}proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())return nil
}
pushConnection()
方法核心逻辑如下:
- 根据资源的变化情况,判断是否需要为该Envoy代理更新xDS,如果不需要更新直接返回
- 遍历该Envoy代理监听的资源类型,根据订阅的资源类型生成xds配置并发送到客户端
1)ProxyNeedsPush
ProxyNeedsPush函数根据资源的变化情况,判断是否需要为该Envoy代理更新xDS,默认实现为DefaultProxyNeedsPush函数,最主要使用的是根据SidecarProxy判断是否依赖对应资源,核心方法如下:
// pilot/pkg/model/sidecar.go
var (sidecarScopeKnownConfigTypes = map[kind.Kind]struct{}{kind.ServiceEntry: {},kind.VirtualService: {},kind.DestinationRule: {},kind.Sidecar: {},}// clusterScopedConfigTypes includes configs when they are in root namespace,// they will be applied to all namespaces within the cluster.clusterScopedConfigTypes = map[kind.Kind]struct{}{kind.EnvoyFilter: {},kind.AuthorizationPolicy: {},kind.RequestAuthentication: {},kind.WasmPlugin: {},}
)func (sc *SidecarScope) DependsOnConfig(config ConfigKey) bool {if sc == nil {return true}// This kind of config will trigger a change if made in the root namespace or the same namespace// clusterScopedConfigTypes类型配置,如果变更的配置在root namespace或相同namespace需要xds推送if _, f := clusterScopedConfigTypes[config.Kind]; f {return config.Namespace == sc.RootNamespace || config.Namespace == sc.Namespace}// This kind of config is unknown to sidecarScope.if _, f := sidecarScopeKnownConfigTypes[config.Kind]; !f {return true}// 如果sidecarScope包含对应配置需要进行xds推送_, exists := sc.configDependencies[config.HashCode()]return exists
}
DependsOnConfig()
方法核心逻辑如下:
- 如果变更的clusterScopedConfigTypes类型配置在root namespace或相同namespace需要xDS推送
- 如果SidecarScope包含对应配置需要进行xDS推送
2)pushXds
// pilot/pkg/xds/xdsgen.go
func (s *DiscoveryServer) pushXds(con *Connection, w *model.WatchedResource, req *model.PushRequest) error {if w == nil {return nil}// 获取xds生成器gen := s.findGenerator(w.TypeUrl, con)if gen == nil {return nil}t0 := time.Now()// If delta is set, client is requesting new resources or removing old ones. We should just generate the// new resources it needs, rather than the entire set of known resources.// Note: we do not need to account for unsubscribed resources as these are handled by parent removal;// See https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#deleting-resources.// This means if there are only removals, we will not respond.var logFiltered stringif !req.Delta.IsEmpty() && features.PartialFullPushes &&!con.proxy.IsProxylessGrpc() {logFiltered = " filtered:" + strconv.Itoa(len(w.ResourceNames)-len(req.Delta.Subscribed))w = &model.WatchedResource{TypeUrl: w.TypeUrl,ResourceNames: req.Delta.Subscribed.UnsortedList(),}}// xds生成器生成xds配置res, logdata, err := gen.Generate(con.proxy, w, req)info := ""if len(logdata.AdditionalInfo) > 0 {info = " " + logdata.AdditionalInfo}if len(logFiltered) > 0 {info += logFiltered}if err != nil || res == nil {// If we have nothing to send, report that we got an ACK for this version.if s.StatusReporter != nil {s.StatusReporter.RegisterEvent(con.conID, w.TypeUrl, req.Push.LedgerVersion)}if log.DebugEnabled() {log.Debugf("%s: SKIP%s for node:%s%s", v3.GetShortType(w.TypeUrl), req.PushReason(), con.proxy.ID, info)}// If we are sending a request, we must respond or we can get Envoy stuck. Assert we do.// One exception is if Envoy is simply unsubscribing from some resources, in which case we can skip.isUnsubscribe := features.PartialFullPushes && !req.Delta.IsEmpty() && req.Delta.Subscribed.IsEmpty()if features.EnableUnsafeAssertions && err == nil && res == nil && req.IsRequest() && !isUnsubscribe {log.Fatalf("%s: SKIPPED%s for node:%s%s but expected a response for request", v3.GetShortType(w.TypeUrl), req.PushReason(), con.proxy.ID, info)}return err}defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()resp := &discovery.DiscoveryResponse{ControlPlane: ControlPlane(),TypeUrl: w.TypeUrl,// TODO: send different version for incremental edsVersionInfo: req.Push.PushVersion,Nonce: nonce(req.Push.LedgerVersion),Resources: model.ResourcesToAny(res),}configSize := ResourceSize(res)configSizeBytes.With(typeTag.Value(w.TypeUrl)).Record(float64(configSize))ptype := "PUSH"if logdata.Incremental {ptype = "PUSH INC"}// 调用send方法将discoveryResponse发送出去if err := con.send(resp); err != nil {if recordSendError(w.TypeUrl, err) {log.Warnf("%s: Send failure for node:%s resources:%d size:%s%s: %v",v3.GetShortType(w.TypeUrl), con.proxy.ID, len(res), util.ByteCount(configSize), info, err)}return err}switch {case !req.Full:if log.DebugEnabled() {log.Debugf("%s: %s%s for node:%s resources:%d size:%s%s",v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res), util.ByteCount(configSize), info)}default:debug := ""if log.DebugEnabled() {// Add additional information to logs when debug mode enabled.debug = " nonce:" + resp.Nonce + " version:" + resp.VersionInfo}log.Infof("%s: %s%s for node:%s resources:%d size:%v%s%s", v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res),util.ByteCount(ResourceSize(res)), info, debug)}return nil
}
pushXds()
方法首先根据订阅的资源类型找到对应的xDS生成器,然后通过生成器相应的xDS配置,最后通过send接口发送出去
3)XdsResourceGenerator.Generate
Pilot主要负责6种xDS配置资源(CDS、EDS、LDS、RDS、ECDS、NDS)的生成及下发。接来下以CDS生成器为例,看看DiscoveryServer是如何根据代理的属性及PushContext缓存生成原始的Cluster配置的
CDS配置通过ConfigGenerator.BuildClusters()
方法生成,代码如下:`
// pilot/pkg/networking/core/v1alpha3/cluster.go
func (configgen *ConfigGeneratorImpl) BuildClusters(proxy *model.Proxy, req *model.PushRequest) ([]*discovery.Resource, model.XdsLogDetails) {// In Sotw, we care about all services.var services []*model.Serviceif features.FilterGatewayClusterConfig && proxy.Type == model.Router {services = req.Push.GatewayServices(proxy)} else {services = proxy.SidecarScope.Services()}return configgen.buildClusters(proxy, req, services)
}func (configgen *ConfigGeneratorImpl) buildClusters(proxy *model.Proxy, req *model.PushRequest,services []*model.Service,
) ([]*discovery.Resource, model.XdsLogDetails) {clusters := make([]*cluster.Cluster, 0)resources := model.Resources{}envoyFilterPatches := req.Push.EnvoyFilters(proxy)// 创建cluster生成器cb := NewClusterBuilder(proxy, req, configgen.Cache)instances := proxy.ServiceInstancescacheStats := cacheStats{}switch proxy.Type {// 生成sidecarProxy clustercase model.SidecarProxy:// Setup outbound clustersoutboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_OUTBOUND}// 构建outbound clusterob, cs := configgen.buildOutboundClusters(cb, proxy, outboundPatcher, services)cacheStats = cacheStats.merge(cs)resources = append(resources, ob...)// Add a blackhole and passthrough cluster for catching traffic to unresolved routes// 添加blackhole和passthrough cluster为默认的路由转发流量clusters = outboundPatcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster(), cb.buildDefaultPassthroughCluster())clusters = append(clusters, outboundPatcher.insertedClusters()...)// Setup inbound clustersinboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_INBOUND}// 构建inbound clusterclusters = append(clusters, configgen.buildInboundClusters(cb, proxy, instances, inboundPatcher)...)if proxy.EnableHBONE() {clusters = append(clusters, configgen.buildInboundHBONEClusters())}// Pass through clusters for inbound traffic. These cluster bind loopback-ish src address to access node local service.// 添加passthrough cluster为默认的路由转发流量clusters = inboundPatcher.conditionallyAppend(clusters, nil, cb.buildInboundPassthroughClusters()...)clusters = append(clusters, inboundPatcher.insertedClusters()...)case model.Waypoint:svcs := findWaypointServices(proxy, req.Push)// Waypoint proxies do not need outbound clusters in most cases, unless we have a route pointing to somethingoutboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_OUTBOUND}ob, cs := configgen.buildOutboundClusters(cb, proxy, outboundPatcher, filterWaypointOutboundServices(req.Push.ServicesAttachedToMesh(), svcs, services))cacheStats = cacheStats.merge(cs)resources = append(resources, ob...)// Setup inbound clustersinboundPatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_SIDECAR_INBOUND}clusters = append(clusters, configgen.buildWaypointInboundClusters(cb, proxy, req.Push, svcs)...)clusters = append(clusters, inboundPatcher.insertedClusters()...)default: // Gatewayspatcher := clusterPatcher{efw: envoyFilterPatches, pctx: networking.EnvoyFilter_GATEWAY}ob, cs := configgen.buildOutboundClusters(cb, proxy, patcher, services)cacheStats = cacheStats.merge(cs)resources = append(resources, ob...)// Gateways do not require the default passthrough cluster as they do not have original dst listeners.clusters = patcher.conditionallyAppend(clusters, nil, cb.buildBlackHoleCluster())if proxy.Type == model.Router && proxy.MergedGateway != nil && proxy.MergedGateway.ContainsAutoPassthroughGateways {clusters = append(clusters, configgen.buildOutboundSniDnatClusters(proxy, req, patcher)...)}clusters = append(clusters, patcher.insertedClusters()...)}// OutboundTunnel cluster is needed for sidecar and gateway.if proxy.EnableHBONE() {clusters = append(clusters, cb.buildConnectOriginate(proxy, req.Push, nil))}// if credential socket exists, create a cluster for itif proxy.Metadata != nil && proxy.Metadata.Raw[security.CredentialMetaDataName] == "true" {clusters = append(clusters, cb.buildExternalSDSCluster(security.CredentialNameSocketPath))}for _, c := range clusters {resources = append(resources, &discovery.Resource{Name: c.Name, Resource: protoconv.MessageToAny(c)})}resources = cb.normalizeClusters(resources)if cacheStats.empty() {return resources, model.DefaultXdsLogDetails}return resources, model.XdsLogDetails{AdditionalInfo: fmt.Sprintf("cached:%v/%v", cacheStats.hits, cacheStats.hits+cacheStats.miss)}
}
7)、响应Envoy主动发起的xDS请求
StreamAggregatedResources()
方法中开启receive协程接收Envoy的xDS请求,代码如下:
// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) receive(con *Connection, identities []string) {defer func() {close(con.errorChan)close(con.reqChan)// Close the initialized channel, if its not already closed, to prevent blocking the stream.select {case <-con.initialized:default:close(con.initialized)}}()firstRequest := truefor {// 接收discoveryRequestreq, err := con.stream.Recv()if err != nil {if istiogrpc.IsExpectedGRPCError(err) {log.Infof("ADS: %q %s terminated", con.peerAddr, con.conID)return}con.errorChan <- errlog.Errorf("ADS: %q %s terminated with error: %v", con.peerAddr, con.conID, err)totalXDSInternalErrors.Increment()return}// This should be only set for the first request. The node id may not be set - for example malicious clients.if firstRequest {// probe happens before envoy sends first xDS requestif req.TypeUrl == v3.HealthInfoType {log.Warnf("ADS: %q %s send health check probe before normal xDS request", con.peerAddr, con.conID)continue}firstRequest = falseif req.Node == nil || req.Node.Id == "" {con.errorChan <- status.New(codes.InvalidArgument, "missing node information").Err()return}if err := s.initConnection(req.Node, con, identities); err != nil {con.errorChan <- errreturn}defer s.closeConnection(con)log.Infof("ADS: new connection for node:%s", con.conID)}select {// 将discoveryRequest添加到reqChancase con.reqChan <- req:case <-con.stream.Context().Done():log.Infof("ADS: %q %s terminated with stream closed", con.peerAddr, con.conID)return}}
}
receive()
方法中接收Envoy的DiscoveryRequest,然后将DiscoveryRequest添加到reqChan中
StreamAggregatedResources()
方法中从reqChan中获取DiscoveryRequest,然后调用processRequest()
方法:
// pilot/pkg/xds/ads.go
func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {stype := v3.GetShortType(req.TypeUrl)log.Debugf("ADS:%s: REQ %s resources:%d nonce:%s version:%s ", stype,con.conID, len(req.ResourceNames), req.ResponseNonce, req.VersionInfo)if req.TypeUrl == v3.HealthInfoType {s.handleWorkloadHealthcheck(con.proxy, req)return nil}// For now, don't let xDS piggyback debug requests start watchers.if strings.HasPrefix(req.TypeUrl, v3.DebugType) {return s.pushXds(con,&model.WatchedResource{TypeUrl: req.TypeUrl, ResourceNames: req.ResourceNames},&model.PushRequest{Full: true, Push: con.proxy.LastPushContext})}if s.StatusReporter != nil {s.StatusReporter.RegisterEvent(con.conID, req.TypeUrl, req.ResponseNonce)}shouldRespond, delta := s.shouldRespond(con, req)if !shouldRespond {return nil}request := &model.PushRequest{Full: true,Push: con.proxy.LastPushContext,Reason: []model.TriggerReason{model.ProxyRequest},// The usage of LastPushTime (rather than time.Now()), is critical here for correctness; This time// is used by the XDS cache to determine if a entry is stale. If we use Now() with an old push context,// we may end up overriding active cache entries with stale ones.Start: con.proxy.LastPushTime,Delta: delta,}// SidecarScope for the proxy may not have been updated based on this pushContext.// It can happen when `processRequest` comes after push context has been updated(s.initPushContext),// but proxy's SidecarScope has been updated(s.computeProxyState -> SetSidecarScope) due to optimizations that skip sidecar scope// computation.if con.proxy.SidecarScope != nil && con.proxy.SidecarScope.Version != request.Push.PushVersion {s.computeProxyState(con.proxy, request)}// 推送xds配置return s.pushXds(con, con.Watched(req.TypeUrl), request)
}
processRequest()
方法中根据DiscoveryRequest信息推送xDS配置
8)、小结
配置变化后向Envoy推送xDS时序:
响应Envoy主动发起的xDS时序:
参考:
《Istio权威指南 下》
4.深入Istio源码:Pilot的Discovery Server如何执行xDS异步分发?
Istio Pilot代码深度解析