milvus datacoord启动源码分析

datacoord启动源码分析

结构体

// components.DataCoord
// DataCoord implements grpc server of DataCoord server
type DataCoord struct {ctx context.Contextsvr *grpcdatacoordclient.Server
}// grpcdatacoord.Server
// Server is the grpc server of datacoord
type Server structgrpcdatacoord.Contextcancel context.CancelFuncserverID atomic.Int64wg        sync.WaitGroupdataCoord types.DataCoordComponentetcdCli *clientv3.ClienttikvCli *txnkv.ClientgrpcErrChan chan errorgrpcServer  *grpc.Server
}

dataCoord是一个接口,实现dataCoord api功能。

func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {wg.Add(1)return runComponent(ctx, localMsg, wg, components.NewDataCoord, metrics.RegisterDataCoord)
}// creator用NewDataCoord替换
role, err = creator(ctx, factory)

components.NewDataCoord是一个函数。

NewDataCoord()用来创建DataCoord结构体。

// NewDataCoord creates a new DataCoord
func NewDataCoord(ctx context.Context, factory dependency.Factory) (*DataCoord, error) {s := grpcdatacoordclient.NewServer(ctx, factory)return &DataCoord{ctx: ctx,svr: s,}, nil
}

grpcdatacoordclient.NewServer()产生的是本结构体Server。

进入NewServer:

// NewServer new data service grpc server
func NewServer(ctx context.Context, factory dependency.Factory, opts ...datacoord.Option) *Server {ctx1, cancel := context.WithCancel(ctx)s := &Server{ctx:         ctx1,cancel:      cancel,grpcErrChan: make(chan error),}s.dataCoord = datacoord.CreateServer(s.ctx, factory, opts...)return s
}

datacoord.CreateServer()返回一个结构体datacoord.Server,是接口types.DataCoordComponent的实现。

执行Run()

Server结构体创建后,调用结构体的Run()方法。

