目录
1、Overview
2、监听服务接口
3、easy/servers package
3.1、基础类Server
3.2、WWServer 服务
3.3、TcpServer
3.4、KCPServer
4、hookAgent链接钩子
5、创建一个WebScoket监听服务
1、Overview
本节主要介绍,easy的监听服务。例如websocket 监听,tcp监听,kcp监听服务等。每个类型的监听可以多次实例,用以监听多个端口;每个实例可以绑定自己的路由器,编码解码器等。
2、监听服务接口
import ("github.com/slclub/easy/route""github.com/slclub/easy/vendors/option"
)type ListenServer interface {Init(assignment option.Assignment)OnInit()Router() route.RouterStart()Hook() *hookAgent // agent 链接 回调//OnClose(func())Close()
}
3、easy/servers package
3.1、基础类Server
WSServer(WebSocket) 和 TCPServer(TcpSocket)等监听服务都是基于Server实现的。
- Import
import ("crypto/tls""errors""github.com/slclub/easy/log""github.com/slclub/easy/nets/agent""github.com/slclub/easy/nets/conns""github.com/slclub/easy/route""github.com/slclub/easy/route/bind""github.com/slclub/easy/route/encode""github.com/slclub/easy/typehandle""github.com/slclub/easy/vendors/option""io""net""sync""time"
)
- Class
/*** 基类 监听服务*/
type Server struct {agent.Gateln net.Listenerbox *ConnBoxrouter route.Routerhook *hookAgentconnOption *conns.Option
}
成员:
box (*ConnBox)链接存储盒子;
router(route.Router)路由器;
hook (*hookAgent)链接阶段的事件;
connOption(*conns.Option)conns选项;
- Methods
func (self *Server) Init(assignment option.Assignment) {//self.Gate.Init(gate)assignment.Target(&self.Gate)assignment.Default(option.OptionFunc(func() (string, any) {return "Protocol", typehandle.EnCriPT_DATA_PROTOBUF}),option.OptionFunc(func() (string, any) {return "MaxConnNum", 100}),option.OptionFunc(func() (string, any) {return "PendingWriteNum", 100}),option.OptionFunc(func() (string, any) {return "MaxMsgLen", 4096}),option.OptionFunc(func() (string, any) {return "HTTPTimeout", 10 * time.Second}),option.OptionFunc(func() (string, any) {return "MsgDigit", conns.CONST_MSG_DIGIT}),)//if self.Protocol == "" {// self.Protocol = typehandle.EnCriPT_DATA_PROTOBUF//}assignment.Apply()self.defaultRouteEncripty()self.Router().Encoder().LittleEndian(self.LittleEndian)
}func (self *Server) startBefore() {self.connOption = &conns.Option{Encrypt: self.Router().Encoder(),MaxMsgLen: self.MaxMsgLen,MinMsgLen: 2,MsgDigit: self.MsgDigit,}ln, err := net.Listen("tcp", self.Addr)if err != nil {log.Fatal("Listen error: %v", err)}if self.CertFile != "" || self.KeyFile != "" {config := &tls.Config{}config.NextProtos = []string{"http/1.1"}var err errorconfig.Certificates = make([]tls.Certificate, 1)config.Certificates[0], err = tls.LoadX509KeyPair(self.CertFile, self.KeyFile)if err != nil {log.Fatal("%v", err)}ln = tls.NewListener(ln, config)}self.ln = ln// 链接容器self.box = &ConnBox{server: self,conns: make(ConnSet),mutex: sync.Mutex{},}
}func (self *Server) defaultRouteEncripty() {switch self.Protocol {case typehandle.ENCRIPT_DATA_JSON:self.router.Binding(bind.NewBindJson(self.router.PathMap()), encode.NewJson(self.router.PathMap()))default:self.router.Binding(bind.NewBindProto(self.router.PathMap()), encode.NewProtobuf(self.router.PathMap()))}
}func (self *Server) Close() {// 利用钩子优雅关闭self.hook.EmitWithKey(CONST_SERVER_CLOSE, nil)// 关闭监听self.ln.Close()// 关闭哦链接池self.box.Close()}func (self *Server) Router() route.Router {return self.router
}func (self *Server) Hook() *hookAgent {return self.hook
}
3.2、WWServer 服务
WebSocket Server 服务源码;
WebSocket是用http升级转换而来;链接的关键是使用http.Handler接口;
- http.Handler接口
type Handler interface {ServeHTTP(ResponseWriter, *Request)
}
- WebSocket Server 服务源码
package serversimport ("github.com/gorilla/websocket""github.com/slclub/easy/log""github.com/slclub/easy/nets/agent""github.com/slclub/easy/nets/conns""github.com/slclub/easy/route""net/http"
)/*** web socket 监听服务*/
type WSServer struct {Serverupgrader websocket.Upgrader
}func NewWSServer() *WSServer {ser := Server{router: route.NewRouter(),hook: newHookAgent(),}return &WSServer{Server: ser,}
}func (self *WSServer) Start() {self.startBefore()handleHttp := &WebSocketHandle{server: self,handle: dealHandle(&self.Server),}httpServer := &http.Server{Addr: self.Addr,Handler: handleHttp,ReadTimeout: self.HTTPTimeout,WriteTimeout: self.HTTPTimeout,MaxHeaderBytes: 1024,}go httpServer.Serve(self.ln)
}var _ ListenServer = &WSServer{}// ------------------------------------------------------------
// serverHttpHandle
type WebSocketHandle struct {server *WSServerhandle agent.AgentHandle
}func (self *WebSocketHandle) ServeHTTP(w http.ResponseWriter, r *http.Request) {if r.Method != "GET" {http.Error(w, "Method not allowed", 405)return}conn, err := self.server.upgrader.Upgrade(w, r, nil)if err != nil {log.Debug("upgrade error: %v", err)return}conn.SetReadLimit(int64(self.server.MaxMsgLen))if err = self.server.box.Add(conn); err != nil {conn.Close()log.Error("", err)return}wsConn := conns.NewWSConn(conn, self.server.connOption, self.server.PendingWriteNum, self.server.MaxMsgLen)ag := agent.NewAgent(wsConn)self.server.hook.EmitWithKey(CONST_AGENT_NEW, ag)ag.LoopRecv(self.handle)ag.Close()//ag.OnClose()self.server.hook.EmitWithKey(CONST_AGENT_CLOSE, ag)self.server.box.Del(conn)
}// function handle 路由分发
func dealHandle(serv *Server) agent.AgentHandle {return func(data []byte, ag agent.Agent) {msg, err := serv.Router().Encoder().Unmarshal(data)if err != nil {log.Debug("unmarshal message error: %v", err)return}err = serv.Router().Route(msg, ag)if err != nil {log.Debug("route message error: %v", err)return}}
}
3.3、TcpServer
源码
package serversimport ("github.com/slclub/easy/log""github.com/slclub/easy/nets/agent""github.com/slclub/easy/nets/conns""github.com/slclub/easy/route""net""time"
)type TCPServer struct {Server
}func NewTCPServer() *TCPServer {ser := Server{router: route.NewRouter(),hook: newHookAgent(),}return &TCPServer{Server: ser,}
}func (self *TCPServer) Start() {self.startBefore()self.run()
}func (self *TCPServer) run() {var tempDelay time.Durationfor {conn, err := self.ln.Accept()if err != nil {if ne, ok := err.(net.Error); ok && ne.Temporary() {if tempDelay == 0 {tempDelay = 5 * time.Millisecond} else {tempDelay *= 2}if max := 1 * time.Second; tempDelay > max {tempDelay = max}log.Release("accept error: %v; retrying in %v", err, tempDelay)time.Sleep(tempDelay)continue}return}tempDelay = 0if self.box.Len() >= self.MaxConnNum {conn.Close()log.Debug("EASY.TCP too many connections")continue}self.box.Add(conn)tcpConn := conns.NewTCPConn(conn, self.PendingWriteNum, self.connOption)ag := agent.NewAgent(tcpConn)go func() {self.hook.EmitWithKey(CONST_AGENT_NEW, ag)ag.LoopRecv(dealHandle(&self.Server))ag.Close()//ag.OnClose()self.hook.EmitWithKey(CONST_AGENT_CLOSE, ag)self.box.Del(conn)// cleanup}()}
}
3.4、KCPServer
待完成
4、hookAgent链接钩子
- 钩子
const (CONST_AGENT_CLOSE = "CloseAgent"CONST_AGENT_NEW = "NewAgent"CONST_SERVER_CLOSE = "CloseServer" // running your defined functions where you closed the listening server
)
- 源码
type hookAgent struct {hook map[string][]typehandle.AgentHandle
}func newHookAgent() *hookAgent {return &hookAgent{hook: make(map[string][]typehandle.AgentHandle),}
}func (self *hookAgent) Append(hkey string, handle typehandle.AgentHandle) {if self.hook[hkey] == nil {self.hook[hkey] = []typehandle.AgentHandle{}}self.hook[hkey] = append(self.hook[hkey], handle)
}func (self *hookAgent) EmitWithKey(hkey string, ag agent.Agent) {if nil == self.hook[hkey] || len(self.hook[hkey]) == 0 {return}for _, fn := range self.hook[hkey] {fn(ag)}
}
5、创建一个WebScoket监听服务
server1 = servers.NewWSServer()server1.Init(option.OptionWith(&agent.Gate{Addr: ":" + strconv.Itoa(ListenPort),Protocol: typehandle.ENCRIPT_DATA_JSON,PendingWriteNum: 2000,LittleEndian: true,MaxConnNum: 20000,}).Default(option.DEFAULT_IGNORE_ZERO))