从零开始实现分布式服务系统

文章目录

  • 开发前言
  • 分布式模型
  • 系统图解
  • 注册中心模块
  • 基础服务模块
  • 被依赖的服务模块(日志服务)
  • 服务模块(访问服务)
  • 运行效果
  • 开发总结

开发前言

分布式系统具有高可靠性、高性能、可扩展性、灵活性、数据共享、可靠性和地理分布等优点,使得其在各种应用场景下都具有巨大的优势,当然分布式系统实现复杂度要高于单体系统🫠

项目代码使用纯粹的Go语言标准库实现,不借用任何其它第三方库😁

我是醉墨居士,废话不多说,我们现在开始吧🤗

分布式模型

  • Hub & Spoke模型
  • 优点:集中管理,安全性,降低成本
  • 缺点:单点故障,延迟,有限的扩展性

在这里插入图片描述

  • Peer to Peer模型
  • 优点:去中心化,高度可扩展性,资源共享
  • 缺点:管理复杂性,安全性,性能问题

在这里插入图片描述

  • Message Queues模型
  • 优点:解耦合,异步处理,可靠性
  • 缺点:系统复杂度,准确性,消息顺序

在这里插入图片描述

我们将要开发的分布式系统将会取其精华,去其糟粕,使用采用上述模型的混合模式😎

系统图解

在这里插入图片描述

注册中心模块

  • 注册信息
package registryimport ("encoding/json""fmt""io""strings"
)type Registration struct {ServiceName stringServiceAddr stringRequiredServices []string
}func buildRegistration(reader io.ReadCloser) (*Registration, error) {defer reader.Close()data, err := io.ReadAll(reader)if err != nil {return nil, err}registration := new(Registration)err = json.Unmarshal(data, registration)if err != nil {return nil, err}return registration, nil
}func buildServiceInfo(reader io.ReadCloser) ([]string, error) {defer reader.Close()data, err := io.ReadAll(reader)if err != nil {return nil, err}parts := strings.SplitN(string(data), " ", 2)if len(parts) != 2 {return nil, fmt.Errorf("Parse service failed with length %d", len(parts))}return parts, nil
}
  • 注册信息表
package registryimport ("bytes""encoding/json""fmt""io""log""math/rand""net/http""sync"
)type serviceTable struct {serviceInfos map[string][]*Registrationlock *sync.RWMutex
}func newServiceTable() *serviceTable {return &serviceTable{serviceInfos: make(map[string][]*Registration),lock: new(sync.RWMutex),}
}func (t *serviceTable) parseServiceInfos(reader io.ReadCloser) (err error){data, err := io.ReadAll(reader)if err != nil {return err}defer func() {err = reader.Close()}()t.lock.Lock()defer t.lock.Unlock()err = json.Unmarshal(data, &t.serviceInfos)return
}func (t *serviceTable) buildRequiredServiceInfos(registration *Registration) map[string][]*Registration {m := make(map[string][]*Registration, len(registration.RequiredServices))t.lock.RLock()defer t.lock.RUnlock()for _, serviceName := range registration.RequiredServices {m[serviceName] = t.serviceInfos[serviceName]}return m
}func (t *serviceTable) notify(method string, registration *Registration) error {if method != http.MethodPost && method != http.MethodDelete {fmt.Println(method, method == http.MethodPost, method == http.MethodDelete)return fmt.Errorf("Method not allowed with method: %s", method)}t.lock.RLock()defer t.lock.RUnlock()data, err := json.Marshal(registration)if err != nil {return err}for _, registrations := range t.serviceInfos {for _, reg := range registrations {for _, requiredServiceName := range reg.RequiredServices {if requiredServiceName == registration.ServiceName {req, err := http.NewRequest(method, "http://" + reg.ServiceAddr + "/services", bytes.NewReader(data))if err != nil {continue}log.Println("update url: ", reg.ServiceAddr + "/services")http.DefaultClient.Do(req)}}}}return nil
}func (t *serviceTable) add(registration *Registration) {t.lock.Lock()defer t.lock.Unlock()log.Printf("Service table add %s with address %s\n", registration.ServiceName, registration.ServiceAddr)if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {registrations = append(registrations, registration)} else {t.serviceInfos[registration.ServiceName] = []*Registration{registration}}
}func (t *serviceTable) remove(registration *Registration) {t.lock.Lock()defer t.lock.Unlock()log.Printf("Service table remove %s with address %s\n", registration.ServiceName, registration.ServiceAddr)if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {for i := len(registrations) - 1; i >= 0; i-- {if registrations[i].ServiceAddr == registration.ServiceAddr {registrations = append(registrations[:i], registrations[i+1:]...)}}}
}func (t *serviceTable) get(serviceName string) *Registration {t.lock.RLock()defer t.lock.RUnlock()regs := t.serviceInfos[serviceName]return regs[rand.Intn(len(regs))]
}
  • 注册服务
