从0到1开发go-tcp框架【1-搭建server、封装连接与业务绑定、实现基础Router】
本期主要完成对Server的搭建、封装连接与业务绑定、实现基础Router(处理业务的部分)、抽取框架的全局配置文件
- 从配置文件中读取数据(服务器监听端口、监听IP等),通过自定义Router完成具体业务操作
第一版最终项目结构:
1 搭建基础server[V1.0]
1.1 编写server端
- 编写iserver.go,用于定义server的接口
- 编写server.go,定义server结构体,并实现接口
①/zinx/ziface/iserver.go:
package zifacetype IServer interface {Start()Stop()Serve()
}
②/zinx/znet/server.go
package znetimport ("fmt""net"
)type Server struct {Name stringIPVersion stringIP stringPort int
}func NewServer(name string) *Server {s := &Server{Name: name,IPVersion: "tcp4",IP: "0.0.0.0",Port: 8090,}return s
}func (s *Server) Start() {//启动服务监听端口fmt.Printf("[start] Server listener at IP:%s, Port %d is starting\n", s.IP, s.Port)go func() {addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Printf("resolve tcp addr error %v\n", err)return}listener, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen ", s.IPVersion, " err ", err)return}fmt.Println("[start] Zinx server success ", s.Name, "Listening...")//阻塞连接,处理业务for {conn, err := listener.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}//处理业务:回显消息go func() {for {buf := make([]byte, 512)cnt, err := conn.Read(buf)if err != nil {fmt.Println("read buf err ", err)continue}fmt.Printf("receive client buf %s, cnt %d \n", buf, cnt)//回显读取到的字节数if _, err := conn.Write(buf[:cnt]); err != nil {fmt.Println("write buf err ", err)continue}}}()}}()
}func (s *Server) Stop() {}func (s *Server) Serve() {s.Start()//阻塞,一直读取客户端所发送过来的消息select {}
}
1.2 测试server端功能
①创建Server.go和Client.go
- 编写myDemo/zinxV1.0/Server.go
package mainimport "myTest/zinx/znet"func main() {s := znet.NewServer("[Zinx v1.0]")s.Serve()
}
- 编写myDemo/zinxV1.0/Client.go
package mainimport ("fmt""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("client start...")time.Sleep(time.Second * 1)//1 创建服务器连接conn, err := net.Dial("tcp", "127.0.0.1:8090")if err != nil {fmt.Println("client start err ", err)return}for {//2 调用连接向服务器发数据_, err := conn.Write([]byte("Hello Zinx v0.1"))if err != nil {fmt.Println("write conn err ", err)return}// 3 读取服务器返回的数据buf := make([]byte, 512)cnt, err := conn.Read(buf)if err != nil {fmt.Println("client read buf err ", err)return}fmt.Printf("server call back:%s, cnt=%d\n", buf, cnt)//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片time.Sleep(time.Second * 1)}
}
②测试结果
可以看到每隔1秒服务器就从客户端接受到数据并回显
2 封装连接conn、业务绑定[V2.0]
V0.1版本我们已经实现了了⼀一个基础的Server框架,现在我们需要对客户端链接和不不同的客户端链接所处 理理的不不同业务再做⼀一层接⼝口封装,当然我们先是把架构搭建起来。
现在在 ziface 下创建⼀一个属于链接的接⼝口⽂文件 iconnection.go ,当然他的实现⽂文件我们放在 znet 下的 connection.go 中。
需要的方法:
- 启动连接
- 停止连接
- 得到连接的conn对象
- 得到连接的id
- 得到客户端连接的地址和端口
- 发送数据的方法
- 连接所绑定的处理业务的函数
2.1 封装Conn
- 定义iconnection接口
- 创建connection结构体并实现iconnection
- 创建/zinx/ziface/iconnection.go:
package zifaceimport "net"type IConnection interface {//启动连接Start()//停止连接Stop()//获取当前连接的Conn对象GetTCPConnection() *net.TCPConn//获取当前连接模块的idGetConnectionID() uint32//获取远程客户端的TCP状态 IP:PortRemoteAddr() net.Addr//发送数据Send()
}//定义一个处理连接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error
- 创建/zinx/znet/connection.go
package znetimport ("fmt""myTest/zinx/ziface""net"
)type Connection struct {Conn *net.TCPConnConnID uint32isClosed boolhandleAPI ziface.HandleFunc//告知当前的连接已经退出ExitChan chan bool
}func NewConnection(conn *net.TCPConn, connID uint32, callback_api ziface.HandleFunc) *Connection {c := &Connection{Conn: conn,ConnID: connID,handleAPI: callback_api,isClosed: false,ExitChan: make(chan bool, 1),}return c
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("connID=", c.ConnID, "Reader is exit, remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {buf := make([]byte, 512)cnt, err := c.Conn.Read(buf)if err != nil {fmt.Printf("connID %d receive buf err %s\n", c.ConnID, err)continue}//调用当前所绑定的处理业务的方法HandleAPIif err := c.handleAPI(c.Conn, buf, cnt); err != nil {fmt.Println("ConnID", c.ConnID, " handle is err ", err)break}}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)go c.StartReader()
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = truec.Conn.Close()close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) Send() {}
2.2 修改server.go(通过封装的conn实现处理业务)
将修改server.go,添加CallBackToClient方法,用于实现具体业务
将ZinxV1.0版本中的server.go的处理业务逻辑部分更换为封装后的Conn来调用
全部代码:
/zinx/znet/server.go:
package znetimport ("fmt""github.com/kataras/iris/v12/x/errors""net"
)type Server struct {Name stringIPVersion stringIP stringPort int
}func NewServer(name string) *Server {s := &Server{Name: name,IPVersion: "tcp4",IP: "0.0.0.0",Port: 8090,}return s
}//定义当前客户端连接所绑定的handleAPI(暂时写死处理业务逻辑:数据回显)
func CallBackToClient(conn *net.TCPConn, data []byte, cnt int) error {fmt.Println("[Conn handle] CallBackToClient....")if _, err := conn.Write(data[:cnt]); err != nil {fmt.Println("write buf err ", err)return errors.New("CallBackToClient error")}return nil
}func (s *Server) Start() {//启动服务监听端口fmt.Printf("[start] Server listener at IP:%s, Port %d is starting\n", s.IP, s.Port)go func() {addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Printf("resolve tcp addr error %v\n", err)return}listener, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen ", s.IPVersion, " err ", err)return}fmt.Println("[start] Zinx server success ", s.Name, "Listening...")//阻塞连接,处理业务for {conn, err := listener.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}var cid uint32 = 0dealConn := NewConnection(conn, cid, CallBackToClient)cid++//开启goroutine处理启动当前conngo dealConn.Start()处理业务:回显消息//go func() {// for {// buf := make([]byte, 512)// cnt, err := conn.Read(buf)// if err != nil {// fmt.Println("read buf err ", err)// continue// }// fmt.Printf("receive client buf %s, cnt %d \n", buf, cnt)// //回显读取到的字节数// if _, err := conn.Write(buf[:cnt]); err != nil {// fmt.Println("write buf err ", err)// continue// }// }////}()}}()
}func (s *Server) Stop() {}func (s *Server) Serve() {s.Start()//阻塞,一直读取客户端所发送过来的消息select {}
}
2.3 测试ZinxV2.0功能
①修改Server.go和Client.go的日志打印
创建/myDemo/ZinxV2.0/Client.go和/myDemo/ZinxV2.0/Server.go,这部分测试代码和V1.0没有区别,将打印日志换成Zinx2.0即可
- Client.go
package mainimport ("fmt""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("client start...")time.Sleep(time.Second * 1)//1 创建服务器连接conn, err := net.Dial("tcp", "127.0.0.1:8090")if err != nil {fmt.Println("client start err ", err)return}for {//2 调用连接向服务器发数据_, err := conn.Write([]byte("Hello Zinx v0.2"))if err != nil {fmt.Println("write conn err ", err)return}// 3 读取服务器返回的数据buf := make([]byte, 512)cnt, err := conn.Read(buf)if err != nil {fmt.Println("client read buf err ", err)return}fmt.Printf("server call back:%s, cnt=%d\n", buf, cnt)//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片time.Sleep(time.Second * 1)}
}
- Server.go
package mainimport "myTest/zinx/znet"func main() {s := znet.NewServer("[Zinx v2.0]")s.Serve()
}
②测试结果
3 实现基础Router[V3.0]
3.1 Request请求封装
将连接和数据绑定在一起
zinx/ziface/irequest.go:
package zifaceimport "net"type IRequest interface {GetConnection() *net.TCPConnGetData() []byte
}
zinx/znet/request.go:
package znetimport "net"type Request struct {conn *net.TCPConndata []byte
}func (r *Request) GetConnection() *net.TCPConn {return r.conn
}func (r *Request) GetData() []byte {return r.data
}
3.2 Router模块
zinx/ziface/irouter.go
package zifacetype IRouter interface {//处理请求之前的方法PreHandle(request IRequest)Handler(request IRequest)//处理请求之后的方法PostHandler(request IRequest)
}
zinx/znet/router.go
package znetimport "myTest/zinx/ziface"type BaseRouter struct {
}//这里做了空实现,直接让后续Router继承BaseRouter,然后根据需要重写对应方法即可
func (br *BaseRouter) PreHandle(request ziface.IRequest) {}
func (br *BaseRouter) Handler(request ziface.IRequest) {}func (br *BaseRouter) PostHandler(request ziface.IRequest) {}
3.3 框架集成router模块
- 取消znet/server.go中的HandlerFunc模块,改为Router。server.go中添加Router属性
- 将znet/connection.go中的callback_api ziface.HandleFunc参数改为Router
zinx/znet/connection.go
package znetimport ("fmt""myTest/zinx/ziface""net"
)type Connection struct {Conn *net.TCPConnConnID uint32isClosed bool//告知当前的连接已经退出ExitChan chan boolRouter ziface.IRouter
}func NewConnection(conn *net.TCPConn, connID uint32, router ziface.IRouter) *Connection {c := &Connection{Conn: conn,ConnID: connID,Router: router,isClosed: false,ExitChan: make(chan bool, 1),}return c
}func (c *Connection) StartReader() {fmt.Println("reader goroutine is running...")defer fmt.Println("connID=", c.ConnID, "Reader is exit, remote addr is ", c.RemoteAddr().String())defer c.Stop()//读取数据for {buf := make([]byte, 512)_, err := c.Conn.Read(buf)if err != nil {fmt.Printf("connID %d receive buf err %s\n", c.ConnID, err)continue}//封装请求,改为router处理r := Request{conn: c.Conn,data: buf,}go func(request ziface.IRequest) {c.Router.PreHandle(request)c.Router.Handler(request)c.Router.PostHandler(request)}(&r)}
}//启动连接
func (c *Connection) Start() {fmt.Printf("ConnID %d is Start...", c.ConnID)go c.StartReader()
}//停止连接
func (c *Connection) Stop() {fmt.Println("Connection Stop()...ConnectionID = ", c.ConnID)if c.isClosed {return}c.isClosed = truec.Conn.Close()close(c.ExitChan)
}//获取当前连接的Conn对象
func (c *Connection) GetTCPConnection() *net.TCPConn {return c.Conn
}//获取当前连接模块的id
func (c *Connection) GetConnectionID() uint32 {return c.ConnID
}//获取远程客户端的TCP状态 IP:Port
func (c *Connection) RemoteAddr() net.Addr {return c.Conn.RemoteAddr()
}//发送数据
func (c *Connection) Send() {}
zinx/znet/server.go
package znetimport ("fmt""myTest/zinx/ziface""net"
)type Server struct {Name stringIPVersion stringIP stringPort intRouter ziface.IRouter
}func NewServer(name string) *Server {s := &Server{Name: name,IPVersion: "tcp4",IP: "0.0.0.0",Port: 8090,Router: nil,}return s
}func (s *Server) Start() {//启动服务监听端口fmt.Printf("[start] Server listener at IP:%s, Port %d is starting\n", s.IP, s.Port)go func() {addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))if err != nil {fmt.Printf("resolve tcp addr error %v\n", err)return}listener, err := net.ListenTCP(s.IPVersion, addr)if err != nil {fmt.Println("listen ", s.IPVersion, " err ", err)return}fmt.Println("[start] Zinx server success ", s.Name, "Listening...")//阻塞连接,处理业务for {conn, err := listener.AcceptTCP()if err != nil {fmt.Println("Accept err ", err)continue}var cid uint32 = 0dealConn := NewConnection(conn, cid, s.Router)cid++//开启goroutine处理启动当前conngo dealConn.Start()}}()
}func (s *Server) Stop() {}func (s *Server) Serve() {s.Start()//阻塞,一直读取客户端所发送过来的消息select {}
}func (s *Server) AddRouter(router ziface.IRouter) {s.Router = router
}
测试框架集成router效果
myDemo/ZinxV3.0/client.go
package mainimport ("fmt""net""time"
)/*
模拟客户端
*/
func main() {fmt.Println("client start...")time.Sleep(time.Second * 1)//1 创建服务器连接conn, err := net.Dial("tcp", "127.0.0.1:8090")if err != nil {fmt.Println("client start err ", err)return}for {//2 调用连接向服务器发数据_, err := conn.Write([]byte("Hello Zinx v0.3"))if err != nil {fmt.Println("write conn err ", err)return}// 3 读取服务器返回的数据buf := make([]byte, 512)cnt, err := conn.Read(buf)if err != nil {fmt.Println("client read buf err ", err)return}fmt.Printf("server call back:%s, cnt=%d\n", buf, cnt)//cpu阻塞,让出cpu时间片,避免无限for循环导致其他程序无法获取cpu时间片time.Sleep(time.Second * 1)}
}
myDemo/ZinxV3.0/server.go
package mainimport ("fmt""myTest/zinx/ziface""myTest/zinx/znet"
)//自定义一个Router,测试路由功能
type PingRouter struct {znet.BaseRouter
}func (pr *PingRouter) PreHandle(request ziface.IRequest) {_, err := request.GetConnection().Write([]byte("pre handle success..."))if err != nil {fmt.Println("server call pre handle err ", err)return}fmt.Println("server call pre handle...")
}func (pr *PingRouter) Handler(request ziface.IRequest) {_, err := request.GetConnection().Write([]byte("handle success..."))if err != nil {fmt.Println("server call handle err ", err)return}fmt.Println("server call handler....")
}func (pr *PingRouter) PostHandler(request ziface.IRequest) {_, err := request.GetConnection().Write([]byte("post handle success..."))if err != nil {fmt.Println("server call post handle err ", err)return}fmt.Println("server call post handler...")
}func main() {s := znet.NewServer("[Zinx v3.0]")//添加自定义路由router := &PingRouter{}s.AddRouter(router)s.Serve()
}
最终效果:
按照模板方法设计模式,完成了调用
4 抽取全局配置文件[V4.0]
4.1 编写/zinx/util/globalobj.go
主要用于读取zinx配置文件的信息
package utilimport ("encoding/json""io/ioutil""myTest/zinx/ziface"
)type GlobalObj struct {TCPServer ziface.IServer //当前全局Zinx的server对象Host string //当前服务器主机监听的ipTcpPort int //当前服务器主机监听的端口号Name string //当前服务器的名称Version string //当前Zinx的版本号MaxConn int //当前服务器所允许的最大连接数MaxPackageSize uint32 //当前Zinx框架数据包的最大值
}var GlobalObject *GlobalObj//从配置文件中重新加载GlobalObject的信息
func (g *GlobalObj) Reload() {data, err := ioutil.ReadFile("conf/zinx.json")if err != nil {panic(err)}//将json文件数据解析到struct中err = json.Unmarshal(data, &GlobalObject)if err != nil {panic(err)}
}//在其他文件导入该util包的时候会加载init
func init() {GlobalObject = &GlobalObj{Name: "ZinxServerApp",Version: "V0.4",TcpPort: 8090,Host: "0.0.0.0",MaxConn: 120,MaxPackageSize: 4096,}//尝试从conf/zinx.json中去加载用户自定义的参数GlobalObject.Reload()
}
4.2 替换之前server.go中的硬编码
包括/zinx/znet/server.go和/zinx/znet/connection.go部分
- server:
- connection:
4.3 测试
编写myDemo/ZinxV4.0
- 并且编写对应的.json配置文件(Client.go与Server.go都与V3.0一样)
zinx.json
{"Name": "Zinx Server Application","Version": "V0.4","Host": "0.0.0.0","TcpPort": 8091,"MaxConn": 30,"MaxPackageSize": 1024
}
最后效果:
参考:https://www.yuque.com/aceld/npyr8s/bgftov