milvus对象存储和消息中间件的工厂设计模式分析

milvus对象存储和消息中间件的工厂设计模式分析

需求

根据参数设置创建mq和storage
mq有kafka,pulsar
storage有local,minio,remote

配置文件

根据配置文件选择初始化mq和存储:

mq:type: pulsarcommon:storageType: minio

对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。

代码框架

在这里插入图片描述

工厂接口

代码路径:internal\util\dependency\factory.go

type Factory interface {msgstream.Factory// Init()给工厂传递参数。Init(p *paramtable.ComponentParam)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}// pkg\mq\msgstream\msgstream.go
// msgstream.Factory的code
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

dependency.Factory是一个工厂接口,里面包含了mq的工厂接口,和创建持久对象的方法。

这个接口创建消息中间件对象和持久存储对象。

这里为什么不这么写:

type Factory interface {Init(p *paramtable.ComponentParam)NewMsgStream(ctx context.Context) (MsgStream, error)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}

DefaultFactory

DefaultFactory结构体是dependency.Factory的实现。

// DefaultFactory is a factory that produces instances of storage.ChunkManager and message queue.
// internal\util\dependency\factory.go
type DefaultFactory struct {standAlone          boolchunkManagerFactory storage.FactorymsgStreamFactory    msgstream.Factory
}// storage.Factory
// internal\storage\factory.go
type Factory interface {NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)
}// msgstream.Factory
// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

DefaultFactory实现了dependency.Factory接口的Init()函数。

在Init()函数内初始化了chunkManagerFactory、msgStreamFactory。

func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {// skip if using default factoryif f.msgStreamFactory != nil {return}// 初始化chunkManagerFactoryf.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(params)// initialize mq client or embedded mq.// 初始化msgStreamFactoryif err := f.initMQ(f.standAlone, params); err != nil {panic(err)}
}

f.chunkManagerFactory:

return &ChunkManagerFactory{persistentStorage: persistentStorage,config:            c,}

f.msgStreamFactory:

func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable()})log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))switch mqType {case mqTypeNatsmq:f.msgStreamFactory = msgstream.NewNatsmqFactory()case mqTypeRocksmq:f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), &params.ServiceParam)case mqTypePulsar:f.msgStreamFactory = msgstream.NewPmsFactory(&params.ServiceParam)case mqTypeKafka:f.msgStreamFactory = msgstream.NewKmsFactory(&params.ServiceParam)}if f.msgStreamFactory == nil {return errors.New("failed to create MQ: check the milvus log for initialization failures")}return nil
}

持久存储

storage.Factory是创建持久存储的工厂接口。

storage.ChunkManagerFactory是storage.Factory的实现。

NewPersistentStorageChunkManager()接口的实现:

func (f *DefaultFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {return f.chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
}func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) {return f.newChunkManager(ctx, f.persistentStorage)
}func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string) (ChunkManager, error) {switch engine {case "local":return NewLocalChunkManager(RootPath(f.config.rootPath)), nilcase "minio":return newMinioChunkManagerWithConfig(ctx, f.config)case "remote":return NewRemoteChunkManager(ctx, f.config)default:return nil, errors.New("no chunk manager implemented with engine: " + engine)}
}

根据传入的engine新建对应的持久存储对象。

LocalChunkManager、MinioChunkManager、RemoteChunkManager。

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.ClientbucketName stringrootPath   string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStoragebucketName stringrootPath   string
}

消息中间件

msgstream.Factory是创建mq的工厂接口。

工厂接口:

// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

实现有:

CommonFactory、KmsFactory、PmsFactory

