milvus对象存储和消息中间件的工厂设计模式分析

milvus对象存储和消息中间件的工厂设计模式分析

需求

根据参数设置创建mq和storage
mq有kafka,pulsar
storage有local,minio,remote

配置文件

根据配置文件选择初始化mq和存储:

mq:type: pulsarcommon:storageType: minio

对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。

代码框架

在这里插入图片描述

工厂接口

代码路径:internal\util\dependency\factory.go

type Factory interface {msgstream.Factory// Init()给工厂传递参数。Init(p *paramtable.ComponentParam)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}// pkg\mq\msgstream\msgstream.go
// msgstream.Factory的code
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

dependency.Factory是一个工厂接口,里面包含了mq的工厂接口,和创建持久对象的方法。

这个接口创建消息中间件对象和持久存储对象。

这里为什么不这么写:

type Factory interface {Init(p *paramtable.ComponentParam)NewMsgStream(ctx context.Context) (MsgStream, error)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}

DefaultFactory

DefaultFactory结构体是dependency.Factory的实现。

// DefaultFactory is a factory that produces instances of storage.ChunkManager and message queue.
// internal\util\dependency\factory.go
type DefaultFactory struct {standAlone          boolchunkManagerFactory storage.FactorymsgStreamFactory    msgstream.Factory
}// storage.Factory
// internal\storage\factory.go
type Factory interface {NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)
}// msgstream.Factory
// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

DefaultFactory实现了dependency.Factory接口的Init()函数。

在Init()函数内初始化了chunkManagerFactory、msgStreamFactory。

func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {// skip if using default factoryif f.msgStreamFactory != nil {return}// 初始化chunkManagerFactoryf.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(params)// initialize mq client or embedded mq.// 初始化msgStreamFactoryif err := f.initMQ(f.standAlone, params); err != nil {panic(err)}
}

f.chunkManagerFactory:

return &ChunkManagerFactory{persistentStorage: persistentStorage,config:            c,}

f.msgStreamFactory:

func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable()})log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))switch mqType {case mqTypeNatsmq:f.msgStreamFactory = msgstream.NewNatsmqFactory()case mqTypeRocksmq:f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), &params.ServiceParam)case mqTypePulsar:f.msgStreamFactory = msgstream.NewPmsFactory(&params.ServiceParam)case mqTypeKafka:f.msgStreamFactory = msgstream.NewKmsFactory(&params.ServiceParam)}if f.msgStreamFactory == nil {return errors.New("failed to create MQ: check the milvus log for initialization failures")}return nil
}

持久存储

storage.Factory是创建持久存储的工厂接口。

storage.ChunkManagerFactory是storage.Factory的实现。

NewPersistentStorageChunkManager()接口的实现:

func (f *DefaultFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {return f.chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
}func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) {return f.newChunkManager(ctx, f.persistentStorage)
}func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string) (ChunkManager, error) {switch engine {case "local":return NewLocalChunkManager(RootPath(f.config.rootPath)), nilcase "minio":return newMinioChunkManagerWithConfig(ctx, f.config)case "remote":return NewRemoteChunkManager(ctx, f.config)default:return nil, errors.New("no chunk manager implemented with engine: " + engine)}
}

根据传入的engine新建对应的持久存储对象。

LocalChunkManager、MinioChunkManager、RemoteChunkManager。

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.ClientbucketName stringrootPath   string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStoragebucketName stringrootPath   string
}

消息中间件

msgstream.Factory是创建mq的工厂接口。

工厂接口:

// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

实现有:

CommonFactory、KmsFactory、PmsFactory