package registryimport ("encoding/json""net/http""time"
)const (serviceName = "Registry Service"serviceAddr = "127.0.0.1:20000"
)type RegistryService struct {serviceInfos *serviceTableheartBeatWorkerNumber intheartBeatAttempCount intheartBeatAttempDuration time.DurationheartBeatCheckDuration time.Duration
}func Default() *RegistryService {return New(3, 3, time.Second, 30 * time.Second)
}func New(heartBeatWorkerNumber, heartBeatAttempCount int, heartBeatAttempDuration, heartBeatCheckDuration time.Duration) *RegistryService {return &RegistryService{serviceInfos: newServiceTable(),heartBeatWorkerNumber: heartBeatWorkerNumber,heartBeatAttempCount: heartBeatAttempCount,heartBeatAttempDuration: heartBeatAttempDuration,heartBeatCheckDuration: heartBeatCheckDuration,}
}func (s *RegistryService) Run() error {go s.heartBeat()http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {statusCode := http.StatusOKswitch r.Method {case http.MethodPost:registration, err := buildRegistration(r.Body)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}err = s.regist(registration)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}serviceInfos := s.serviceInfos.buildRequiredServiceInfos(registration)data, err := json.Marshal(&serviceInfos)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}defer w.Write(data)case http.MethodDelete:registration, err := buildRegistration(r.Body)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}s.unregist(registration)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}default:statusCode = http.StatusMethodNotAllowedgoto END}END:w.WriteHeader(statusCode)})return http.ListenAndServe(serviceAddr, nil)
}func (s *RegistryService) heartBeat() {channel := make(chan *Registration, 1)for i := 0; i < s.heartBeatWorkerNumber; i++ {go func() {for reg := range channel {for j := 0; j < s.heartBeatAttempCount; j++ {resp, err := http.Get("http://" + reg.ServiceAddr + "/heart-beat")if err == nil && resp.StatusCode == http.StatusOK {goto NEXT}time.Sleep(s.heartBeatAttempDuration)}s.unregist(reg)NEXT:}}()}for {s.serviceInfos.lock.RLock()for _, registrations := range s.serviceInfos.serviceInfos {for i := len(registrations) - 1; i >= 0; i-- {channel <- registrations[i]}}s.serviceInfos.lock.RUnlock()time.Sleep(s.heartBeatCheckDuration)}
}func (s *RegistryService) regist(registration *Registration) error {s.serviceInfos.add(registration)return s.serviceInfos.notify(http.MethodPost, registration)
}func (s *RegistryService) unregist(registration *Registration) error {s.serviceInfos.remove(registration)return s.serviceInfos.notify(http.MethodDelete, registration)
}
  • 注册服务客户端接口
package registryimport ("bytes""encoding/json""fmt""net/http"
)func registerMonitorHandler() {http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {defer r.Body.Close()switch r.Method {case http.MethodPost:registration, err := buildRegistration(r.Body)if err != nil {w.WriteHeader(http.StatusInternalServerError)return}provider.add(registration)fmt.Printf("add service %s\n", registration.ServiceName)case http.MethodDelete:registration, err := buildRegistration(r.Body)if err != nil {w.WriteHeader(http.StatusInternalServerError)return}provider.remove(registration)fmt.Printf("remove service %s\n", registration.ServiceName)default:w.WriteHeader(http.StatusMethodNotAllowed)return}w.WriteHeader(http.StatusOK)})http.HandleFunc("/heart-beat", func(w http.ResponseWriter, r *http.Request) {w.WriteHeader(http.StatusOK)})
}func RegistService(registration *Registration) error {registerMonitorHandler()data, err := json.Marshal(registration)if err != nil {return err}resp, err := http.Post("http://" + serviceAddr + "/services", "application/json", bytes.NewReader(data))if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Regist %s error with code %d", registration.ServiceName, resp.StatusCode)}err = provider.parseServiceInfos(resp.Body)if err != nil {return err}return nil
}func UnregistService(registration *Registration) error {data, err := json.Marshal(registration)if err != nil {return err}req, err := http.NewRequest(http.MethodDelete, "http://" + serviceAddr + "/services", bytes.NewReader(data))if err != nil {return err}resp, err := http.DefaultClient.Do(req)if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Unregist %s error with code %d", registration.ServiceName, resp.StatusCode)}return nil
}var provider = newServiceTable()func Get(serviceName string) *Registration {return provider.get(serviceName)
}
  • 服务入口
