从零开始实现分布式服务系统

文章目录

  • 开发前言
  • 分布式模型
  • 系统图解
  • 注册中心模块
  • 基础服务模块
  • 被依赖的服务模块(日志服务)
  • 服务模块(访问服务)
  • 运行效果
  • 开发总结

开发前言

分布式系统具有高可靠性、高性能、可扩展性、灵活性、数据共享、可靠性和地理分布等优点,使得其在各种应用场景下都具有巨大的优势,当然分布式系统实现复杂度要高于单体系统🫠

项目代码使用纯粹的Go语言标准库实现,不借用任何其它第三方库😁

我是醉墨居士,废话不多说,我们现在开始吧🤗

分布式模型

  • Hub & Spoke模型
  • 优点:集中管理,安全性,降低成本
  • 缺点:单点故障,延迟,有限的扩展性

在这里插入图片描述

  • Peer to Peer模型
  • 优点:去中心化,高度可扩展性,资源共享
  • 缺点:管理复杂性,安全性,性能问题

在这里插入图片描述

  • Message Queues模型
  • 优点:解耦合,异步处理,可靠性
  • 缺点:系统复杂度,准确性,消息顺序

在这里插入图片描述

我们将要开发的分布式系统将会取其精华,去其糟粕,使用采用上述模型的混合模式😎

系统图解

在这里插入图片描述

注册中心模块

  • 注册信息
package registryimport ("encoding/json""fmt""io""strings"
)type Registration struct {ServiceName stringServiceAddr stringRequiredServices []string
}func buildRegistration(reader io.ReadCloser) (*Registration, error) {defer reader.Close()data, err := io.ReadAll(reader)if err != nil {return nil, err}registration := new(Registration)err = json.Unmarshal(data, registration)if err != nil {return nil, err}return registration, nil
}func buildServiceInfo(reader io.ReadCloser) ([]string, error) {defer reader.Close()data, err := io.ReadAll(reader)if err != nil {return nil, err}parts := strings.SplitN(string(data), " ", 2)if len(parts) != 2 {return nil, fmt.Errorf("Parse service failed with length %d", len(parts))}return parts, nil
}
  • 注册信息表
package registryimport ("bytes""encoding/json""fmt""io""log""math/rand""net/http""sync"
)type serviceTable struct {serviceInfos map[string][]*Registrationlock *sync.RWMutex
}func newServiceTable() *serviceTable {return &serviceTable{serviceInfos: make(map[string][]*Registration),lock: new(sync.RWMutex),}
}func (t *serviceTable) parseServiceInfos(reader io.ReadCloser) (err error){data, err := io.ReadAll(reader)if err != nil {return err}defer func() {err = reader.Close()}()t.lock.Lock()defer t.lock.Unlock()err = json.Unmarshal(data, &t.serviceInfos)return
}func (t *serviceTable) buildRequiredServiceInfos(registration *Registration) map[string][]*Registration {m := make(map[string][]*Registration, len(registration.RequiredServices))t.lock.RLock()defer t.lock.RUnlock()for _, serviceName := range registration.RequiredServices {m[serviceName] = t.serviceInfos[serviceName]}return m
}func (t *serviceTable) notify(method string, registration *Registration) error {if method != http.MethodPost && method != http.MethodDelete {fmt.Println(method, method == http.MethodPost, method == http.MethodDelete)return fmt.Errorf("Method not allowed with method: %s", method)}t.lock.RLock()defer t.lock.RUnlock()data, err := json.Marshal(registration)if err != nil {return err}for _, registrations := range t.serviceInfos {for _, reg := range registrations {for _, requiredServiceName := range reg.RequiredServices {if requiredServiceName == registration.ServiceName {req, err := http.NewRequest(method, "http://" + reg.ServiceAddr + "/services", bytes.NewReader(data))if err != nil {continue}log.Println("update url: ", reg.ServiceAddr + "/services")http.DefaultClient.Do(req)}}}}return nil
}func (t *serviceTable) add(registration *Registration) {t.lock.Lock()defer t.lock.Unlock()log.Printf("Service table add %s with address %s\n", registration.ServiceName, registration.ServiceAddr)if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {registrations = append(registrations, registration)} else {t.serviceInfos[registration.ServiceName] = []*Registration{registration}}
}func (t *serviceTable) remove(registration *Registration) {t.lock.Lock()defer t.lock.Unlock()log.Printf("Service table remove %s with address %s\n", registration.ServiceName, registration.ServiceAddr)if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {for i := len(registrations) - 1; i >= 0; i-- {if registrations[i].ServiceAddr == registration.ServiceAddr {registrations = append(registrations[:i], registrations[i+1:]...)}}}
}func (t *serviceTable) get(serviceName string) *Registration {t.lock.RLock()defer t.lock.RUnlock()regs := t.serviceInfos[serviceName]return regs[rand.Intn(len(regs))]
}
  • 注册服务
