1. mqant——入门篇

0. 介绍

mqant技术架构和开发流程的学习笔记。

https://github.com/liangdas/mqant
Introduction · mqant 的文档库

mqant是一个微服务框架。目标是简化分布式系统开发。 mqant的核心是简单易用,关注业务场景,因此会针对特定场景研究一些特定组件和解决方案,方便开发者使用。

0.1 mqant核心组件组成

  • 核心RPC组件 - 它提供了用于服务发现,客户端负载平衡,编码,同步通信库。

  • http网关 - 提供将HTTP请求路由到相应微服务的API网关。它充当单个入口点,可以用作反向代理或将HTTP请求转换为RPC。

  • tcp/websocket网关 - 它提供了tcp和websocket两种客户端连接方式。并且默认内置了一个简单的mqtt协议,可以非常便捷的 提供长连接服务,快速搭建iot和网络游戏通信业务,同时也支持自定义通信协议插件。

0.2 名称定义

  • 模块 一类功能的统称,在mqant中以模块划分代码结构。
  • 服务 服务即模块,在微服务中服务比模块更容易让人理解。
  • 节点 即模块(服务)部署后的实例,一个模块(服务)可以在多台服务器启动多个节点。

0.3 架构

mqant的设计思想是在能用单台服务器时能让充分挖掘服务器的性能,而在需要多进程时再通过简单的配置就可以实现分布式部署。

0.3.1 mqant框架架构图

0.3.2 mqant模块化运行架构图

0.4 处理器(handler)

每一个模块可以注册多个处理器(handler),例如用户模块可以提供用户注册、用户登录、用户注销、用户信息查询等一系列的处理器。handler就是一个可供远程RPC调用的函数。

0.5 模块间通信RPC

mqant模块间的通信方式应该使用RPC,而非本地代码调用。 mqant的RPC也非采取grpc等消息投递工具,而是选用了nats消息总线。

0.6 为什么选择消息队列进行RPC通信?

选择消息队列而不选择传统的tcp/socket rpc的主要考虑是传统的基于点对点service/client模式的连接比较难于维护和统计,假如服务器存在100个模块,一个进程所需要维护的client连接就>100个(计算可能不太准确(^—^)).

而选择消息队列的话每一个进程对每一个模块只需要维护一条连接即可,nats有完善的监控,报警工具,可以随时监控模块的处理性能和实时性。

1. 起步

1.1 应用

应用(app)是mqant的最基本单位,通常一个进程中只需要实例化一个应用(app). 应用负责维护整个框架的基本服务

  • 服务注册与发现
  • RPC通信
  • 模块依赖

1.1.1 应用生命周期

1.1.2 流程

1.2 模块

mqant以模块化来组织代码模块,模块概念在框架中非常重要。

1.2.1 模块定义

结构体只要实现了以下几个函数就被认为是一个模块

//指定一个模块的名称,非常重要,在配置文件和RPC路由中会频繁使用
func GetType() string   //指定模块的版本
func Version() string   //模块的初始化函数,当框架初始化模块是会主动调用该方法
func OnInit(app module.App, settings *conf.ModuleSettings) //当App解析配置后调用,这个接口不管这个模块是否在这个进程的模块分组中都会调用
func OnAppConfigurationLoaded(app module.App)//模块独立运行函数,框架初始化模块以后会以单独goroutine运行该函数,并且提供一个关闭信号,以再框架要停止模块时通知
func Run(closeSig chan bool) //当模块停止运行后框架会调用该方法,让模块做一些回收操作
func OnDestroy()

1.2.2 模块声明周期

 1.2.3 模块使用

通常我们不止是实现一个简单模块,还需要利用框架的其他高级特性,因此我们通常会继承框架封装好的一些基础模块

  • RPCModule
    继承 basemodule.BaseModule该模块封装了mqant的RPC通信相关方法
  • GateModule
    继承 basegate.Gate该模块封装了tcp/websocket+mqtt协议的长连接网关

1.2.4 不在进程分组中的模块如何初始化

func (self *HellWorld) OnAppConfigurationLoaded(app module.App) {//当App初始化时调用,这个接口不管这个模块是否在这个进程运行都会调用self.BaseModule.OnAppConfigurationLoaded(app)
}

1.3 编写第一个模块

1.3.1 代码组织结构

新增了一个helloworld目录用来存放模块代码

    工程目录|-bin|-conf|-server.conf|-helloworld    //新建模块|-module.go|-xxx.go|-main.go

1.3.2 编写第一个模块

实现module.Module定义的所有方法接口

