milvus对象存储和消息中间件的工厂设计模式分析
需求
根据参数设置创建mq和storage
mq有kafka,pulsar
storage有local,minio,remote
配置文件
根据配置文件选择初始化mq和存储:
mq:type: pulsarcommon:storageType: minio
对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。
代码框架
工厂接口
代码路径:internal\util\dependency\factory.go
type Factory interface {msgstream.Factory// Init()给工厂传递参数。Init(p *paramtable.ComponentParam)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}// pkg\mq\msgstream\msgstream.go
// msgstream.Factory的code
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}
dependency.Factory是一个工厂接口,里面包含了mq的工厂接口,和创建持久对象的方法。
这个接口创建消息中间件对象和持久存储对象。
这里为什么不这么写:
type Factory interface {Init(p *paramtable.ComponentParam)NewMsgStream(ctx context.Context) (MsgStream, error)NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}
DefaultFactory
DefaultFactory结构体是dependency.Factory的实现。
// DefaultFactory is a factory that produces instances of storage.ChunkManager and message queue.
// internal\util\dependency\factory.go
type DefaultFactory struct {standAlone boolchunkManagerFactory storage.FactorymsgStreamFactory msgstream.Factory
}// storage.Factory
// internal\storage\factory.go
type Factory interface {NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)
}// msgstream.Factory
// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}
DefaultFactory实现了dependency.Factory接口的Init()函数。
在Init()函数内初始化了chunkManagerFactory、msgStreamFactory。
func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {// skip if using default factoryif f.msgStreamFactory != nil {return}// 初始化chunkManagerFactoryf.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(params)// initialize mq client or embedded mq.// 初始化msgStreamFactoryif err := f.initMQ(f.standAlone, params); err != nil {panic(err)}
}
f.chunkManagerFactory:
return &ChunkManagerFactory{persistentStorage: persistentStorage,config: c,}
f.msgStreamFactory:
func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable()})log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))switch mqType {case mqTypeNatsmq:f.msgStreamFactory = msgstream.NewNatsmqFactory()case mqTypeRocksmq:f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), ¶ms.ServiceParam)case mqTypePulsar:f.msgStreamFactory = msgstream.NewPmsFactory(¶ms.ServiceParam)case mqTypeKafka:f.msgStreamFactory = msgstream.NewKmsFactory(¶ms.ServiceParam)}if f.msgStreamFactory == nil {return errors.New("failed to create MQ: check the milvus log for initialization failures")}return nil
}
持久存储
storage.Factory是创建持久存储的工厂接口。
storage.ChunkManagerFactory是storage.Factory的实现。
NewPersistentStorageChunkManager()接口的实现:
func (f *DefaultFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {return f.chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
}func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) {return f.newChunkManager(ctx, f.persistentStorage)
}func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string) (ChunkManager, error) {switch engine {case "local":return NewLocalChunkManager(RootPath(f.config.rootPath)), nilcase "minio":return newMinioChunkManagerWithConfig(ctx, f.config)case "remote":return NewRemoteChunkManager(ctx, f.config)default:return nil, errors.New("no chunk manager implemented with engine: " + engine)}
}
根据传入的engine新建对应的持久存储对象。
LocalChunkManager、MinioChunkManager、RemoteChunkManager。
// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.ClientbucketName stringrootPath string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStoragebucketName stringrootPath string
}
消息中间件
msgstream.Factory是创建mq的工厂接口。
工厂接口:
// pkg\mq\msgstream\msgstream.go
type Factory interface {NewMsgStream(ctx context.Context) (MsgStream, error)NewTtMsgStream(ctx context.Context) (MsgStream, error)NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}
实现有:
CommonFactory、KmsFactory、PmsFactory
// CommonFactory is a Factory for creating message streams with common logic.
// It contains a function field named newer, which is a function that creates
// an mqwrapper.Client when called.
// pkg\mq\msgstream\common_mq_factory.go
type CommonFactory struct {Newer func(context.Context) (mqwrapper.Client, error) // client constructorDispatcherFactory ProtoUDFactoryReceiveBufSize int64MQBufSize int64
}// pkg\mq\msgstream\mq_factory.go
// kafka工厂
type KmsFactory struct {dispatcherFactory ProtoUDFactoryconfig *paramtable.KafkaConfigReceiveBufSize int64MQBufSize int64
}// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
// pkg\mq\msgstream\mq_factory.go
// pulsar工厂
type PmsFactory struct {dispatcherFactory ProtoUDFactory// the following members must be public, so that mapstructure.Decode() can access themPulsarAddress stringPulsarWebAddress stringReceiveBufSize int64MQBufSize int64PulsarAuthPlugin stringPulsarAuthParams stringPulsarTenant stringPulsarNameSpace stringRequestTimeout time.DurationmetricRegisterer prometheus.Registerer
}
mq产品
mq的产品接口是msgstream.MsgStream
// MsgStream is an interface that can be used to produce and consume message on message queue
type MsgStream interface {Close()AsProducer(channels []string)Produce(*MsgPack) errorSetRepackFunc(repackFunc RepackFunc)GetProduceChannels() []stringBroadcast(*MsgPack) (map[string][]MessageID, error)AsConsumer(ctx context.Context, channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) errorChan() <-chan *MsgPackSeek(ctx context.Context, offset []*MsgPosition) errorGetLatestMsgID(channel string) (MessageID, error)CheckTopicValid(channel string) errorEnableProduce(can bool)
}
具体产品实现有:
msgstream.mqMsgStream、msgstream.MqTtMsgStream
type mqMsgStream struct {ctx context.Contextclient mqwrapper.Clientproducers map[string]mqwrapper.ProducerproducerChannels []stringconsumers map[string]mqwrapper.ConsumerconsumerChannels []stringrepackFunc RepackFuncunmarshal UnmarshalDispatcherreceiveBuf chan *MsgPackcloseRWMutex *sync.RWMutexstreamCancel func()bufSize int64producerLock *sync.RWMutexconsumerLock *sync.Mutexclosed int32onceChan sync.OnceenableProduce atomic.Value
}// MqTtMsgStream is a msgstream that contains timeticks
type MqTtMsgStream struct {*mqMsgStreamchanMsgBuf map[mqwrapper.Consumer][]TsMsgchanMsgPos map[mqwrapper.Consumer]*msgpb.MsgPositionchanStopChan map[mqwrapper.Consumer]chan boolchanTtMsgTime map[mqwrapper.Consumer]TimestampchanMsgBufMutex *sync.MutexchanTtMsgTimeMutex *sync.RWMutexchanWaitGroup *sync.WaitGrouplastTimeStamp TimestampsyncConsumer chan int
}
存储产品
存储的产品接口是storag.ChunkManagere
// ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks.
type ChunkManager interface {// RootPath returns current root path.RootPath() string// Path returns path of @filePath.Path(ctx context.Context, filePath string) (string, error)// Size returns path of @filePath.Size(ctx context.Context, filePath string) (int64, error)// Write writes @content to @filePath.Write(ctx context.Context, filePath string, content []byte) error// MultiWrite writes multi @content to @filePath.MultiWrite(ctx context.Context, contents map[string][]byte) error// Exist returns true if @filePath exists.Exist(ctx context.Context, filePath string) (bool, error)// Read reads @filePath and returns content.Read(ctx context.Context, filePath string) ([]byte, error)// Reader return a reader for @filePathReader(ctx context.Context, filePath string) (FileReader, error)// MultiRead reads @filePath and returns content.MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error)// ReadWithPrefix reads files with same @prefix and returns contents.ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error)Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.// if all bytes are read, @err is io.EOF.// return other error if read failed.ReadAt(ctx context.Context, filePath string, off int64, length int64) (p []byte, err error)// Remove delete @filePath.Remove(ctx context.Context, filePath string) error// MultiRemove delete @filePaths.MultiRemove(ctx context.Context, filePaths []string) error// RemoveWithPrefix remove files with same @prefix.RemoveWithPrefix(ctx context.Context, prefix string) error
}
具体产品实现有:
LocalChunkManager、MinioChunkManager、RemoteChunkManager
// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {localPath string
}// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {*minio.Client// ctx context.ContextbucketName stringrootPath string
}// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {client ObjectStorage// ctx context.ContextbucketName stringrootPath string
}
总结
从代码框架可以看出每一种mq都有一个工厂,存储只有一个工厂