go动态创建/增加channel并处理数据

背景描述

有一个需求,大概可以描述为:有多个websocket连接,因此消息会并发地发送过来,这些消息中有一个标志可以表明是哪个连接发来的消息,但只有收到消息后才能建立channel或写入已有channel,在收消息前无法预先创建channel

解决过程(可直接阅读最终版)

初版:直接写入

因为对数据量错误预估(以为数据量不大),一开始我是用的mysql直接写入,每次收到ws消息立即处理,可测试中发现因数据量过多且都会操作同一行数据,出现了资源竞争,导致死锁。

第二版:增加锁

在发现出现数据竞争后,我第一反应是增加读写锁。读写锁的代码类似以下示例:

package mainimport ("database/sql""fmt""sync"_ "github.com/go-sql-driver/mysql"
)var (db *sql.DBmu sync.RWMutex
)func init() {var err errordb, err = sql.Open("mysql", "username:password@tcp(localhost:3306)/dbname")if err != nil {panic(err)}
}func main() {defer db.Close()// 读取数据go readData()// 写入数据go writeData()// 保持主线程运行select {}
}func readData() {for {mu.RLock()rows, err := db.Query("SELECT * FROM table_name")mu.RUnlock()if err != nil {fmt.Println("Error reading data:", err)continue}defer rows.Close()// 处理查询结果// ...// 睡眠一段时间,模拟读操作的持续性// 请注意,这是一个简单示例,实际应用中可能需要更复杂的逻辑// 或使用定时器进行控制}
}func writeData() {for {mu.Lock()_, err := db.Exec("INSERT INTO table_name (column1, column2) VALUES (?, ?)", value1, value2)mu.Unlock()if err != nil {fmt.Println("Error writing data:", err)continue}// 睡眠一段时间,模拟写操作的持续性// 请注意,这是一个简单示例,实际应用中可能需要更复杂的逻辑// 或使用定时器进行控制}
}

但是代码里对数据库的操作非常频繁且混乱,加了读写锁后经常出现请求很慢的情况,考虑其他方案

第三版 使用事务

使用事务代码忽略,最终发现,因为事务过长,导致出现了重复写的问题,考虑其他方案

第四版 map

通过一个二维的map来存储数据,每当数据存满10条就处理,当然毫不意外的,出现了map的竞争。map也是可以用锁的,但是这里是二维的map,加上两层锁之后使得效率极低,而且依旧有概率出现map竞争导致报错

此外,还可以考虑使用redis设置锁,直接set就行了,但是因为环境不支持redis,此方案弃用

最终版 动态channel

出现以上问题的根本原因是消费太快,其实完全可以把每个ws连接的数据都写到各自的channel里,同时设置每个channel都累积10条再消费,当然还需要一个处理机制,如果超过10s也消费一次。

启动"生产者"、“消费者”

在当前环境中,生产者就是每次从ws中读到数据往动态channel中写入,消费者就是不断获取有哪些channel,以及从channel中读数据,在ws写入时的处理逻辑大概可以简化为如下demo:

package testimport ("context""encoding/json""github.com/gin-gonic/gin""github.com/gorilla/websocket"log "github.com/sirupsen/logrus""net/http""sync"
)// RequestTemplate 请求模板
type RequestTemplate struct {Op   string               `json:"op"`   // 操作Id   int                  `json:"id"`   // 唯一id标识Time string               `json:"time"` // 时间,用秒级时间戳,字符串包裹Data *RequestTemplateData `json:"data"` // 请求数据Code int                  `json:"code"` // 状态码
}// RequestTemplateData 请求中data包含的部分,实际这里是很复杂的结构,之前超时/死锁也是因为这里处理逻辑比较复杂,但是这篇博客的演示重点不是这个,因此简略为id和请求ip
type RequestTemplateData struct {ConnIp string `json:"conn_ip"` // 请求ipId     int    `json:"id"`      // 唯一id标识
}// ConnInfo 具体的连接信息
type ConnInfo struct {Conn      *websocket.Conn    `json:"conn"`   // websocket连接Ctx       context.Context    `json:"ctx"`    // 连接上下文CtxCancel context.CancelFunc `json:"cancel"` // 连接上下文cancel functionIp        string             `json:"ip"`     // 连接的手机端ipId        int                `json:"id"`     // 唯一id标识
}var AllConns = make(map[string]*ConnInfo) //创建字典集合存储连接信息// Start 启动
func Start() {//处理ws的连接http.HandleFunc("/ws", HandleMsg)// //监听7001端口号,作为websocket连接的服务log.Info("Server started on :7001")log.Fatal(http.ListenAndServe(":7001", nil))
}// ChannelStorage channel数据
type ChannelStorage struct {sync.RWMutexchannels map[string]chan *RequestTemplateData
}var ConnRequestData map[int]*RequestTemplateDatavar upgrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {return true},
}// HandleMsg 处理ws连接,每来一个新客户端请求就建立一个新连接
func HandleMsg(w http.ResponseWriter, r *http.Request) {conn, err := upgrader.Upgrade(w, r, nil) // 协议升级,这里也可以直连if err != nil {log.Error(err)return}//获取连接ip,这里是为了区分每个连接connIp := conn.RemoteAddr().String()// 这里是为了后续关闭channelrootCtx := context.Background()ctx, cancel := context.WithCancel(rootCtx)//加入连接AllConns[connIp] = &ConnInfo{Conn:      conn,   // 客户端ws链接对象Ctx:       ctx,    // 连接上下文CtxCancel: cancel, // 取消连接上下文}defer func() {// 如果断开连接,删除数据if AllConns[connIp] != nil {AllConns[connIp].CtxCancel()delete(ConnRequestData, AllConns[connIp].Id)go SetDoneData(AllConns[connIp].Id, conn) // 这里对结束做处理}delete(AllConns, conn.RemoteAddr().String())err = conn.Close()if err != nil {return}log.Error("HandleMsg异常,开始defer处理:", err)if err := recover(); err != nil {log.Error("websocket连接异常,已断开:", err)}}()log.WithFields(log.Fields{"connIp": connIp,}).Info("沙箱已连接")reqCh := &ChannelStorage{}go reqCh.ResultConsumer(ctx) // 这里是消费者//循环读取ws客户端的消息for {// 读取消息_, msg, err := conn.ReadMessage()if err != nil {log.WithFields(log.Fields{"connIp": connIp,}).WithError(err).Error("读取websocket的消息失败")if AllConns[connIp] != nil {delete(ConnRequestData, AllConns[connIp].Id)go SetDoneData(AllConns[connIp].Id, conn) // 连接断开设置状态为结束}// 断开ws连接conn.Close()delete(AllConns, conn.RemoteAddr().String())return}//msg []byte转stringmsgStr := string(msg)log.Info("收到消息为:", msgStr)//反序列化消息为结构体requestData := RequestTemplate{}if err := json.Unmarshal(msg, &requestData); err != nil {conn.WriteJSON(gin.H{"id": "未知", "op": "未知", "error": "cmd通信的请求参数有误,无法json decode"})log.Error("json_decode cmd命令的请求参数时出错:", err)continue}dataInfo := requestData.Data// 这里实际上有很多操作,简写为两种if requestData.Op != "" {switch requestData.Op {// 收到报告case "report":go reqCh.Produce(dataInfo) // "生产者",发送一条消息// 已完成case "done":go CheckDone(dataInfo, conn) // 做完成的处理default:log.Error("未识别的命令:", msgStr)}}}
}

有一个for循环在持续监听ws消息,消费者只启动一次,这里重点就是生产和消费如何实现

“生产者”

“生产者”要做的事就是:
1 每当收到ws消息后,解析,拿到唯一id(这个唯一是指这个连接下的所有上报消息的id都是相同的)
2 判断这个“唯一id”是否已经创建了channel,若创建了则不需要创建,直接写入channel,若未创建则新建channel
以下是生产者的demo:

// GetChannel 获取通道
func (cs *ChannelStorage) GetChannel(key string) chan *RequestTemplateData {cs.RLock()defer cs.RUnlock()return cs.channels[key]
}// CreateChannel 创建通道并存储到 map 中
func (cs *ChannelStorage) CreateChannel(key string) chan *RequestTemplateData {cs.Lock()defer cs.Unlock()if cs.channels == nil {cs.channels = make(map[string]chan *RequestTemplateData, 800)}ch := make(chan *RequestTemplateData, 10)cs.channels[key] = chreturn ch
}// Produce 往上报channel中写数据
func (cs *ChannelStorage) Produce(requestData *RequestTemplateData) {defer func() {if err := recover(); err != nil {log.Info("_____________recover CaseResultAdd error________: ", err)}}()// 创建存储通道的结构体实例chanelKey := strconv.Itoa(requestData.Id)channel := cs.GetChannel(chanelKey)if channel == nil {channel = cs.CreateChannel(chanelKey)}// 直接往channel里面塞if channel != nil {channel <- requestData}
}
消费者

消费者由于只启动一次,但后续可能会有新的channel,因此需要增加一个获取所有连接的方法:
消费者demo:

func (cs *ChannelStorage) ResultConsumer(ctx context.Context) {defer func() {if err := recover(); err != nil {log.Info("_____________recover CaseResultConsumer error________: ", err)}}()for {select {case <-ctx.Done():log.Info("websocket断开连接,消费者协程退出...")returndefault:cs.processAllChannels(ctx)  // 传入 context.Contexttime.Sleep(2 * time.Second) // 控制处理频率}}
}// processAllChannels 获取所有channel
func (cs *ChannelStorage) processAllChannels(ctx context.Context) {cs.RLock()defer cs.RUnlock()var wg sync.WaitGroup // 用于等待所有通道处理完毕for chName, channel := range cs.channels {wg.Add(1)go func(chName string, channel chan *RequestTemplateData) {defer wg.Done()cs.processChannel(chName, channel, ctx)}(chName, channel)}wg.Wait() // 等待所有通道处理完毕
}
func (cs *ChannelStorage) processChannel(chName string, channel chan *RequestTemplateData, ctx context.Context) {const batchSize = 10 // 每次处理的数据量var messages []*RequestTemplateDatatargetMsgOverTime := 10 * time.Second // 超时时间for {select {case caseMsg := <-channel:messages = append(messages, caseMsg) // 将接收到的消息放入 messages 切片中if len(messages) == batchSize {tmpMessages := messagesmessages = nilprocessMessages(tmpMessages)}case <-time.After(targetMsgOverTime):log.Info("Timeout reached. Processing...")if len(messages) > 0 {tmpMessages := messagesmessages = nillog.Info("Processing remaining messages for channel:", chName)processMessages(tmpMessages)}case <-ctx.Done(): // 如果收到上下文取消信号,退出函数log.Info("______________________error__________cancel______")return}}
}func processMessages(messages []*RequestTemplateData) {// 在这里处理消息就是批量的了
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/7596.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【vue-echarts】 报错问题解决 “Error: Component series.pie not exists. Load it first.“

目录 问题描述解决【解决1】【解决2】 问题描述 使用 vue-echarts 时导入的文件 import VChart from vue-echarts/components/ECharts import echarts/lib/chart/line import echarts/lib/chart/bar import echarts/lib/chart/pie import echarts/lib/component/legend impor…

SourceTree打开就闪退

问题背景&#xff1a;Windows 10 系统提示更新&#xff0c;然后SourceTree没有正常关闭&#xff0c;导致系统升级后打开该软件闪退&#xff0c;系统重启也无法解决。 1、打开文件位置 右键“Sourcetree”桌面快捷图标&#xff0c;点击“打开文件所在位置” 2、找到Local目录…

虚拟机装CentOS镜像

起先&#xff0c;是先安装一个VM虚拟机&#xff0c;再去官方网站之类的下载一些镜像&#xff0c;常见镜像有CentOS镜像&#xff0c;ubantu镜像&#xff0c;好像还有一个树莓还是什么的&#xff0c;软件这块&#xff0c;日新月异&#xff0c;更新太快&#xff0c;好久没碰&#…

【大模型】1. 深度学习框架和推理框架概念

深度学习框架&#xff1a;用于构建、训练和部署神经网络模型&#xff0c;包含&#xff1a;TensorFlow、PyTorch、OneFlow等 深度学习模型交换格式ONNX&#xff1a;实现不同深度学习框架之间的模型转换和互操作性 推理框架&#xff1a; NCNN&#xff1a;一个为手机端极致优化…

在Excel中使用正则提取单元格内容

在办公自动化的浪潮中&#xff0c;Excel 作为数据处理的利器&#xff0c;一直在不断进化。最近&#xff0c;我注意到了不坑盒子Office插件一个非常实用的功能更新——bk_regex_string 公式。这个功能对于我们这些日常需要处理大量文本和数据的办公人员来说&#xff0c;无疑是一…

HNU-人工智能-实验4-基于Resnet的分类器

前言 本实验是自选实验&#xff0c;可以在给定范围内选择。 我刚刚提交了实验报告&#xff0c;暂时不准备放出我自己的实验报告&#xff0c;大概在截止提交之后我再放。 之所以这么着急写blog&#xff0c;是想便利还没做实验的同学。 如果选择的也是这个“毒蘑菇识别”的分类器…

c++ 线程交叉场景试验

1.需求 1.处理一个列表的数据&#xff0c;要求按照列表的数据处理10个数据 2.可以使用多线程处理&#xff0c;但是针对每个线程&#xff0c;1~10的处理顺序不能变。 3.每个数据的处理必须原子&#xff0c;即只有一个线程可以针对某个数据进行处理&#xff0c;但是10个数据是可…

资源池管理

资源池相关概念 1、什么是资源池 资源池是灵活管理资源的逻辑抽象。资源池可以分组为层次结构&#xff0c;用于对可用的CPU和内存资源按层次结构进行分区。 2、为什么使用资源池 使用资源池可以委派对主机(或集群)资源的控制权&#xff0c;在使用资源池划分群集内的所有资 源时…

jdk环境安装

jdk安装 创建软件安装的目录 mkdir -p /bigdata/{soft,server} /bigdata/soft 安装文件的存放目录 /bigdata/server 软件安装的目录 把安装的软件上传到/bigdata/soft 目录 解压到指定目录 -C :指定解压到指定目录 tar -zxvf /bigdata/soft/jdk-8u241-linux-x64.tar.gz -C /b…

道可道,非常道,名可名,非常名;学习道德经新解读!打破思想钢印——早读(逆天打工人爬取热门微信文章解读)

你读过道德经吗? 引言Python 代码第一篇 洞见 原来这就是&#xff1a;穷人的思想钢印第二篇 人民日报 来了&#xff01;新闻早班车要闻社会政策 结尾 知识始于好奇 终于智慧 好奇心驱使我们探索 而智慧则是自由思想的结晶 引言 玄之又玄 众妙之门 今天真的是大开我的眼界 我之…

2024 cleanmymac有没有必要买呢,全反面分析

在使用mac时&#xff0c;小编遇到了运行内存不足、硬盘空间不足的情况。遇到这种情况&#xff0c;我们可以借助经典的电脑深度清理软件——CleanMyMac X&#xff0c;清理不常用的软件和系统垃圾&#xff0c;非常好用&#xff01;不过&#xff0c;有许多网友发现CleanMyMac X有免…

掌握 nmcli 命令,轻松管理网络连接

课程目标 本课程将指导你如何在 RHEL8 系统中使用 nmcli 命令配置和管理网络连接。通过本课程的学习&#xff0c;你将掌握以下技能&#xff1a; 查看现有网络连接和设备状态添加新的以太网和 Wi-Fi 连接为连接配置静态 IP 地址、子网掩码、默认网关和 DNS 服务器启用、禁用和…

mvc 异步请求、异步连接、异步表单

》》》 利用Jquery ajax 》》》 mvc 异步表单 c# MVC 添加异步 jquery.unobtrusive-ajax.min.js 方法 具–>Nuget程序包管理器–>程序包管理器控制台 在控制台输入&#xff1a;PM>Install-Package Microsoft.jQuery.Unobtrusive.Ajax –version 3.0.0 回车执行即可在…

PM说|还有不会DISC的项目经理?

DISC行为模型是一种常用于职场中的人际交往工具&#xff0c;它通过对个体的行为特点进行分类和分析&#xff0c;帮助人们更好地理解自己和他人的行为方式&#xff0c;从而更加高效地进行沟通和合作。在项目管理过程中&#xff0c;多方沟通是必备工作技能&#xff0c;如何利用DI…

四级英语翻译随堂笔记

降维表达&#xff1a;中译英&#xff0c;英译英 没有强调主语&#xff0c;没有说明主语&#xff1a;用被动 但如果实在不行&#xff0c;再增添主语 不会就不翻译&#xff0c;不要乱翻译 以xxx为背景&#xff1a;against the backdrop of the xxx eg:against the backdrop of…

c++ 唤醒指定线程

在C中&#xff0c;直接唤醒一个特定的线程并不像在Java的Thread类中有interrupt()方法或者某些操作系统特定的API&#xff08;如POSIX的pthread_cond_signal或Windows的SetEvent&#xff09;那样简单。C标准库没有提供一个直接的方法来"唤醒"一个正在等待的线程。然而…

录屏软件哪个好用?这4款不容错过!

在现代社会中&#xff0c;信息的传递和分享变得越来越重要。一个好的录屏软件能够帮助我们将想要分享的信息快速直观地展示给他人。 通过下文推荐的4款录屏软件&#xff0c;我们可以轻松地分享自己的知识、经验和见解&#xff0c;让更多的人受益。 方法一&#xff1a;QQ软件进…

目前最便宜的VPS多少钱一个月?

目前最便宜的VPS一个月的价格在5美元左右&#xff0c;换算成人民币约为35元。 VPS服务器的配置、性能、所在地区都是影响其价格的因素&#xff0c;价格与性能呈正相关&#xff0c;也有的廉价VPS的服务商会提供性能低的配置&#xff0c;让用户可以进行简单的网站托管或开发环境…

ast-hook的使用

官方githubGitHub - JSREI/ast-hook-for-js-RE: 浏览器内存漫游解决方案&#xff08;探索中...&#xff09; 首先下载项目以及nodejs(不要低于14版本) 下载地址Node.js — Node v16.13.0 (LTS) (nodejs.org) ast-hook建议下载这个版本 新版本会出现hook未定义 进入项目目录…

Flume+Hadoop:打造你的大数据处理流水线

引言 在大数据处理中&#xff0c;日志数据的采集是数据分析的第一步。Apache Flume是一个分布式、可靠且可用的系统&#xff0c;用于有效地收集、聚合和移动大量日志数据到集中式数据存储。本文将详细介绍如何使用Flume采集日志数据&#xff0c;并将其上传到Hadoop分布式文件系…