这里提到的endpointId是一个负载了数据的逻辑点,就像一根水管的出口,有新数据来就会根据后端记录的endpointId推送到用户正在查看的endpointId。用户没有正在查看的endpoint就不会有新数据推送。这里如果如果对endpoint加上权限就相当于实现对实时数据的准确推送。
main包
mian.go
package mainimport ("mcs/backend/core""mcs/backend/global"
)
func main() {global.DS_LOG.Info("Server Starting.......")core.RunWindowsServer()
}
core包
server.go
package coreimport ("context""fmt""mcs/backend/global"
)func RunWindowsServer() {
//初始化global.DS_ROUTER变量InitRouter()address := fmt.Sprintf(":%d", global.DS_CONFIG.System.Addr)global.DS_LOG.Infof("服务端口:%s", address)go global.DS_WS_MANAGER.Start()global.DS_ROUTER.Run(address)
}
router.go
package coreimport ("mcs/backend/controller/register""mcs/backend/global""mcs/backend/middleware""github.com/gin-gonic/gin"
)func InitRouter() {global.DS_ROUTER = gin.Default()rootGroup := global.DS_ROUTER.Group("api")publicGroup := rootGroup.Group("v1"){// 健康监测register.HealthRouter.InitHealthRouter(publicGroup)}privateGroup := rootGroup.Group("v1")privateGroup.Use(middleware.JWTAuth()){register.WebSocketRouter.InitWebsocketRouter(privateGroup)}global.DS_LOG.Info("路由注册完成")
}
api包
webSocket.go
package apiimport ("net/http""mcs/backend/global""github.com/gin-gonic/gin""github.com/gorilla/websocket"
)type webSocketApi struct{}func (wsa *webSocketApi) PingV2(c *gin.Context) {// 升级get请求为websocket协议ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)if err != nil {return}client := global.NewWSClient(ws)go client.Write()go client.Read()
}
global包
ws.go
package globalimport ("encoding/json""github.com/gorilla/websocket"uuid "github.com/satori/go.uuid""github.com/sirupsen/logrus"
)// ws客户端管理
type ClientManager struct {clients map[string]*Clientbroadcast chan []byteregister chan *Clientunregister chan *Client
}// ws客户端
type Client struct {Alive bool // 是否还存活id string // 客户端自身的id,多个客户端管理// UserId uint // 唯一标识客户端属于哪个请求的用户EndpointIdMap map[string]interface{} // 放当前该用户正在查看的endpoint,到时就只推送这个几个endpoint的新数据socket *websocket.Connsend chan []byte
}func NewWSClient(socket *websocket.Conn) *Client {client := &Client{socket: socket, id: uuid.NewV4().String(), send: make(chan []byte)}DS_LOG.Infof("New user with uuid %s", client.id)// 客户端注册DS_WS_MANAGER.register <- clientreturn client
}// 发送到前端的消息结构体,前端可以根据endpointId选择把数据推送到指定的位置
type WSMessage struct {EndpointId *string `json:"endpointId"`Code uint `json:"code"` // 消息代码Content []byte `json:"content"` // 消息内容
}func (manager *ClientManager) Start() {logrus.Info("Websocket manager start")for {select {case client := <-manager.register:client.Alive = truemanager.clients[client.id] = client// go func() {// time.Sleep(1 * time.Second)// msg := &Message{Code: UserCount, Content: manager.ClientsTotal()}// manager.Broadcast(msg)// }()case client := <-manager.unregister:if _, ok := manager.clients[client.id]; ok {DS_LOG.Infof("管道【%s】关闭", client.id)close(client.send)delete(manager.clients, client.id)// msg := &Message{Code: UserCount, Content: manager.ClientsTotal()}// manager.send(msg.JSON(), client)DS_LOG.Infof("管道【%s】已经关闭", client.id)}case message := <-manager.broadcast:msg := jsonUnmarshall(message)if msg.EndpointId != nil {for clientId := range manager.clients {if _, ok := manager.clients[clientId].EndpointIdMap[*msg.EndpointId]; ok {select {case manager.clients[clientId].send <- msg.Content:DS_LOG.Info("数据开始推送")default:/*logrus.Error("broadcast closed")close(conn.send)delete(manager.clients, conn)*/}}}}}}
}func (msg *WSMessage) JSONMarshal() []byte {c, _ := json.Marshal(&msg)return c
}func jsonUnmarshall(b []byte) WSMessage {msg := WSMessage{}json.Unmarshal(b, &msg)return msg
}func (manager *ClientManager) send(message []byte, ignore *Client) {for userId := range manager.clients {if manager.clients[userId] != ignore {manager.clients[userId].send <- message}}
}func (manager *ClientManager) Broadcast(msg *WSMessage) {select {// TODO: 需要为每个websocket管道配置单独的channelcase manager.broadcast <- msg.JSONMarshal():default:DS_LOG.Error("无法立即写入通道,协程结束")}
}func (manager *ClientManager) ClientsTotal() int {return len(manager.clients)
}const (UserOnline = 101 // 用户上线UserOffline = 102 // 用户下线UserCount = 103 // 用户总数NewMsg = 104 // 新消息
)func (c *Client) Write() {defer func() {DS_LOG.Infof("User:%s closed conn", c.id)c.socket.Close()}()for {select {case message, ok := <-c.send:if !ok {c.socket.WriteMessage(websocket.CloseMessage, []byte{})return}c.socket.WriteMessage(websocket.TextMessage, message)DS_LOG.Info("数据已经推送")}}
}func (c *Client) Read() {defer func() {DS_WS_MANAGER.unregister <- cc.socket.Close()}()for {t, b, err := c.socket.ReadMessage()if err != nil {DS_LOG.Error(err)DS_WS_MANAGER.unregister <- cc.socket.Close()break}DS_LOG.Info(t)DS_LOG.Info(string(b))endpointIdArr := EndpointIdList{}err = json.Unmarshal(b, &endpointIdArr)if err != nil {DS_LOG.Error("WS--READ ERR:", err)continue}// DS_LOG.Info("endpointArr=", endpointIdArr)endpointIdMap := make(map[string]interface{})for i := 0; i < len(endpointIdArr.EndpointIds); i++ {endpointIdMap[endpointIdArr.EndpointIds[i]] = nil}c.EndpointIdMap = endpointIdMap}
}type EndpointIdList struct {EndpointIds []string `json:"endpointIds"`
}
在其他包里使用
byteMsgData:=[]byte("ahdaasdsada")msg := global.WSMessage{EndpointId: &endpointId,Content: byteMsgData,}global.DS_WS_MANAGER.Broadcast(&msg)
这里的代码并不能复制之后直接使用,但是websocket部分已经全部在这