golang实现mediasoup的tcp服务及channel通道

tcp模块

定义相关类

  • Client:表示客户端连接,包含网络连接conn、指向服务器的指针Server和Channel指针c。
  • server:表示TCP服务器,包含服务器地址address、TLS配置config以及三个回调函数:
    • onNewClientCallback:新客户端连接时调用。
    • onClientConnectionClosed:客户端连接关闭时调用。
    • onNewMessage:客户端接收新消息时调用。

客户端相关接口

  • Client.listen():客户端监听方法,读取连接数据,调用onNewMessage回调。
  • Client.Send(message string):发送文本消息给客户端。
  • Client.SendBytes(b []byte):发送字节数据给客户端。
  • Client.Conn():获取客户端的网络连接。
  • Client.Close():关闭客户端的网络连接。

服务器相关接口

  • server.OnNewClient(callback func(c *Client)):设置新客户端连接的回调。
  • server.OnClientConnectionClosed(callback func(c *Client, err error)):设置客户端连接关闭的回调。
  • server.OnNewMessage(callback func(c *Client, message []byte, size int)):设置客户端接收新消息的回调。
  • server.Listen():启动网络服务器,监听连接。

服务器初始化

  • NewTcpServer(address string) *server:创建新的TCP服务器实例,不使用TLS。
  • NewTcpServerWithTLS(address, certFile, keyFile string) *server:创建带有TLS功能的TCP服务器实例。

服务器启动流程

  1. 使用NewTcpServer或NewTcpServerWithTLS创建服务器实例。
  2. 设置回调函数,响应新客户端连接、客户端连接关闭和接收新消息事件。
  3. 调用server.Listen()开始监听连接。

TLS支持

  • 如果需要TLS,使用NewTcpServerWithTLS函数,提供证书和密钥文件路径。

参考demo