// CommonFactory is a Factory for creating message streams with common logic.
// It contains a function field named newer, which is a function that creates
// an mqwrapper.Client when called.
// pkg\mq\msgstream\common_mq_factory.go
type CommonFactory struct {Newer             func(context.Context) (mqwrapper.Client, error) // client constructorDispatcherFactory ProtoUDFactoryReceiveBufSize    int64MQBufSize         int64
}// pkg\mq\msgstream\mq_factory.go
// kafka工厂
type KmsFactory struct {dispatcherFactory ProtoUDFactoryconfig            *paramtable.KafkaConfigReceiveBufSize    int64MQBufSize         int64
}// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
// pkg\mq\msgstream\mq_factory.go
// pulsar工厂
type PmsFactory struct {dispatcherFactory ProtoUDFactory// the following members must be public, so that mapstructure.Decode() can access themPulsarAddress    stringPulsarWebAddress stringReceiveBufSize   int64MQBufSize        int64PulsarAuthPlugin stringPulsarAuthParams stringPulsarTenant     stringPulsarNameSpace  stringRequestTimeout   time.DurationmetricRegisterer prometheus.Registerer
}

mq产品

mq的产品接口是msgstream.MsgStream

// MsgStream is an interface that can be used to produce and consume message on message queue
type MsgStream interface {Close()AsProducer(channels []string)Produce(*MsgPack) errorSetRepackFunc(repackFunc RepackFunc)GetProduceChannels() []stringBroadcast(*MsgPack) (map[string][]MessageID, error)AsConsumer(ctx context.Context, channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) errorChan() <-chan *MsgPackSeek(ctx context.Context, offset []*MsgPosition) errorGetLatestMsgID(channel string) (MessageID, error)CheckTopicValid(channel string) errorEnableProduce(can bool)
}

具体产品实现有:

msgstream.mqMsgStream、msgstream.MqTtMsgStream

type mqMsgStream struct {ctx              context.Contextclient           mqwrapper.Clientproducers        map[string]mqwrapper.ProducerproducerChannels []stringconsumers        map[string]mqwrapper.ConsumerconsumerChannels []stringrepackFunc    RepackFuncunmarshal     UnmarshalDispatcherreceiveBuf    chan *MsgPackcloseRWMutex  *sync.RWMutexstreamCancel  func()bufSize       int64producerLock  *sync.RWMutexconsumerLock  *sync.Mutexclosed        int32onceChan      sync.OnceenableProduce atomic.Value
}// MqTtMsgStream is a msgstream that contains timeticks
type MqTtMsgStream struct {*mqMsgStreamchanMsgBuf         map[mqwrapper.Consumer][]TsMsgchanMsgPos         map[mqwrapper.Consumer]*msgpb.MsgPositionchanStopChan       map[mqwrapper.Consumer]chan boolchanTtMsgTime      map[mqwrapper.Consumer]TimestampchanMsgBufMutex    *sync.MutexchanTtMsgTimeMutex *sync.RWMutexchanWaitGroup      *sync.WaitGrouplastTimeStamp      TimestampsyncConsumer       chan int
}

存储产品

存储的产品接口是storag.ChunkManagere

// ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks.
type ChunkManager interface {// RootPath returns current root path.RootPath() string// Path returns path of @filePath.Path(ctx context.Context, filePath string) (string, error)// Size returns path of @filePath.Size(ctx context.Context, filePath string) (int64, error)// Write writes @content to @filePath.Write(ctx context.Context, filePath string, content []byte) error// MultiWrite writes multi @content to @filePath.MultiWrite(ctx context.Context, contents map[string][]byte) error// Exist returns true if @filePath exists.Exist(ctx context.Context, filePath string) (bool, error)// Read reads @filePath and returns content.Read(ctx context.Context, filePath string) ([]byte, error)// Reader return a reader for @filePathReader(ctx context.Context, filePath string) (FileReader, error)// MultiRead reads @filePath and returns content.MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error)// ReadWithPrefix reads files with same @prefix and returns contents.ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error)Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.// if all bytes are read, @err is io.EOF.// return other error if read failed.ReadAt(ctx context.Context, filePath string, off int64, length int64) (p []byte, err error)// Remove delete @filePath.Remove(ctx context.Context, filePath string) error// MultiRemove delete @filePaths.MultiRemove(ctx context.Context, filePaths []string) error// RemoveWithPrefix remove files with same @prefix.RemoveWithPrefix(ctx context.Context, prefix string) error
}

具体产品实现有:

LocalChunkManager、MinioChunkManager、RemoteChunkManager

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.Client//	ctx        context.ContextbucketName stringrootPath   string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStorage//	ctx        context.ContextbucketName stringrootPath   string
}

总结

从代码框架可以看出每一种mq都有一个工厂,存储只有一个工厂

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/3647.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

理解与解决BouncyCastle库中“ASN1Primitive overrides final method equals”异常

理解与解决BouncyCastle库中“ASN1Primitive overrides final method equals”异常 引言错误原因分析应用场景及解决方案示例示例一&#xff1a;不同版本间的兼容性问题示例二&#xff1a;库之间的相互影响示例三&#xff1a;JDK版本适配问题 结论 引言 在Java开发中&#xff0…

springboot+vue新疆肉牛智慧牧场养殖系统

系统涉及的对象是奶牛。 系统使用员工有管理员和普通员工。 管理员有修改的权限&#xff0c;普通员工没有。 系统需要包含奶牛的编号&#xff0c;种类&#xff0c;体重&#xff0c;健康情况、生长情况、牛奶产量&#xff0c;以及上次更新数据时间等信息&#xff0c;管理员可以对…

关于权限的设计

首先系统权限&#xff0c;每个账号登录后&#xff0c;都需要知道这个账号允许访问哪些api&#xff0c;哪些数据权限&#xff08;一般是指其他账号的一些数据&#xff09; 这里就需要通过角色来关联。 --1.角色绑定菜单&#xff0c;每个菜单设计的时候包含了这个菜单会用到的所…

设计模式:合成复用原则(Composite Reuse Principle,CRP)介绍

合成复用原则&#xff08;Composite Reuse Principle&#xff0c;CRP&#xff09;是面向对象设计原则之一&#xff0c;它强调通过组合已有的对象来实现新的功能&#xff0c;而不是通过继承已有的类来实现。合成复用原则的核心思想是尽量使用对象组合而不是类继承&#xff0c;从…

Hive官方文档 join table 总结

Hive官方文档 join table 总结 join_table:table_reference [INNER] JOIN table_factor [join_condition]| table_reference {LEFT|RIGHT|FULL} [OUTER] JOIN table_reference join_condition| table_reference LEFT SEMI JOIN table_reference join_condition| table_referen…

C语言中定义和声明的区别

定义: 编译器在创建一个对象时, 为该对象申请开辟了内存空间, 这个空间的名字就是变量或者对象名, 同一个变量名在某个区域只能定义一次, 重复定义会出现错误 声明有两种作用: 1. 告诉编译器, 这个变量或者函数, 我已经定义了 (开辟了空间了), 但是在别的地方, 我先说明一下…

HarmonyOS 实战开发-使用canvas实现图表系列之折线图

一、功能结构 实现一个公共组件的时候&#xff0c;首先分析一下大概的实现结构以及开发思路&#xff0c;方便我们少走弯路&#xff0c;也可以使组件更加容易拓展&#xff0c;维护性更强。然后我会把功能逐个拆开来讲&#xff0c;这样大家才能学习到更详细的内容。下面简单阐述…

C# 调整图像的亮度简单示例

操作顺序&#xff1a; 首先加载要调整亮度的图像。 然后通过循环遍历图像的像素&#xff0c;并根据需要增加像素的亮度值。 最后&#xff0c;将调整后的图像保存回原始文件。 代码如下&#xff1a; using System; using System.Drawing.Imaging;class Program {static void…

【Linux系统编程】基础指令(二)

&#x1f49e;&#x1f49e; 前言 hello hello~ &#xff0c;这里是大耳朵土土垚~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f4a5;个人主页&#x…

架构师系列- 定时任务(二)- Quartz框架

quartz特点 Quartz是一个优秀的任务调度框架&#xff0c; 具有以下特点 强大的调度功能&#xff0c;例如支持丰富多样的调度方法&#xff0c;可以满足各种常规及特殊需求&#xff1b;负载均衡高可用 quartz 架构体系 Quartz 设计有四个核心类&#xff0c;分别是Scheduler(调度…

一个简单的java递归下降语法分析器例子

