milvus querycoord启动源码分析

querycoord启动源码分析

结构体

// Server is the grpc server of QueryCoord.
type Server struct {wg         sync.WaitGrouploopCtx    context.ContextloopCancel context.CancelFuncgrpcServer *grpc.ServerserverID atomic.Int64grpcErrChan chan error// 是一个接口类型queryCoord types.QueryCoordComponentfactory dependency.FactoryetcdCli *clientv3.ClienttikvCli *txnkv.ClientdataCoord types.DataCoordClientrootCoord types.RootCoordClient
}

分析变量dataCoord、rootCoord是何时赋予的值。

queryCoord是一个接口,实现queryCoord api功能。

func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {wg.Add(1)return runComponent(ctx, localMsg, wg, components.NewQueryCoord, metrics.RegisterQueryCoord)
}// creator用NewQueryCoord替换
role, err = creator(ctx, factory)

components.NewQueryCoord是一个函数。

NewQueryCoord()用来创建QueryCoord结构体。

// NewQueryCoord creates a new QueryCoord
func NewQueryCoord(ctx context.Context, factory dependency.Factory) (*QueryCoord, error) {svr, err := grpcquerycoord.NewServer(ctx, factory)if err != nil {return nil, err}return &QueryCoord{ctx: ctx,svr: svr,}, nil
}

grpcquerycoord.NewServer()产生的是本结构体Server。

进入NewServer:

// NewServer create a new QueryCoord grpc server.
func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) {ctx1, cancel := context.WithCancel(ctx)svr, err := qc.NewQueryCoord(ctx1)if err != nil {cancel()return nil, err}return &Server{queryCoord:  svr,loopCtx:     ctx1,loopCancel:  cancel,factory:     factory,grpcErrChan: make(chan error),}, nil
}

qc.NewQueryCoord()返回一个结构体,是types.QueryCoordComponent接口的实现。

执行Run()

Server结构体创建后,调用结构体的Run()方法。