func runComponent[T component](ctx context.Context,localMsg bool,runWg *sync.WaitGroup,creator func(context.Context, dependency.Factory) (T, error),metricRegister func(*prometheus.Registry),
) component {var role Tsign := make(chan struct{})go func() {factory := dependency.NewFactory(localMsg)var err errorrole, err = creator(ctx, factory)if localMsg {paramtable.SetRole(typeutil.StandaloneRole)} else {paramtable.SetRole(role.GetName())}if err != nil {panic(err)}close(sign)// 在这里调用对应组件结构体的Run()方法,这里是components.DataCoord结构体if err := role.Run(); err != nil {panic(err)}runWg.Done()}()......
}

runComponent是一个包裹函数。

// Run starts service
func (s *DataCoord) Run() error {if err := s.svr.Run(); err != nil {log.Error("DataCoord starts error", zap.Error(err))return err}log.Debug("DataCoord successfully started")return nil
}

Run()方法调用s.svr.Run()方法。srv是datacoord.CreateServer()返回的结构体datacoord.Server。

// grpcdatacoord
// Run starts the Server. Need to call inner init and start method.
func (s *Server) Run() error {if err := s.init(); err != nil {return err}log.Debug("DataCoord init done ...")if err := s.start(); err != nil {return err}log.Debug("DataCoord start done ...")return nil
}

接下来分析s.init()和s.start()方法。

s.init()

func (s *Server) init() error {params := paramtable.Get()etcdConfig := &params.EtcdCfgetcdCli, err := etcd.GetEtcdClient(etcdConfig.UseEmbedEtcd.GetAsBool(),etcdConfig.EtcdUseSSL.GetAsBool(),etcdConfig.Endpoints.GetAsStrings(),etcdConfig.EtcdTLSCert.GetValue(),etcdConfig.EtcdTLSKey.GetValue(),etcdConfig.EtcdTLSCACert.GetValue(),etcdConfig.EtcdTLSMinVersion.GetValue())if err != nil {log.Debug("DataCoord connect to etcd failed", zap.Error(err))return err}s.etcdCli = etcdClis.dataCoord.SetEtcdClient(etcdCli)s.dataCoord.SetAddress(params.DataCoordGrpcServerCfg.GetAddress())if params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {log.Info("Connecting to tikv metadata storage.")tikvCli, err := getTiKVClient(&paramtable.Get().TiKVCfg)if err != nil {log.Warn("DataCoord failed to connect to tikv", zap.Error(err))return err}s.dataCoord.SetTiKVClient(tikvCli)log.Info("Connected to tikv. Using tikv as metadata storage.")}// 启动grpc,默认为13333err = s.startGrpc()if err != nil {log.Debug("DataCoord startGrpc failed", zap.Error(err))return err}// 执行真正的初始化if err := s.dataCoord.Init(); err != nil {log.Error("dataCoord init error", zap.Error(err))return err}return nil
}

这段可以看出来,创建了etcdCli并赋予给了s.etcdCli。

s.startGrpc()启动grpc端口服务。

最终调用s.dataCoord.Init()进行初始化,代码位置:internal\datacoord\server.go

s.queryCoord是接口类型types.DataCoordComponent,DataCoordComponent继承于Component。

type DataCoordComponent interface {DataCoordSetAddress(address string)SetEtcdClient(etcdClient *clientv3.Client)SetTiKVClient(client *txnkv.Client)SetRootCoordClient(rootCoord RootCoordClient)SetDataNodeCreator(func(context.Context, string, int64) (DataNodeClient, error))SetIndexNodeCreator(func(context.Context, string, int64) (IndexNodeClient, error))
}// DataCoord is the interface `datacoord` package implements
type DataCoord interface {Componentdatapb.DataCoordServer
}// Component is the interface all services implement
type Component interface {Init() errorStart() errorStop() errorRegister() error
}

接口套接口:

RootCoordComponent -> RootCoord -> Component
DataCoordComponent -> DataCoord -> Component
QueryCoordComponent -> QueryCoord -> Component
ProxyComponent -> Proxy -> Component
QueryNodeComponent -> QueryNode -> Component
IndexNodeComponent -> IndexNode -> Component
DataNodeComponent -> DataNode -> Component

各组件最终的Init()初始化代码路径:

internal\rootcoord\root_coord.go->Init()
internal\datacoord\server.go->Init()
internal\querycoordv2\server.go->Init()
internal\datanode\data_node.go->Init()
internal\indexnode\indexnode.go->Init()
internal\querynodev2\server.go->Init()
internal\proxy\proxy.go->Init()

回过头来继续datacoord的init。

// Init change server state to Initializing
func (s *Server) Init() error {var err errors.factory.Init(Params)if err = s.initSession(); err != nil {return err}if s.enableActiveStandBy {......}// 执行真正的初始化return s.initDataCoord()
}

继续进入c.initDataCoord():

func (s *Server) initDataCoord() error {s.stateCode.Store(commonpb.StateCode_Initializing)var err errorif err = s.initRootCoordClient(); err != nil {return err}s.broker = NewCoordinatorBroker(s.rootCoordClient)storageCli, err := s.newChunkManagerFactory()if err != nil {return err}if err = s.initMeta(storageCli); err != nil {return err}s.handler = newServerHandler(s)if err = s.initCluster(); err != nil {return err}s.allocator = newRootCoordAllocator(s.rootCoordClient)s.initIndexNodeManager()if err = s.initServiceDiscovery(); err != nil {return err}if Params.DataCoordCfg.EnableCompaction.GetAsBool() {s.createCompactionHandler()s.createCompactionTrigger()}if err = s.initSegmentManager(); err != nil {return err}s.initGarbageCollection(storageCli)s.initIndexBuilder(storageCli)s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)return nil
}

从代码可以看出初始化是在填充datacoord结构体。

s.start()

启动组件的逻辑。

func (s *Server) start() error {err := s.dataCoord.Register()if err != nil {log.Debug("DataCoord register service failed", zap.Error(err))return err}err = s.dataCoord.Start()if err != nil {log.Error("DataCoord start failed", zap.Error(err))return err}return nil
}

