milvus querynode启动源码分析

querynode启动源码分析

结构体

// QueryNode implements QueryNode grpc server
// cmd\components\query_node.go
type QueryNode struct {ctx context.Contextsvr *grpcquerynode.Server
}// Server is the grpc server of QueryNode.
type Server struct {querynode   types.QueryNodeComponentwg          sync.WaitGroupctx         context.Contextcancel      context.CancelFuncgrpcErrChan chan errorserverID atomic.Int64grpcServer *grpc.ServeretcdCli *clientv3.Client
}

querynode是一个接口,实现querynode api功能。

func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {wg.Add(1)// clear local storagerootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()queryDataLocalPath := filepath.Join(rootPath, typeutil.QueryNodeRole)cleanLocalDir(queryDataLocalPath)// clear mmap dirmmapDir := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()if len(mmapDir) > 0 {cleanLocalDir(mmapDir)}return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode)
}// creator用NewQueryNode替换
role, err = creator(ctx, factory)

components.NewQueryNode是一个函数。

NewQueryNode()用来创建QueryNode结构体。

// NewQueryNode creates a new QueryNode
func NewQueryNode(ctx context.Context, factory dependency.Factory) (*QueryNode, error) {svr, err := grpcquerynode.NewServer(ctx, factory)if err != nil {return nil, err}return &QueryNode{ctx: ctx,svr: svr,}, nil
}

grpcquerynode.NewServer()产生的是本结构体Server。

// NewServer create a new QueryNode grpc server.
func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) {ctx1, cancel := context.WithCancel(ctx)s := &Server{ctx:         ctx1,cancel:      cancel,querynode:   qn.NewQueryNode(ctx, factory),grpcErrChan: make(chan error),}return s, nil
}

qn.NewQueryNode()返回一个结构体,是 types.QueryNodeComponen接口的一个实现

执行Run()

Server结构体创建后,调用结构体的Run()方法。

