从0到1开发go-tcp框架【3-读写协程分离、引入消息队列、进入连接管理器、引入连接属性】【基础篇完结】

从0到1开发go-tcp框架【3-读写协程分离、引入消息队列、进入连接管理器、引入连接属性】

1 读写协程分离[v0.7]

  1. 添加一个Reader和Writer之间通信的channel
  2. 添加一个Writer goroutine
  3. Reader由之前直接发送给客户端改为发送给通信channel
  4. 启动Reader和Writer一起工作

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""net"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan   chan boolMsgHandler *MsgHandle
}func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),}return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}go c.MsgHandler.DoMsgHandler(&r)}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = truec.Conn.Close()c.ExitChan <- trueclose(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}

测试

myDemo/ZinxV0.7/client.go

  • client0.go
package mainimport ("fmt""io""myTest/zinx/znet""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("client start...")time.Sleep(time.Second * 1)//1 创建服务器连接conn, err := net.Dial("tcp", "127.0.0.1:8092")if err != nil {fmt.Println("client start err ", err)return}for {//发送封装后的数据包dp := znet.NewDataPack()binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx client0 test msg")))if err != nil {fmt.Println("client pack msg err ", err)return}if _, err := conn.Write(binaryMsg); err != nil {fmt.Println("client write err ", err)return}//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...//1 先读取流中的head部分,得到Id和dataLenbinaryHead := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(conn, binaryHead); err != nil {fmt.Println("client read head err ", err)break}//将二进制的head拆包到msg中msgHead, err := dp.UnPack(binaryHead)if err != nil {fmt.Println("client unpack msgHead err ", err)break}if msgHead.GetMsgLen() > 0 {//2 有数据, 再根据dataLen进行二次读取,将data读出来msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(conn, msg.Data); err != nil {fmt.Println("read msg data error ", err)return}fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))}//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片time.Sleep(time.Second * 1)}
}
  • client1.go

在这里插入图片描述

myDemo/ZinxV0.7/server.go

package mainimport ("fmt""myTest/zinx/ziface""myTest/zinx/znet"
)//自定义一个Router,测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println("call router handler...")//先读取客户端数据,再回写ping...ping...ping...fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))//回写pingerr := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))if err != nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))if err != nil {fmt.Println(err)}
}func main() {s := znet.NewServer("[Zinx v0.7]")//添加自定义路由(PingRouter和HelloRouter)router0 := &PingRouter{}s.AddRouter(0, router0)router1 := &HelloRouter{}s.AddRouter(1, router1)s.Serve()
}

结果:
在这里插入图片描述

  • 接受多个客户端也可以
    在这里插入图片描述
  • 当client0退出时,不会影响client1
    在这里插入图片描述

2 创建消息队列及多任务[v0.8]

  1. 创建一个消息队列,MsgHandler消息管理模块增加:TaskQueue、WorkerPoolSize
  2. 创还能多任务worker的工作池并且启动
  3. 将之前发送的消息,全部改为把消息发送给消息队列和worker工作池来处理

在这里插入图片描述

实现消息队列机制和工作池机制(集成到自定义框架)

  1. 创建一个消息队列:MsgHandler消息管理模块
  2. 创建多任务worker的工作池并启动
  3. 将之前发送的消息,全部改为把消息发送给消息队列和worker工作池来处理
  4. 将消息队列机制集成到Zinx框架中
  • 开启并调用消息队列及worker工作池
  • 将从客户端处理的消息,发送给当前Worker的工作池来处理

zinx/znet/server.go

