Go语言学习13-常见软件架构的实现
架构模式
An architectural pattern is a general, reusable solution to a commonly occurring problem in software architectural within a given context. ——wikipedia
Pipe-Filter 架构
Pipe-Filter 模式
-
非常适合于数据处理及数据分析系统
-
Filter 封装数据处理的功能
-
松耦合: Filter只跟数据(格式) 耦合
-
Pipe用于连接 Filter 传递数据或者在异步处理过程中缓冲数据流
进程内同步调用时, pipe 演变为数据在方法调用间传递
Filter和组合模式
示例
// filter.go
// Package pipefilter is to define the interfaces
// and the structures for pipe-filter style implementation
package pipefilter// Request is the input of the filter
type Request interface{}// Response is the output of the filter
type Response interface{}// Filter interface is the definition of the data processing components
// Pipe-Filter structure
type Filter interface {Process(data Request) (Response, error)
}// split_filter.go
package pipefilterimport ("errors""strings"
)var SplitFilterWrongFormatError = errors.New("input data should be string")type SplitFilter struct {delimiter string
}func NewSplitFilter(delimiter string) *SplitFilter {return &SplitFilter{delimiter}
}func (sf *SplitFilter) Process(data Request) (Response, error) {str, ok := data.(string) // 检查数据格式/类型, 是否可以处理if !ok {return nil, SplitFilterWrongFormatError}parts := strings.Split(str, sf.delimiter)return parts, nil
}// to_int_filter.go
package pipefilterimport ("errors""strconv"
)var ToIntFilterWrongFormatError = errors.New("input data should be []string")type ToIntFilter struct {
}func NewToIntFilter() *ToIntFilter {return &ToIntFilter{}
}func (tif *ToIntFilter) Process(data Request) (Response, error) {parts, ok := data.([]string)if !ok {return nil, ToIntFilterWrongFormatError}ret := []int{}for _, part := range parts {s, err := strconv.Atoi(part)if err != nil {return nil, err}ret = append(ret, s)}return ret, nil
}// sum_filter.go
package pipefilterimport "errors"var SumFilterWrongFormatError = errors.New("input data should be []int")type SumFilter struct {
}func NewSumFilter() *SumFilter {return &SumFilter{}
}func (sf *SumFilter) Process(data Request) (Response, error) {elems, ok := data.([]int)if !ok {return nil, SumFilterWrongFormatError}ret := 0for _, elem := range elems {ret += elem}return ret, nil
}// straight_pipeline.go
package pipefilter// StraightPipeline is composed of the filters, and the filters are piled as a straight line.
type StraightPipeline struct {Name stringFilters *[]Filter
}// NewStraightPipeline create a new StraightPipelineWithWallTime
func NewStraightPipeline(name string, filters ...Filter) *StraightPipeline {return &StraightPipeline{Name: name,Filters: &filters,}
}// Process is to process the coming data by the pipeline
func (f *StraightPipeline) Process(data Request) (Response, error) {var ret interface{}var err errorfor _, filter := range *f.Filters {ret, err = filter.Process(data)if err != nil {return ret, err}data = ret}return ret, err
}
Micro Kernel架构
- 特点
- 易于扩展
- 错误隔离
- 保持架构一致性
- 要点
- 内核包含公共流程或通用逻辑
- 将可变或可扩展部分规划为扩展点
- 抽象扩展点行为, 定义接口
- 利用插件进行扩展
示例
package microkernelimport ("context""errors""fmt""strings""sync"
)const (Waiting = iotaRunning
)var WrongStateError = errors.New("can not take the operation in the current state")type CollectorsError struct {CollectorsErrors []error
}func (ce CollectorsError) Error() string {var strs []stringfor _, err := range ce.CollectorsErrors {strs = append(strs, err.Error())}return strings.Join(strs, ";")
}type Event struct {Source stringContent string
}type EventReceiver interface {OnEvent(evt Event)
}type Collector interface {Init(evtReceiver EventReceiver) errorStart(agtCtx context.Context) errorStop() errorDestroy() error
}type Agent struct {collectors map[string]CollectorevtBuf chan Eventcancel context.CancelFuncctx context.Contextstate int
}func (agt *Agent) EventProcessGroutine() {var evtSeg [10]Eventfor {for i := 0; i < 10; i++ {select {case evtSeg[i] = <-agt.evtBuf:case <-agt.ctx.Done():return}}fmt.Println(evtSeg)}
}func NewAgent(sizeEvtBuf int) *Agent {agt := Agent{collectors: map[string]Collector{},evtBuf: make(chan Event, sizeEvtBuf),state: Waiting,}return &agt
}func (agt *Agent) RegisterCollector(name string, collector Collector) error {if agt.state != Waiting {return WrongStateError}agt.collectors[name] = collectorreturn collector.Init(agt)
}func (agt *Agent) startCollectors() error {var err errorvar errs CollectorsErrorvar mutex sync.Mutexfor name, collector := range agt.collectors {go func(name string, collector Collector, ctx context.Context) {defer func() {mutex.Unlock()}()err = collector.Start(ctx)mutex.Lock()if err != nil {errs.CollectorsErrors = append(errs.CollectorsErrors,errors.New(name+":"+err.Error()))}}(name, collector, agt.ctx)}if len(errs.CollectorsErrors) == 0 {return nil}return errs
}func (agt *Agent) stopCollectors() error {var err errorvar errs CollectorsErrorfor name, collector := range agt.collectors {if err = collector.Stop(); err != nil {errs.CollectorsErrors = append(errs.CollectorsErrors,errors.New(name+":"+err.Error()))}}if len(errs.CollectorsErrors) == 0 {return nil}return errs
}func (agt *Agent) destroyCollectors() error {var err errorvar errs CollectorsErrorfor name, collector := range agt.collectors {if err = collector.Destroy(); err != nil {errs.CollectorsErrors = append(errs.CollectorsErrors,errors.New(name+":"+err.Error()))}}if len(errs.CollectorsErrors) == 0 {return nil}return errs
}func (agt *Agent) Start() error {if agt.state != Waiting {return WrongStateError}agt.state = Runningagt.ctx, agt.cancel = context.WithCancel(context.Background())go agt.EventProcessGroutine()return agt.startCollectors()
}func (agt *Agent) Stop() error {if agt.state != Running {return WrongStateError}agt.state = Waitingagt.cancel()return agt.stopCollectors()
}func (agt *Agent) Destroy() error {if agt.state != Waiting {return WrongStateError}return agt.destroyCollectors()
}func (agt *Agent) OnEvent(evt Event) {agt.evtBuf <- evt
}