golang --gin+websocket实现指定的数据点推送

这里提到的endpointId是一个负载了数据的逻辑点,就像一根水管的出口,有新数据来就会根据后端记录的endpointId推送到用户正在查看的endpointId。用户没有正在查看的endpoint就不会有新数据推送。这里如果如果对endpoint加上权限就相当于实现对实时数据的准确推送。

main包

mian.go

package mainimport ("mcs/backend/core""mcs/backend/global"
)
func main() {global.DS_LOG.Info("Server Starting.......")core.RunWindowsServer()
}

core包

server.go

package coreimport ("context""fmt""mcs/backend/global"
)func RunWindowsServer() {
//初始化global.DS_ROUTER变量InitRouter()address := fmt.Sprintf(":%d", global.DS_CONFIG.System.Addr)global.DS_LOG.Infof("服务端口:%s", address)go global.DS_WS_MANAGER.Start()global.DS_ROUTER.Run(address)
}

router.go

package coreimport ("mcs/backend/controller/register""mcs/backend/global""mcs/backend/middleware""github.com/gin-gonic/gin"
)func InitRouter() {global.DS_ROUTER = gin.Default()rootGroup := global.DS_ROUTER.Group("api")publicGroup := rootGroup.Group("v1"){// 健康监测register.HealthRouter.InitHealthRouter(publicGroup)}privateGroup := rootGroup.Group("v1")privateGroup.Use(middleware.JWTAuth()){register.WebSocketRouter.InitWebsocketRouter(privateGroup)}global.DS_LOG.Info("路由注册完成")
}

api包

webSocket.go

package apiimport ("net/http""mcs/backend/global""github.com/gin-gonic/gin""github.com/gorilla/websocket"
)type webSocketApi struct{}func (wsa *webSocketApi) PingV2(c *gin.Context) {// 升级get请求为websocket协议ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)if err != nil {return}client := global.NewWSClient(ws)go client.Write()go client.Read()
}

global包

ws.go

package globalimport ("encoding/json""github.com/gorilla/websocket"uuid "github.com/satori/go.uuid""github.com/sirupsen/logrus"
)// ws客户端管理
type ClientManager struct {clients    map[string]*Clientbroadcast  chan []byteregister   chan *Clientunregister chan *Client
}// ws客户端
type Client struct {Alive bool   // 是否还存活id    string // 客户端自身的id,多个客户端管理// UserId        uint                   // 唯一标识客户端属于哪个请求的用户EndpointIdMap map[string]interface{} // 放当前该用户正在查看的endpoint,到时就只推送这个几个endpoint的新数据socket        *websocket.Connsend          chan []byte
}func NewWSClient(socket *websocket.Conn) *Client {client := &Client{socket: socket, id: uuid.NewV4().String(), send: make(chan []byte)}DS_LOG.Infof("New user with uuid %s", client.id)// 客户端注册DS_WS_MANAGER.register <- clientreturn client
}// 发送到前端的消息结构体,前端可以根据endpointId选择把数据推送到指定的位置
type WSMessage struct {EndpointId *string `json:"endpointId"`Code       uint    `json:"code"`    // 消息代码Content    []byte  `json:"content"` // 消息内容
}func (manager *ClientManager) Start() {logrus.Info("Websocket manager start")for {select {case client := <-manager.register:client.Alive = truemanager.clients[client.id] = client// go func() {// 	time.Sleep(1 * time.Second)// 	msg := &Message{Code: UserCount, Content: manager.ClientsTotal()}// 	manager.Broadcast(msg)// }()case client := <-manager.unregister:if _, ok := manager.clients[client.id]; ok {DS_LOG.Infof("管道【%s】关闭", client.id)close(client.send)delete(manager.clients, client.id)// msg := &Message{Code: UserCount, Content: manager.ClientsTotal()}// manager.send(msg.JSON(), client)DS_LOG.Infof("管道【%s】已经关闭", client.id)}case message := <-manager.broadcast:msg := jsonUnmarshall(message)if msg.EndpointId != nil {for clientId := range manager.clients {if _, ok := manager.clients[clientId].EndpointIdMap[*msg.EndpointId]; ok {select {case manager.clients[clientId].send <- msg.Content:DS_LOG.Info("数据开始推送")default:/*logrus.Error("broadcast closed")close(conn.send)delete(manager.clients, conn)*/}}}}}}
}func (msg *WSMessage) JSONMarshal() []byte {c, _ := json.Marshal(&msg)return c
}func jsonUnmarshall(b []byte) WSMessage {msg := WSMessage{}json.Unmarshal(b, &msg)return msg
}func (manager *ClientManager) send(message []byte, ignore *Client) {for userId := range manager.clients {if manager.clients[userId] != ignore {manager.clients[userId].send <- message}}
}func (manager *ClientManager) Broadcast(msg *WSMessage) {select {// TODO: 需要为每个websocket管道配置单独的channelcase manager.broadcast <- msg.JSONMarshal():default:DS_LOG.Error("无法立即写入通道,协程结束")}
}func (manager *ClientManager) ClientsTotal() int {return len(manager.clients)
}const (UserOnline  = 101 // 用户上线UserOffline = 102 // 用户下线UserCount   = 103 // 用户总数NewMsg      = 104 // 新消息
)func (c *Client) Write() {defer func() {DS_LOG.Infof("User:%s closed conn", c.id)c.socket.Close()}()for {select {case message, ok := <-c.send:if !ok {c.socket.WriteMessage(websocket.CloseMessage, []byte{})return}c.socket.WriteMessage(websocket.TextMessage, message)DS_LOG.Info("数据已经推送")}}
}func (c *Client) Read() {defer func() {DS_WS_MANAGER.unregister <- cc.socket.Close()}()for {t, b, err := c.socket.ReadMessage()if err != nil {DS_LOG.Error(err)DS_WS_MANAGER.unregister <- cc.socket.Close()break}DS_LOG.Info(t)DS_LOG.Info(string(b))endpointIdArr := EndpointIdList{}err = json.Unmarshal(b, &endpointIdArr)if err != nil {DS_LOG.Error("WS--READ ERR:", err)continue}// DS_LOG.Info("endpointArr=", endpointIdArr)endpointIdMap := make(map[string]interface{})for i := 0; i < len(endpointIdArr.EndpointIds); i++ {endpointIdMap[endpointIdArr.EndpointIds[i]] = nil}c.EndpointIdMap = endpointIdMap}
}type EndpointIdList struct {EndpointIds []string `json:"endpointIds"`
}

在其他包里使用

byteMsgData:=[]byte("ahdaasdsada")msg := global.WSMessage{EndpointId: &endpointId,Content:    byteMsgData,}global.DS_WS_MANAGER.Broadcast(&msg)

这里的代码并不能复制之后直接使用,但是websocket部分已经全部在这

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/625556.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

8个 Python 开发者必备的 PyCharm 插件

这8个顶级插件保证了更快、更轻松、更愉悦的开发过程。 在 PyCharm 插件列表中&#xff0c;我们发现了几个瑰宝插件&#xff0c;它们各自以独特的方式帮助开发者快速、简便、愉悦地开发。 今天我就给大家逐个介绍它们。 1. Key Promoter X 【下载链接】&#xff1a;https://…

01-15网络编程-XML

网络编程Web服务器XML 网络编程 项目架构&#xff1a; C[Client] /S[Server- Service] -访问这个程序时用客户端 缺点&#xff1a; 1.用户需要单独安装客户端&#xff0c; 2.客户端升级了用于需要重新更新不能跨平台: 3.不同的操作系统都需要相应版本的程序 优点&#xff1a;…

AtCoder abc336 A~D题解

A. 题目翻译&#xff1a; 对于正整数 X X X 级别的龙串&#xff0c; X X X 是长度为 ( X 3 ) (X3) (X3) 的字符串&#xff0c;由按此顺序排列的 o、n 和 g 的一次L、 X X X次出现形成。 你得到一个正整数 N N N。打印 N N N 级的龙串。 分析 按题目要求做即可……&am…

Openlayer【四】—— 控件

控件 控件是一个可见的小部件&#xff0c;其 DOM 元素位于 屏幕。它们可以涉及用户输入&#xff08;按钮&#xff09;&#xff0c;也可以仅供参考; 位置是使用 CSS 确定的。默认情况下&#xff0c;它们位于 容器&#xff0c;但可以使用 任何外部 DOM 元素。 其中ol/control是…

GO基础进阶篇 (十二)、反射

什么是反射 Go语言中的反射是指在程序运行时检查程序的结构&#xff0c;比如变量的类型、方法的签名等。Go语言的反射包是reflect。通过反射&#xff0c;你可以动态地检查类型信息、获取字段和方法、调用方法等。 反射可以在运行时动态获取变量的各种信息&#xff0c;比如变量…

linux系统nginx工具的一些应用

nginx高级应用 虚拟目录监控模块配置文件创建用户名密码客户端访问 限制传输速度&#xff08;服务层&#xff09; nginx配置文件中的每个语句要以 ; 结尾 虚拟目录 配置文件中的server块中编辑&#xff1a;location /test {alias /usr/share/nginx/html; //映射的是/usr…

定时器中断控制的独立式键盘扫描实验

#include<reg51.h> //包含51单片机寄存器定义的头文件 sbit S1P1^4; //将S1位定义为P1.4引脚 sbit S2P1^5; //将S2位定义为P1.5引脚 sbit S3P1^6; //将S3位定义为P1.6引脚 sbit S4P1^7; //将S4位定义为P1.7引脚 unsigned char keyval; /…

知识笔记(八十)———链式语句中join用法

join通常有下面几种类型&#xff0c;不同类型的join操作会影响返回的数据结果。 INNER JOIN: 等同于 JOIN&#xff08;默认的JOIN类型&#xff09;,如果表中有至少一个匹配&#xff0c;则返回行LEFT JOIN: 即使右表中没有匹配&#xff0c;也从左表返回所有的行RIGHT JOIN: 即使…

50天精通Golang(第18天)

web开发介绍、iris框架安装、HTTP请求和返回、Iris路由处理 一 Web项目开发介绍及实战项目介绍 1.1 引言 本系列课程我们将学些Golang语言中的Web开发框架Iris的相关知识和用法。通过本系列视频课程&#xff0c;大家能够从零到一经历一个完整项目的开发&#xff0c;并在课程…

[树莓派]给树莓派装pyinstaller环境

安装&#xff1a; sudo pip3 install pyinstaller 但是打包后会发现报错&#xff1a; Fatal error: PyInstaller does not include a pre-compiled bootloader for your platform. For more details and instructions how to build the bootloader see https://pyinstaller.…

Golang语言switch case

Golang语言使用switch语句可方便地对大量的值进行条件判断。 练习&#xff1a;判断文件类型,如果后缀名是.html输入text/html, 如果后缀名.css 输出text/css ,如果后缀名是.js 输出text/javascript Go语言规定每个switch只能有一个default分支。 extname : ".a"swit…

【栈】Leetcode 496 下一个更大元素I

【栈】Leetcode 496 下一个更大元素I 解法1 两个单调栈解法2 ---------------&#x1f388;&#x1f388;题目链接&#x1f388;&#x1f388;------------------- 解法1 两个单调栈 两个栈进行操作&#xff0c;一个栈用来遍历寻找&#xff0c;一个栈用来保留 将nums2中的元素…

NLP论文阅读记录 - 2021 | WOS 利用 ParsBERT 和预训练 mT5 进行波斯语抽象文本摘要

文章目录 前言0、论文摘要一、Introduction1.1目标问题1.2相关的尝试1.3本文贡献 二.前提三.本文方法A. 序列到序列 ParsBERTB、mT5 四 实验效果4.1数据集4.2 对比模型4.3实施细节4.4评估指标4.5 实验结果4.6 细粒度分析 五 总结思考 前言 Leveraging ParsBERT and Pretrained …

1.6 面试经典150题 - 买卖股票的最佳时机

买卖股票的最佳时机 给定一个数组 prices &#xff0c;它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。 你只能选择 某一天 买入这只股票&#xff0c;并选择在 未来的某一个不同的日子 卖出该股票。设计一个算法来计算你所能获取的最大利润。 返回你可以从这笔交易…

鸿蒙开发之组合手势

当我们需要支持多个手势的时候&#xff0c;可以通过GestureGroup来实现&#xff0c;如下实现了同时支持Tap和Pan手势 import Prompt from system.prompt Entry Component struct OfficialGestureGroupPage {State message: string Hello Worldbuild() {Column() {Column() {T…

STM32WL用户手册学习

介绍 STM32Cube是意法半导体的原创产品&#xff0c;通过减少开发工作量、时间和成本来显著提高开发人员的生产力。STM32Cube涵盖了整个STM32产品组合。 STM32Cube包括&#xff1a; 一套用户友好的软件开发工具&#xff0c;涵盖项目开发从设计到生产&#xff0c;其中&#xf…

算法第十九天-二叉搜索树节点最小距离

二叉搜索树节点最小距离 题目要求 解题思路 今天题目重点&#xff1a; 1.二叉搜索树&#xff08;BST&#xff09; 2.任意两个不同节点 遇到二叉搜索树&#xff0c;立即想到这句话&#xff1a;[二叉搜索树&#xff08;BST&#xff09;的中序遍历是有序的]。这是解决所有二叉搜…

1.1 面试经典 150 题-合并两个有序数组

合并两个有序数组 方法一&#xff1a;先合并再排序 class Solution:def merge(self, nums1: List[int], m: int, nums2: List[int], n: int) -> None:"""Do not return anything, modify nums1 in-place instead."""# 先合并for i in range…

全网最细RocketMQ源码四:消息存储

看完上一章之后&#xff0c;有没有很好奇&#xff0c;生产者发送完消息之后&#xff0c;server是如何存储&#xff0c;这一章节就来学习 入口 SendMessageProcessor.processRequest private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerCont…

【现代密码学】笔记5--伪随机置换(分组加密)《introduction to modern cryphtography》

【现代密码学】笔记5--伪随机置换&#xff08;分组加密&#xff09;《introduction to modern cryphtography》 写在最前面5 伪随机排列实践构造&#xff08;块密码/分组密码&#xff09; 写在最前面 主要在 哈工大密码学课程 张宇老师课件 的基础上学习记录笔记。 内容补充&…