package znetimport ("fmt""myTest/zinx/util""myTest/zinx/ziface""net"
)type Server struct {Name       stringIPVersion  stringIP         stringPort       intMsgHandler *MsgHandle
}func NewServer(name string) *Server {s := &Server{Name:       name,IPVersion:  "tcp4",IP:         util.GlobalObject.Host,Port:       util.GlobalObject.TcpPort,MsgHandler: NewMsgHandle(),}return s
}func (s *Server) Start() {//启动服务监听端口fmt.Printf("[Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n", s.Name, s.IP, s.Port)fmt.Printf("[Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n", util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)var cid uint32 = 0go func() {//0 开启消息队列及Worker工作池s.MsgHandler.StartWorkerPool()addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Printf("resolve tcp addr error %v\n", err)return}listener, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen ", s.IPVersion, " err ", err)return}fmt.Println("[start] Zinx server success ", s.Name, "Listening...")//阻塞连接,处理业务for {conn, err := listener.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}dealConn := NewConnection(conn, cid, s.MsgHandler)cid++//开启goroutine处理启动当前conngo dealConn.Start()}}()
}func (s *Server) Stop() {}func (s *Server) Serve() {s.Start()//阻塞,一直读取客户端所发送过来的消息select {}
}func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.MsgHandler.AddRouter(msgId, router)
}

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""myTest/zinx/util""net"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan   chan boolMsgHandler *MsgHandle
}func NewConnection(conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),}return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize > 0 {c.MsgHandler.SendMsgToTaskQueue(&r)} else {go c.MsgHandler.DoMsgHandler(&r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = truec.Conn.Close()c.ExitChan <- trueclose(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}

zinx/znet/msgHandler.go

package znetimport ("fmt""myTest/zinx/util""myTest/zinx/ziface""strconv"
)type MsgHandle struct {//msgId与对应的router对应Api map[uint32]ziface.IRouter//负责worker取任务的消息队列TaskQueue []chan ziface.IRequest//业务工作worker池的goroutine数量WorkerPoolSize uint32
}func NewMsgHandle() *MsgHandle {return &MsgHandle{Api:            make(map[uint32]ziface.IRouter),TaskQueue:      make([]chan ziface.IRequest, util.GlobalObject.WorkerPoolSize),WorkerPoolSize: util.GlobalObject.WorkerPoolSize,}
}func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {//判断是否有对应的routerif _, ok := mh.Api[request.GetMsgID()]; !ok {fmt.Println("msgId ", request.GetMsgID(), "does not exist handler, need to add router")return}//call handlerrouter := mh.Api[request.GetMsgID()]router.PreHandle(request)router.Handler(request)router.PostHandler(request)
}func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {if _, ok := mh.Api[msgId]; ok {//如果已经存在了对应的router,则提示panic("repeat api, msgId = " + strconv.Itoa(int(msgId)))}mh.Api[msgId] = routerfmt.Println("msgId ", msgId, "Add router success ")
}//启动一个worker工作池(开启工作池的动作只能发生一次,一个zinx框架只能有一个worker工作池)
func (mh *MsgHandle) StartWorkerPool() {for i := 0; i < int(mh.WorkerPoolSize); i++ {//开辟任务队列mh.TaskQueue[i] = make(chan ziface.IRequest, util.GlobalObject.MaxWorkerTaskLen)//启动workergo mh.startOneWorker(i, mh.TaskQueue[i])}
}func (mh *MsgHandle) startOneWorker(workerId int, taskQueue chan ziface.IRequest) {fmt.Println("Worker ID=", workerId, " is started...")for {select {//从任务队列中取消息(如果有消息过来,出列的就是request,然后执行该request所绑定的业务)case request := <-taskQueue:mh.DoMsgHandler(request)}}
}//将消息交给taskQueue,由Worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {//通过取余数的方式来达到负载均衡workID := request.GetConnection().GetConnectionID() % util.GlobalObject.WorkerPoolSizefmt.Println("Add ConnID=", request.GetConnection().GetConnectionID()," requestID=", request.GetMsgID()," workID=", workID)//将消息发送给对应worker的任务队列mh.TaskQueue[workID] <- request
}

zinx/ziface/imsgHandler.go

package zifacetype IMsgHandler interface {DoMsgHandler(request IRequest)AddRouter(msgId uint32, router IRouter)StartWorkerPool()SendMsgToTaskQueue(request IRequest)
}

测试

myDemo/ZinxV0.8/Server.go

同myDemo/ZinxV0.7/Server.go,修改一下NewServer时候所传的Zinx的名称即可

myDemo/ZinxV0.8/Client.go

同myDemo/ZinxV0.7/Client.go

myDemo/ZinxV0.8/zinx.json

{"Name": "Zinx Server Application","Version": "V0.8","Host": "0.0.0.0","TcpPort": 8092,"MaxConn": 30,"MaxPackageSize": 1024,"WorkerPoolSize": 10
}

在这里插入图片描述

在这里插入图片描述

3 连接管理器(connManager)[v0.9]

3.1 连接管理器(conn)的定义与实现

创建一个连接管理模块ConnManager

  • 添加连接
  • 删除连接
  • 根据连接ID查找对应的连接
  • 总连接个数
  • 清理全部的连接

3.2 将连接管理模块集成到Zinx框架中

  1. 给server添加一个ConnMgr属性
  2. 修改NewServer方法,加入ConnMgr初始化
  3. 判断当前连接数是否超出最大值MaxConn
  4. 当server停止的时候(调用server.Stop方法),应该加入ConnMgr.ClearConn()

3.3 提供创建连接/销毁连之前所需的Hook函数

给我们自定义框架Zinx提供创建连接之后/销毁连接之前所要处理的一些业务。提供给用户能够注册的Hook函数

  • 添加OnConnStart()
  • 添加OnConnStop()

zinx/ziface/iserver.go

package zifacetype IServer interface {Start()Stop()Serve()AddRouter(msgId uint32, router IRouter)GetConnMgr() IConnManager//注册创OnConnStart钩子函数SetOnConnStart(func(conn IConnection))SetOnConnStop(func(conn IConnection))//调用OnConnStart钩子函数CallOnConnStart(conn IConnection)CallOnConnStop(conn IConnection)
}

zinx/ziface/iconnmanager.go

package zifacetype IConnManager interface {Add(conn IConnection)Remove(conn IConnection)Get(connID uint32) (IConnection, error)Len() intClearConn()
}

zinx/znet/connmanager.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""myTest/zinx/util""myTest/zinx/ziface""sync"
)type ConnManager struct {connections map[uint32]ziface.IConnection //管理的连接集合connLock    sync.RWMutex                  //保护连接集合的读写锁
}func NewConnManager() *ConnManager {return &ConnManager{connections: make(map[uint32]ziface.IConnection, util.GlobalObject.MaxConn),}
}
func (cm *ConnManager) Add(conn ziface.IConnection) {//添加写锁cm.connLock.Lock()defer cm.connLock.Unlock()cm.connections[conn.GetConnectionID()] = connfmt.Println("connectionID=", conn.GetConnectionID(), " add to ConnManager success, conn num=", cm.Len())
}func (cm *ConnManager) Remove(conn ziface.IConnection) {//保护共享资源mapcm.connLock.Lock()defer cm.connLock.Unlock()delete(cm.connections, conn.GetConnectionID())fmt.Println("connectionID=", conn.GetConnectionID(), " remote from ConnManager success, conn num=", cm.Len())
}func (cm *ConnManager) Get(connID uint32) (ziface.IConnection, error) {cm.connLock.RLock()defer cm.connLock.RUnlock()if conn, ok := cm.connections[connID]; ok {return conn, nil} else {return nil, errors.New("connection NOT FOUND")}
}func (cm *ConnManager) Len() int {return len(cm.connections)
}func (cm *ConnManager) ClearConn() {cm.connLock.Lock()defer cm.connLock.Unlock()for connID, conn := range cm.connections {//停止连接conn.Stop()//删除连接delete(cm.connections, connID)}fmt.Println("Clear All connections success! conn num=", cm.Len())
}

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""myTest/zinx/util""myTest/zinx/ziface""net"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan   chan boolMsgHandler *MsgHandleTcpServer  ziface.IServer
}func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),TcpServer:  server,}//将conn添加到connMgr中c.TcpServer.GetConnMgr().Add(c)return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize > 0 {c.MsgHandler.SendMsgToTaskQueue(&r)} else {go c.MsgHandler.DoMsgHandler(&r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()//执行钩子函数c.TcpServer.CallOnConnStart(c)
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = true//连接关闭之前执行hook关闭的钩子函数c.TcpServer.CallOnConnStop(c)c.Conn.Close()c.ExitChan <- true//连接conn关闭时,需要从连接管理模块中移除c.TcpServer.GetConnMgr().Remove(c)close(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}

zinx/znet/server.go

package znetimport ("fmt""myTest/zinx/util""myTest/zinx/ziface""net"
)type Server struct {Name       stringIPVersion  stringIP         stringPort       intMsgHandler *MsgHandleConnMgr    *ConnManager//创建连接之前的Hook函数OnConnStart func(conn ziface.IConnection)OnConnStop  func(conn ziface.IConnection)
}func NewServer(name string) *Server {s := &Server{Name:       name,IPVersion:  "tcp4",IP:         util.GlobalObject.Host,Port:       util.GlobalObject.TcpPort,MsgHandler: NewMsgHandle(),ConnMgr:    NewConnManager(),}return s
}func (s *Server) Start() {//启动服务监听端口fmt.Printf("[Zinx] Server Name :%s , listen IP :%v , Port: %d is starting \n", s.Name, s.IP, s.Port)fmt.Printf("[Zinx] Version :%s , MaxConn:%v , MaxPackageSize: %d \n", util.GlobalObject.Version, util.GlobalObject.MaxConn, util.GlobalObject.MaxPackageSize)var cid uint32 = 0go func() {//0 开启消息队列及Worker工作池s.MsgHandler.StartWorkerPool()addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Printf("resolve tcp addr error %v\n", err)return}listener, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen ", s.IPVersion, " err ", err)return}fmt.Println("[start] Zinx server success ", s.Name, "Listening...")//阻塞连接,处理业务for {conn, err := listener.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}//判断当前连接数是否超过最大连接数,如果超过则关闭新创建的连接if s.ConnMgr.Len() >= util.GlobalObject.MaxConn {//TODO 给客户端返回一个超出最大连接的错误包fmt.Println("-----------------》 Tcp Conn exceed, conn num=", util.GlobalObject.MaxConn)conn.Close()//关闭当前连接,等待下一次连接【如果当前连接数小于最大连接数】continue}dealConn := NewConnection(s, conn, cid, s.MsgHandler)cid++//开启goroutine处理启动当前conngo dealConn.Start()}}()
}func (s *Server) Stop() {//释放相关资源fmt.Println("[STOP] Zinx server name ", s.Name)s.ConnMgr.ClearConn()
}func (s *Server) Serve() {s.Start()//阻塞,一直读取客户端所发送过来的消息select {}
}func (s *Server) GetConnMgr() ziface.IConnManager {return s.ConnMgr
}func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {s.MsgHandler.AddRouter(msgId, router)
}//注册创OnConnStart钩子函数
func (s *Server) SetOnConnStart(hookFunc func(conn ziface.IConnection)) {s.OnConnStart = hookFunc
}func (s *Server) SetOnConnStop(hookFunc func(conn ziface.IConnection)) {s.OnConnStop = hookFunc
}//调用OnConnStart钩子函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {if s.OnConnStart != nil {fmt.Println("---------> call OnConnStart()")s.OnConnStart(conn)}
}func (s *Server) CallOnConnStop(conn ziface.IConnection) {if s.OnConnStop != nil {fmt.Println("----------> call OnConnStop()")s.OnConnStop(conn)}
}

测试

myDemo/ZinxV0.9/Server.go
package mainimport ("fmt""myTest/zinx/ziface""myTest/zinx/znet"
)//自定义一个Router,测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println("call router handler...")//先读取客户端数据,再回写ping...ping...ping...fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))//回写pingerr := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))if err != nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))if err != nil {fmt.Println(err)}
}//连接创建成功之后需要执行的逻辑
func DoConnBegin(conn ziface.IConnection) {fmt.Println("=====>Do Conn Begin...")if err := conn.SendMsg(202, []byte("do connection begin...")); err != nil {fmt.Println("err")}
}//连接断开之前要执行的逻辑
func DoConnLost(conn ziface.IConnection) {fmt.Println("=====>Do Conn Lost...")fmt.Println("connID=", conn.GetConnectionID(), " is Lost....")
}func main() {s := znet.NewServer("[Zinx v0.9]")//添加自定义路由(PingRouter和HelloRouter)router0 := &PingRouter{}s.AddRouter(0, router0)router1 := &HelloRouter{}s.AddRouter(1, router1)//注册hook钩子函数s.SetOnConnStart(DoConnBegin)s.SetOnConnStop(DoConnLost)s.Serve()
}