s.dataCoord是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。

Register():向元数据etcd注册。

Start():用来启动组件。

进入s.dataCoord.Start():

func (s *Server) Start() error {if !s.enableActiveStandBy {s.startDataCoord()log.Info("DataCoord startup successfully")}return nil
}

真正执行启动逻辑在s.startDataCoord()。

func (s *Server) startDataCoord() {if Params.DataCoordCfg.EnableCompaction.GetAsBool() {s.compactionHandler.start()s.compactionTrigger.start()}s.startServerLoop()s.stateCode.Store(commonpb.StateCode_Healthy)sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.ServerID)
}

要详细知道启动querycoord组件做了什么事情,研究这个函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/2284.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI写作助手:一键智能改写文章质量高

无论是自媒体人写作文章、还是企业撰写宣传资料,文字都是表达思想和传递信息的重要介质。然而,有时候我们在工作中可能会遇到写作困难,或者想要对文章进行一定程度的改写以增加独特性和质量。而在这样的背景下,智能改写文章成为了…

VS2019编译OSG3.7.0+OSGEarth3.3+OSGQt

原文链接:VS2019编译OSG3.7.0OSGEarth3.3OSGQt-CSDN博客 工具与环境 这里使用的 cmake-3.18.3 、 VS2019 和 Qt5.15.2 进行编译。 一、编译OSG3.7.0 1、下载 下载不多赘述,在github上下master版本就是3.7.0版本。 另外还需要 3rdParty_VS2017_v141_…

新技术实现WEB前端零代码开发,将为0基础的人进入到IT行业创造极大便利!顺势破解低代码难以落地的魔咒:程序员集体排斥不喜欢、无基础的人不会代码用不来!

网站:hhtp://www.uiotos.net IT行业太大,从嵌入式硬件、到工具软件,到互联网、移动互联网,再到当前火爆的AI人工智能,都属于IT行业的阵地!一度风光无两,即便当前大环境低迷,除了垄断…

Maxwell安装使用和简单案例

一、解压 cd /opt/software/ ​ tar -zxvf maxwell-1.29.2.tar.gz -C /opt/module/ ​ cd /opt/module/ 二、MySQL 环境准备 1、修改 mysql 的配置文件 修改 mysql 的配置文件,开启 MySQL Binlog 设置 vi /etc/my.cnf 添加以下内容 server_id1 log-binmysql-…

基于WOA优化的CNN-GRU-Attention的时间序列回归预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1卷积神经网络(CNN)在时间序列中的应用 4.2 GRU网络 4.3 注意力机制(Attention) 4.4 WOA优化算法 5.算法完整程序工程 1.算法运行效果图…

开发语言漫谈-脚本语言

前面讲的都称之为编程语言,就是做系统用的。还有一大类称之为脚本语言的语言,这类语言数量极多,大部分程序员用不上,也不关心,这是系统维护人员专用的邻域。这个定义其实也很不准确,不必较真。更准确的来讲…

get_or_insert_with

get_or_insert_with 是 Rust 中集合类型(如 HashMap, HashSet, 或某些自定义类型如 BTreeMap)提供的方法之一,用于处理“如果不存在则插入”的场景。它旨在简化在集合中查找某个键(key),如果该键不存在&…

把 WordPress 变成 BaaS 服务:API 调用指南

有了前面两篇内容的铺垫,我们来聊聊 WordPress 作为 CMS / BaaS 服务使用时绕不开的问题,API 调用。 这篇内容同样的,会尽量少贴代码,简单的讲清楚一件事,降低阅读负担。 写在前面 首先,我们需要进行清晰…

Docker Compose 的安装和使用详解

Docker Compose 是 Docker 官方开源的容器编排(Orchestration)项目之一,用于快速部署分布式应用。本文将介绍 Docker Compose 的基本概念、安装流程及使用方法。 简介 Compose 项目是 Docker 官方的开源项目,负责实现对 Docker 容器集群的快速编排。从功能上看,Docker C…

卡车卫星定位系统 user/create 未授权密码重置漏洞复现