import parser.Parser; import parser.RecursiveDescentParser;import java.util.ArrayList; import java.util.Arrays; import java.util.List;public class Main {public static void main(String[] args) {// 关键词List<String> keyList new ArrayList<>(Arra…

NXP i.MX8系列平台开发讲解 - 3.10 Linux PCIe资源分配与访问(二)

目录 1. PCIe BFD 2. PCIe 配置空间 2.1 PCIe 配置空间访问 PCIe I/O访问方法 PCIe MMIO访问方法 3. PCIe BAR相关 4. PCIe Capbility 5. PCIe 操作 本文将重点讲解PCIe的资源访问相关内容&#xff0c;对于PCIe资源访问是从Host 端老看可以对PCIe进行配置与访问的资源主…

【opencv 加速推理】如何安装 支持cuda的opencv 包 用于截帧加速

要在支持CUDA的系统上安装OpenCV&#xff0c;您可以使用pip来安装支持CUDA的OpenCV版本。OpenCV支持CUDA加速&#xff0c;但需要安装额外的库&#xff0c;如cuDNN和NVIDIA CUDA Toolkit。以下是一般步骤&#xff1a; 安装NVIDIA CUDA Toolkit: 首先&#xff0c;您需要安装NVID…

深度学习基础之《TensorFlow框架(12)—图片数据》

一、图像基本知识 1、如何转换图片文件 回忆&#xff1a;之前我们在特征抽取中讲过如何将文本处理成数据 思考&#xff1a;如何将图片文件转换成机器学习算法能够处理的数据&#xff1f; 我们经常接触到的图片有两种&#xff0c;一种是黑白图片&#xff08;灰度图&#xff09;…

网站被SmartScreen标记为不安全怎么办?

在互联网时代&#xff0c;网站的安全性和可信度是用户选择是否继续访问的重要因素之一&#xff0c;然而&#xff0c;网站运营者偶尔会发现使用Edge浏览器访问网站时&#xff0c;会出现Microsoft Defender SmartScreen&#xff08;以下简称SmartScreen&#xff09;提示网站不安全…

Windows下搭建Flutter开发环境

IDE:VS code Flutter官网:Flutter: 为所有屏幕创造精彩 - Flutter 中文开发者网站 - Flutter 下载&安装 下载Flutter SDK,如图,建议自行下载安装: SDK还是挺大的,近1G,使用迅雷下载会快不少。 下载完成,解压缩到指定目录即可! 设置Local SDK,按下面步骤操作即…

持续集成和持续部署(CI/CD)

持续集成&#xff08;Continuous Integration&#xff0c;简称CI&#xff09;和持续部署&#xff08;Continuous Deployment&#xff0c;简称CD&#xff09;是现代软件开发中的重要实践&#xff0c;旨在提高开发团队的效率和软件交付的质量。 持续集成是指开发人员将代码频繁地…

【数据结构(邓俊辉)学习笔记】绪论05——动态规划

文章目录 0.前言1. Fibonacci数应用1.1 fib&#xff08;&#xff09;&#xff1a;递归1.1.1 问题与代码1.1.2 复杂度分析1.1.3 递归分析 1.2 fib&#xff08;&#xff09;&#xff1a;迭代 0.前言 make it work,make it right,make it fast. 让代码能够不仅正确而且足够高效地…

适合初学者的自然语言处理 (NLP) 综合指南

一、简述 自然语言处理 (NLP) 是人工智能 (AI) 最热门的领域之一&#xff0c;现在主要指大语言模型了。这要归功于人们热衷于能编写故事的文本生成器、欺骗人们的聊天机器人以及产生照片级真实感的文本到图像程序等应用程序。近年来&#xff0c;计算机理解人类语言、编程语言&a…

前端开发攻略---用原生JS在网页中也能实现文本转语音

1、原理 语音合成 (也被称作是文本转为语音&#xff0c;英语简写是 tts) 包括接收 app 中需要语音合成的文本&#xff0c;再在设备麦克风播放出来这两个过程。 Web API中对此有一个主要控制接口 SpeechSynthesis&#xff0c;外加一些处理如何表示要被合成的文本 (也被称为 utte…