测试代码中的myDemo/ZinxV0.9/Client.go和myDemo/ZinxV0.8/Client.go一样。

  • 为了方便测试超过最大连接数的报错信息,我们可以修改配置文件
    在这里插入图片描述
//将最大连接数设置为2,然后我们复制Client.go,可以多起几个Client来进行测试
{"Name": "Zinx Server Application","Version": "V0.9","Host": "0.0.0.0","TcpPort": 8092,"MaxConn": 2,"MaxPackageSize": 1024,"WorkerPoolSize": 10
}

测试最大连接数与连接管理:
在这里插入图片描述

测试钩子函数:
在这里插入图片描述

4 添加连接属性并测试【v0.10】

通过map[string]interface{}来存储连接的属性值,通过RWLock来保证读写connection属性值安全

  • 设置连接属性
  • 获取连接属性
  • 移除连接属性

zinx/ziface/iconnection.go

package zifaceimport "net"type IConnection interface {//启动连接Start()//停止连接Stop()//获取当前连接的Conn对象GetTCPConnection() *net.TCPConn//获取当前连接模块的idGetConnectionID() uint32//获取远程客户端的TCP状态 IP:PortRemoteAddr() net.Addr//发送数据SendMsg(msgId uint32, data []byte) errorSetProperty(key string, value interface{})GetProperty(key string) (interface{}, error)RemoveProperty(key string)
}//定义一个处理连接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error