package registryimport ("encoding/json""net/http""time"
)const (serviceName = "Registry Service"serviceAddr = "127.0.0.1:20000"
)type RegistryService struct {serviceInfos *serviceTableheartBeatWorkerNumber intheartBeatAttempCount intheartBeatAttempDuration time.DurationheartBeatCheckDuration time.Duration
}func Default() *RegistryService {return New(3, 3, time.Second, 30 * time.Second)
}func New(heartBeatWorkerNumber, heartBeatAttempCount int, heartBeatAttempDuration, heartBeatCheckDuration time.Duration) *RegistryService {return &RegistryService{serviceInfos: newServiceTable(),heartBeatWorkerNumber: heartBeatWorkerNumber,heartBeatAttempCount: heartBeatAttempCount,heartBeatAttempDuration: heartBeatAttempDuration,heartBeatCheckDuration: heartBeatCheckDuration,}
}func (s *RegistryService) Run() error {go s.heartBeat()http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {statusCode := http.StatusOKswitch r.Method {case http.MethodPost:registration, err := buildRegistration(r.Body)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}err = s.regist(registration)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}serviceInfos := s.serviceInfos.buildRequiredServiceInfos(registration)data, err := json.Marshal(&serviceInfos)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}defer w.Write(data)case http.MethodDelete:registration, err := buildRegistration(r.Body)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}s.unregist(registration)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}default:statusCode = http.StatusMethodNotAllowedgoto END}END:w.WriteHeader(statusCode)})return http.ListenAndServe(serviceAddr, nil)
}func (s *RegistryService) heartBeat() {channel := make(chan *Registration, 1)for i := 0; i < s.heartBeatWorkerNumber; i++ {go func() {for reg := range channel {for j := 0; j < s.heartBeatAttempCount; j++ {resp, err := http.Get("http://" + reg.ServiceAddr + "/heart-beat")if err == nil && resp.StatusCode == http.StatusOK {goto NEXT}time.Sleep(s.heartBeatAttempDuration)}s.unregist(reg)NEXT:}}()}for {s.serviceInfos.lock.RLock()for _, registrations := range s.serviceInfos.serviceInfos {for i := len(registrations) - 1; i >= 0; i-- {channel <- registrations[i]}}s.serviceInfos.lock.RUnlock()time.Sleep(s.heartBeatCheckDuration)}
}func (s *RegistryService) regist(registration *Registration) error {s.serviceInfos.add(registration)return s.serviceInfos.notify(http.MethodPost, registration)
}func (s *RegistryService) unregist(registration *Registration) error {s.serviceInfos.remove(registration)return s.serviceInfos.notify(http.MethodDelete, registration)
}
  • 注册服务客户端接口
package registryimport ("bytes""encoding/json""fmt""net/http"
)func registerMonitorHandler() {http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {defer r.Body.Close()switch r.Method {case http.MethodPost:registration, err := buildRegistration(r.Body)if err != nil {w.WriteHeader(http.StatusInternalServerError)return}provider.add(registration)fmt.Printf("add service %s\n", registration.ServiceName)case http.MethodDelete:registration, err := buildRegistration(r.Body)if err != nil {w.WriteHeader(http.StatusInternalServerError)return}provider.remove(registration)fmt.Printf("remove service %s\n", registration.ServiceName)default:w.WriteHeader(http.StatusMethodNotAllowed)return}w.WriteHeader(http.StatusOK)})http.HandleFunc("/heart-beat", func(w http.ResponseWriter, r *http.Request) {w.WriteHeader(http.StatusOK)})
}func RegistService(registration *Registration) error {registerMonitorHandler()data, err := json.Marshal(registration)if err != nil {return err}resp, err := http.Post("http://" + serviceAddr + "/services", "application/json", bytes.NewReader(data))if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Regist %s error with code %d", registration.ServiceName, resp.StatusCode)}err = provider.parseServiceInfos(resp.Body)if err != nil {return err}return nil
}func UnregistService(registration *Registration) error {data, err := json.Marshal(registration)if err != nil {return err}req, err := http.NewRequest(http.MethodDelete, "http://" + serviceAddr + "/services", bytes.NewReader(data))if err != nil {return err}resp, err := http.DefaultClient.Do(req)if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Unregist %s error with code %d", registration.ServiceName, resp.StatusCode)}return nil
}var provider = newServiceTable()func Get(serviceName string) *Registration {return provider.get(serviceName)
}
  • 服务入口