1.3.2.1 helloworld/module.go 
package helloworldimport ("github.com/liangdas/mqant/conf""github.com/liangdas/mqant/log""github.com/liangdas/mqant/module"basemodule "github.com/liangdas/mqant/module/base"
)var Module = func() module.Module {this := new(HelloWorld)return this
}type HelloWorld struct {basemodule.BaseModule
}func (self *HelloWorld) GetType() string {//	很关键,需要与配置文件中的Module配置对应return "helloworld"
}
func (self *HelloWorld) Version() string {//	可以在监控时了解代码版本return "1.0.0"
}
func (self *HelloWorld) OnInit(app module.App, settings *conf.ModuleSettings) {self.BaseModule.OnInit(self, app, settings)log.Info("%v模块初始化完成...", self.GetType())
}
func (self *HelloWorld) OnDestroy() {//	不能忘记继承self.BaseModule.OnDestroy()log.Info("%v模块已回收...", self.GetType())
}
func (self *HelloWorld) GetApp() module.App {return self.BaseModule.GetApp()
}
func (self *HelloWorld) Run(closeSig chan bool) {}
1.3.2.2 将模块加入main.go入口函数
import ("github.com/liangdas/mqant/helloworld""github.com/liangdas/mqant/log""github.com/liangdas/mqant/module""net/http"
)func main() {go func() {http.ListenAndServe("o.o.o.o:6060", nil)}()app := CreateApp(module.Debug(true), //只有是在调试模式下才会在控制台打印日志,非调试模式下只在日志文件中输出日志)err := app.Run(helloworld.Module(),)if err !=  nil {log.Error(err.Error())}
}
1.3.2.3 在配置文件中加入模块配置 bin/conf/server.conf
配置说明
Module|-moduleType  与func GetType() string 值保持一致|-  ProcessID  模块分组,在今后分布式部署时非常有用,默认分组为development
{"Settings":{},"Module":{"greeter":[{//Id在整个Module中必须唯一,不能重复"Id":"greeter",//这个模块所属进程,非常重要,进程会根据该参数来判断是否需要运行该模块 [development]为默认值代表开发环境"ProcessID":"development","Settings":{"Port": 7370}}],"helloword": [{//Id在整个Module中必须唯一,不能重复"Id": "helloworld",//这个模块所属进程,非常重要,进程会根据该参数来判断是否需要运行该模块 [development]为默认值代表开发环境"ProcessID": "development"}],"client":[{//Id在整个Module中必须唯一,不能重复"Id":"client",//这个模块所属进程,非常重要,进程会根据该参数来判断是否需要运行该模块 [development]为默认值代表开发环境"ProcessID":"development","Settings":{}}]},"Log": {"contenttype":"application/json","multifile": {"maxLines": 0,"maxsize": 0,"daily": true,"rotate": true,"perm": "0600","prefix":"a","separate": ["error"]},"file": {"maxLines": 0,"maxsize": 0,"daily": true,"prefix":"n","rotate": true,"perm": "0600"}},"Mqtt":{// 最大写入包队列缓存"WirteLoopChanNum": 10,// 最大读取包队列缓存"ReadPackLoop": 1,// 读取超时"ReadTimeout": 10,// 写入超时"WriteTimeout": 10},"Rpc":{"MaxCoroutine":100,// 远程访问最后期限值 单位秒 这个值指定了在客户端可以等待服务端多长时间来应答"RpcExpired": 3,//默认是 false 不打印"LogSuccess":true}
}

1.4 编写第一个handler

1.3章节实现了一个helloworld模块,但是模块并没有真正的功能,仅仅进行声明周期的日志输出。实际上真正的模块应该有自身功能的核心实现。

  • 作为一个网关模块
  • 作为一个后端模块提供核心功能的handler给其他模块调用

此章节介绍一个后端模块如何实现一个handler并且能被其他模块调用。

1.4.1 代码组织结构

重新组织了一下代码目录结构,新增了一个web目录用来存放http网关模块代码

    工程目录|-bin|-conf|-server.conf|-helloworld|-module.go|-web|-module.go|-main.go

1.4.2 依赖

1.4.2.1 服务发现

需要服务发现,所以启动Consul(默认),或者通过go-plugins替换。

mqant的服务发现模块是从go-mirco移植而来的,因此基本可以完全复用go-mirco服务发现相关插件和功能

启动consul

consul agent --dev

1.4.2.2 RPC调用 

nats作为RPC的消息投递通道,mqant默认只内置了nats一种通道。

1.4.3 mqant加入consul和nats

import ("github.com/liangdas/mqant""github.com/liangdas/mqant/log""github.com/liangdas/mqant/module""github.com/liangdas/mqant/registry""github.com/liangdas/mqant/registry/consul""github.com/nats-io/nats.go""mqant-helloworld/helloworld"
)func main() {rs := consul.NewRegistry(func(options *registry.Options) {options.Addrs = []string{"127.0.0.1:8500"}})nc, err := nats.Connect("nats://127.0.0.1:4222", nats.MaxReconnects(10000))if err != nil {log.Error("nats error %v", err)return}app := mqant.CreateApp(module.Debug(true),//只有是在调试模式下才会在控制台打印日志, 非调试模式下只在日志文件中输出日志module.Nats(nc),     //指定nats rpcmodule.Registry(rs), //指定服务发现)err= app.Run( //模块都需要加到入口列表中传入框架helloworld.Module(),)if err!=nil{log.Error(err.Error())}
}

1.4.4 编写第一个handler

package helloworldimport ("fmt""github.com/liangdas/mqant/conf""github.com/liangdas/mqant/log""github.com/liangdas/mqant/module"basemodule "github.com/liangdas/mqant/module/base"
)// Module 是一个工厂函数,返回HelloWorld模块的实例
var Module = func() module.Module {this := new(HelloWorld)return this
}// HelloWorld 结构体,继承自basemodule.BaseModule
type HelloWorld struct {basemodule.BaseModule
}// GetType 返回模块类型,用于与配置文件中的Module配置对应
func (self *HelloWorld) GetType() string {return "helloworld"
}// Version 返回模块版本,用于监控时了解代码版本
func (self *HelloWorld) Version() string {return "1.0.0"
}// OnInit 模块初始化函数
func (self *HelloWorld) OnInit(app module.App, settings *conf.ModuleSettings) {// 调用基础模块的初始化方法self.BaseModule.OnInit(self, app, settings)// 注册RPC处理函数self.GetServer().RegisterGO("/say/hi", self.say)log.Info("%v模块初始化完成...", self.GetType())
}// OnDestroy 模块销毁函数
func (self *HelloWorld) OnDestroy() {// 调用基础模块的销毁方法self.BaseModule.OnDestroy()log.Info("%v模块已回收...", self.GetType())
}// GetApp 返回应用实例
func (self *HelloWorld) GetApp() module.App {return self.BaseModule.GetApp()
}// Run 模块运行函数
func (self *HelloWorld) Run(closeSig chan bool) {log.Info("%v模块运行中...", self.GetType())log.Info("%v say hello world...", self.GetType())<-closeSig // 等待关闭信号log.Info("%v模块已停止...", self.GetType())
}// say 是一个RPC处理函数
func (self *HelloWorld) say(name string) (r string, err error) {return fmt.Sprintf("hi %v", name), nil
}
  • 新增handler函数 func say(name string) (r string, err error)
    • name:传入值
    • r:函数正常处理err情况下的返回内容
    • err:函数一场处理情况下的返回内容
  • 将handler注册到模块中    self.GetServer().RegisterGO("/say/hi", self.say)
    • /say/hi :访问地址

 1.4.5 创建一个web模块

在helloworld模块中实现了第一个handler,但是没有地方在使用它,因此编写一个web模块尝试通过http能调用这个handler

package webimport ("context""github.com/liangdas/mqant/conf""github.com/liangdas/mqant/log""github.com/liangdas/mqant/module"basemodule "github.com/liangdas/mqant/module/base"mqrpc "github.com/liangdas/mqant/rpc""io""net/http"
)// Module 函数返回一个新的 Web 模块实例
var Module = func() module.Module {this := new(Web)return this
}// Web 结构体嵌入了 BaseModule
type Web struct {basemodule.BaseModule
}// GetType 返回模块类型
func (self *Web) GetType() string {return "Web"
}// Version 返回模块版本
func (self *Web) Version() string {return "1.0.0"
}// OnInit 初始化模块
func (self *Web) OnInit(app module.App, settings *conf.ModuleSettings) {self.BaseModule.OnInit(self, app, settings)
}// startHttpServer 启动 HTTP 服务器
func (self *Web) startHttpServer() *http.Server {// 创建一个新的 HTTP 服务器,监听 8080 端口srv := &http.Server{Addr: "8080"}// 设置根路由处理函数http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {// 解析表单数据_ = r.ParseForm()// 调用 RPC 方法 "helloword" 的 "/say/hi" 函数,传入 name 参数rstr, err := mqrpc.String(self.Call(context.Background(),"helloword","/say/hi",mqrpc.Param(r.Form.Get("name")),),)// 记录 RPC 调用结果log.Info("RpcCall %v, err %v", rstr, err)// 将结果写入 HTTP 响应_, _ = io.WriteString(w, rstr)})// 在新的 goroutine 中启动 HTTP 服务器go func() {if err := srv.ListenAndServe(); err != nil {log.Info("Httpserver: ListenAndServer() error: %s", err)}}()return srv
}// Run 运行 Web 模块
func (self *Web) Run(closeSig chan bool) {log.Info("web: starting http server: 8080")srv := self.startHttpServer()<-closeSig // 等待关闭信号log.Info("web: stroppting http server")if err := srv.Shutdown(nil); err != nil {panic(err)}log.Info("web: done.exiting")
}// OnDestroy 销毁模块
func (self *Web) OnDestroy() {self.BaseModule.OnDestroy()
}
1.4.5.1 特性

1. web模块对外监听8080端口

http://127.0.0.1:8080/say?name=mqant

2. web模块收到请求后,通过rpc调用helloworld模块提供的handler,并将结果返回客户端。

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        _ = r.ParseForm()
        rstr,err:=mqrpc.String(
            self.Call(
            context.Background(),
            "helloworld",   
            "/say/hi",
            mqrpc.Param(r.Form.Get("name")),
            ),
        )
        log.Info("RpcCall %v , err %v",rstr,err)
        _, _ = io.WriteString(w, rstr)
    })

1.4.5.2 注意事项

1. web模块需要加入main.go 函数入口中

 err= app.Run( //模块都需要加到入口列表中传入框架
         helloworld.Module(),
         web.Module(),
     )

2. web模块需要在mqant配置文件中添加配置

 "Module":{"helloworld":[{"Id":"helloworld","ProcessID":"development"}],"Web":[{"Id":"Web001","ProcessID":"development"}]},

2. 模块间通信

2.1 RPC介绍

mqant RPC本身是一个相对独立的功能,RPC有以下的几个特点:

1. 目前支持nats作为服务发现通道,理论上可以扩展其他通信方式

2.支持服务之策发现,是一个相对完善的微服务框架

2.1.1 在模块中使用RPC

module.BaseModule中一种继承了RPC,使用方法如下

2.1.1.1 服务提供者

注册handler

//注册服务函数
module.GetServer().RegisterGO(_func string, fn interface{})
//注册服务函数
module.GetServer().Register(_func string, fn interface{})

RegisterGO与Register的区别是前者为每一条消息创建一个单独的协程来处理,后者注册的函数共用一个协程来处理所有消息,具体使用哪一种方式可以根据实际情况来定,但Register方式的函数请一定注意不要执行耗时功能,以免引起消息阻塞。

Register 和 RegisterGO 都用于注册RPC处理函数,但它们在处理请求的方式上有重要的区别:
Register(id string, f interface{})
功能:注册一个同步的RPC处理函数。
实现:当收到RPC请求时,直接在当前goroutine中执行注册的函数。
特点:
按顺序处理请求,一次只处理一个请求。
适合短时间内能完成的操作。
如果处理时间较长,可能会阻塞其他请求。
不需要考虑并发安全问题,因为请求是顺序处理的。
RegisterGO(id string, f interface{})
功能:注册一个异步的RPC处理函数。
实现:当收到RPC请求时,会创建一个新的goroutine来执行注册的函数。
特点:
并发处理请求,可以同时处理多个请求。
适合可能需要长时间处理的操作。
不会阻塞其他请求的处理。
需要考虑并发安全问题,因为多个goroutine可能同时访问共享资源。
选择使用哪个方法取决于你的具体需求:
如果你的RPC处理函数是快速的、无状态的,或者需要严格的顺序执行,使用 Register。
如果你的RPC处理函数可能耗时较长,或者你希望能并发处理多个请求,使用 RegisterGO。
在实际应用中,RegisterGO 通常更常用,因为它能更好地利用Go语言的并发特性,提高服务器的吞吐量。但是使用 RegisterGO 时,你需要确保你的处理函数是并发安全的。
2.1.1.2 服务调用者

在开发过程中,模块A可能需要用到模块B的服务,这是模块A就成了服务调用方。mqant提供了很多RPC调用方法,也支持高级扩展(服务发现)

2.2 RPC调用方式

2.2.1 RPC路由规则

mqant每一类模块可以部署到多台服务器中,因此需要一个nodeId对同一类模块进行区分。在框架中加入服务注册和发现功能后,nodeId通过服务发现模块在服务启动时自动生成,无法提前编码指定。

2.2.2 RPC调用方法1——通过Call函数调度(推荐)

/*
通用RPC调度函数
ctx         context.Context             上下文,可以设置这次请求的超时时间
moduleType    string                         服务名称 serverId 或 serverId@nodeId
_func        string                        需要调度的服务方法
param         mqrpc.ParamOption            方法传参
opts ...selector.SelectOption            服务发现模块过滤,可以用来选择调用哪个服务节点*/
Call(ctx context.Context, moduleType, _func string, param mqrpc.ParamOption, opts ...selector.SelectOption) (interface{}, string)
2.2.2.1 特点
  • 支持设置调用超时时间
  • 支持自定义的服务节点选择过滤器
2.2.2.2 超时时间设置
ctx, _ := context.WithTimeout(context.TODO(), time.Second*3) //3s后超时
rstr,err:=mqrpc.String(self.Call(ctx,"helloworld",    //要访问的moduleType"/say/hi",            //访问模块中handler路径mqrpc.Param(r.Form.Get("name")),),
)

超时时间仅是调用方有效,超时后无法取消被调用方正在执行的任务。

2.2.2.3 服务节点选择过滤器
ctx, _ := context.WithTimeout(context.TODO(), time.Second*3)
rstr,err:=mqrpc.String(self.Call(ctx,"helloworld",    //要访问的moduleType"/say/hi",            //访问模块中handler路径mqrpc.Param(r.Form.Get("name")),selector.WithStrategy(func(services []*registry.Service) selector.Next {var nodes []*registry.Node// Filter the nodes for datacenterfor _, service := range services {for _, node := range service.Nodes {if node.Metadata["version"] == "1.0.0" {nodes = append(nodes, node)}}}var mtx sync.Mutex//log.Info("services[0] $v",services[0].Nodes[0])return func() (*registry.Node, error) {mtx.Lock()defer mtx.Unlock()if len(nodes) == 0 {return nil, fmt.Errorf("no node")}index := rand.Intn(int(len(nodes)))return nodes[index], nil}}),),
)
2.2.2.4 module Type格式
  • 指定到模块级别

当module Type为模块名时func GetType()值一样,rpc将查找模块已启用的所有节点,然后根据【节点选择过滤器】选择一个节点发起调用。

  • 指定到节点级别

格式为module Type@moduleID 例如

helloworld@1b0073cbbab33247,rpc将直接选择节点1b0073cbbab33247发起调用。

2.2.3 RPC调用方法2——通过RpcInvoke函数调度

module.Invoke(moduleType string, _func string, params ...interface{})
2.2.3.1 特点
  • 不支持设置调用超时时间(只能通过配置文件设置全局RPC超时时间)
  • 不支持自定义的服务节点选择过滤器
  • 支持module Type过滤

2.2.4 RPC调用方法3——InvokeNR函数调度

module.InvokeNR(moduleType string, _func string, params ...interface{})
2.2.4.1 特点
  • 包含Invoke所有特点
  • 本函数无需等待返回结果(不会阻塞),仅投递RPC消息

2.2.5 RPC调用方法4——指定节点调用

查找到节点(module.ServerSession),通过节点结构体提供的方法调用

moduleType 模块名称(类型)
opts        服务节点选择过滤器
func GetRouteServer(moduleType string, opts ...selector.SelectOption) (s module.ServerSession, err error)
// DefaultRoute 默认路由规则
var DefaultRoute = func(app module.App, r *http.Request) (*Service, error) {// 检查URL路径是否为空if r.URL.Path == "" {return nil, errors.New("path is nil")}// 将URL路径按"/"分割handers := strings.Split(r.URL.Path, "/")// 检查路径段数是否至少为2if len(handers) < 2 {return nil, errors.New("path is not /[server]/path")}// 获取服务器名称(路径的第二段)server := handers[1]// 检查服务器名称是否为空if server == "" {return nil, errors.New("server is nil")}// 获取路由服务器会话session, err := app.GetRouteServer(server,// 使用自定义的选择策略selector.WithStrategy(func(services []*registry.Service) selector.Next {var nodes []*registry.Node// 从所有服务中收集节点for _, service := range services {for _, node := range service.Nodes {nodes = append(nodes, node)}}var mtx sync.Mutex// 返回一个函数,该函数在每次调用时随机选择一个节点return func() (*registry.Node, error) {mtx.Lock()defer mtx.Unlock()if len(nodes) == 0 {return nil, fmt.Errorf("no node")}// 随机选择一个节点index := rand.Intn(int(len(nodes)))return nodes[index], nil}}),)

 以上的调用方法在module级别和app级别都有对应实现,可灵活选择。

2.3 RPC传参数据结构

2.3.1 RPC可传参数据类型

1-9为基础数据类型,可直接使用。10、11为自定义结构体,需要单独定义(章节后续会单独讲解)

  1. bool
  2. int32
  3. int64
  4. long64
  5. float32
  6. float64
  7. []byte
  8. string
  9. map[string]interface{}
  10. protocol buffer结构体
  11. 自定义结构体

注意调用参数不能为nil 如: result,err:=module.Invoke(“user”,"login","mqant",nil) 会出现异常无法调用

2.3.2 返回值可使用的参数类型

hander的返回值固定为两个,其中result表示正常业务返回值,err表示异常业务返回值

2.3.2.1 result:
  1. bool
  2. int32
  3. int64
  4. long64
  5. float32
  6. float64
  7. []byte
  8. string
  9. map[string]interface{}
  10. protocol buffer结构体
  11. 自定义结构体
2.3.2.2 err:
  1. string
  2. error
func (self *HellWorld)say(name string) (result string,err error) {return fmt.Sprintf("hi %v",name), nil
}
result,err:=mqrpc.String(self.Call(ctx,"helloworld",    //要访问的moduleType"/say/hi",            //访问模块中handler路径mqrpc.Param(r.Form.Get("name")),))

2.4 protocolbuffer

2.4.1 注意事项

1. proto.Message是protocol buffer约定的数据结构,因此需要双方都能够明确数据结构的类型(可以直接断言的)

2. 服务函数返回结构一定要用指针(例如 *rpcpb.ResultInfo),否则mqant无法识别。

2.4.2 代码组织结构

新增了一个rpctest目录用来存放rpctest模块代码

    工程目录
 

        |-bin|-conf|-server.conf|-helloworld|-module.go|-web|-module.go|-rpctest|-module.go|-main.go

2.4.3 编写支持pb传参的handler

 为了简化操作,我们直接使用mqant内部的protocolbuffer结构体rpcpb.ResultInfo

var Module = func() module.Module {this := new(rpctest)return this
}type rpctest struct {basemodule.BaseModule
}func (self *rpctest) GetType() string {//很关键,需要与配置文件中的Module配置对应return "rpctest"
}
func (self *rpctest) Version() string {//可以在监控时了解代码版本return "1.0.0"
}
func (self *rpctest) OnInit(app module.App, settings *conf.ModuleSettings) {self.BaseModule.OnInit(self, app, settings)self.GetServer().RegisterGO("/test/proto", self.testProto)
}func (self *rpctest) Run(closeSig chan bool) {log.Info("%v模块运行中...",self.GetType())<-closeSig
}func (self *rpctest) OnDestroy() {//一定别忘了继承self.BaseModule.OnDestroy()
}
func (self *rpctest) testProto(req *rpcpb.ResultInfo) (*rpcpb.ResultInfo, error) {r := &rpcpb.ResultInfo{Error: *proto.String(fmt.Sprintf("你说: %v",req.Error))}return r, nil
}

2.4.4 调用pb的hander

http.HandleFunc("/test/proto", func(w http.ResponseWriter, r *http.Request) {_ = r.ParseForm()ctx, _ := context.WithTimeout(context.TODO(), time.Second*3)protobean := new(rpcpb.ResultInfo)err:=mqrpc.Proto(protobean,func() (reply interface{}, errstr interface{}) {return self.RpcCall(ctx,"rpctest",     //要访问的moduleType"/test/proto", //访问模块中handler路径mqrpc.Param(&rpcpb.ResultInfo{Error: *proto.String(r.Form.Get("message"))}),)})log.Info("RpcCall %v , err %v",protobean,err)if err!=nil{_, _ = io.WriteString(w, err.Error())}_, _ = io.WriteString(w, protobean.Error)})

2.5 自定义数据结构

和protocolbuffer类似,mqant也识别目前mqrpc.Marshaler接口实现的数据结构,开发者只需要自己实现序列化和反序列化即可。

2.5.1 Marshaler接口定义

序列化
func (this *mystruct)Marshal() ([]byte, error)
反序列化
func (this *mystruct)Unmarshal(data []byte) error
数据结构名称
func (this *mystruct)String() string

1. mqrpc.Marshaler是请求方和服务方约定的数据结构,因此需要双方都能够明确数据结构的类型(可以直接断言的)
2. 服务函数返回结构一定要用指针(例如*rsp)否则mqant无法识别 (见下文)

2.5.2 编写支持Marshaler传参的handler

重新组织了一下代码目录结构,新增了一个marshaler.go用来存放自定义数据结构代码

    工程目录|-bin|-conf|-server.conf|-helloworld|-module.go|-web|-module.go|-rpctest|-module.go|-marshaler.go|-main.go

 2.5.3 定义数据结构

package rpctesttype Req struct {Id string
}func (this *Req) Marshal() ([]byte, error) {return []byte(this.Id), nill
}
func (this *Req) Unmarshal(data []byte) error {this.Id = string(data)return nil
}
func (this *Req) String() string {return "req"
}type Rsp struct {Msg string
}func (this *Rsp) Marshal() ([]byte, error) {return []byte(this.Msg), nil
}
func (this *Rsp) Unmarshal(data []byte) error {this.Msg = string(data)return nil
}
func (this *Rsp) String() string {return "rsp"
}

2.5.4 在rpctest/marshaler.go中新增handler函数testMarshal

func (self *rpctest) testMarshal(req Req) (*Rsp, error) {r := &Rsp{Msg: fmt.Sprintf("%v", req.Id)}return r, nil
}

2.5.5 在rpctest/module.go中将testMarshal注册到模块中

func (self *rpctest) OnInit(app module.App, settings *conf.ModuleSettings) {self.BaseModule.OnInit(self, app, settings)self.GetServer().RegisterGO("/test/proto", self.testProto)self.GetServer().RegisterGO("/test/marshal", self.testMarshal)  //注册testMarshal方法
}

2.5.6 在web/moduler.go中新增api进行测试

// startHttpServer 启动 HTTP 服务器
func (self *Web) startHttpServer() *http.Server {// 创建一个新的 HTTP 服务器,监听 8080 端口srv := &http.Server{Addr: ":8080"}// 设置根路由("/")的处理函数http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {// 解析表单数据_ = r.ParseForm()// 调用 RPC 方法 "helloword" 的 "/say/hi" 函数,传入 name 参数rstr, err := mqrpc.String(self.Call(context.Background(),"helloword",                     // 模块名"/say/hi",                       // 方法名mqrpc.Param(r.Form.Get("name")), // 参数),)// 记录 RPC 调用结果log.Info("RpcCall %v, err %v", rstr, err)// 将结果写入 HTTP 响应_, _ = io.WriteString(w, rstr)})// 设置 "/test/proto" 路由的处理函数http.HandleFunc("/test/proto", func(w http.ResponseWriter, r *http.Request) {// 解析表单数据_ = r.ParseForm()// 创建一个带3秒超时的上下文ctx, _ := context.WithTimeout(context.TODO(), time.Second*3)// 创建一个新的 rpcpb.ResultInfo 对象protobean := new(rpcpb.ResultInfo)// 调用 RPC 方法并将结果解析到 protobeanerr := mqrpc.Proto(protobean, func() (reply interface{}, errstr interface{}) {return self.Call(ctx,"rpctest",     // 模块名"/test/proto", // 方法名mqrpc.Param(&rpcpb.ResultInfo{Error: *proto.String(r.Form.Get("message"))}), // 参数)})// 记录 RPC 调用结果log.Info("RpcCall %v , err %v", protobean, err)// 如果有错误,将错误信息写入响应if err != nil {_, _ = io.WriteString(w, err.Error())}// 将 protobean 的 Error 字段写入响应_, _ = io.WriteString(w, protobean.Error)})http.HandleFunc("/test/marshal", func(w http.ResponseWriter, r *http.Request) {// 解析表单数据_ = r.ParseForm()// 创建一个带3秒超时的上下文ctx, _ := context.WithTimeout(context.TODO(), time.Second*3)// 创建一个新的 rpctest.Rsp 对象rspbean := new(rpctest.Rsp)// 调用 RPC 方法并将结果解析到 rspbeanerr := mqrpc.Proto(rspbean, func() (reply interface{}, errstr interface{}) {return self.Call(ctx,"rpctest",       // 模块名"/test/marshal", // 方法名mqrpc.Param(&rpctest.Req{Id: r.Form.Get("mid")}), // 参数)})// 记录 RPC 调用结果log.Info("RpcCall %v , err %v", rspbean, err)// 如果有错误,将错误信息写入响应if err != nil {_, _ = io.WriteString(w, err.Error())}// 将 rspbean 的 Error 字段写入响应_, _ = io.WriteString(w, rspbean.Msg)})// 在新的 goroutine 中启动 HTTP 服务器go func() {if err := srv.ListenAndServe(); err != nil {log.Info("Httpserver: ListenAndServer() error: %s", err)}}()return srv
}

2.6 RPC返回结果断言

由于RpcCall是一个通用函数,无法对其返回值指定类型,为简化代码,mqant参考了redis封装了RPC返回类型断言函数,方便使用。

2.6.1 protocolbuffer断言

protobean := new(rpcpb.ResultInfo)
err:=mqrpc.Proto(protobean,func() (reply interface{}, errstr interface{}) {return self.Call(ctx,"rpctest",     //要访问的moduleType"/test/proto", //访问模块中handler路径mqrpc.Param(&rpcpb.ResultInfo{Error: *proto.String(r.Form.Get("message"))}),)
})
log.Info("RpcCall %v , err %v",protobean,err)

2.6.2 自定义结构断言

rspbean := new(rpctest.Rsp)
err:=mqrpc.Marshal(rspbean,func() (reply interface{}, errstr interface{}) {return self.Call(ctx,"rpctest",     //要访问的moduleType"/test/marshal", //访问模块中handler路径mqrpc.Param(&rpctest.Req{Id: "hello 我是RpcInvoke"}),)
})
log.Info("RpcCall %v , err %v",rspbean,err)

这段代码中确实使用了类型断言,但它是隐式的,发生在 mqrpc.Proto 函数内部。让我详细解释一下这个过程:

  • rspbean := new(rpctest.Rsp)

创建了一个新的 rpctest.Rsp 类型的指针。

  • err := mqrpc.Proto(protobean, func() (reply interface{}, errstr interface{}) { ... })

这里调用了 mqrpc.Proto 函数,它接受两个参数:

  • 第一个参数 rspbean是用来存储结果的。
  • 第二个参数是一个匿名函数,这个函数执行实际的 RPC 调用。
  • 在 mqrpc.Proto 函数内部(虽然我们看不到其实现),很可能发生了以下过程:
  • 调用传入的匿名函数获取 RPC 调用的结果。
  • 使用类型断言将返回的 reply interface{} 转换为具体的 Protocol Buffers 类型。
  • 将转换后的结果复制或解析到传入的rspbean中。
  • 类型断言的过程大概是这样的:

       if protoReply, ok := reply.(* rpctest.Rsp); ok {

           // 复制 protoReply 到 rspbean

       } else {

           // 处理类型不匹配的情况

       }

5. self.Call(...) 函数返回的结果被 mqrpc.Proto 函数处理,然后填充到 rspbean 中。

2.6.3 字符串断言

rstr,err:=mqrpc.String(self.Call(context.Background(),"helloworld","/say/hi",mqrpc.Param(r.Form.Get("name")),),
)
log.Info("RpcCall %v , err %v",rstr,err)

3. 服务发现

3.1 服务续约

3.1.1 基本原理

服务在启动时注册服务发现,并在关闭时取消注册。有时这些服务可能会意外死亡,被强行杀死或面临临时的网络问题。在这些情况下,遗留的节点将存在服务发现中。

3.1.2 解决方法

服务发现支持注册的TTL选项和间隔。TTL指定在发现之后注册的信息存在多长时间,然后国企呗删除。时间间隔是服务应该重新注册的时间,以保留其在服务发现中的注册信息。

3.1.3 设置

mqant默认的ttl=20,重新注册间隔为10秒

func (self *rpctest) OnInit(app module.App, settings *conf.ModuleSettings) {self.BaseModule.OnInit(self, app, settings,server.RegisterInterval(15*time.Second),server.RegisterTTL(30*time.Second),)
}设置了一个30秒的ttl,重新注册间隔为15秒。如果应用进程被强杀,服务未取消注册,则30秒内其他服务节点无法感知改节点已失效。

3.2 服务元数据

服务还支持元数据,即服务的自身属性,通过这些属性我们可以定制服务发现策略。

3.2.1 节点ID

一般情况下节点ID在模块初始化时由系统自动生成一个不重复的随机数,但也可以指定节点ID

func (self *rpctest) OnInit(app module.App, settings *conf.ModuleSettings) {self.BaseModule.OnInit(self, app, settings,server.RegisterInterval(15*time.Second),server.RegisterTTL(30*time.Second),server.Id("mynode001"),)
}

3.2.2 调用

如果明确知道节点ID,那你可以直接找到,虽然通常不这样用

err:=mqrpc.Marshal(rspbean,func() (reply interface{}, errstr interface{}) {return self.Call(ctx,"rpctest@mynode001",     //要访问的moduleType"/test/marshal", //访问模块中handler路径mqrpc.Param(&rpctest.Req{Id: r.Form.Get("mid")}),)
})

3.2.3 服务版本(Version)

模块(服务)启动时,会自动注册模块func Version() string 的返回值作为服务的版本。

可以利用服务版本过滤节点

rstr,err:=mqrpc.String(self.Call(ctx,"helloworld",    //要访问的moduleType"/say/hi",            //访问模块中handler路径mqrpc.Param(r.Form.Get("name")),selector.WithStrategy(func(services []*registry.Service) selector.Next {var nodes []*registry.Node// Filter the nodes for datacenterfor _, service := range services {if service.Version!= "1.0.0"{continue}for _, node := range service.Nodes {nodes = append(nodes, node)}}var mtx sync.Mutex//log.Info("services[0] $v",services[0].Nodes[0])return func() (*registry.Node, error) {mtx.Lock()defer mtx.Unlock()if len(nodes) == 0 {return nil, fmt.Errorf("no node")}index := rand.Intn(int(len(nodes)))return nodes[index], nil}}),),)

3.2.4 元数据(Metadata)

可以为服务节点指定设置元数据,元数据时节点级别,且可以随时修改,利用好它可以灵活的实现定制化的服务发现,比如实现灰度发布,熔断策略等等

3.2.4.1 设置元数据
self.GetServer().Options().Metadata["state"]="alive"
3.2.4.2 立即刷新

设置好的元数据会等到下一次重新注册时更新配置中心并同步至其他节点,想立即生效的话可以这样做

_ := self.GetServer().ServiceRegister()
rstr,err:=mqrpc.String(self.Call(ctx,"helloworld",    //要访问的moduleType"/say/hi",            //访问模块中handler路径mqrpc.Param(r.Form.Get("name")),selector.WithStrategy(func(services []*registry.Service) selector.Next {var nodes []*registry.Node// Filter the nodes for datacenterfor _, service := range services {if service.Version!= "1.0.0"{continue}for _, node := range service.Nodes {nodes = append(nodes, node)if node.Metadata["state"] == "alive" || node.Metadata["state"] == "" {nodes = append(nodes, node)}}}var mtx sync.Mutex//log.Info("services[0] $v",services[0].Nodes[0])return func() (*registry.Node, error) {mtx.Lock()defer mtx.Unlock()if len(nodes) == 0 {return nil, fmt.Errorf("no node")}index := rand.Intn(int(len(nodes)))return nodes[index], nil}}),),
)

3.3 服务选择

微服务中每一个服务都会部署多个节点,并且根据实际情况可能面临新增或摘除节点,通常选择节点是结合业务而定的,因此灵活的节点选择器是框架必备的功能。

3.3.1 用法

mqant的节点选择器(selector)是从go-mirco移植来的,其使用规则可参考go-mirco实现

3.3.2 默认选择

mqant默认的选择器是一个随即负责均衡选择器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/51649.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

二分类、多分类、多标签分类的评价指标

前言 在机器学习和深度学习中&#xff0c;常见的分类任务可以分为&#xff1a;二分类&#xff08;Binary Classification); 多分类&#xff08;Multi-class Classification); 多标签分类&#xff08;Multi-label Classification); 序列分类 (Sequence Classification); 图分类…

Notcoin 即将空投:你需要知道什么

Notcoin 于 2024 年 1 月推出&#xff0c;是 Telegram 上的一款边玩边赚游戏&#xff0c;用户可以通过点击硬币图标获得 Notcoin 代币 (NOT) 形式的奖励。NOT 建立在开放网络区块链&#xff08;称为“TON 区块链”&#xff09;上&#xff0c;由 Open Builders 创始人 Sasha Plo…

IOS-05 Swift循环控制语句

在 Swift 编程语言中&#xff0c;控制语句用于决定程序的执行流程&#xff0c;使我们能够根据不同的条件和情况来控制代码的执行顺序。下面我们将详细介绍几种常见的控制语句 一、for 循环 let names ["zhangsan","lisi"] for name in names{print(name…

DNS服务器搭建练习

练习要求&#xff1a; 3、搭建一个dns服务器&#xff0c;客户端可以使用该服务器解析域名www.haha.com为web服务器的 4、将客户端的ip地址中的域名解析服务器地址修改为第3题的dnt服务器的p&#xff0c;使用ping命令ping www.haha.com看能否ping通&#xff0c;用curl命令访问c…

FPGA开发——LED流水灯实现先从左往右流水,再从右往左流水

一、概述 我们在设计完一个方向的流水灯的设计时&#xff0c;总是会想实现让流水灯倒着流水回去的设计&#xff0c;这里我也是一样&#xff0c;实现这种设计的方法有很多种&#xff0c;其中就有直接使用case语句将所有可能包含进去编写&#xff0c;这种设计方法是最简单的&…

STM32通信协议 总集篇 (速记版)

名称引脚常用在双工时钟电平设备USARTTX、RX单片机和pc,单片机和单片机全双工异步单端点对点I2CSCL、SDA单片机和单片机半双工同步单端多设备SPISCLK、MOSI、MISO、CS单片机和单片机全双工同步单端多设备CANCAN_H、CAN_L智能汽车半双工异步差分多设备USBDP、DM半双工异步差分点…

[php7系列]--php7里的返回类型声明和标量类型声明及不要用isset判断数组是否定义某个KEY-最好使用array_key_exists

一、[php7系列]--php7里的返回类型声明和标量类型声明 php7里增加了返回类型声明和标题类型声明&#xff0c;可以理解为对一个方法的输入输出进行了类型验证&#xff0c;在PHP7之前&#xff0c;方法里的数组、对象参数是有类型声明的&#xff0c;但其它的整数、字符串等类型声明…

【海贼王航海日志:前端技术探索】HTML你学会了吗?(一)

目录 1 -> HTML概念 2 -> HTML结构 2.1 -> 认识HTML标签 2.2 -> HTML文件基本结构 2.3 -> 标签层次结构 3 -> 快速生成代码框架 4 -> HTML常见标签 4.1 -> 注释标签 4.2 -> 标题标签 4.3 -> 段落标签 4.4 -> 换行标签 4.5 ->…

ES(ElasticSearch)倒排索引

目录 正排与倒排索引 1.正排索引 作用&#xff1a; 优点&#xff1a; 缺点&#xff1a; 2.倒排索引 原理&#xff1a; 倒排索引的构建流程&#xff1a; 倒排索引的搜索流程&#xff1a; 优点&#xff1a; 缺点&#xff1a; 3. 应用场景 倒排索引中有几个非常重要的概念…

【二叉树的锯齿形层序遍历】python刷题记录

R2-树与二叉树篇 层序遍历双端队列deque # Definition for a binary tree node. # class TreeNode: # def __init__(self, val0, leftNone, rightNone): # self.val val # self.left left # self.right right class Solution:def zigzagLevelOr…

【读代码】高斯掩模

目录 问题&#xff1a; 主要功能&#xff1a; 问题&#xff1a; 看不懂实现的功能 主要功能&#xff1a; 从输出张量中提取与边界框对应的区域&#xff0c;并计算该区域与高斯核之间的均方误差&#xff08;MSE&#xff09;损失 例子 假设我们有以下输入&#xff1a; boxe…

我的创作纪念日(一)——Giser?Noder?不如“Computer”

目录 Giser&#xff1f;Noder&#xff1f;不如“Computer” 一、根源&#xff1a;保持学习习惯的刚需 二、机缘&#xff1a;processOn的另类替代 三、日常&#xff1a;对技术栈丰富的思考 四、成就&#xff1a;保持心态健康的活着 五、憧憬&#xff1a;能一直心态健康的活…

前端实现【 批量任务调度管理器 】demo优化

一、前提介绍 我在前文实现过一个【批量任务调度管理器】的 demo&#xff0c;能实现简单的任务批量并发分组&#xff0c;过滤等操作。但是还有很多优化空间&#xff0c;所以查找一些优化的库&#xff0c; 主要想优化两个方面&#xff0c; 上篇提到的&#xff1a; 针对 3&…

CSS技巧专栏:一日一例 14-纯CSS实现模拟水波波动填充按钮特效

CSS技巧专栏:一日一例 14-纯CSS实现模拟水波波动填充按钮特效 大家好,今天介绍一个在网上很常见的模拟水波波动要灌满按钮的动画效果,效果下面图所示。 本例图片 案例分析 我们沿着Z轴从上到下数一下一共有几个层: 文字层:白色文字阴影的黑色文字,当鼠标移动上来时候…

黑马点评--给店铺类型查询添加缓存

controller/ShopTypeController.java /*** 店铺分类查询&#xff0c;用于展示首页头部店铺分类* return*/GetMapping("list")public Result queryTypeList() {return typeService.queryList();} service/IShopTypeService.java Result queryList(); service/impl/S…

fatal: Could not read from remote repository. 解决方法

问题描述&#xff1a; Git : fatal: Could not read from remote repository. Please make sure you have the correct access rights and the repository exists。 解决方法&#xff1a; 当在网上尝试大量方法仍然失败的时候&#xff0c;不妨试试这个方法。 在 github 上&…

探索 Redis 不同集群架构的性能与应用

1. 引言 Redis的集群配置成为了提高数据可靠性和服务可用性的关键。本文将带领大家了解Redis的四种主要集群架构&#xff0c;并重点分析哨兵模式和Redis Cluster架构和优势。 2. Redis的四种集群架构 2.1 单实例Redis 使用单个 Redis 实例提供服务。适用于小规模应用&#…

论文阅读:Deformable DETR: Deformable Transformers for End-to-End Object Detection

论文阅读&#xff1a;Deformable DETR: Deformable Transformers for End-to-End Object Detection Deformable DETR: 基于稀疏空间采样的注意力机制&#xff0c;让DCN与Transformer一起玩&#xff01; - 知乎 (zhihu.com) 【Deformable DETR 论文源码解读】Deformable Trans…

The Llama 3 Herd of Models.Llama 3 模型第1,2,3部分全文

现代人工智能(AI)系统是由基础模型驱动的。本文提出了一套新的基础模型,称为Llama 3。它是一组语言模型,支持多语言、编码、推理和工具使用。我们最大的模型是一个密集的Transformer,具有405B个参数和多达128K个tokens的上下文窗口。本文对Llama 3进行了广泛的实证评价。我们…

【error】AttributeError: module ‘cv2.dnn‘ has no attribute ‘DictValue‘(库冲突)

conda list conda remove opencv pip uninstall opencv-python conda list pip 同时卸载两个库 pip uninstall opencv-contrib-python opencv-python 没有and 直接写库名 module ‘cv2.dnn‘ has no attribute ‘DictValue‘解决办法_module cv2.dnn has no attribute d…