zinx/znet/connection.go

package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""io""myTest/zinx/util""myTest/zinx/ziface""net""sync"
)type Connection struct {Conn       *net.TCPConnConnID     uint32isClosed   boolmsgChannel chan []byte//告知当前的连接已经退出/停止(由Reader告知writer退出)ExitChan     chan boolMsgHandler   *MsgHandleTcpServer    ziface.IServerproperty     map[string]interface{}propertyLock sync.RWMutex
}func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandle *MsgHandle) *Connection {c := &Connection{Conn:       conn,ConnID:     connID,MsgHandler: msgHandle,isClosed:   false,msgChannel: make(chan []byte),ExitChan:   make(chan bool, 1),TcpServer:  server,property: make(map[string]interface{}),}//将conn添加到connMgr中c.TcpServer.GetConnMgr().Add(c)return c
}func (c *Connection) StartWriter() {fmt.Println("[Writer Goroutine is running]")defer fmt.Println("[conn Writer  goroutine exit!]", c.RemoteAddr().String())//不断的阻塞等待channel的消息,然后将channel中的消息写给客户端for {select {case data := <-c.msgChannel://有数据写给客户端if _, err := c.Conn.Write(data); err != nil {fmt.Println("Send data error , ", err)return}case <-c.ExitChan://代表reader已经退出,此时writer也需要退出return}}
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("[Reader goroutine is exit] connID=", c.ConnID, " remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {//创建一个拆包对象dp := NewDataPack()//读取客户端的msg Head 二进制流 8字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head err ", err)break}//拆包,将读取到的headData封装为msgmsg, err := dp.UnPack(headData)if err != nil {fmt.Println("unpack msg err ", err)break}//根据dataLen,再次读取Data,放在msg.Data中,var data []byte//如果数据包中有数据,则读取if msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())//将切片data读满if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data err ", err)break}}msg.SetData(data)//封装请求,改为router处理r := Request{conn: c,msg:  msg,}//判断是否开启workerPool,如果没有开启则直接创建协程处理;如果开启则通过workerPool处理if util.GlobalObject.WorkerPoolSize > 0 {c.MsgHandler.SendMsgToTaskQueue(&r)} else {go c.MsgHandler.DoMsgHandler(&r)}}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)//开启读、写go c.StartReader()go c.StartWriter()//执行钩子函数c.TcpServer.CallOnConnStart(c)
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = true//连接关闭之前执行hook关闭的钩子函数c.TcpServer.CallOnConnStop(c)c.Conn.Close()c.ExitChan <- true//连接conn关闭时,需要从连接管理模块中移除c.TcpServer.GetConnMgr().Remove(c)close(c.msgChannel)close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed {return errors.New("connection closed\n")}//将data进行封包dp := NewDataPack()binaryMsg, err := dp.Pack(NewMessage(msgId, data))if err != nil {fmt.Println("Pack error msg id=", msgId)return errors.New("pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("write msg id ", msgId, " error ", err)return errors.New("conn write err ")}return nil
}func (c *Connection) SetProperty(key string, value interface{}) {c.propertyLock.Lock()defer c.propertyLock.Unlock()c.property[key] = value
}func (c *Connection) GetProperty(key string) (interface{}, error) {c.propertyLock.RLock()defer c.propertyLock.RUnlock()if value, ok := c.property[key]; ok {return value, nil} else {return nil, errors.New("no property found")}
}func (c *Connection) RemoveProperty(key string) {c.propertyLock.Lock()defer c.propertyLock.Unlock()delete(c.property, key)
}