// CommonFactory is a Factory for creating message streams with common logic.
// It contains a function field named newer, which is a function that creates
// an mqwrapper.Client when called.
// pkg\mq\msgstream\common_mq_factory.go
type CommonFactory struct {Newer             func(context.Context) (mqwrapper.Client, error) // client constructorDispatcherFactory ProtoUDFactoryReceiveBufSize    int64MQBufSize         int64
}// pkg\mq\msgstream\mq_factory.go
// kafka工厂
type KmsFactory struct {dispatcherFactory ProtoUDFactoryconfig            *paramtable.KafkaConfigReceiveBufSize    int64MQBufSize         int64
}// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
// pkg\mq\msgstream\mq_factory.go
// pulsar工厂
type PmsFactory struct {dispatcherFactory ProtoUDFactory// the following members must be public, so that mapstructure.Decode() can access themPulsarAddress    stringPulsarWebAddress stringReceiveBufSize   int64MQBufSize        int64PulsarAuthPlugin stringPulsarAuthParams stringPulsarTenant     stringPulsarNameSpace  stringRequestTimeout   time.DurationmetricRegisterer prometheus.Registerer
}

mq产品

mq的产品接口是msgstream.MsgStream

// MsgStream is an interface that can be used to produce and consume message on message queue
type MsgStream interface {Close()AsProducer(channels []string)Produce(*MsgPack) errorSetRepackFunc(repackFunc RepackFunc)GetProduceChannels() []stringBroadcast(*MsgPack) (map[string][]MessageID, error)AsConsumer(ctx context.Context, channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) errorChan() <-chan *MsgPackSeek(ctx context.Context, offset []*MsgPosition) errorGetLatestMsgID(channel string) (MessageID, error)CheckTopicValid(channel string) errorEnableProduce(can bool)
}

具体产品实现有:

msgstream.mqMsgStream、msgstream.MqTtMsgStream

type mqMsgStream struct {ctx              context.Contextclient           mqwrapper.Clientproducers        map[string]mqwrapper.ProducerproducerChannels []stringconsumers        map[string]mqwrapper.ConsumerconsumerChannels []stringrepackFunc    RepackFuncunmarshal     UnmarshalDispatcherreceiveBuf    chan *MsgPackcloseRWMutex  *sync.RWMutexstreamCancel  func()bufSize       int64producerLock  *sync.RWMutexconsumerLock  *sync.Mutexclosed        int32onceChan      sync.OnceenableProduce atomic.Value
}// MqTtMsgStream is a msgstream that contains timeticks
type MqTtMsgStream struct {*mqMsgStreamchanMsgBuf         map[mqwrapper.Consumer][]TsMsgchanMsgPos         map[mqwrapper.Consumer]*msgpb.MsgPositionchanStopChan       map[mqwrapper.Consumer]chan boolchanTtMsgTime      map[mqwrapper.Consumer]TimestampchanMsgBufMutex    *sync.MutexchanTtMsgTimeMutex *sync.RWMutexchanWaitGroup      *sync.WaitGrouplastTimeStamp      TimestampsyncConsumer       chan int
}

存储产品

存储的产品接口是storag.ChunkManagere

// ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks.
type ChunkManager interface {// RootPath returns current root path.RootPath() string// Path returns path of @filePath.Path(ctx context.Context, filePath string) (string, error)// Size returns path of @filePath.Size(ctx context.Context, filePath string) (int64, error)// Write writes @content to @filePath.Write(ctx context.Context, filePath string, content []byte) error// MultiWrite writes multi @content to @filePath.MultiWrite(ctx context.Context, contents map[string][]byte) error// Exist returns true if @filePath exists.Exist(ctx context.Context, filePath string) (bool, error)// Read reads @filePath and returns content.Read(ctx context.Context, filePath string) ([]byte, error)// Reader return a reader for @filePathReader(ctx context.Context, filePath string) (FileReader, error)// MultiRead reads @filePath and returns content.MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error)// ReadWithPrefix reads files with same @prefix and returns contents.ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error)Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.// if all bytes are read, @err is io.EOF.// return other error if read failed.ReadAt(ctx context.Context, filePath string, off int64, length int64) (p []byte, err error)// Remove delete @filePath.Remove(ctx context.Context, filePath string) error// MultiRemove delete @filePaths.MultiRemove(ctx context.Context, filePaths []string) error// RemoveWithPrefix remove files with same @prefix.RemoveWithPrefix(ctx context.Context, prefix string) error
}

具体产品实现有:

LocalChunkManager、MinioChunkManager、RemoteChunkManager

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.Client//	ctx        context.ContextbucketName stringrootPath   string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStorage//	ctx        context.ContextbucketName stringrootPath   string
}

总结

从代码框架可以看出每一种mq都有一个工厂,存储只有一个工厂

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/3647.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot+vue新疆肉牛智慧牧场养殖系统

系统涉及的对象是奶牛。 系统使用员工有管理员和普通员工。 管理员有修改的权限&#xff0c;普通员工没有。 系统需要包含奶牛的编号&#xff0c;种类&#xff0c;体重&#xff0c;健康情况、生长情况、牛奶产量&#xff0c;以及上次更新数据时间等信息&#xff0c;管理员可以对…

关于权限的设计

首先系统权限&#xff0c;每个账号登录后&#xff0c;都需要知道这个账号允许访问哪些api&#xff0c;哪些数据权限&#xff08;一般是指其他账号的一些数据&#xff09; 这里就需要通过角色来关联。 --1.角色绑定菜单&#xff0c;每个菜单设计的时候包含了这个菜单会用到的所…

HarmonyOS 实战开发-使用canvas实现图表系列之折线图

一、功能结构 实现一个公共组件的时候&#xff0c;首先分析一下大概的实现结构以及开发思路&#xff0c;方便我们少走弯路&#xff0c;也可以使组件更加容易拓展&#xff0c;维护性更强。然后我会把功能逐个拆开来讲&#xff0c;这样大家才能学习到更详细的内容。下面简单阐述…

【Linux系统编程】基础指令(二)

&#x1f49e;&#x1f49e; 前言 hello hello~ &#xff0c;这里是大耳朵土土垚~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f4a5;个人主页&#x…

架构师系列- 定时任务(二)- Quartz框架

quartz特点 Quartz是一个优秀的任务调度框架&#xff0c; 具有以下特点 强大的调度功能&#xff0c;例如支持丰富多样的调度方法&#xff0c;可以满足各种常规及特殊需求&#xff1b;负载均衡高可用 quartz 架构体系 Quartz 设计有四个核心类&#xff0c;分别是Scheduler(调度…

一个简单的java递归下降语法分析器例子