package mainimport ("log""services/registry"
)func main() {registryService := registry.Default()err := registryService.Run()if err != nil {log.Fatalln(err)}
}

基础服务模块

package serviceimport ("context""fmt""net/http""services/registry"
)type Service interface {Init()
}func Run(registration *registry.Registration) (err error) {err = registry.RegistService(registration)if err != nil {return err}defer func() {err = registry.UnregistService(registration)}()srv := http.Server{Addr: registration.ServiceAddr}go func() {fmt.Println("Press any key to stop.")var s stringfmt.Scan(&s)srv.Shutdown(context.Background())}()err = srv.ListenAndServe()if err != nil {return err}return nil
}

被依赖的服务模块(日志服务)

  • 业务服务
package logserviceimport ("io""log""net/http""os"
)type logService struct {destination stringlogger *log.Logger
}func Init(destination string) {s := &logService{destination: destination,}s.logger = log.New(s, "Go:", log.Ltime | log.Lshortfile)s.register()
}func (s *logService)Write(data []byte) (int, error) {file, err := os.OpenFile(s.destination, os.O_CREATE | os.O_APPEND | os.O_WRONLY, 0600)if err != nil {return 0, err}defer file.Close()return file.Write(data)
}func (s *logService)register() {http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {if r.Method != http.MethodPost {w.WriteHeader(http.StatusMethodNotAllowed)return}data, err := io.ReadAll(r.Body)if err != nil || len(data) == 0 {w.WriteHeader(http.StatusBadRequest)return}s.logger.Println(string(data))})
}
  • 客户端接口
package logserviceimport ("bytes""fmt""net/http""services/registry"
)func Println(registration *registry.Registration, s string) error {resp, err := http.Post("http://"+registration.ServiceAddr+"/log", "text/plain", bytes.NewReader([]byte(s)))if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Response Error with code: %d", resp.StatusCode)}return nil
}
  • 服务入口
package mainimport ("log""services/logservice""services/registry""services/service"
)func main() {logservice.Init("./services.log")err := service.Run(&registry.Registration{ServiceName:      "LogService",ServiceAddr:      "127.0.0.1:20002",RequiredServices: make([]string, 0),})if err != nil {log.Fatalln(err)}
}

服务模块(访问服务)

  • 业务服务
package visistserviceimport ("log""net/http""services/logservice""services/registry""strconv""sync/atomic"
)type visistService struct {visistCount atomic.Int32
}func Init() {s := &visistService{visistCount: atomic.Int32{},}s.register()
}func (s *visistService) register() {http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {s.visistCount.Add(1)count := strconv.Itoa(int(s.visistCount.Load()))err := logservice.Println(registry.Get("LogService"), count)if err != nil {w.WriteHeader(http.StatusInternalServerError)log.Printf("Log service println error: %s\n", err)return}w.WriteHeader(http.StatusOK)w.Write([]byte(count))})
}
  • 服务入口
package mainimport ("log""services/registry""services/service""services/visistservice"
)func main() {visistservice.Init()err := service.Run(&registry.Registration{ServiceName:      "VisistService",ServiceAddr:      "127.0.0.1:20003",RequiredServices: []string{"LogService"},})if err != nil {log.Fatalln(err)}
}

运行效果

依次运行注册服务,日志服务,浏览服务
在这里插入图片描述
运行完毕之后,访问http://127.0.0.1:20003,返回访问量
在这里插入图片描述

日志记录对应访问量数据
在这里插入图片描述
这里只是用了一个简单的示例,你可以使用这套基础组件,然后让服务变得更加复杂,更加丰富。

开发总结