测试

myDemo/ZinxV0.10/Server.go
package mainimport ("fmt""myTest/zinx/ziface""myTest/zinx/znet"
)//自定义一个Router,测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) Handler(request ziface.IRequest) {fmt.Println("call router handler...")//先读取客户端数据,再回写ping...ping...ping...fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))//回写pingerr := request.GetConnection().SendMsg(0, []byte("ping...ping...ping..."))if err != nil {fmt.Println(err)}
}//定义第二个Router
type HelloRouter struct {znet.BaseRouter
}func (hr *HelloRouter) Handler(request ziface.IRequest) {fmt.Println("receive from client msgId=", request.GetMsgID(),"data=", string(request.GetData()))err := request.GetConnection().SendMsg(1, []byte("hello zinx, I'm the other handler"))if err != nil {fmt.Println(err)}
}//连接创建成功之后需要执行的逻辑
func DoConnBegin(conn ziface.IConnection) {fmt.Println("=====>Do Conn Begin...")if err := conn.SendMsg(202, []byte("do connection begin...")); err != nil {fmt.Println("err")}//给conn设置属性conn.SetProperty("Name", "ziyi")conn.SetProperty("士兵突击", "https://www.bilibili.com/video/BV1Lk4y1N7tC/")
}//连接断开之前要执行的逻辑
func DoConnLost(conn ziface.IConnection) {fmt.Println("=====>Do Conn Lost...")fmt.Println("connID=", conn.GetConnectionID(), " is Lost....")//读取属性property, _ := conn.GetProperty("Name")fmt.Println("Get Property Name=", property)property, _ = conn.GetProperty("士兵突击")fmt.Println("Get Property 士兵突击=", property)
}func main() {s := znet.NewServer("[Zinx v0.10]")//添加自定义路由(PingRouter和HelloRouter)router0 := &PingRouter{}s.AddRouter(0, router0)router1 := &HelloRouter{}s.AddRouter(1, router1)//注册hook钩子函数s.SetOnConnStart(DoConnBegin)s.SetOnConnStop(DoConnLost)s.Serve()
}
myDemo/ZinxV0.10/Client.go
package mainimport ("fmt""io""myTest/zinx/znet""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("client start...")time.Sleep(time.Second * 1)//1 创建服务器连接conn, err := net.Dial("tcp", "127.0.0.1:8092")if err != nil {fmt.Println("client start err ", err)return}for {//发送封装后的数据包dp := znet.NewDataPack()binaryMsg, err := dp.Pack(znet.NewMessage(0, []byte("Zinx client0 test msg")))if err != nil {fmt.Println("client pack msg err ", err)return}if _, err := conn.Write(binaryMsg); err != nil {fmt.Println("client write err ", err)return}//服务器应该给我们回复一个message数据,msgId为1,内容为ping...ping...//1 先读取流中的head部分,得到Id和dataLenbinaryHead := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(conn, binaryHead); err != nil {fmt.Println("client read head err ", err)break}//将二进制的head拆包到msg中msgHead, err := dp.UnPack(binaryHead)if err != nil {fmt.Println("client unpack msgHead err ", err)break}if msgHead.GetMsgLen() > 0 {//2 有数据, 再根据dataLen进行二次读取,将data读出来msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(conn, msg.Data); err != nil {fmt.Println("read msg data error ", err)return}fmt.Println("--------> Receive Server msg , ID=", msg.Id, " ,len=", msg.DataLen, " ,data=", string(msg.Data))}//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片time.Sleep(time.Second * 1)}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/20262.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用火山云搜索ESCloud服务构建图文检索应用(以文搜图/以图搜图)

图文检索在生活中具有广泛的应用&#xff0c;常见的图片检索包括基于文本内容搜索和基于图片内容搜索。用户通过输入文字描述或上传图片就可以在海量的图片库中快速找到同款或者相似图片&#xff0c;这种搜索方式被广泛应用于电商、广告、设计以及搜索引擎等热门领域。 本文基…

在CSDN学Golang场景化解决方案(微服务架构设计)

一&#xff0c;聚合器微服务设计模式 在Golang微服务架构设计中&#xff0c;聚合器&#xff08;Aggregator&#xff09;微服务设计模式是一种常见的应用程序体系结构模式。该模式旨在简化客户端与后端微服务之间的通信&#xff0c;并支持更高级别的操作&#xff0c;例如聚合多…

Vue3文本省略(Ellipsis)

APIs 参数说明类型默认值必传maxWidth文本最大宽度number | string‘100%’falseline最大行数numberundefinedfalsetrigger展开的触发方式‘click’undefinedfalsetooltip是否启用文本提示框booleantruefalsetooltipMaxWidth提示框内容最大宽度&#xff0c;单位px&#xff0c;…

亚马逊云科技发布Amazon HealthScribe,使用生成式AI技术实现临床文档的自动生成

近日&#xff0c;亚马逊云科技在纽约峰会上推出了Amazon HealthScribe&#xff0c;该服务符合HIPAA&#xff08;《健康保险流通与责任法案》&#xff09;的相关要求&#xff0c;可为医疗软件供应商提供一种基于语音和文字识别的生成式AI技术&#xff0c;帮助其创建能够自动生成…

简-理解Python的装饰器、迭代器和生成器底层原理

装饰器、迭代器和生成器是 Python 中的高级功能&#xff0c;它们分别用于扩展函数或方法的功能、遍历容器元素和创建简洁的迭代器。 装饰器 装饰器是一个用于修改或扩展函数或方法的功能的函数。装饰器接受一个函数作为参数&#xff0c;并返回一个新的函数&#xff0c;新函数通…

APUE学习62章终端(二): stty命令特殊字符终端标志

1. stty命令 stty命令的英文解释: 很明显stty有一个-F参数 所以准确的说: stty命令是设置当前终端驱动程序(也有可能直接配置了硬件&#xff0c;这点目前不清楚)的属性&#xff0c;使当前终端的驱动程序能够使能/去使能一些特殊字符的识别与处理等等 2. stty命令的结构 3. 终端…

使用vuex让购物车联动

// 1.vuex点击加减触发函数提交仓库把我们请求的数据存到仓库 2.在仓库定义这个函数和对象 把我们存进去的数据存起来 // 3。在我们需要的页面拿出数据&#xff0c;然后循环就可以 // 4.当我们点击加号就触发函数然后在vuex对这个数据进行处理 // 5.对我们点进来的数据进行一个…

【SLAM】LoFTR知多少

1. LoFTR: Detector-Free Local Feature Matching with Transformers PAPER 论文 | LoFTR: Detector-Free Local Feature Matching with Transformers 代码 | CODE: 关键词 | detector-free, local feature matching LoFTR知多少 1. LoFTR: Detector-Free Local Feature M…

O3DE的Pass

Pass介绍 Pass是具有输入和输出的渲染过程。 在最终渲染帧中看到的每个细节都是通过一系列Pass&#xff08;前一个Pass的输出是下一个Pass的输入&#xff09;计算出来的。Pass可以生成图像&#xff08;作为纹理、缓冲区或渲染目标&#xff09;。每个图像都包含关于场景的特定…

不藏了!极狐GitLab 向你介绍一位研发效能「六边形战士」

怎么用数字说清研发效能&#xff1f; 总是觉得研发资源不够用&#xff1f; 高效工作全靠领导盯&#xff1f; 不问不知道项目推迟发布&#xff1f; 代码泄漏了才紧急采取措施&#xff1f; 是时候甩开这些“研发人的 PTSD”了&#x1f623; 极狐星&#xff0c;给专业的你更…

Unity Shader:常用的C#与shader交互的方法

俗话说久病成医&#xff0c;虽然不是专业技术美术&#xff0c;但代码写久了自然会积累一些常用的shader交互方法。零零散散的&#xff0c;总结如下&#xff1a; 1&#xff0c;改变UGUI的材质球属性 有时候我们需要改变ui的一些属性&#xff0c;从而实现想要的效果。通常UGUI上…

Spring如何通过三级缓存解决循环依赖问题?

目录 一、什么是Spring 二、循环依赖问题 三、三级缓存机制 四、如何通过三级缓存解决循环依赖问题 一、什么是Spring Spring框架是一个开源的Java应用程序开发框架&#xff0c;提供了一种全面的、一致的编程模型&#xff0c;用于构建企业级应用程序和服务。它由Rod Johnso…

多个List 合并变成一个List+一个List 根据某个字段相等的另一个字段相加,并排序变成新的List

List<CurveTimeAndValueDomain> curves new ArrayList<>();for (int i 0; i < columnNames.size(); i){if (columnNames.get(i).equals(PlantConstant.TENDOWNFX) || columnNames.get(i).equals(PlantConstant.TENDOWNQP)) {//10千伏以下 数据 进行缓慢处理cu…

代码随想录算法训练营第五十三天 | 1143.最长公共子序列、1035.不相交的线、53.最大子数组和

文章目录 一、1143.最长公共子序列二、1035.不相交的线三、最大子数组和 一、1143.最长公共子序列 题目链接 代码如下&#xff1a; class Solution { public:int longestCommonSubsequence(string text1, string text2) {vector<vector<int>> dp (text1.size() …

Stable Diffusion中人物生成相关的negative prompts

下面是常用的negative prompt&#xff0c;在使用stable Diffusion webui等工具生成时可以填入。 bad anatomy, bad proportions, blurry, cloned face, deformed, disfigured, duplicate, extra arms, extra fingers, extra limbs, extra legs, fused fingers, gross proporti…

深度学习(32)——CycleGAN(1)

深度学习&#xff08;32&#xff09;——CycleGAN&#xff08;1&#xff09; 文章目录 深度学习&#xff08;32&#xff09;——CycleGAN&#xff08;1&#xff09;1. GAN原理2. CycleGAN&#xff08;1&#xff09;原理&#xff08;2&#xff09;核心思想&#xff08;3&#xf…

git回退到指定版本

#首先使用该方法查看版本号&#xff0c;后面可以接-n,n为数量&#xff0c;指定展示前几个版本记录。 git log &#xff08;或 git reflog&#xff09;#xxx换成你想回退的版本号。 git reset --hard xxx#强制推送到远程仓库&#xff0c;让远程仓库版本号一致。 git push origin…

PtahDAO:全球首个DAO治理资产信托计划的金融平台

金融科技是当今世界最具创新力和影响力的领域之一&#xff0c;区块链技术作为金融科技的核心驱动力&#xff0c;正在颠覆传统的金融模式&#xff0c;为全球用户提供更加普惠、便捷、安全的金融服务。在这个变革的浪潮中&#xff0c;PtahDAO&#xff08;普塔道&#xff09;作为全…

优漫动游|前端程序员容易出错的基础知识

web全栈是目前比较流行的语言&#xff0c;因为前端较其他语言相比&#xff0c;简单好学&#xff0c;而且现在的互联网公司几乎缺不了web前端开发&#xff0c;行业的需求致使大量的人转型前端&#xff0c;对于刚学前端的同学来讲&#xff0c;他们缺乏开发经验&#xff0c;在项目…

【C++】类与对象(2)

文章目录 前言一、类的6个默认成员函数二、构造函数1.概念2.特性3.初始化列表 三、析构函数1.概念2.特性 四、拷贝构造函数1.概念2.特性 五、赋值运算符重载1.运算符重载2.赋值运算符重载3.前置和后置重载 六、取地址及const取地址操作符重载总结 前言 在前面&#xff0c;给大…