import parser.Parser; import parser.RecursiveDescentParser;import java.util.ArrayList; import java.util.Arrays; import java.util.List;public class Main {public static void main(String[] args) {// 关键词List<String> keyList new ArrayList<>(Arra…

NXP i.MX8系列平台开发讲解 - 3.10 Linux PCIe资源分配与访问(二)

目录 1. PCIe BFD 2. PCIe 配置空间 2.1 PCIe 配置空间访问 PCIe I/O访问方法 PCIe MMIO访问方法 3. PCIe BAR相关 4. PCIe Capbility 5. PCIe 操作 本文将重点讲解PCIe的资源访问相关内容&#xff0c;对于PCIe资源访问是从Host 端老看可以对PCIe进行配置与访问的资源主…

【opencv 加速推理】如何安装 支持cuda的opencv 包 用于截帧加速

要在支持CUDA的系统上安装OpenCV&#xff0c;您可以使用pip来安装支持CUDA的OpenCV版本。OpenCV支持CUDA加速&#xff0c;但需要安装额外的库&#xff0c;如cuDNN和NVIDIA CUDA Toolkit。以下是一般步骤&#xff1a; 安装NVIDIA CUDA Toolkit: 首先&#xff0c;您需要安装NVID…

深度学习基础之《TensorFlow框架(12)—图片数据》

一、图像基本知识 1、如何转换图片文件 回忆&#xff1a;之前我们在特征抽取中讲过如何将文本处理成数据 思考&#xff1a;如何将图片文件转换成机器学习算法能够处理的数据&#xff1f; 我们经常接触到的图片有两种&#xff0c;一种是黑白图片&#xff08;灰度图&#xff09;…

网站被SmartScreen标记为不安全怎么办?

在互联网时代&#xff0c;网站的安全性和可信度是用户选择是否继续访问的重要因素之一&#xff0c;然而&#xff0c;网站运营者偶尔会发现使用Edge浏览器访问网站时&#xff0c;会出现Microsoft Defender SmartScreen&#xff08;以下简称SmartScreen&#xff09;提示网站不安全…

Windows下搭建Flutter开发环境

IDE:VS code Flutter官网:Flutter: 为所有屏幕创造精彩 - Flutter 中文开发者网站 - Flutter 下载&安装 下载Flutter SDK,如图,建议自行下载安装: SDK还是挺大的,近1G,使用迅雷下载会快不少。 下载完成,解压缩到指定目录即可! 设置Local SDK,按下面步骤操作即…

【数据结构(邓俊辉)学习笔记】绪论05——动态规划

文章目录 0.前言1. Fibonacci数应用1.1 fib&#xff08;&#xff09;&#xff1a;递归1.1.1 问题与代码1.1.2 复杂度分析1.1.3 递归分析 1.2 fib&#xff08;&#xff09;&#xff1a;迭代 0.前言 make it work,make it right,make it fast. 让代码能够不仅正确而且足够高效地…

适合初学者的自然语言处理 (NLP) 综合指南

一、简述 自然语言处理 (NLP) 是人工智能 (AI) 最热门的领域之一&#xff0c;现在主要指大语言模型了。这要归功于人们热衷于能编写故事的文本生成器、欺骗人们的聊天机器人以及产生照片级真实感的文本到图像程序等应用程序。近年来&#xff0c;计算机理解人类语言、编程语言&a…

前端开发攻略---用原生JS在网页中也能实现文本转语音

1、原理 语音合成 (也被称作是文本转为语音&#xff0c;英语简写是 tts) 包括接收 app 中需要语音合成的文本&#xff0c;再在设备麦克风播放出来这两个过程。 Web API中对此有一个主要控制接口 SpeechSynthesis&#xff0c;外加一些处理如何表示要被合成的文本 (也被称为 utte…

Idea:通义千问插件

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 一、通义千问大模型 二、程序编写助手 三、Idea安装通义千问插件 总结 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、通义千问大模型…

数据结构——二叉树的操作 (层序遍历)(C++实现)

数据结构——二叉树的操作&#xff08;2&#xff09;&#xff08;C实现&#xff09; 统计叶子结点个数统计结点个数层序遍历非递归方式递归方式 我们今天接着来看二叉树的操作&#xff0c;如果还没有看过上一篇的可以点击这里&#xff1a; https://blog.csdn.net/qq_67693066/a…

来自异国的客人 - 华为OD统一考试(D卷)

OD统一考试(D卷) 分值: 100分 题解: Java / Python / C++ 题目描述 有位客人来自异国,在该国使用m进制计数。 该客人有个幸运数字n(n<m),每次购物时,其总是喜欢计算本次支付的花费(折算为异国的价格后)中存在多少幸运数字。 问: 当其购买一个在我国价值k的产品时,…

【Leetcode每日一题】 穷举vs暴搜vs深搜vs回溯vs剪枝_全排列 - 子集(难度⭐⭐)(65)

1. 题目解析 题目链接&#xff1a;78. 子集 这个问题的理解其实相当简单&#xff0c;只需看一下示例&#xff0c;基本就能明白其含义了。 2.算法原理 算法思路详解&#xff1a; 为了生成数组 nums 的所有子集&#xff0c;我们需要对数组中的每个元素进行“选择”或“不选择…

JavaScript-Vue入门

本文主要测分享Vue的一些基础 Vue简介 Vue.js 是一个构建数据驱动的 web 界面的渐进式框架。它的主要目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。 下是一些 Vue 的主要特点和概念&#xff1a; 1. 响应式数据绑定&#xff1a;Vue 使用基于 HTML 的模板语法…

Visual Studio安装MFC开发组件

MFC由于比较古老了&#xff0c;Visual Studio默认没有这个开发组件。最近由于一些原因&#xff0c;需要使用这个库&#xff0c;这就需要另外安装。 参考了网上的一些资料&#xff0c;根据实际使用&#xff0c;其实很多步骤不是必须的。 https://zhuanlan.zhihu.com/p/68117276…