恭喜你,我们一起完成了简易分布式系统的开发,麻雀虽小,五脏俱全😉
希望这个项目能让你有所收获😊
如果有什么错误,请你评论区或者私信我指出,让我们一起进步✌️

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/231014.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【计算机网络】TCP协议——3. 可靠性策略效率策略

前言 TCP是一种可靠的协议&#xff0c;提供了多种策略来确保数据的可靠性传输。 可靠并不是保证每次发送的数据&#xff0c;对方都一定收到&#xff1b;而是尽最大可能让数据送达目的主机&#xff0c;即使丢包也可以知道丢包。 目录 一. 确认应答和捎带应答机制 二. 超时重…

小信砍柴的题解

目录 原题描述&#xff1a; 时间&#xff1a;1s 空间&#xff1a;256M 题目描述&#xff1a; 输入格式&#xff1a; 输出格式&#xff1a; 样例1输入&#xff1a; 题目大意&#xff1a; 主要思路&#xff1a; 注意事项&#xff1a; 总代码&#xff1a; 原题描述&#…

计网01 计算机网络基础

一、计算机网络基本概念 1、什么是计算机网络 网络&#xff1a;由两台或多台计算机通过网络设备串联&#xff08;网络设备通过传输介质串联&#xff09;而形成的网络网络设备&#xff1a;计算机、路由交换、防火墙、上网行为管理等传输介质&#xff1a;双绞线&#xff08;网线…

微服务——服务异步通讯(MQ高级)

MQ的一些常见问题 消息可靠性 生产者消息确认 返回ack&#xff0c;怎么感觉这么像某个tcp的3次握手。 使用资料提供的案例工程. 在图形化界面创建一个simple.queue的队列&#xff0c;虚拟机要和配置文件里面的一样。 SpringAMQP实现生产者确认 AMQP里面支持多种生产者确认的类…

【华为】文档中命令行约定格式规范(命令行格式规范、命令行行为规范、命令行参数格式、命令行规范)

文章目录 命令行约定格式**粗体&#xff1a;命令行关键字***斜体&#xff1a;命令行参数*[ ]&#xff1a;可选配置{ x | y | ... } 和 [ x | y | ... ]&#xff1a;选项{ x | y | ... }* 和 [ x | y | ... ]*&#xff1a;多选项&<1-n>&#xff1a;重复参数#&#xff…

kill编译异常处理

当kill编译时出现如下警告 Build target Target 1 linking... *** WARNING L16: UNCALLED SEGMENT, IGNORED FOR OVERLAY PROCESSSEGMENT: ?PR?_LCD_SHOWCHAR?LCD1602 *** WARNING L16: UNCALLED SEGMENT, IGNORED FOR OVERLAY PROCESSSEGMENT: ?PR?_LCD_SHOWSTRING?LCD…

SpringSecurity 手机号登录

一、工作流程 1.向手机发送验证码&#xff0c;第三方短信发送平台&#xff0c;如阿里云短信。 2.手机获取验证码后&#xff0c;在表单中输入验证码。 3.使用自定义过滤器​SmsCodeValidateFilter​。 4.短信校验通过后&#xff0c;使用自定义手机认证过滤器​SmsCodeAuthentic…

UE4/UE5 日志插件(基于spdlog)

1 解决问题 对于高频日志序列化到本地的需求&#xff0c;spdlog肯定完美满足。 源码地址&#xff1a;https://github.com/gabime/spdlog 博主下载的版本为 spdlog-1.12.0&#xff0c;各位大佬可以根绝自己爱好选择。 2 过程介绍 大概目录&#xff1a; SpdlogLibC目录下是对…

Qt/C++音视频开发60-坐标拾取/按下鼠标获取矩形区域/转换到视频源真实坐标

一、前言 通过在通道画面上拾取鼠标按下的坐标&#xff0c;然后鼠标移动&#xff0c;直到松开&#xff0c;根据松开的坐标和按下的坐标&#xff0c;绘制一个矩形区域&#xff0c;作为热点或者需要电子放大的区域&#xff0c;拿到这个坐标区域&#xff0c;用途非常多&#xff0…

C语言之文件操作(下)

C语言之文件操作&#xff08;下&#xff09; 文章目录 C语言之文件操作&#xff08;下&#xff09;1. 文件的顺序读写1.1 文件的顺序读写函数1.1.1 字符输入/输出函数&#xff08;fgetc/fputc&#xff09;1.1.2 ⽂本⾏输⼊/输出函数&#xff08;fgets/fputs&#xff09;1.1.3 格…