package MediasoupLibimport ("bufio""time""crypto/tls""log""net"
)// Client holds info about connectiontype Client struct {conn net.ConnServer *serverc *Channel
}// TCP server
type server struct {address string // Address to open connection: localhost:9999config *tls.ConfigonNewClientCallback func(c *Client)onClientConnectionClosed func(c *Client, err error)onNewMessage func(c *Client, message []byte, size int)
}// Read client data from channel
func (c *Client) listen() {fmt.Printf("tcp client listen() ")c.Server.onNewClientCallback(c)reader := bufio.NewReader(c.conn)for {recv := make([]byte, 1500) //MTU 1500size, err := reader.Read(recv)if err != nil {c.conn.Close()c.Server.onClientConnectionClosed(c, err)fmt.Printf("tcp client close! %s", err.Error())return}if size == 0 {time.Sleep(time.Millisecond * 250)fmt.Printf("tcp client recv size=0")continue}recv = recv[0:size]c.Server.onNewMessage(c, recv, size)}
}// Send text message to client
func (c *Client) Send(message string) error {return c.SendBytes([]byte(message))
}// Send bytes to client
func (c *Client) SendBytes(b []byte) error {_, err := c.conn.Write(b)if err != nil {c.conn.Close()c.Server.onClientConnectionClosed(c, err)}return err
}func (c *Client) Conn() net.Conn {return c.conn
}func (c *Client) Close() error {return c.conn.Close()
}// Called right after server starts listening new client
func (s *server) OnNewClient(callback func(c *Client)) {s.onNewClientCallback = callback
}// Called right after connection closed
func (s *server) OnClientConnectionClosed(callback func(c *Client, err error)) {s.onClientConnectionClosed = callback
}// Called when Client receives new message
func (s *server) OnNewMessage(callback func(c *Client, message []byte, size int)) {s.onNewMessage = callback
}// Listen starts network server
func (s *server) Listen() {var listener net.Listenervar err errorif s.config == nil {listener, err = net.Listen("tcp", s.address)} else {listener, err = tls.Listen("tcp", s.address, s.config)}if err != nil {fmt.Printf("Error starting TCP server.\r\n", err)}defer listener.Close()for {conn, err := listener.Accept()if err != nil {fmt.Printf("tcpserver listner Accept error:%s", err.Error())}client := &Client{conn: conn,Server: s,}go client.listen()}
}// Creates new tcp server instance
func NewTcpServer(address string) *server {fmt.Printf("Creating server with address %s", address)server := &server{address: address,}server.OnNewClient(func(c *Client) {c.c = NewChannel(c)})server.OnNewMessage(func(c *Client, message []byte, size int) {c.c.onData(c, message, size)})server.OnClientConnectionClosed(func(c *Client, err error) {c.c.Close()fmt.Printf("OnClientConnectionClosed   err = %s", err.Error())})return server
}func NewTcpServerWithTLS(address, certFile, keyFile string) *server {cert, err := tls.LoadX509KeyPair(certFile, keyFile)if err != nil {fmt.Printf("Error loading certificate files. Unable to create TCP server with TLS functionality.\r\n", err)}config := &tls.Config{Certificates: []tls.Certificate{cert},}server := NewTcpServer(address)server.config = configreturn server}

channel模块

ChannelListener接口定义:

  • ChannelListener:定义了一个接口,包含两个方法OnChannelEvent和OnChannelStringEvent,用于监听通道事件。

Channel结构体:

  • 包含字段如MediasoupClient(指向Client的指针)、PendingSent(一个同步映射,用于存储待发送的数据)、LastBinaryNotification、ID、Pid、udpAddress、udpPort、queue和messageQueue(两个循环队列)、Num和Listeners(一个映射,存储监听器)。
  • 包含一个互斥锁mutex,用于并发控制。

Channel的接口:

  • NewChannel:构造函数,创建并返回一个新的Channel实例。
  • AddListener和RemoveListener:用于添加和移除监听器。
  • processMessage:处理接收到的消息。
  • onData:处理接收到的数据。
  • handle:一个循环,从队列中取出项目并处理。
  • handleMessage:处理消息队列中的消息。
  • process:根据消息类型进行不同的处理。
  • Close:关闭通道,清理资源。
  • Request:发送请求并返回一个通道用于接收响应。
  • SetUdp:设置UDP地址和端口。

并发处理:

  • 使用sync.Map和sync.RWMutex来处理并发,确保数据的一致性和线程安全。

循环队列:

  • 使用MeetGo.CycleQueue作为循环队列,用于存储消息和数据。

参考demo

import ("encoding/json""fmt""strconv""sync""time""strings"
)//###########SendRequest begin############/
var REQUEST_TIMEOUT = 30000type SendRequest struct {ID       stringMethod   stringInternal map[string]interface{}Data     map[string]interface{}
}//async chan SendReponse
type SendReponse struct {ID       intTargetId intEvent    stringAccepted boolRejected boolInternal map[string]interface{}Data     map[string]interface{}Reason   stringBinary   bool
}type AsyncSingal struct {Async chan SendReponse
}
###########SendRequest End############/////###########CycleQueue begin############//
type CycleQueue struct {data  []interface{} //存储空间front int           //前指针,前指针负责弹出数据移动rear  int           //尾指针,后指针负责添加数据移动cap   int           //设置切片最大容量
}func NewCycleQueue(cap int) *CycleQueue {return &CycleQueue{data:  make([]interface{}, cap),cap:   cap,front: 0,rear:  0,}
}//入队操作
//判断队列是否队满,队满则不允许添加数据
func (q *CycleQueue) Push(data interface{}) bool {//check queue is fullif (q.rear+1)%q.cap == q.front { //队列已满时,不执行入队操作return false}q.data[q.rear] = data         //将元素放入队列尾部q.rear = (q.rear + 1) % q.cap //尾部元素指向下一个空间位置,取模运算保证了索引不越界(余数一定小于除数)return true
}//出队操作
//需要考虑: 队队为空没有数据返回了
func (q *CycleQueue) Pop() interface{} {if q.rear == q.front {return nil}data := q.data[q.front]q.data[q.front] = nilq.front = (q.front + 1) % q.capreturn data
}//因为是循环队列, 后指针减去前指针 加上最大值, 然后与最大值 取余
func (q *CycleQueue) QueueLength() int {return (q.rear - q.front + q.cap) % q.cap
}func (q *CycleQueue) FindDataByRequestId(requestId string) string {for i := 0; i < q.QueueLength(); i++ {if strings.Count(q.data[i].(string), requestId) == 1 {emitData := q.data[i].(string)q.data = append(q.data[:i], q.data[i+1:]...)return emitData}}return ""
}
///###########CycleQueue############import ("encoding/json""fmt""strconv""sync""time"MeetGo "vrv.meeting.server/MeetGo"
)const NS_MAX_SIZE int = 655350var messageBuffer = make([]byte, NS_MAX_SIZE)
var messageIndex = 0type ChannelListener interface {OnChannelEvent(string, map[string]interface{})OnChannelStringEvent(string, string)
}type Channel struct {MediasoupClient        *ClientPendingSent            sync.MapLastBinaryNotification interface{}ID                     intPid                    intudpAddress             stringudpPort                intqueue                  CycleQueuemessageQueue          CycleQueueNum                    intListeners              map[string]ChannelListenermutex                  sync.RWMutex
}func NewChannel(tcpClient *Client) *Channel {channel := new(Channel)channel.MediasoupClient = tcpClientchannel.queue = MeetGo.NewCycleQueue(1000)channel.messageQueue = MeetGo.NewCycleQueue(10000)channel.Num = 0channel.Listeners = make(map[string]ChannelListener, 100)go channel.handle()go channel.handleMessage()return channel
}func (channel *Channel) AddListener(id string, listener ChannelListener) {channel.mutex.Lock()channel.Listeners[id] = listenerchannel.mutex.Unlock()
}func (channel *Channel) RemoveListener(id string) {channel.mutex.Lock()delete(channel.Listeners, id)channel.mutex.Unlock()
}func (channel *Channel) processMessage(message string) {jsonMessage := make(map[string]interface{})err := json.Unmarshal([]byte(message), &jsonMessage)if err != nil {MeetGo.Log.Error("Channel processMessage error:%s", err.Error())return}if jsonMessage["registId"] != nil {MeetGo.Log.Debug("client registId succeeded [id:%s]", jsonMessage["registId"].(string))channel.ID, _ = strconv.Atoi(jsonMessage["registId"].(string))channel.Pid = int(jsonMessage["pid"].(float64))Global_Worker.OnMediasoupWorkerOnline(channel.ID, channel, jsonMessage["registId"].(string))} else if jsonMessage["id"] != nil {idd := int(jsonMessage["id"].(float64))value, ret := channel.PendingSent.Load(idd)if !ret {fmt.Printf("received Response does not match any sent Request")return}channel.PendingSent.Delete(idd)asyncReponse := value.(*MeetGo.AsyncSingal)if jsonMessage["accepted"] != nil && jsonMessage["accepted"].(bool) {MeetGo.Log.Debug("request succeeded [id:%d]", int(jsonMessage["id"].(float64)))sendReponse := MeetGo.SendReponse{ID:       idd,Accepted: jsonMessage["accepted"].(bool),Data:     jsonMessage["data"].(interface{}).(map[string]interface{}),}asyncReponse.Async <- sendReponse} else {MeetGo.Log.Debug("request failed [id:%d, reason: %s]", int(jsonMessage["id"].(float64)), jsonMessage["reason"].(string))sendReponse := MeetGo.SendReponse{ID:     int(jsonMessage["id"].(float64)),Reason: jsonMessage["reason"].(string),}asyncReponse.Async <- sendReponse}} else if jsonMessage["targetId"] != nil && jsonMessage["event"] != nil {if jsonMessage["binary"] != nil {channel.LastBinaryNotification = jsonMessagereturn} else if jsonMessage["data"] != nil {listenerKey := fmt.Sprintf("%d", int(jsonMessage["targetId"].(float64)))channel.mutex.RLock()listener := channel.Listeners[listenerKey]channel.mutex.RUnlock()if listener != nil {listener.OnChannelEvent(jsonMessage["event"].(string), jsonMessage["data"].(map[string]interface{}))}} else {data := make(map[string]interface{})listenerKey := fmt.Sprintf("%d", int(jsonMessage["targetId"].(float64)))channel.mutex.RLock()listener := channel.Listeners[listenerKey]channel.mutex.RUnlock()if listener != nil {listener.OnChannelEvent(jsonMessage["event"].(string), data)}}} else {fmt.Printf("received message is not a Response nor a Notification")return}
}func (channel *Channel) onData(client *Client, message []byte, size int) {for {ret := channel.messageQueue.Push(message)if ret {break} else {time.Sleep(40 * time.Millisecond)}}
}func (channel *Channel) handle() {for {item := channel.queue.Pop()if item == nil {time.Sleep(40 * time.Millisecond)continue}channel.process(item)time.Sleep(1 * time.Millisecond)}
}func (channel *Channel) handleMessage() {ns := NetString{bufLen: 0, length: 0, state: 0}for {item := channel.messageQueue.Pop()if item == nil {time.Sleep(40 * time.Millisecond)continue}message := item.([]byte)var nsPayloads [][]byteerr := ns.NsUnmarshal(message, &nsPayloads)if err != nil {fmt.Printf("Channel handleMessage nsPayload error %s", err.Error())return}for _, nsPayload := range nsPayloads {channel.queue.Push(nsPayload)}time.Sleep(1 * time.Millisecond)}
}func (channel *Channel) process(data interface{}) {nsPayload := data.([]byte)if channel.LastBinaryNotification == nil {switch nsPayload[0] {// 123 = '{' (a Channel JSON messsage).case 123:channel.processMessage(string(nsPayload))break// 68 = 'D' (a debug log).case 68:fmt.Printf(string(nsPayload))break// 87 = 'W' (a warning log).case 87:fmt.Printf(string(nsPayload))break// 69 = 'E' (an error log).case 69:fmt.Printf(string(nsPayload))breakdefault:fmt.Printf("unexpected data: %s", string(nsPayload))}} else {msg := channel.LastBinaryNotificationchannel.LastBinaryNotification = niljsonMsg := make(map[string]interface{})err := json.Unmarshal([]byte(msg.(string)), &jsonMsg)if err != nil {panic(err)}listenerKey := fmt.Sprintf("%d", int(jsonMsg["targetId"].(float64)))channel.mutex.RLock()listener := channel.Listeners[listenerKey]channel.mutex.RUnlock()if listener != nil {listener.OnChannelStringEvent(jsonMsg["event"].(string), jsonMsg["data"].(string))}}
}func (channel *Channel) Close() {channel.PendingSent.Range(func(k, v interface{}) bool {channel.PendingSent.Delete(k)return true})registId := strconv.Itoa(channel.ID)Global_Worker.OnMediasoupWorkerOffline(registId)time.Sleep(time.Millisecond * 250) //?fmt.Printf("channel.MediasoupClient.Close() ")channel.MediasoupClient.Close()
}func (c *Channel) Request(method string, internal,data map[string]interface{}) (chan MeetGo.SendReponse, int64) {id := RandomNumberGenerator(10000000, 99999999)fmt.Printf("MediasoupLib Channel [method:%s, id:%d]", method, id)request := MeetGo.RequestJson{ID:       id,Method:   method,Internal: internal,Data:     data,}requestJson := request.Encode()requestSend := nsEncode(requestJson)fmt.Printf("___requestSend : %s", requestSend)sendReponse := new(MeetGo.AsyncSingal)sendReponse.Async = make(chan MeetGo.SendReponse)if sendReponse != nil {c.PendingSent.Store(int(id), sendReponse)}defer c.MediasoupClient.Send(requestSend)return sendReponse.Async, id
}func (channel *Channel) SetUdp(udpAddress string, udpPort int) {channel.udpAddress = udpAddresschannel.udpPort = udpPort
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/16671.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

最大连续1的个数(滑动窗口)

算法原理&#xff1a; 这道题大眼一看是关于翻转多少个0的问题&#xff0c;但是&#xff0c;如果你按照这种思维去做题&#xff0c;肯定不容易。所以我们要换一种思维去做&#xff0c;这种思维不是一下就能想到的&#xff0c;所以想不到也情有可原。 题目是&#xff1a;给定一…

Vue3:动态路由+子页面(新增、详情页)动态路由配置(代码全注释)

文章目录 实现思路调用后端接口获取用户权限获取页面权限动态绑定到路由对象中动态添加子页面路由 实现思路 emm&#xff0c;项目中使用动态路由实现根据后端返回的用户详情信息&#xff0c;动态将该用户能够访问的页面信息&#xff0c;动态生成并且绑定到路由对象中。但是后…

如何从清空的回收站中恢复已删除的Excel文件?

“嗨&#xff0c;几天前我删除了很多没有备份的Excel文件。回收站已清空。当我意识到我犯了一个大错误时&#xff0c;所有的Excel文件都消失了&#xff0c;回收站里什么都没有。清空回收站后是否可以恢复已删除的 Excel 文件&#xff1f; 回收站是一种工具&#xff0c;可让您在…

LeetCode 343. 整数拆分 (dp动态规划)

343. 整数拆分 力扣题目链接(opens new window) 给定一个正整数 n&#xff0c;将其拆分为至少两个正整数的和&#xff0c;并使这些整数的乘积最大化。 返回你可以获得的最大乘积。 示例 1: 输入: 2输出: 1解释: 2 1 1, 1 1 1。 示例 2: 输入: 10输出: 36解释: 10 3 …

【openlayers系统学习】4.2Mapbox 样式渲染图层

二、Mapbox 样式渲染图层 显然我们目前的地图需要一些样式。 VectorTile​ 图层的样式与 Vector​ 图层的样式工作方式完全相同。那里描述的样式在这里也适用。 对于这样的地图&#xff0c;创建数据驱动的样式&#xff08;对矢量图层操作&#xff09;非常简单。但矢量切片也用…

单兵组网设备+指挥中心:集群系统技术详解

一、单兵设备功能特点 单兵组网设备是现代通信技术的重要成果&#xff0c;旨在为单个作战或工作单元提供高效的通信和数据传输能力。其主要功能特点包括&#xff1a; 1. 便携性&#xff1a;设备轻巧&#xff0c;便于单兵携带和使用&#xff0c;适应各种复杂环境。 2. 通信能…

简述vue-router 组件复用导致路由参数失效怎么办

当使用Vue Router时&#xff0c;组件复用可能会导致路由参数失效的问题。为了解决这个问题&#xff0c;我们可以采取以下策略&#xff1a; 1. 监听路由变化 在Vue组件中&#xff0c;我们可以使用watch属性来监听$route对象的变化。当路由发生变化时&#xff0c;如果目标组件是…

第 8 章 机器人实体导航实现_路径规划(自学二刷笔记)

重要参考&#xff1a; 课程链接:https://www.bilibili.com/video/BV1Ci4y1L7ZZ 讲义链接:Introduction Autolabor-ROS机器人入门课程《ROS理论与实践》零基础教程 9.3.5 导航实现05_路径规划 路径规划仍然使用 navigation 功能包集中的 move_base 功能包。 5.1编写launch文…

PHP之fastadmin系统配置分组增加配置和使用

目录 一、实现功能&#xff1a;fasttadmin实现添加系统配置分组和添加参数、使用 二、添加分组 三、配置分组参数 四、最终存储位置 五、获取配置参数 一、实现功能&#xff1a;fasttadmin实现添加系统配置分组和添加参数、使用 二、添加分组 在字典配置中找到分组对应键值…

linux系统——top资源管理器

在linux系统中&#xff0c;有类似于windows系统中的资源管理器&#xff0c;top用于实时的监控系统的任务执行状态以及硬件配置信息 在linux中&#xff0c;输入top命令&#xff0c;可以进入相应界面&#xff0c;在此界面可以使用一些指令进行操作 如&#xff0c;输入z 可以改变…

终端安全管理系统、天锐DLP(数据泄露防护系统)| 数据透明加密保护,防止外泄!

终端作为企业员工日常办公、数据处理和信息交流的关键工具&#xff0c;承载着企业运营的核心信息资产。一旦终端安全受到威胁&#xff0c;企业的敏感数据将面临泄露风险&#xff0c;业务流程可能遭受中断&#xff0c;甚至整个企业的运营稳定性都会受到严重影响。 因此&#xff…

【EVI】Hume AI 初探

写在前面的话 Hume AI宣布已在B轮融资中筹集5000万美元&#xff0c;由前Google DeepMind研究员Alan Cowen创立并担任CEO。该AI模型专注于理解人类情感&#xff0c;并发布了「共情语音界面」演示&#xff0c;通过语音对话实现互动。从 Hume AI 官网展示的信息&#xff0c;EVI 能…

计算机视觉与深度学习实战:以Python为工具,基于深度学习的汽车目标检测

随着人工智能技术的飞速发展,计算机视觉与深度学习已经成为当今科技领域的热点。其中,汽车目标检测作为自动驾驶、智能交通等系统的核心技术,受到了广泛关注。本文将以Python为工具,探讨基于深度学习的汽车目标检测方法及其实战应用。 一、计算机视觉与深度学习基础 计算机…

力扣刷题--747. 至少是其他数字两倍的最大数【简单】

题目描述 给你一个整数数组 nums &#xff0c;其中总是存在 唯一的 一个最大整数 。 请你找出数组中的最大元素并检查它是否 至少是数组中每个其他数字的两倍 。如果是&#xff0c;则返回 最大元素的下标 &#xff0c;否则返回 -1 。 示例 1&#xff1a; 输入&#xff1a;n…

Python-opencv通过距离变换提取图像骨骼

文章目录 距离变换distanceTransform函数 距离变换 如果把二值图像理解成地形&#xff0c;黑色表示海洋&#xff0c;白色表示陆地&#xff0c;那么陆地上任意一点&#xff0c;到海洋都有一个最近的距离&#xff0c;如下图所示&#xff0c;对于左侧二值图像来说&#xff0c;【d…

Gitee的原理及应用详解(三)

本系列文章简介&#xff1a; Gitee是一款开源的代码托管平台&#xff0c;是国内最大的代码托管平台之一。它基于Git版本控制系统&#xff0c;提供了代码托管、项目管理、协作开发、代码审查等功能&#xff0c;方便团队协作和项目管理。Gitee的出现&#xff0c;在国内的开发者社…

漂流瓶挂机项目,聊天脚本赚钱新玩法,号称单机30-50+ (教程+软件)

一、项目简介&#xff1a; 漂流瓶挂机项目主要是通过使用探遇漂流瓶、音麦漂流瓶等聊天软件&#xff0c;为用户提供一个聊天赚钱的平台。男性用户需要充值后才能发送消息&#xff0c;而女性用户则可以通过接收消息赚取分红。男性用户发送给女性用户的消息费用大约在.1-.2元之间…

VScode中对git的学习笔记

1.git是什么&#xff1f; Git是一个功能强大的分布式版本控制系统&#xff0c;由Linux内核的创始人Linus Torvalds在2005年创建。它以其速度、数据完整性和支持大型项目的能力而闻名&#xff0c;被广泛应用于软件开发中。Git允许开发者在本地机器上拥有完整的代码库副本&#x…

读书笔记分享

1.苏格拉底只在需要的时候才索取&#xff0c;那样便能以最少的物质满足自身的要求。他认为每个人都天生体质脆弱&#xff0c;只有在贫乏的环境中才会锻炼地强壮起来。生活中的大多数人认为&#xff0c;奢华才是幸福的生活。无休止的物质积聚&#xff0c;让人们每天生活在一个内…

2024-05-27 blue-vh-问题点

摘要: 2024-05-27 思考-日记-问题点 问题点: 一. 同步接口的并发问题 接口调用是在客户端的的上下文&#xff0c;无论是线程&#xff0c;协程&#xff0c;是在客户端的执行上下文里面同步的话&#xff0c;是同步客户端的调用接口的上下文&#xff0c;阻塞的是客户端的上下文&a…