文章目录
- 开发前言
- 分布式模型
- 系统图解
- 注册中心模块
- 基础服务模块
- 被依赖的服务模块(日志服务)
- 服务模块(访问服务)
- 运行效果
- 开发总结
开发前言
分布式系统具有高可靠性、高性能、可扩展性、灵活性、数据共享、可靠性和地理分布等优点,使得其在各种应用场景下都具有巨大的优势,当然分布式系统实现复杂度要高于单体系统🫠
项目代码使用纯粹的Go语言标准库实现,不借用任何其它第三方库😁
我是醉墨居士,废话不多说,我们现在开始吧🤗
分布式模型
- Hub & Spoke模型
- 优点:集中管理,安全性,降低成本
- 缺点:单点故障,延迟,有限的扩展性
- Peer to Peer模型
- 优点:去中心化,高度可扩展性,资源共享
- 缺点:管理复杂性,安全性,性能问题
- Message Queues模型
- 优点:解耦合,异步处理,可靠性
- 缺点:系统复杂度,准确性,消息顺序
我们将要开发的分布式系统将会取其精华,去其糟粕,使用采用上述模型的混合模式😎
系统图解
注册中心模块
- 注册信息
package registryimport ("encoding/json""fmt""io""strings"
)type Registration struct {ServiceName stringServiceAddr stringRequiredServices []string
}func buildRegistration(reader io.ReadCloser) (*Registration, error) {defer reader.Close()data, err := io.ReadAll(reader)if err != nil {return nil, err}registration := new(Registration)err = json.Unmarshal(data, registration)if err != nil {return nil, err}return registration, nil
}func buildServiceInfo(reader io.ReadCloser) ([]string, error) {defer reader.Close()data, err := io.ReadAll(reader)if err != nil {return nil, err}parts := strings.SplitN(string(data), " ", 2)if len(parts) != 2 {return nil, fmt.Errorf("Parse service failed with length %d", len(parts))}return parts, nil
}
- 注册信息表
package registryimport ("bytes""encoding/json""fmt""io""log""math/rand""net/http""sync"
)type serviceTable struct {serviceInfos map[string][]*Registrationlock *sync.RWMutex
}func newServiceTable() *serviceTable {return &serviceTable{serviceInfos: make(map[string][]*Registration),lock: new(sync.RWMutex),}
}func (t *serviceTable) parseServiceInfos(reader io.ReadCloser) (err error){data, err := io.ReadAll(reader)if err != nil {return err}defer func() {err = reader.Close()}()t.lock.Lock()defer t.lock.Unlock()err = json.Unmarshal(data, &t.serviceInfos)return
}func (t *serviceTable) buildRequiredServiceInfos(registration *Registration) map[string][]*Registration {m := make(map[string][]*Registration, len(registration.RequiredServices))t.lock.RLock()defer t.lock.RUnlock()for _, serviceName := range registration.RequiredServices {m[serviceName] = t.serviceInfos[serviceName]}return m
}func (t *serviceTable) notify(method string, registration *Registration) error {if method != http.MethodPost && method != http.MethodDelete {fmt.Println(method, method == http.MethodPost, method == http.MethodDelete)return fmt.Errorf("Method not allowed with method: %s", method)}t.lock.RLock()defer t.lock.RUnlock()data, err := json.Marshal(registration)if err != nil {return err}for _, registrations := range t.serviceInfos {for _, reg := range registrations {for _, requiredServiceName := range reg.RequiredServices {if requiredServiceName == registration.ServiceName {req, err := http.NewRequest(method, "http://" + reg.ServiceAddr + "/services", bytes.NewReader(data))if err != nil {continue}log.Println("update url: ", reg.ServiceAddr + "/services")http.DefaultClient.Do(req)}}}}return nil
}func (t *serviceTable) add(registration *Registration) {t.lock.Lock()defer t.lock.Unlock()log.Printf("Service table add %s with address %s\n", registration.ServiceName, registration.ServiceAddr)if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {registrations = append(registrations, registration)} else {t.serviceInfos[registration.ServiceName] = []*Registration{registration}}
}func (t *serviceTable) remove(registration *Registration) {t.lock.Lock()defer t.lock.Unlock()log.Printf("Service table remove %s with address %s\n", registration.ServiceName, registration.ServiceAddr)if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {for i := len(registrations) - 1; i >= 0; i-- {if registrations[i].ServiceAddr == registration.ServiceAddr {registrations = append(registrations[:i], registrations[i+1:]...)}}}
}func (t *serviceTable) get(serviceName string) *Registration {t.lock.RLock()defer t.lock.RUnlock()regs := t.serviceInfos[serviceName]return regs[rand.Intn(len(regs))]
}
- 注册服务
package registryimport ("encoding/json""net/http""time"
)const (serviceName = "Registry Service"serviceAddr = "127.0.0.1:20000"
)type RegistryService struct {serviceInfos *serviceTableheartBeatWorkerNumber intheartBeatAttempCount intheartBeatAttempDuration time.DurationheartBeatCheckDuration time.Duration
}func Default() *RegistryService {return New(3, 3, time.Second, 30 * time.Second)
}func New(heartBeatWorkerNumber, heartBeatAttempCount int, heartBeatAttempDuration, heartBeatCheckDuration time.Duration) *RegistryService {return &RegistryService{serviceInfos: newServiceTable(),heartBeatWorkerNumber: heartBeatWorkerNumber,heartBeatAttempCount: heartBeatAttempCount,heartBeatAttempDuration: heartBeatAttempDuration,heartBeatCheckDuration: heartBeatCheckDuration,}
}func (s *RegistryService) Run() error {go s.heartBeat()http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {statusCode := http.StatusOKswitch r.Method {case http.MethodPost:registration, err := buildRegistration(r.Body)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}err = s.regist(registration)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}serviceInfos := s.serviceInfos.buildRequiredServiceInfos(registration)data, err := json.Marshal(&serviceInfos)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}defer w.Write(data)case http.MethodDelete:registration, err := buildRegistration(r.Body)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}s.unregist(registration)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}default:statusCode = http.StatusMethodNotAllowedgoto END}END:w.WriteHeader(statusCode)})return http.ListenAndServe(serviceAddr, nil)
}func (s *RegistryService) heartBeat() {channel := make(chan *Registration, 1)for i := 0; i < s.heartBeatWorkerNumber; i++ {go func() {for reg := range channel {for j := 0; j < s.heartBeatAttempCount; j++ {resp, err := http.Get("http://" + reg.ServiceAddr + "/heart-beat")if err == nil && resp.StatusCode == http.StatusOK {goto NEXT}time.Sleep(s.heartBeatAttempDuration)}s.unregist(reg)NEXT:}}()}for {s.serviceInfos.lock.RLock()for _, registrations := range s.serviceInfos.serviceInfos {for i := len(registrations) - 1; i >= 0; i-- {channel <- registrations[i]}}s.serviceInfos.lock.RUnlock()time.Sleep(s.heartBeatCheckDuration)}
}func (s *RegistryService) regist(registration *Registration) error {s.serviceInfos.add(registration)return s.serviceInfos.notify(http.MethodPost, registration)
}func (s *RegistryService) unregist(registration *Registration) error {s.serviceInfos.remove(registration)return s.serviceInfos.notify(http.MethodDelete, registration)
}
- 注册服务客户端接口
package registryimport ("bytes""encoding/json""fmt""net/http"
)func registerMonitorHandler() {http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {defer r.Body.Close()switch r.Method {case http.MethodPost:registration, err := buildRegistration(r.Body)if err != nil {w.WriteHeader(http.StatusInternalServerError)return}provider.add(registration)fmt.Printf("add service %s\n", registration.ServiceName)case http.MethodDelete:registration, err := buildRegistration(r.Body)if err != nil {w.WriteHeader(http.StatusInternalServerError)return}provider.remove(registration)fmt.Printf("remove service %s\n", registration.ServiceName)default:w.WriteHeader(http.StatusMethodNotAllowed)return}w.WriteHeader(http.StatusOK)})http.HandleFunc("/heart-beat", func(w http.ResponseWriter, r *http.Request) {w.WriteHeader(http.StatusOK)})
}func RegistService(registration *Registration) error {registerMonitorHandler()data, err := json.Marshal(registration)if err != nil {return err}resp, err := http.Post("http://" + serviceAddr + "/services", "application/json", bytes.NewReader(data))if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Regist %s error with code %d", registration.ServiceName, resp.StatusCode)}err = provider.parseServiceInfos(resp.Body)if err != nil {return err}return nil
}func UnregistService(registration *Registration) error {data, err := json.Marshal(registration)if err != nil {return err}req, err := http.NewRequest(http.MethodDelete, "http://" + serviceAddr + "/services", bytes.NewReader(data))if err != nil {return err}resp, err := http.DefaultClient.Do(req)if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Unregist %s error with code %d", registration.ServiceName, resp.StatusCode)}return nil
}var provider = newServiceTable()func Get(serviceName string) *Registration {return provider.get(serviceName)
}
- 服务入口
package mainimport ("log""services/registry"
)func main() {registryService := registry.Default()err := registryService.Run()if err != nil {log.Fatalln(err)}
}
基础服务模块
package serviceimport ("context""fmt""net/http""services/registry"
)type Service interface {Init()
}func Run(registration *registry.Registration) (err error) {err = registry.RegistService(registration)if err != nil {return err}defer func() {err = registry.UnregistService(registration)}()srv := http.Server{Addr: registration.ServiceAddr}go func() {fmt.Println("Press any key to stop.")var s stringfmt.Scan(&s)srv.Shutdown(context.Background())}()err = srv.ListenAndServe()if err != nil {return err}return nil
}
被依赖的服务模块(日志服务)
- 业务服务
package logserviceimport ("io""log""net/http""os"
)type logService struct {destination stringlogger *log.Logger
}func Init(destination string) {s := &logService{destination: destination,}s.logger = log.New(s, "Go:", log.Ltime | log.Lshortfile)s.register()
}func (s *logService)Write(data []byte) (int, error) {file, err := os.OpenFile(s.destination, os.O_CREATE | os.O_APPEND | os.O_WRONLY, 0600)if err != nil {return 0, err}defer file.Close()return file.Write(data)
}func (s *logService)register() {http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {if r.Method != http.MethodPost {w.WriteHeader(http.StatusMethodNotAllowed)return}data, err := io.ReadAll(r.Body)if err != nil || len(data) == 0 {w.WriteHeader(http.StatusBadRequest)return}s.logger.Println(string(data))})
}
- 客户端接口
package logserviceimport ("bytes""fmt""net/http""services/registry"
)func Println(registration *registry.Registration, s string) error {resp, err := http.Post("http://"+registration.ServiceAddr+"/log", "text/plain", bytes.NewReader([]byte(s)))if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Response Error with code: %d", resp.StatusCode)}return nil
}
- 服务入口
package mainimport ("log""services/logservice""services/registry""services/service"
)func main() {logservice.Init("./services.log")err := service.Run(®istry.Registration{ServiceName: "LogService",ServiceAddr: "127.0.0.1:20002",RequiredServices: make([]string, 0),})if err != nil {log.Fatalln(err)}
}
服务模块(访问服务)
- 业务服务
package visistserviceimport ("log""net/http""services/logservice""services/registry""strconv""sync/atomic"
)type visistService struct {visistCount atomic.Int32
}func Init() {s := &visistService{visistCount: atomic.Int32{},}s.register()
}func (s *visistService) register() {http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {s.visistCount.Add(1)count := strconv.Itoa(int(s.visistCount.Load()))err := logservice.Println(registry.Get("LogService"), count)if err != nil {w.WriteHeader(http.StatusInternalServerError)log.Printf("Log service println error: %s\n", err)return}w.WriteHeader(http.StatusOK)w.Write([]byte(count))})
}
- 服务入口
package mainimport ("log""services/registry""services/service""services/visistservice"
)func main() {visistservice.Init()err := service.Run(®istry.Registration{ServiceName: "VisistService",ServiceAddr: "127.0.0.1:20003",RequiredServices: []string{"LogService"},})if err != nil {log.Fatalln(err)}
}
运行效果
依次运行注册服务,日志服务,浏览服务
运行完毕之后,访问http://127.0.0.1:20003,返回访问量
日志记录对应访问量数据
这里只是用了一个简单的示例,你可以使用这套基础组件,然后让服务变得更加复杂,更加丰富。
开发总结
恭喜你,我们一起完成了简易分布式系统的开发,麻雀虽小,五脏俱全😉
希望这个项目能让你有所收获😊
如果有什么错误,请你评论区或者私信我指出,让我们一起进步✌️