func runComponent[T component](ctx context.Context,localMsg bool,runWg *sync.WaitGroup,creator func(context.Context, dependency.Factory) (T, error),metricRegister func(*prometheus.Registry),
) component {var role Tsign := make(chan struct{})go func() {factory := dependency.NewFactory(localMsg)var err errorrole, err = creator(ctx, factory)if localMsg {paramtable.SetRole(typeutil.StandaloneRole)} else {paramtable.SetRole(role.GetName())}if err != nil {panic(err)}close(sign)// 在这里调用对应组件结构体的Run()方法,这里是QueryCoord结构体if err := role.Run(); err != nil {panic(err)}runWg.Done()}()......
}

runComponent是一个包裹函数。

// Run starts service
func (qs *QueryCoord) Run() error {if err := qs.svr.Run(); err != nil {log.Error("QueryCoord starts error", zap.Error(err))return err}log.Debug("QueryCoord successfully started")return nil
}

Run()方法调用qs.svr.Run()方法。srv是qc.NewQueryCoord()返回的结构体。

// Run initializes and starts QueryCoord's grpc service.
func (s *Server) Run() error {if err := s.init(); err != nil {return err}log.Debug("QueryCoord init done ...")if err := s.start(); err != nil {return err}log.Debug("QueryCoord start done ...")return nil
}

接下来分析s.init()和s.start()方法。

s.init()

// init initializes QueryCoord's grpc service.
func (s *Server) init() error {params := paramtable.Get()etcdConfig := &params.EtcdCfgrpcParams := &params.QueryCoordGrpcServerCfgetcdCli, err := etcd.GetEtcdClient(etcdConfig.UseEmbedEtcd.GetAsBool(),etcdConfig.EtcdUseSSL.GetAsBool(),etcdConfig.Endpoints.GetAsStrings(),etcdConfig.EtcdTLSCert.GetValue(),etcdConfig.EtcdTLSKey.GetValue(),etcdConfig.EtcdTLSCACert.GetValue(),etcdConfig.EtcdTLSMinVersion.GetValue())if err != nil {log.Debug("QueryCoord connect to etcd failed", zap.Error(err))return err}s.etcdCli = etcdClis.SetEtcdClient(etcdCli)s.queryCoord.SetAddress(rpcParams.GetAddress())if params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {......}s.wg.Add(1)// 启动grpc,默认为19531go s.startGrpcLoop(rpcParams.Port.GetAsInt())// wait for grpc server loop starterr = <-s.grpcErrChanif err != nil {return err}// --- Master Server Client ---// 创建rootCoord客户端if s.rootCoord == nil {s.rootCoord, err = rcc.NewClient(s.loopCtx, qc.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)if err != nil {log.Error("QueryCoord try to new RootCoord client failed", zap.Error(err))panic(err)}}// wait for master init or healthy// 等待rootcoord服务正常log.Debug("QueryCoord try to wait for RootCoord ready")err = componentutil.WaitForComponentHealthy(s.loopCtx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200)if err != nil {log.Error("QueryCoord wait for RootCoord ready failed", zap.Error(err))panic(err)}if err := s.SetRootCoord(s.rootCoord); err != nil {panic(err)}log.Debug("QueryCoord report RootCoord ready")// --- Data service client ---// 创建dataCoord客户端if s.dataCoord == nil {s.dataCoord, err = dcc.NewClient(s.loopCtx, qc.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)if err != nil {log.Error("QueryCoord try to new DataCoord client failed", zap.Error(err))panic(err)}}// 等待datacoord服务正常log.Debug("QueryCoord try to wait for DataCoord ready")err = componentutil.WaitForComponentHealthy(s.loopCtx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200)if err != nil {log.Error("QueryCoord wait for DataCoord ready failed", zap.Error(err))panic(err)}if err := s.SetDataCoord(s.dataCoord); err != nil {panic(err)}log.Debug("QueryCoord report DataCoord ready")// 执行真正的初始化if err := s.queryCoord.Init(); err != nil {return err}return nil
}

这段可以看出来,创建了etcdCli并赋予给了s.etcdCli。

s.startGrpcLoop()启动grpc端口服务。

最终调用s.queryCoord.Init()进行初始化,代码位置:internal\querycoordv2\server.go

s.queryCoord是接口类型types.QueryCoordComponent ,QueryCoordComponent 继承于Component。

// QueryCoord is the interface `querycoord` package implements
type QueryCoord interface {Componentquerypb.QueryCoordServer
}// Component is the interface all services implement
type Component interface {Init() errorStart() errorStop() errorRegister() error
}

接口套接口:

RootCoordComponent -> RootCoord -> Component
DataCoordComponent -> DataCoord -> Component
QueryCoordComponent -> QueryCoord -> Component
ProxyComponent -> Proxy -> Component
QueryNodeComponent -> QueryNode -> Component
IndexNodeComponent -> IndexNode -> Component
DataNodeComponent -> DataNode -> Component

各组件最终的Init()初始化代码路径:

internal\rootcoord\root_coord.go->Init()
internal\datacoord\server.go->Init()
internal\querycoordv2\server.go->Init()
internal\datanode\data_node.go->Init()
internal\indexnode\indexnode.go->Init()
internal\querynodev2\server.go->Init()
internal\proxy\proxy.go->Init()

回过头来继续querycoord的init。

func (s *Server) Init() error {log.Info("QueryCoord start init",zap.String("meta-root-path", Params.EtcdCfg.MetaRootPath.GetValue()),zap.String("address", s.address))if err := s.initSession(); err != nil {return err}if s.enableActiveStandBy {......}// 真正执行初始化return s.initQueryCoord()
}

继续进入c.initQueryCoord():

func (s *Server) initQueryCoord() error {s.UpdateStateCode(commonpb.StateCode_Initializing)log.Info("QueryCoord", zap.Any("State", commonpb.StateCode_Initializing))// Init KV and ID allocatormetaType := Params.MetaStoreCfg.MetaStoreType.GetValue()var idAllocatorKV kv.TxnKVlog.Info(fmt.Sprintf("query coordinator connecting to %s.", metaType))if metaType == util.MetaStoreTypeTiKV {s.kv = tikv.NewTiKV(s.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue())idAllocatorKV = tsoutil.NewTSOTiKVBase(s.tikvCli, Params.TiKVCfg.KvRootPath.GetValue(), "querycoord-id-allocator")} else if metaType == util.MetaStoreTypeEtcd {s.kv = etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())idAllocatorKV = tsoutil.NewTSOKVBase(s.etcdCli, Params.EtcdCfg.KvRootPath.GetValue(), "querycoord-id-allocator")} else {return fmt.Errorf("not supported meta store: %s", metaType)}log.Info(fmt.Sprintf("query coordinator successfully connected to %s.", metaType))idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", idAllocatorKV)err := idAllocator.Initialize()if err != nil {log.Error("query coordinator id allocator initialize failed", zap.Error(err))return err}s.idAllocator = func() (int64, error) {return idAllocator.AllocOne()}// Init metrics cache managers.metricsCacheManager = metricsinfo.NewMetricsCacheManager()// Init metas.nodeMgr = session.NewNodeManager()err = s.initMeta()if err != nil {return err}// Init sessionlog.Info("init session")s.cluster = session.NewCluster(s.nodeMgr, s.queryNodeCreator)// Init schedulerslog.Info("init schedulers")s.jobScheduler = job.NewScheduler()s.taskScheduler = task.NewScheduler(s.ctx,s.meta,s.dist,s.targetMgr,s.broker,s.cluster,s.nodeMgr,)// Init heartbeatlog.Info("init dist controller")s.distController = dist.NewDistController(s.cluster,s.nodeMgr,s.dist,s.targetMgr,s.taskScheduler,)// Init balancer map and balancerlog.Info("init all available balancer")s.balancerMap = make(map[string]balance.Balance)s.balancerMap[balance.RoundRobinBalancerName] = balance.NewRoundRobinBalancer(s.taskScheduler, s.nodeMgr)s.balancerMap[balance.RowCountBasedBalancerName] = balance.NewRowCountBasedBalancer(s.taskScheduler,s.nodeMgr, s.dist, s.meta, s.targetMgr)s.balancerMap[balance.ScoreBasedBalancerName] = balance.NewScoreBasedBalancer(s.taskScheduler,s.nodeMgr, s.dist, s.meta, s.targetMgr)if balancer, ok := s.balancerMap[params.Params.QueryCoordCfg.Balancer.GetValue()]; ok {s.balancer = balancerlog.Info("use config balancer", zap.String("balancer", params.Params.QueryCoordCfg.Balancer.GetValue()))} else {s.balancer = s.balancerMap[balance.RowCountBasedBalancerName]log.Info("use rowCountBased auto balancer")}// Init checker controllerlog.Info("init checker controller")s.checkerController = checkers.NewCheckerController(s.meta,s.dist,s.targetMgr,s.balancer,s.nodeMgr,s.taskScheduler,s.broker,)// Init observerss.initObserver()// Init load status cachemeta.GlobalFailedLoadCache = meta.NewFailedLoadCache()log.Info("QueryCoord init success")return err
}

从代码可以看出初始化是在填充querycoord结构体。

s.start()

启动组件的逻辑。

// start starts QueryCoord's grpc service.
func (s *Server) start() error {err := s.queryCoord.Register()if err != nil {return err}return s.queryCoord.Start()
}

s.queryCoord是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。

Register():向元数据etcd注册。

Start():用来启动组件。

进入s.queryCoord.Start():

func (s *Server) Start() error {if !s.enableActiveStandBy {if err := s.startQueryCoord(); err != nil {return err}log.Info("QueryCoord started")}return nil
}

真正执行启动逻辑在s.startQueryCoord()。

func (s *Server) startQueryCoord() error {log.Info("start watcher...")sessions, revision, err := s.session.GetSessions(typeutil.QueryNodeRole)if err != nil {return err}for _, node := range sessions {s.nodeMgr.Add(session.NewNodeInfo(node.ServerID, node.Address))s.taskScheduler.AddExecutor(node.ServerID)if node.Stopping {s.nodeMgr.Stopping(node.ServerID)}}s.checkReplicas()for _, node := range sessions {s.handleNodeUp(node.ServerID)}s.wg.Add(2)go s.handleNodeUpLoop()go s.watchNodes(revision)// Recover dist, to avoid generate too much task when dist not ready after restarts.distController.SyncAll(s.ctx)s.startServerLoop()s.afterStart()s.UpdateStateCode(commonpb.StateCode_Healthy)sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.ServerID)return nil
}

要详细知道启动querycoord组件做了什么事情,研究这个函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/823163.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DDD领域设计基础

1概述 作为架构师&#xff0c;我们在业务建模的时候不能完全凭经验、感觉&#xff0c;我们还得有一套方法论&#xff0c;DDD领域驱动恰巧可以作为业务建模的方法论来使用。 2 为什么要使用DDD 2.1 为什么需要DDD 复杂系统设计&#xff1a;系统多&#xff0c;业务逻辑复杂&a…

ABeam德硕|旗下艾宾信息技术开发上海、西安、大连三地校招信息公开,期待您的加入!

寻人启事 想要找到你&#xff01; 关于我们 ABeam Consulting全球 ABeam Consulting集团成立于1981年&#xff0c;总部位于日本东京&#xff0c;历经40余年的发展&#xff0c;先后在中国、韩国、泰国、新加坡、英国、德国、美国等全球多个国家和地区设立了服务网点&#xff0…

Python和R概率统计算法建模评估气象和运动

&#x1f3af;要点 概率统计数学&#xff1a;&#x1f3af;Python和R计算和算法实现气象学&#xff1a; 计算和可视化&#xff1a;&#x1f3af;全球陆地-海洋平均年平均表面温度&#xff1a;&#x1f58a;直方图温度异常&#xff0c;&#x1f58a;显示分位数-分位数&#xff…

使用Socket实现局域网内聊天室

需要提前了解的Socket知识点&#xff1a; Client端输入的IP都是Server所在电脑的IPServer最好设置0.0.0.0这样无论迁移到哪个电脑上&#xff0c;都是那台电脑的IPClient和Server必须在同一个局域网之下&#xff0c;否则不能通信&#xff1b;如果要实现跨局域网通信&#xff0c…

幻兽帕鲁老板公开发声:腾讯正在制作幻兽帕鲁克隆版

昨天&#xff0c;Pocketpair的老板出来指责中国游戏公司抄袭了他们的游戏Palworld&#xff0c;说这简直是太不可思议了。 Pocketpair的CEO Takuro Mizobe发布了一个叫Auroria的游戏的截图&#xff0c;然后说&#xff1a;“腾讯正在制作Palworld的克隆游戏&#xff01;在中国&a…

Python根据主播直播时间段判定订单销售额归属

写在前面&#xff1a;最近在群里看到一个这样的直播电商的场景觉得还是挺有趣的&#xff0c;于是就想用Python来实现。 需求描述&#xff1a;根据主播直播时间段结合销售订单的付款时间判断所属销售的归属 生成主播在线直播时间段数据 from datetime import datetime, time…

zabbix监控配置(添加主机、主机组和添加监控项等)

zabbix监控配置 文章目录 zabbix监控配置1.添加主机组2.添加主机&#xff08;linux&#xff09;3.添加主机&#xff08;windows&#xff09;4.监控项配置&#xff08;通过模板添加&#xff09;5.监控项配置&#xff08;手动添加&#xff09; 1.添加主机组 2.添加主机&#xff0…

学习Rust的第5天:控制流

Control flow, as the name suggests controls the flow of the program, based on a condition. 控制流&#xff0c;顾名思义&#xff0c;根据条件控制程序的流。 If expression If表达式 An if expression is used when you want to execute a block of code if a condition …

自定义vue-cli 实现预设模板项目

模板结构 主要包括四个部分&#xff1a; preset.jsonprompts.jsgenerator/index.jstemplate/ 项目最终结构 preset.json preset.json 中是一个包含创建新项目所需预定义选项和插件的 JSON 对象&#xff0c;让用户无需在命令提示中选择它们&#xff0c;简称预设&#xff1b;…

openGauss学习笔记-265 openGauss性能调优-TPCC性能调优测试指导-操作系统配置

文章目录 openGauss学习笔记-265 openGauss性能调优-TPCC性能调优测试指导-操作系统配置265.1安装openEuler操作系统265.2 修改操作系统内核PAGESIZE为64KB。265.3 关闭CPU中断的服务irqbalance openGauss学习笔记-265 openGauss性能调优-TPCC性能调优测试指导-操作系统配置 本…

绩效考核:关键成功因素法(CSF)

绩效考核是企业管理的核心环节&#xff0c;其目的是为了确保员工的工作表现符合组织的目标和期望。然而&#xff0c;传统的绩效考核方法往往只关注员工的业绩和产出&#xff0c;而忽略了员工的能力和潜力。关键成功因素法&#xff08;CSF&#xff09;作为一种新型的绩效考核方法…

Scala详解(5)

Scala 集合 概述 集合本质上就是一个用于存储1个到多个数据的容器。在Scala中&#xff0c;集合可以分为三大类&#xff1a;Seq(序列)&#xff0c;Set(集合)和Map(映射)。基于这三大类&#xff0c;衍生出来众多的子类 序列&#xff1a;元素有序可重复 集合&#xff1a;元素无…

通过控制台获取iptv直播地址

控制台代码1: // 获取所有包含频道名称和URL的<div>和<td>元素 const divElements = document.querySelectorAll(div[style="float: left;"]); const tdElements = document.querySelectorAll(td[style="padding-left: 6px;"]);// 创建空数组…

2011年认证杯SPSSPRO杯数学建模C题(第二阶段)你的爱车入保险了吗全过程文档及程序

2011年认证杯SPSSPRO杯数学建模 C题 你的爱车入保险了吗 原题再现&#xff1a; 近几年&#xff0c;国内汽车销售市场异常火爆&#xff0c;销售量屡创新高。车轮上的世界&#xff0c;保险已经与我们如影随形。汽车保险&#xff0c;简称车险&#xff0c;是指对机动车辆由于自然…

计算机考研都将采用408!?

这个根本不可能&#xff0c;高考还没做到全国统一考试呢 每个学校对于计算机招生的需求是不一样的&#xff0c;比如清华大学&#xff0c;专业课912&#xff0c;算的上是最难的计算机专业课了&#xff0c;那他为什么搞这么难啊&#xff0c;还不是因为那群敢考清华的卷王们太变态…

Python数据结构【二】查找

前言 可私聊进一千多人Python全栈交流群&#xff08;手把手教学&#xff0c;问题解答&#xff09; 进群可领取Python全栈教程视频 多得数不过来的计算机书籍&#xff1a;基础、Web、爬虫、数据分析、可视化、机器学习、深度学习、人工智能、算法、面试题等。 &#x1f680;&a…

C++奇迹之旅:构造函数

文章目录 &#x1f4dd;类的6个默认成员函数&#x1f320; 构造函数&#x1f309; 概念&#x1f309;特性&#x1f309;三种默认构造函数 &#x1f6a9;总结 &#x1f4dd;类的6个默认成员函数 如果一个类中什么成员都没有&#xff0c;简称为空类。 空类中真的什么都没有吗&am…

【重磅开源】一款可以生成SpringBoot+Vue代码的轻量级项目

基于SpringBootVue3开发的轻量级快速开发脚手架 &#x1f341;项目简介 一款通用的前、后端项目模板 一款快速开发管理系统的项目 一款可以生成SpringBootVue代码的项目 一款持续迭代的开源项目 一个程序员的心血合集 度过严寒&#xff0c;终有春日&#xff…

FastJson转化时BigDecimal与Double问题

一、场景 在使用FastJson将json字符串转化为jsonObject时&#xff0c;FastJson默认会将小数转为BigDecimal类型&#xff0c;但有时候我们想要的是double类型。 二、解决方案 int disableDecimal JSON.DEFAULT_PARSER_FEATURE & ~Feature.UseBigDecimal.getMask(); Stri…

Nginx内存池相关源码剖析(一)总览

剖析nginx的内存池源码&#xff0c;讲解原理实现以及该内存池设计的应用场景 介绍 Nginx内存池是Nginx为了优化内存管理而引入的一种机制。在Nginx中&#xff0c;每个层级&#xff08;如模板、TCP连接、HTTP请求等&#xff09;都会创建一个内存池进行内存管理。当这些层级的…