package mainimport ("log""services/registry"
)func main() {registryService := registry.Default()err := registryService.Run()if err != nil {log.Fatalln(err)}
}

基础服务模块

package serviceimport ("context""fmt""net/http""services/registry"
)type Service interface {Init()
}func Run(registration *registry.Registration) (err error) {err = registry.RegistService(registration)if err != nil {return err}defer func() {err = registry.UnregistService(registration)}()srv := http.Server{Addr: registration.ServiceAddr}go func() {fmt.Println("Press any key to stop.")var s stringfmt.Scan(&s)srv.Shutdown(context.Background())}()err = srv.ListenAndServe()if err != nil {return err}return nil
}

被依赖的服务模块(日志服务)

  • 业务服务
package logserviceimport ("io""log""net/http""os"
)type logService struct {destination stringlogger *log.Logger
}func Init(destination string) {s := &logService{destination: destination,}s.logger = log.New(s, "Go:", log.Ltime | log.Lshortfile)s.register()
}func (s *logService)Write(data []byte) (int, error) {file, err := os.OpenFile(s.destination, os.O_CREATE | os.O_APPEND | os.O_WRONLY, 0600)if err != nil {return 0, err}defer file.Close()return file.Write(data)
}func (s *logService)register() {http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {if r.Method != http.MethodPost {w.WriteHeader(http.StatusMethodNotAllowed)return}data, err := io.ReadAll(r.Body)if err != nil || len(data) == 0 {w.WriteHeader(http.StatusBadRequest)return}s.logger.Println(string(data))})
}
  • 客户端接口
package logserviceimport ("bytes""fmt""net/http""services/registry"
)func Println(registration *registry.Registration, s string) error {resp, err := http.Post("http://"+registration.ServiceAddr+"/log", "text/plain", bytes.NewReader([]byte(s)))if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Response Error with code: %d", resp.StatusCode)}return nil
}
  • 服务入口
package mainimport ("log""services/logservice""services/registry""services/service"
)func main() {logservice.Init("./services.log")err := service.Run(&registry.Registration{ServiceName:      "LogService",ServiceAddr:      "127.0.0.1:20002",RequiredServices: make([]string, 0),})if err != nil {log.Fatalln(err)}
}

服务模块(访问服务)

  • 业务服务
package visistserviceimport ("log""net/http""services/logservice""services/registry""strconv""sync/atomic"
)type visistService struct {visistCount atomic.Int32
}func Init() {s := &visistService{visistCount: atomic.Int32{},}s.register()
}func (s *visistService) register() {http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {s.visistCount.Add(1)count := strconv.Itoa(int(s.visistCount.Load()))err := logservice.Println(registry.Get("LogService"), count)if err != nil {w.WriteHeader(http.StatusInternalServerError)log.Printf("Log service println error: %s\n", err)return}w.WriteHeader(http.StatusOK)w.Write([]byte(count))})
}
  • 服务入口
package mainimport ("log""services/registry""services/service""services/visistservice"
)func main() {visistservice.Init()err := service.Run(&registry.Registration{ServiceName:      "VisistService",ServiceAddr:      "127.0.0.1:20003",RequiredServices: []string{"LogService"},})if err != nil {log.Fatalln(err)}
}

运行效果

依次运行注册服务,日志服务,浏览服务
在这里插入图片描述
运行完毕之后,访问http://127.0.0.1:20003,返回访问量
在这里插入图片描述

日志记录对应访问量数据
在这里插入图片描述
这里只是用了一个简单的示例,你可以使用这套基础组件,然后让服务变得更加复杂,更加丰富。

开发总结