工业应用新典范,飞凌嵌入式FET-D9360-C核心板发布!

来源&#xff1a;飞凌嵌入式官网 当前新一轮科技革命和产业变革突飞猛进&#xff0c;工业领域对高性能、高可靠性、高稳定性的计算需求也在日益增长。为了更好地满足这一需求&#xff0c;飞凌嵌入式与芯驰科技&#xff08;SemiDrive&#xff09;强强联合&#xff0c;基于芯驰D9…

SI24R03国产自主可控RISC-V架构MCU低功耗2.4GHz收发芯片SoC

目录 RISC-V架构的优势SI24R03/04特性射频收发器模块特征MCU 模块特征 其他特征 RISC-V架构的优势 相对于目前主流的英特尔X86架构及ARM等架构来说&#xff0c;RISC-V架构具有指令精简、模块化、可扩展、开源、免费等优点。RISC-V的基础指令集只有40多条&#xff0c;加上其他基…

Kafka--从Zookeeper数据理解Kafka集群工作机制

从Zookeeper数据理解Kafka集群工作机制 这一部分主要是理解Kafka的服务端重要原理。但是Kafka为了保证高吞吐&#xff0c;高性能&#xff0c;高可扩展的三高架构&#xff0c;很多具体设计都是相当复杂的。如果直接跳进去学习研究&#xff0c;很快就会晕头转向。所以&#xff0c…

Echarts相关配置

title&#xff1a;标题组件 tooltip:提示框组件 legend:图例组件 toolbox:工具栏 grid&#xff1a;直角坐标系内绘图网格 xAxis:直角坐标系grid中的x轴 yAxis&#xff1a;直角坐标系grid中的y轴 series:系列列表。每个系列通过type决定自己的图表类型 color&#xff1a;调色…

如何用 Cargo 管理 Rust 工程系列 戊

以下内容为本人的学习笔记&#xff0c;如需要转载&#xff0c;请声明原文链接 微信公众号「ENG八戒」https://mp.weixin.qq.com/s/-OiWtUCUc3FmKIGMBEYfHQ 单元和集成测试 Rust 为单元测试提供了非常好的原生支持。 创建库工程时&#xff0c;cargo 生成的源码文件 lib.rs 自带…

【C语言】自定义类型——枚举、联合体

引言 对枚举、联合体进行介绍&#xff0c;包括枚举的声明、枚举的优点&#xff0c;联合体的声明、联合体的大小。 ✨ 猪巴戒&#xff1a;个人主页✨ 所属专栏&#xff1a;《C语言进阶》 &#x1f388;跟着猪巴戒&#xff0c;一起学习C语言&#x1f388; 目录 引言 枚举 枚举…

06. Python模块

目录 1、前言 2、什么是模块 3、Python标准库模块 3.1、os模块 3.2、datetime 模块 3.3、random模块 4、自定义模块 4.1、创建和使用 4.2、模块命名空间 4.3、作用域 5、安装第三方依赖 5.1、使用 pip 安装单个依赖 5.2、从 requirements.txt 安装依赖 5.3、安装指…

还在为学MyBatis发愁?史上最全,一篇文章带你学习MyBatis

文章目录 前言一、&#x1f4d6;MyBatis简介1.Mybatis历史2.MyBatis特性3.对比&#xff08;其他持久化层技术&#xff09; 二、&#x1f4e3;搭建MyBatis1.开发环境2.创建maven工程3.创建MyBatis核心配置文件4.创建mapper接口5.创建MyBatis的映射文件6.通过junit测试功能7.加入…

OpenCV4工业缺陷检测的六种方法

机器视觉 机器视觉是使用各种工业相机&#xff0c;结合传感器跟电气信号实现替代传统人工&#xff0c;完成对象识别、计数、测量、缺陷检测、引导定位与抓取等任务。其中工业品的缺陷检测极大的依赖人工完成&#xff0c;特别是传统的3C制造环节&#xff0c;产品缺陷检测依赖于人…

python+torch线性回归模型机器学习

程序示例精选 pythontorch线性回归模型机器学习 如需安装运行环境或远程调试&#xff0c;见文章底部个人QQ名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 这篇博客针对《pythontorch线性回归模型机器学习》编写代码&#xff0c;代码整洁&#xff0c;规则&#xf…