0x01 产品简介 卡车卫星定位系统是一种基于卫星通信和导航技术的系统,用于对卡车的位置进行精确测定。该系统主要由一组卫星、地面控制站和接收器组成。通过测量卫星信号的传播时间,可以确定接收器(即卡车上的定位设备)所在的位置。具有高精度、高可靠性、全球覆盖等特点,…

上位机图像处理和嵌入式模块部署(树莓派4b的一种固件部署方法)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 如果软件开发好了之后,下面就是实施和部署。对于树莓派4b来说,部署其实就是烧录卡和拷贝文件。之前我们烧录卡,…

SpringBoot如何集成MyBatis可以通过几个简单的步骤来实现

在SpringBoot中集成MyBatis可以通过几个简单的步骤来实现。以下是一个基本的步骤指南: 步骤1:添加依赖 首先,你需要在你的pom.xml文件中添加MyBatis和数据库的依赖。例如,如果你使用MySQL数据库,你可以添加以下依赖&…

服务器防入侵的方案浅析

随着物联网技术和互联网技术的日益发展,勒索病毒、工控安全、产线作业都面领着极大的威胁。智慧互联正在成为各个行业未来的发展方向,智慧互联包括物联网、万物互联,机器与机器,工业控制体系,信息化,也就是…

Landsat8-9 C2L2使用注意点(简略版)

Landsat8-9 C2L2使用注意点(简略版) 像元值转化 表面反射率产品时和表面温度时,即对应的SR和ST波段的遥感数据时,需要进行像元值的转换,其转换公式与Collection1不一样(Collection1已经弃用了&#xff0c…

SQL中NULL值比较问题解析与解决方法

在进行 SQL 查询时&#xff0c;经常会遇到处理 NULL 值的情况。然而&#xff0c;在使用 ! 或 <> 来比较 NULL 值时&#xff0c;可能会遇到一些意想不到的问题。本篇博客将介绍在 SQL 中处理 NULL 值比较的问题&#xff0c;并提供解决方法。 问题描述 假设我们有一张名为…

每日新闻掌握【2024年4月22日 星期一】

2024年4月22日 星期一 农历三月十四 大公司/大事件 央视发文谈调休&#xff1a;原则是最小干预 “五一其实只放一天”近日冲上热搜&#xff0c;再度引发“假期该不该调休”的热议。如何调休&#xff0c;也有讲究。一个基本原则是&#xff0c;“最小干预&#xff0c;尽量不打乱…

SharpDevelop插件系统代码阅读笔记

SharpDevelop插件系统代码阅读笔记 1.插件系统实现细节 /// <summary> ///AddIn.cs 通过在插件dll里面查找类名&#xff0c;再创建对象&#xff0c;这个功能MEF已经可以实现了 /// </summary> /// <param name"className"></param> /// <…

FLStudio怎么冻结轨道以及如何批量复制音符

FLStudio是一款功能强大的音乐制作软件&#xff0c;广泛用于音乐制作和打谱当中。我们在制作音乐时&#xff0c;经常会遇到处理大量音频轨道的情况&#xff0c;过多的音频轨道可能会导致电脑性能受限&#xff0c;从而影响工作流程。为了应对这个问题&#xff0c;FLStudio提供了…

【JavaEE多线程】Java 文件操作

目录 Java中操作文件File概述属性构造方法方法 文件内容的读写——文件流 streamInputStreamFileInputStream概述利用 Scanner 进行字符读取OutputStream 概述 练习 Java中操作文件 Java 中通过 java.io.File类来对一个文件&#xff08;包括目录&#xff09;进行抽象的描述。注…

Ruby中Rack中间件的作用是什么?如何应用?

在 Ruby 中&#xff0c;Rack 是一个 Web 服务器接口&#xff0c;它允许开发者使用统一的方式构建 Web 应用程序。Rack 中间件是 Rack 框架的一个核心概念&#xff0c;它可以在请求被传递给应用程序之前或之后对请求和响应进行处理。 Rack 中间件的作用包括但不限于&#xff1a…