恭喜你,我们一起完成了简易分布式系统的开发,麻雀虽小,五脏俱全😉
希望这个项目能让你有所收获😊
如果有什么错误,请你评论区或者私信我指出,让我们一起进步✌️

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/231014.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【.NET Core】特性(Attribute)详解

【.NET Core】特性&#xff08;Attribute&#xff09;详解 文章目录 【.NET Core】特性&#xff08;Attribute&#xff09;详解一、概述二、编写自定义属性2.1 自定义特性的主要步骤2.2 应用AttributeUsageAttributeAttributeTargets 成员Inherited属性AllowMultiple属性 三、声…

【计算机网络】TCP协议——3. 可靠性策略效率策略

前言 TCP是一种可靠的协议&#xff0c;提供了多种策略来确保数据的可靠性传输。 可靠并不是保证每次发送的数据&#xff0c;对方都一定收到&#xff1b;而是尽最大可能让数据送达目的主机&#xff0c;即使丢包也可以知道丢包。 目录 一. 确认应答和捎带应答机制 二. 超时重…

小信砍柴的题解

目录 原题描述&#xff1a; 时间&#xff1a;1s 空间&#xff1a;256M 题目描述&#xff1a; 输入格式&#xff1a; 输出格式&#xff1a; 样例1输入&#xff1a; 题目大意&#xff1a; 主要思路&#xff1a; 注意事项&#xff1a; 总代码&#xff1a; 原题描述&#…

计网01 计算机网络基础

一、计算机网络基本概念 1、什么是计算机网络 网络&#xff1a;由两台或多台计算机通过网络设备串联&#xff08;网络设备通过传输介质串联&#xff09;而形成的网络网络设备&#xff1a;计算机、路由交换、防火墙、上网行为管理等传输介质&#xff1a;双绞线&#xff08;网线…

汇编语言学习(2)

更好的阅读体验&#xff0c;请点击 YinKai’s Blog。 基本语法 汇编程序可以分为三个部分&#xff1a; 数据部分&#xff08;data section&#xff09;未初始化数据部分&#xff08;bss section&#xff09;文本部分&#xff08;text section&#xff09; data 部分 ​ 数据…

微服务——服务异步通讯(MQ高级)

MQ的一些常见问题 消息可靠性 生产者消息确认 返回ack&#xff0c;怎么感觉这么像某个tcp的3次握手。 使用资料提供的案例工程. 在图形化界面创建一个simple.queue的队列&#xff0c;虚拟机要和配置文件里面的一样。 SpringAMQP实现生产者确认 AMQP里面支持多种生产者确认的类…

【华为】文档中命令行约定格式规范(命令行格式规范、命令行行为规范、命令行参数格式、命令行规范)

文章目录 命令行约定格式**粗体&#xff1a;命令行关键字***斜体&#xff1a;命令行参数*[ ]&#xff1a;可选配置{ x | y | ... } 和 [ x | y | ... ]&#xff1a;选项{ x | y | ... }* 和 [ x | y | ... ]*&#xff1a;多选项&<1-n>&#xff1a;重复参数#&#xff…

xtu oj 1375 Fabonacci

题目描述 小明非常喜欢Fibonacci数列&#xff0c;数列为 f11,f22,fnfn−1fn−2。 小明想知道对于一个整数n&#xff0c;使得nfifjfk的组合有多少种&#xff1f; 比如5113 或者 5122,有2种。注意 122 和 212 被认为是同一种。 输入 第一行是一个整数T(1≤T≤1000)&#xff0c…

kill编译异常处理

当kill编译时出现如下警告 Build target Target 1 linking... *** WARNING L16: UNCALLED SEGMENT, IGNORED FOR OVERLAY PROCESSSEGMENT: ?PR?_LCD_SHOWCHAR?LCD1602 *** WARNING L16: UNCALLED SEGMENT, IGNORED FOR OVERLAY PROCESSSEGMENT: ?PR?_LCD_SHOWSTRING?LCD…

SpringSecurity 手机号登录

一、工作流程 1.向手机发送验证码&#xff0c;第三方短信发送平台&#xff0c;如阿里云短信。 2.手机获取验证码后&#xff0c;在表单中输入验证码。 3.使用自定义过滤器​SmsCodeValidateFilter​。 4.短信校验通过后&#xff0c;使用自定义手机认证过滤器​SmsCodeAuthentic…