func runComponent[T component](ctx context.Context,localMsg bool,runWg *sync.WaitGroup,creator func(context.Context, dependency.Factory) (T, error),metricRegister func(*prometheus.Registry),
) component {var role Tsign := make(chan struct{})go func() {factory := dependency.NewFactory(localMsg)var err errorrole, err = creator(ctx, factory)if localMsg {paramtable.SetRole(typeutil.StandaloneRole)} else {paramtable.SetRole(role.GetName())}if err != nil {panic(err)}close(sign)// 在这里调用对应组件结构体的Run()方法,这里是QueryNode结构体if err := role.Run(); err != nil {panic(err)}runWg.Done()}()......
}

runComponent是一个包裹函数。

// Run starts service
func (q *QueryNode) Run() error {if err := q.svr.Run(); err != nil {log.Error("QueryNode starts error", zap.Error(err))return err}log.Debug("QueryNode successfully started")return nil
}

Run()方法调用q.svr.Run()方法。srv是grpcquerynode.NewServer()返回的结构体。

进入Run()方法:

// Run initializes and starts QueryNode's grpc service.
func (s *Server) Run() error {if err := s.init(); err != nil {return err}log.Debug("QueryNode init done ...")if err := s.start(); err != nil {return err}log.Debug("QueryNode start done ...")return nil
}

接下来分析s.init()和s.start()方法。

s.init()

// init initializes QueryNode's grpc service.
func (s *Server) init() error {etcdConfig := &paramtable.Get().EtcdCfgParams := &paramtable.Get().QueryNodeGrpcServerCfgif !funcutil.CheckPortAvailable(Params.Port.GetAsInt()) {paramtable.Get().Save(Params.Port.Key, fmt.Sprintf("%d", funcutil.GetAvailablePort()))log.Warn("QueryNode get available port when init", zap.Int("Port", Params.Port.GetAsInt()))}log.Debug("QueryNode", zap.Int("port", Params.Port.GetAsInt()))etcdCli, err := etcd.GetEtcdClient(etcdConfig.UseEmbedEtcd.GetAsBool(),etcdConfig.EtcdUseSSL.GetAsBool(),etcdConfig.Endpoints.GetAsStrings(),etcdConfig.EtcdTLSCert.GetValue(),etcdConfig.EtcdTLSKey.GetValue(),etcdConfig.EtcdTLSCACert.GetValue(),etcdConfig.EtcdTLSMinVersion.GetValue())if err != nil {log.Debug("QueryNode connect to etcd failed", zap.Error(err))return err}s.etcdCli = etcdClis.SetEtcdClient(etcdCli)s.querynode.SetAddress(Params.GetAddress())log.Debug("QueryNode connect to etcd successfully")s.wg.Add(1)// 启动grpc,默认端口为21123go s.startGrpcLoop(Params.Port.GetAsInt())// wait for grpc server loop starterr = <-s.grpcErrChanif err != nil {return err}s.querynode.UpdateStateCode(commonpb.StateCode_Initializing)log.Debug("QueryNode", zap.Any("State", commonpb.StateCode_Initializing))// 调用querynode的初始化方法if err := s.querynode.Init(); err != nil {log.Error("QueryNode init error: ", zap.Error(err))return err}return nil
}

这段可以看出来,创建了etcdCli并赋予给了s.etcdCli。

s.startGrpcLoop()启动grpc端口服务。

最终调用s.querynode.Init()进行初始化,代码位置:internal\querynodev2\server.go

s.querynode是接口类型types.QueryNodeComponent,QueryNodeComponent继承于Component。

type QueryNodeComponent interface {QueryNodeUpdateStateCode(stateCode commonpb.StateCode)SetAddress(address string)GetAddress() stringSetEtcdClient(etcdClient *clientv3.Client)
}// QueryNode is the interface `querynode` package implements
type QueryNode interface {Componentquerypb.QueryNodeServer
}// Component is the interface all services implement
type Component interface {Init() errorStart() errorStop() errorRegister() error
}

接口套接口:

RootCoordComponent -> RootCoord -> Component
DataCoordComponent -> DataCoord -> Component
QueryCoordComponent -> QueryCoord -> Component
ProxyComponent -> Proxy -> Component
QueryNodeComponent -> QueryNode -> Component
IndexNodeComponent -> IndexNode -> Component
DataNodeComponent -> DataNode -> Component

各组件最终的Init()初始化代码路径:

internal\rootcoord\root_coord.go->Init()
internal\datacoord\server.go->Init()
internal\querycoordv2\server.go->Init()
internal\datanode\data_node.go->Init()
internal\indexnode\indexnode.go->Init()
internal\querynodev2\server.go->Init()
internal\proxy\proxy.go->Init()

回过头来继续querynode的init。

// Init function init historical and streaming module to manage segments
func (node *QueryNode) Init() error {var initError errornode.initOnce.Do(func() {// ctx := context.Background()log.Info("QueryNode session info", zap.String("metaPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue()))err := node.initSession()if err != nil {log.Error("QueryNode init session failed", zap.Error(err))initError = errreturn}err = node.initHook()if err != nil {// auto index cannot work if hook init failedif paramtable.Get().AutoIndexConfig.Enable.GetAsBool() {log.Error("QueryNode init hook failed", zap.Error(err))initError = errreturn}}node.factory.Init(paramtable.Get())localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()localChunkManager := storage.NewLocalChunkManager(storage.RootPath(localRootPath))localUsedSize, err := segments.GetLocalUsedSize(localRootPath)if err != nil {log.Warn("get local used size failed", zap.Error(err))initError = errreturn}metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localUsedSize / 1024 / 1024))remoteChunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx)if err != nil {log.Warn("failed to init remote chunk manager", zap.Error(err))initError = errreturn}node.cacheChunkManager, err = storage.NewVectorChunkManager(node.ctx,localChunkManager,remoteChunkManager,paramtable.Get().QueryNodeCfg.CacheMemoryLimit.GetAsInt64(),paramtable.Get().QueryNodeCfg.CacheEnabled.GetAsBool(),)if err != nil {log.Error("failed to init cache chunk manager", zap.Error(err))initError = errreturn}node.vectorStorage, err = node.factory.NewPersistentStorageChunkManager(node.ctx)if err != nil {log.Error("QueryNode init vector storage failed", zap.Error(err))initError = errreturn}log.Info("queryNode try to connect etcd success", zap.String("MetaRootPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue()))schedulePolicy := paramtable.Get().QueryNodeCfg.SchedulePolicyName.GetValue()node.scheduler = tasks.NewScheduler(schedulePolicy,)log.Info("queryNode init scheduler", zap.String("policy", schedulePolicy))node.clusterManager = cluster.NewWorkerManager(func(ctx context.Context, nodeID int64) (cluster.Worker, error) {if nodeID == paramtable.GetNodeID() {return NewLocalWorker(node), nil}sessions, _, err := node.session.GetSessions(typeutil.QueryNodeRole)if err != nil {return nil, err}addr := ""for _, session := range sessions {if session.ServerID == nodeID {addr = session.Addressbreak}}client, err := grpcquerynodeclient.NewClient(ctx, addr, nodeID)if err != nil {return nil, err}return cluster.NewRemoteWorker(client), nil})node.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]()node.subscribingChannels = typeutil.NewConcurrentSet[string]()node.unsubscribingChannels = typeutil.NewConcurrentSet[string]()node.manager = segments.NewManager()node.loader = segments.NewLoader(node.manager, node.vectorStorage)node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, paramtable.GetNodeID())// init pipeline managernode.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators)err = node.InitSegcore()if err != nil {log.Error("QueryNode init segcore failed", zap.Error(err))initError = errreturn}if paramtable.Get().QueryNodeCfg.GCEnabled.GetAsBool() {if paramtable.Get().QueryNodeCfg.GCHelperEnabled.GetAsBool() {action := func(GOGC uint32) {debug.SetGCPercent(int(GOGC))}gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)} else {action := func(uint32) {}gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)}}log.Info("query node init successfully",zap.Int64("queryNodeID", paramtable.GetNodeID()),zap.String("Address", node.address),)})return initError
}

从代码可以看出初始化是在填充QueryNode结构体。

s.start()

启动组件的逻辑。

// start starts QueryNode's grpc service.
func (s *Server) start() error {if err := s.querynode.Start(); err != nil {log.Error("QueryNode start failed", zap.Error(err))return err}if err := s.querynode.Register(); err != nil {log.Error("QueryNode register service failed", zap.Error(err))return err}return nil
}

s.querynode是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。

Register():向元数据etcd注册。

Start():用来启动组件。

// Start mainly start QueryNode's query service.
func (node *QueryNode) Start() error {node.startOnce.Do(func() {node.scheduler.Start()paramtable.SetCreateTime(time.Now())paramtable.SetUpdateTime(time.Now())mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()mmapEnabled := len(mmapDirPath) > 0node.UpdateStateCode(commonpb.StateCode_Healthy)registry.GetInMemoryResolver().RegisterQueryNode(paramtable.GetNodeID(), node)log.Info("query node start successfully",zap.Int64("queryNodeID", paramtable.GetNodeID()),zap.String("Address", node.address),zap.Bool("mmapEnabled", mmapEnabled),)})return nil
}

node节点都没有standby,coord节点有standby。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/824343.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android笔记: mkdirs不生效失败

Manifest已经配置权限,代码中也动态获取权限,mkdirs一直返回false File.mkdirs()方法创建文件夹失败 1、动态申请读写权限 <!--SDCard写权限--> <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /> <!--SDCard读权…

Linux安装和使用Android Debug Bridge(ADB)

目录 1、开发环境和工具 2、ADB是什么&#xff1f; 3、安装ADB 3.1、使用包管理器安装 ADB 3.2、手动安装 ADB 4、使用ADB 4.1、连接设备 4.2、执行shell命令 4.3、安装应用程序 4.4、截取屏幕截图 4.5、模拟按键和手势 4.6、上传文件到Android设备 4.7、从Android设备下载文件…

常见的并发编程问题,如死锁、竞态条件、线程不安全、内存可见性问题等,如何在Java中避免这些问题?

死锁&#xff1a;发生在两个或更多线程互相等待对方持有的资源&#xff0c;导致所有的线程都无法进行下去。避免死锁的一个常见方法是遵循资源顺序访问&#xff0c;将系统中的资源排序&#xff0c;并约定每个线程都按序请求资源。 竞态条件&#xff1a;两个或更多线程同时访问…

BGP边界网关路由实验(华为)

一&#xff0c;技术简介 BGP&#xff08;边界网关路由协议&#xff09;是一种自治系统&#xff08;AS&#xff09;间的协议&#xff0c;主要用于在不同的AS之间交换路由信息。AS是一个由一组网络设备和路由器组成的网络集合&#xff0c;这些设备可以在一个共同的管理域中协同工…

IaC:实现持续交付和 DevOps 自动化的关键

基础架构即代码&#xff08;IaC&#xff09;和 CI/CD 流水线最初似乎并不匹配。因为它们代表了两种不同的流程。IaC 主要关注基础设施的配置和开发&#xff0c;而 CI/CD 则围绕软件开发、测试和部署。 然而&#xff0c;将 IaC 集成到 CI/CD 流水线中具有多种优势。首先&#xf…

C++从入门到精通——static成员

static成员 前言一、static成员概念例题 二、 static成员的特性特性例题静态成员函数可以调用非静态成员函数吗非静态成员函数可以调用类的静态成员函数吗 前言 一、static成员 概念 声明为static的类成员称为类的静态成员&#xff0c;用static修饰的成员变量&#xff0c;称之…

Logback:SpringBoot 2.0 整合 Logback (kafaka es)

1. 规范了日志的打印格式 2. 增加了彩色日志输出 3. 支持异步推送kafka 4. 日志文件压缩功能 我们无需关心 Logback 版本&#xff0c;只需关注 Boot 版本即可&#xff0c;Parent 工程自动集成了 Logback。Springboot 本身就可以打印日志&#xff0c;为什么还需要规范…

1 回归:锂电池温度预测top2 代码部分(一) Tabnet

2024 iFLYTEK A.I.开发者大赛-讯飞开放平台 TabNet&#xff1a; 模型也是我在这个比赛一个意外收获&#xff0c;这个模型在比赛之中可用。但是需要GPU资源&#xff0c;否则运行真的是太慢了。后面针对这个模型我会写出如何使用的方法策略。 比赛结束后有与其他两位选手聊天&am…

.net core8 自定义一个中间件

在.NET Core 8中自定义一个中间件&#xff0c;基本步骤与之前的.NET Core版本相似。中间件是ASP.NET Core请求处理管道的一个组件&#xff0c;它们可以在请求处理过程中被调用。下面是如何创建和使用一个自定义中间件的步骤&#xff1a; 第一步&#xff1a;创建中间件类 创建…

win2022服务器apache配置https(ssl)真实环境实验(避坑之作)不依赖宝塔小皮等集成环境

本次实验背景&#xff1a; 完全参考官方 https://cloud.tencent.com/document/product/400/4143 文档流程&#xff0c;没有搞定&#xff0c;于是写下避坑之作。 服务器&#xff1a;腾讯云轻量应用服务器 操作系统&#xff1a; Windows Server 2022 DataCenter 64bit CN apache…

李沐45_SSD实现——自学笔记

主体思路&#xff1a; 1.生成一堆锚框 2.根据真实标签为每个锚框打标(类别、偏移、mask) 3.模型为每个锚框做一个预测(类别、偏移) 4.计算上述二者的差异损失&#xff0c;以更新模型weights 先读取一张图像。 它的高度和宽度分别为561和728像素。 %matplotlib inline import …

Photoshop 2024 (ps) v25.6中文 强大的图像处理软件 mac/win

Photoshop 2024 for Mac是一款强大的图像处理软件&#xff0c;专为Mac用户设计。它继承了Adobe Photoshop一贯的优秀功能&#xff0c;并进一步提升了性能和稳定性。 Mac版Photoshop 2024 (ps)v25.6中文激活版下载 win版Photoshop 2024 (ps)v25.6直装版下载 无论是专业的设计师还…

EI Scopus双检索 | 2024年清洁能源与智能电网国际会议(CCESG 2024)

会议简介 Brief Introduction 2024年清洁能源与智能电网国际会议(CCESG 2024) 会议时间&#xff1a;2024年 11月27-29日 召开地点&#xff1a;澳大利亚悉尼 大会官网&#xff1a;CCESG 2024-2024 International Joint Conference on Clean Energy and Smart Grid 由CoreShare科…

m4p转换mp3格式怎么转?3个Mac端应用~

M4P文件格式的诞生伴随着苹果公司引入FairPlay版权管理系统&#xff0c;该系统旨在保护音频的内容。M4P因此而生&#xff0c;成为受到FairPlay系统保护的音频格式&#xff0c;常见于苹果设备的iTunes等平台。 MP3文件格式的多个优点 MP3格式的优点显而易见。首先&#xff0c;其…

【flink报错】flink cdc无主键时的操作

文章目录 一. 报错二. 解决 一. 报错 “org.apache.flink.table.api.validationexception: ‘scan.incremental.snapshot.chunk.key-column’ must be set when the table doesn’t have primary keys” 报错提示当表没有主键时&#xff0c;必须设置 ‘scan.incremental.snapsh…

WPF: XAML语法规范详解

WPF&#xff08;Windows Presentation Foundation&#xff09;是.NET框架的一个组成部分&#xff0c;用于构建桌面应用程序的用户界面。XAML&#xff08;Extensible Application Markup Language&#xff09;是一种基于XML的标记语言&#xff0c;用于定义WPF应用程序的界面和逻…

【vim】折叠代码

目录 简介操作创建折叠删除折叠打开或关闭折叠在折叠间移动简介 Vim编辑器中可以使用 foldmethod 选项设置折叠方法。 将 foldmethod 设置为 manual 以外的值时,将删除所有折叠并创建新折叠。切换到 manual 方法不会删除现有的折叠。由此可以先用自动定义折叠,然后手动更改它…

k8s之etcd

1.特点&#xff1a; etcd 是云原生架构中重要的基础组件。有如下特点&#xff1a; 简单&#xff1a;安装配置简单&#xff0c;而且提供了 HTTP API 进行交互&#xff0c;使用也很简单键值对存储&#xff1a;将数据存储在分层组织的目录中&#xff0c;如同在标准文件系统中监…

vscode msvc qt环境搭建

自己整了好久都没把环境搞好&#xff0c;后来发现已经有大佬搞好了插件&#xff0c;完全不需要自己整理。 下载如下插件&#xff1a; 第二个qt插件就可以自动帮我们生成工程了。 可惜目前似乎支持win&#xff0c;另外就是debug模式运行后会报qwindowsd.dll插件找不到的错误&a…

打造一套属于自己的php开发框架(一)封装Db类

一直使用thinkphp或者laravel框架&#xff0c;越到后面越发现&#xff0c;这些框架占用太大了&#xff0c;最主要的是很多东西完全用不到&#xff0c;我就想为啥不能自己封装一个&#xff1f;想到就搞&#xff0c;这个是一个Db类&#xff0c;主要封装了MySQL的增删改查方法&…