【并发编程】线程基础

目录 1、线程基础 1.1基本概念 1.1.1 进程与线程 1.1.1.2 什么是线程 1.1.1.3 两者间的联系和区别 1.1.2 多线程 1.1.2.1 什么是多线程 1.1.2.2 多线程的局限 1.1.3串行,并行,并发 1.1.3.1 什么是串行 1.1.3.2 什么是并行 1.1.3.3 什么是并发 1.1.3.4 区别和联系…

UE4/UE5 日志插件(基于spdlog)

1 解决问题 对于高频日志序列化到本地的需求&#xff0c;spdlog肯定完美满足。 源码地址&#xff1a;https://github.com/gabime/spdlog 博主下载的版本为 spdlog-1.12.0&#xff0c;各位大佬可以根绝自己爱好选择。 2 过程介绍 大概目录&#xff1a; SpdlogLibC目录下是对…

Qt/C++音视频开发60-坐标拾取/按下鼠标获取矩形区域/转换到视频源真实坐标

一、前言 通过在通道画面上拾取鼠标按下的坐标&#xff0c;然后鼠标移动&#xff0c;直到松开&#xff0c;根据松开的坐标和按下的坐标&#xff0c;绘制一个矩形区域&#xff0c;作为热点或者需要电子放大的区域&#xff0c;拿到这个坐标区域&#xff0c;用途非常多&#xff0…

C语言之文件操作(下)

C语言之文件操作&#xff08;下&#xff09; 文章目录 C语言之文件操作&#xff08;下&#xff09;1. 文件的顺序读写1.1 文件的顺序读写函数1.1.1 字符输入/输出函数&#xff08;fgetc/fputc&#xff09;1.1.2 ⽂本⾏输⼊/输出函数&#xff08;fgets/fputs&#xff09;1.1.3 格…

工业应用新典范,飞凌嵌入式FET-D9360-C核心板发布!

来源&#xff1a;飞凌嵌入式官网 当前新一轮科技革命和产业变革突飞猛进&#xff0c;工业领域对高性能、高可靠性、高稳定性的计算需求也在日益增长。为了更好地满足这一需求&#xff0c;飞凌嵌入式与芯驰科技&#xff08;SemiDrive&#xff09;强强联合&#xff0c;基于芯驰D9…

vue3.0基础

1. setup函数 vue单页面使用到的变量和方法都定义在setup函数中,return后才能被页面引用 export default {setup(){const name 张三const person {name,age:30}function goWork(){consle.log(工作)}return {name,person,goWork}} } 注意&#xff1a;直接定义的变量修改不会…

SI24R03国产自主可控RISC-V架构MCU低功耗2.4GHz收发芯片SoC

目录 RISC-V架构的优势SI24R03/04特性射频收发器模块特征MCU 模块特征 其他特征 RISC-V架构的优势 相对于目前主流的英特尔X86架构及ARM等架构来说&#xff0c;RISC-V架构具有指令精简、模块化、可扩展、开源、免费等优点。RISC-V的基础指令集只有40多条&#xff0c;加上其他基…

Kafka--从Zookeeper数据理解Kafka集群工作机制

从Zookeeper数据理解Kafka集群工作机制 这一部分主要是理解Kafka的服务端重要原理。但是Kafka为了保证高吞吐&#xff0c;高性能&#xff0c;高可扩展的三高架构&#xff0c;很多具体设计都是相当复杂的。如果直接跳进去学习研究&#xff0c;很快就会晕头转向。所以&#xff0c…

Echarts相关配置

title&#xff1a;标题组件 tooltip:提示框组件 legend:图例组件 toolbox:工具栏 grid&#xff1a;直角坐标系内绘图网格 xAxis:直角坐标系grid中的x轴 yAxis&#xff1a;直角坐标系grid中的y轴 series:系列列表。每个系列通过type决定自己的图表类型 color&#xff1a;调色…

如何用 Cargo 管理 Rust 工程系列 戊

以下内容为本人的学习笔记&#xff0c;如需要转载&#xff0c;请声明原文链接 微信公众号「ENG八戒」https://mp.weixin.qq.com/s/-OiWtUCUc3FmKIGMBEYfHQ 单元和集成测试 Rust 为单元测试提供了非常好的原生支持。 创建库工程时&#xff0c;cargo 生成的源码文件 lib.rs 自带…