本文主要分析 mediasoup 一对一 WebRTC 通信 demo 的协议交互,从协议层面了解 mediasoup 的设计与实现,这是深入阅读 mediasoup 源码的重要基础。
1. 时序图
下图是 mediasoup 客户端-服务器协议交互的总体架构,服务器是一个 Node.js 进程加一个 Worker 服务进程,客户端是一个 React 应用加一个支持 WebRTC 的浏览器,mediasoup 为客户端和服务器都提供了 SDK,服务器 SDK 封装了与 Worker 之间管道通信的协议细节,客户端 SDK 封装了 WebRTC API 接口,对外提供 ORTC 接口。
下图是 mediasoup 建立一对一 WebRTC 通信所涉及协议交互的时序图,展示了协议交互的几个重要阶段:初始化、准备、发送媒体和接受媒体,接下来的内容就来详细分析整个协议交互过程和实现细节。
2. 初始化阶段(Initialize)
Node.js 进程启动时会创建 Worker 子进程,并为每个 Worker 创建一个 WebRtcServer 对象用来承载媒体传输(在《通信框架》篇讲过,WebRtcServer 用来做端口汇聚)。同时,还会创建 WebSocket 服务,开始监听客户端连接。
客户端启动时会进行本地初始化,构造 URL 携带 roomId 和 peerId,开始连接 WebSocket 服务。服务器收到客户端连接会尝试创建房间(如果已经存在则不需创建),并在 Worker 上创建对应的 Router。
2.1. createWorker
可以在服务器的配置文件中配置启动的 Worker 数量,默认等于 cpu 核心数量。
module.exports =
{...mediasoup :{numWorkers : Object.keys(os.cpus()).length,...}...
}
服务器启动后,根据配置文件调用 createWorker 创建指定数量的 Worker。
async function runMediasoupWorkers()
{// 从配置文件中获取启动的worker数量const { numWorkers } = config.mediasoup;for (let i = 0; i < numWorkers; ++i){// 创建Workerconst worker = await mediasoup.createWorker({dtlsCertificateFile : config.mediasoup.workerSettings.dtlsCertificateFile,dtlsPrivateKeyFile : config.mediasoup.workerSettings.dtlsPrivateKeyFile,logLevel : config.mediasoup.workerSettings.logLevel,logTags : config.mediasoup.workerSettings.logTags,rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort),rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort)});...// 保存到worker集合mediasoupWorkers.push(worker);...}...
}
createWorker 接口参数如下:
export async function createWorker<WorkerAppData extends types.AppData = types.AppData,
>({// 打印的日志级别logLevel = 'error',// 打印的日志标签logTags,// RTC 通信端口范围rtcMinPort = 10000,rtcMaxPort = 59999,// DTLS证书,所有DtlsTransport需要dtlsCertificateFile,// DTLS私钥,所有DtlsTransport需要dtlsPrivateKeyFile,// 可以配置 WebRTC 实验特性libwebrtcFieldTrials,// 自定义数据appData,
}: WorkerSettings<WorkerAppData> = {}): Promise<Worker<WorkerAppData>> {
createWorker 通过spawn 创建 Worker 子进程,并建立 pipe 通信。
constructor({logLevel,logTags,rtcMinPort,rtcMaxPort,dtlsCertificateFile,dtlsPrivateKeyFile,libwebrtcFieldTrials,appData,}: WorkerSettings<WorkerAppData>) {...// 创建Worker子进程this.#child = spawn(// worker可执行程序路径spawnBin,// worker启动参数spawnArgs,// options{// 子进程环境变量env: {MEDIASOUP_VERSION: version,...process.env, // 继承父进程环境变量},// 子进程跟随父进程一起退出detached: false,// 忽略子进程的标准输入(fd 0)// 创建管道关联子进程的标准输出(fd 1)和标准错误(fd 2),允许Node.js代码读取// 创建管道关联子进程的fd 3和fd 4,允许Node.js代码进行读写,用来传输自定义协议stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe'],// 隐藏子进程控制台windowsHide: true,}// 保存子进程PIDthis.#pid = this.#child.pid!;// 基于子进程的fd 3和fd 4创建管道this.#channel = new Channel({// 主进程通过fd 3向子进程写入消息producerSocket: this.#child.stdio[3],// 主进程通过fd 4从子进程接收消息consumerSocket: this.#child.stdio[4],pid: this.#pid,});...// 监听子进程Worker的stdout,在主进程通过日志器输出this.#child.stdout!.on('data', buffer => {for (const line of buffer.toString('utf8').split('\n')) {if (line) {workerLogger.debug(`(stdout) ${line}`);}}});// 监听子进程Worker的stderr,在主进程通过日志器输出this.#child.stderr!.on('data', buffer => {for (const line of buffer.toString('utf8').split('\n')) {if (line) {workerLogger.error(`(stderr) ${line}`);}}}););...
}
2.2. createWebRtcServer
WebRtcServer 用来实现媒体通信端口聚合,可以配置其监听地址和端口,mediasoup 支持使用 TCP 或 UDP 传输媒体。
【注意】这里有两个地址,“ip”是进程监听的地址;“announceAddress”是公告给客户端来连接的地址,如果监听的是私网地址或 0.0.0.0,则一定要配置 announceAddress,否则客户端会连接失败。
module.exports =
{...mediasoup :{...webRtcServerOptions :{listenInfos :[{protocol : 'udp',ip : process.env.MEDIASOUP_LISTEN_IP || '0.0.0.0',announcedAddress : process.env.MEDIASOUP_ANNOUNCED_IP,port : 44444},{protocol : 'tcp',ip : process.env.MEDIASOUP_LISTEN_IP || '0.0.0.0',announcedAddress : process.env.MEDIASOUP_ANNOUNCED_IP,port : 44444}]},...}...
}
每个 Worker 对应创建一个 WebRtcServer,需要防止 WebRtcServer 监听端口冲突。
async function runMediasoupWorkers()
{...if (process.env.MEDIASOUP_USE_WEBRTC_SERVER !== 'false'){// 从配置文件获取选项信息const webRtcServerOptions = utils.clone(config.mediasoup.webRtcServerOptions);const portIncrement = mediasoupWorkers.length - 1;// 同一个主机上,不同Worker监听端口不能冲突for (const listenInfo of webRtcServerOptions.listenInfos){listenInfo.port += portIncrement;}// 创建WebRtcServerconst webRtcServer = await worker.createWebRtcServer(webRtcServerOptions);// 设置webrtcServerworker.appData.webRtcServer = webRtcServer;}...
}
createWebRtcServer 实现如下,在这里分配 WebRtcServer ID,向 Worker 进程发送 pipe 消息。
async createWebRtcServer<WebRtcServerAppData extends AppData = AppData>({listenInfos,appData,
}: WebRtcServerOptions<WebRtcServerAppData>): Promise<WebRtcServer<WebRtcServerAppData>
> {...// Build the request.const fbsListenInfos: FbsTransport.ListenInfoT[] = [];for (const listenInfo of listenInfos) {fbsListenInfos.push(new FbsTransport.ListenInfoT(listenInfo.protocol === 'udp'? FbsTransportProtocol.UDP: FbsTransportProtocol.TCP,listenInfo.ip,listenInfo.announcedAddress ?? listenInfo.announcedIp,listenInfo.port,portRangeToFbs(listenInfo.portRange),socketFlagsToFbs(listenInfo.flags),listenInfo.sendBufferSize,listenInfo.recvBufferSize));}// 创建UUID作为WebRtcServer的IDconst webRtcServerId = utils.generateUUIDv4();const createWebRtcServerRequestOffset =new FbsWorker.CreateWebRtcServerRequestT(webRtcServerId,fbsListenInfos).pack(this.#channel.bufferBuilder);// 向Worker进程发送pipe消息await this.#channel.request(FbsRequest.Method.WORKER_CREATE_WEBRTCSERVER,FbsRequest.Body.Worker_CreateWebRtcServerRequest,createWebRtcServerRequestOffset);const webRtcServer = new WebRtcServer<WebRtcServerAppData>({internal: { webRtcServerId },channel: this.#channel,appData,});this.#webRtcServers.add(webRtcServer);webRtcServer.on('@close', () => this.#webRtcServers.delete(webRtcServer));// Emit observer event.this.#observer.safeEmit('newwebrtcserver', webRtcServer);return webRtcServer;
}
2.3. connect
客户端启动后会立即连接 WebSocket 服务,建立双向信令传输信道。mediasoup 使用 protoo 库实现WebSocket 通信。连接 WebSocket 服务的逻辑流程描述如下:
1)app 打包时,指定 index.jsx 为入口
{..."main": "lib/index.jsx",...
}
...
const PKG = require('./package');
...function bundle(options)
{...let bundler = browserify({entries : PKG.main, // 入口定义...})...
}
...
2)加载 index.jsx 会执行 run 函数,并加载 Room 组件
// 生命周期钩子
domready(async () =>
{logger.debug('DOM ready');await utils.initialize();run();
});...// 渲染函数
render(<Provider store={store}><RoomContext.Provider value={roomClient}><Room /></RoomContext.Provider></Provider>,document.getElementById('mediasoup-demo-app-container')
);...
3)在 Room 组件加载过程中,会调用 RoomClient 的 join 方法
// 生命周期钩子
componentDidMount()
{const { roomClient } = this.props;roomClient.join();
}
4)在 RoomClient::join 方法中创建 websocket 连接
async join()
{store.dispatch(stateActions.setMediasoupClientVersion(mediasoupClient.version));const protooTransport = new protooClient.WebSocketTransport(this._protooUrl);this._protoo = new protooClient.Peer(protooTransport);...
}
_protooUrl 根据页面 URL 进行构造,如果页面 URL 中没有指定,程序会自动生成 roomId 和 peerId 参数:
wss://192.168.28.164:4443/?roomId=cgxbcht8&peerId=5pwtlewx
2.4. mediasoup-version
Websocket 连接创建成功后,服务器会立即向客户端发送版本信息,版本号是从服务端 SDK 获取。
handleProtooConnection({ peerId, consume, protooWebSocketTransport })
{...// Notify mediasoup version to the peer.peer.notify('mediasoup-version', { version: mediasoup.version }).catch(() => {});...
}
服务端 SDK 中导出了 version,version 信息存储在 package.json 中。
...
export const version: string = require('../../package.json').version;
...
package.json 中 version 配置如下。
{"name": "mediasoup","version": "3.14.6",...
}
2.5. createRouter
客户端 WebSocket 连接请求会携带 roomId,服务器会检查 roomId 是否已经存在,如果不存在则创建房间。
async function runProtooWebSocketServer()
{...protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>{...queue.push(async () =>{const room = await getOrCreateRoom({ roomId, consumerReplicas }); ...})...});...
}
async function getOrCreateRoom({ roomId, consumerReplicas })
{// 从map中寻找roomlet room = rooms.get(roomId);// 不存在则创建if (!room){logger.info('creating a new Room [roomId:%s]', roomId);// 获取一个可用的worker(程序启动时已创建)const mediasoupWorker = getMediasoupWorker();// 创建Roomroom = await Room.create({ mediasoupWorker, roomId, consumerReplicas });// 保存Roomrooms.set(roomId, room);// 监听room的close事件,从rooms移除关闭的房间room.on('close', () => rooms.delete(roomId));}return room;
}
Room::create 方法中,会调用接口 createRouter 创建 router。可以看出,一个 room 对应一个 router。
static async create({ mediasoupWorker, roomId, consumerReplicas }){logger.info('create() [roomId:%s]', roomId);// Create a protoo Room instance.const protooRoom = new protoo.Room();// Router media codecs.const { mediaCodecs } = config.mediasoup.routerOptions;// Create a mediasoup Router.const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });...}
createRouter 的参数在 config.js 中配置,用来描述 router 的媒体能力。
module.exports =
{...routerOptions :{mediaCodecs :[{kind : 'audio',mimeType : 'audio/opus',clockRate : 48000,channels : 2},{kind : 'video',mimeType : 'video/VP8',clockRate : 90000,parameters :{'x-google-start-bitrate' : 1000}},{kind : 'video',mimeType : 'video/VP9',clockRate : 90000,parameters :{'profile-id' : 2,'x-google-start-bitrate' : 1000}},{kind : 'video',mimeType : 'video/h264',clockRate : 90000,parameters :{'packetization-mode' : 1,'profile-level-id' : '4d0032','level-asymmetry-allowed' : 1,'x-google-start-bitrate' : 1000}},{kind : 'video',mimeType : 'video/h264',clockRate : 90000,parameters :{'packetization-mode' : 1,'profile-level-id' : '42e01f','level-asymmetry-allowed' : 1,'x-google-start-bitrate' : 1000}}]},...
}
createRouter 定义如下。生成 routeId,并生成创建 router 所需的 rtpCapabilities,然后向 Worker 发送管道消息创建 router。服务器支持的能力集在supportedRtpCapabilities.ts 文件中定义。
async createRouter<RouterAppData extends AppData = AppData>({mediaCodecs,appData,
}: RouterOptions<RouterAppData> = {}): Promise<Router<RouterAppData>> {...// Clone given media codecs to not modify input data.const clonedMediaCodecs = utils.clone<RtpCodecCapability[] | undefined>(mediaCodecs);// 生成RtpCapabilities,传入的codecs匹配服务器支持codecsconst rtpCapabilities =ortc.generateRouterRtpCapabilities(clonedMediaCodecs);// 生成UUID作为Router IDconst routerId = utils.generateUUIDv4();// 构建请求,请求参数只有routeIdconst createRouterRequestOffset = new FbsWorker.CreateRouterRequestT(routerId).pack(this.#channel.bufferBuilder);// 发送pipe消息并等待响应await this.#channel.request(FbsRequest.Method.WORKER_CREATE_ROUTER,FbsRequest.Body.Worker_CreateRouterRequest,createRouterRequestOffset);// rtpCapabilities保存在SDK创建的Route对象中const data = { rtpCapabilities };const router = new Router<RouterAppData>({internal: {routerId,},data,channel: this.#channel,appData,});this.#routers.add(router);router.on('@close', () => this.#routers.delete(router));// Emit observer event.this.#observer.safeEmit('newrouter', router);return router;
}
3. 准备阶段(Prepare)
准备阶段主要是获取服务器能力集,提前在服务器上创建一个发送transport和接收transport,为协商完成后的媒体传输做准备。
3.1. getRouterRtpCapabilities
客户端在 WebSocket 连接创建成功后会调用 _joinRoom() 方法,获取服务器的能力集。
async join()
{...this._protoo.on('open', () => this._joinRoom());...
}
_joinRoom 方法中会发送 getRouterRtpCapabilities 消息并同步等待服务器响应。
async _joinRoom()
{try{...const routerRtpCapabilities =await this._protoo.request('getRouterRtpCapabilities');...}...
}
服务器收到请求,直接返回在 createRouter 阶段生成 rtpCapabilities。
async _handleProtooRequest(peer, request, accept, reject)
{switch (request.method){case 'getRouterRtpCapabilities':{accept(this._mediasoupRouter.rtpCapabilities);break;}...}...
}
3.2. load
Device 是 mediasoup 客户端 SDK 的主要导出类,代表一个通信设备。在获取服务器的能力集后,客户端应立即使用服务器能力集作为参数初始化 Device。
async _joinRoom()
{...const routerRtpCapabilities =await this._protoo.request('getRouterRtpCapabilities');await this._mediasoupDevice.load({ routerRtpCapabilities });...
}
Device 会基于本地和服务器能力集,计算本端媒体接收能力集和 SCTP 能力集。在计算本端接收能力集时,首先匹配 primary codec,再匹配远端支持的 rtx codec。客户端媒体接收能力集,rtx codec由服务器决定。
async load({routerRtpCapabilities,}: {routerRtpCapabilities: RtpCapabilities;}): Promise<void> {...// 拷贝本地和服务器能力集const clonedRouterRtpCapabilities = utils.clone<RtpCapabilities>(routerRtpCapabilities);const nativeRtpCapabilities = await handler.getNativeRtpCapabilities();const clonedNativeRtpCapabilities = utils.clone<RtpCapabilities>(nativeRtpCapabilities);// 对两个能力集取交集this._extendedRtpCapabilities = ortc.getExtendedRtpCapabilities(clonedNativeRtpCapabilities,clonedRouterRtpCapabilities);// 生成媒体接收能力集this._recvRtpCapabilities = ortc.getRecvRtpCapabilities(this._extendedRtpCapabilities);// 生成SCTP能力集this._sctpCapabilities = await handler.getNativeSctpCapabilities();...
}
3.3. createWebRtcTransport
客户端可以通过在 URL 中添加“produce=false”来指示本端不发送媒体,如下所示:
https://192.168.28.164:3000/?roomId=6i7vwgur&produce=false
如果没有设置“produce=false”,则会请求服务器创建一个用来发送媒体的 WebRtcTransport。
还可以在 URL 中添加“forceTcp=true”来强制使用 TCP 来传输媒体,如下所示:
https://192.168.28.164:3000/?roomId=6i7vwgur&forceTcp=true
async _joinRoom()
{...if (this._produce){// 请求创建WebRtcTransportconst transportInfo = await this._protoo.request('createWebRtcTransport',{forceTcp : this._forceTcp, // 是否使用TCP传输媒体producing : true, // 支持发送媒体consuming : false, // 不支持接收媒体sctpCapabilities : this._useDataChannel? this._mediasoupDevice.sctpCapabilities: undefined});...}...
}
服务器收到 createWebRtcTransport 消息处理逻辑如下:
async _handleProtooRequest(peer, request, accept, reject)
{switch (request.method){...case 'createWebRtcTransport':{// 构造createWebRtcTransport参数const {forceTcp,producing,consuming,sctpCapabilities} = request.data;const webRtcTransportOptions ={// WebRtcServer配置参数(监听地址和端口等信息)...utils.clone(config.mediasoup.webRtcTransportOptions),// 基于WebRtcServer创建TransportwebRtcServer : this._webRtcServer,// ICE应答超时时间???iceConsentTimeout : 20,enableSctp : Boolean(sctpCapabilities),numSctpStreams : (sctpCapabilities || {}).numStreams,appData : { producing, consuming }};// 如果强制使用TCP传输媒体,则过滤非TCP地址if (forceTcp){webRtcTransportOptions.listenInfos = webRtcTransportOptions.listenInfos.filter((listenInfo) => listenInfo.protocol === 'tcp');webRtcTransportOptions.enableUdp = false;webRtcTransportOptions.enableTcp = true;}// 调用Route接口创建Transportconst transport =await this._mediasoupRouter.createWebRtcTransport(webRtcTransportOptions);...// Store the WebRtcTransport into the protoo Peer data Object.peer.data.transports.set(transport.id, transport);// 返回给客户端的信息accept({id : transport.id,iceParameters : transport.iceParameters,iceCandidates : transport.iceCandidates,dtlsParameters : transport.dtlsParameters,sctpParameters : transport.sctpParameters});...break;}
在 createWebRtcTransport 方法中会生成 transportId,然后向 Worker 进程发送管道消息,如果 transport 是建立在 webRtcServer 之上,则发送 ROUTER_CREATE_WEBRTCTRANSPORT_WITH_SERVER 消息,否则发送 ROUTER_CREATE_WEBRTCTRANSPORT 消息。
async createWebRtcTransport<WebRtcTransportAppData extends AppData = AppData,
>({webRtcServer,listenInfos,listenIps,port,enableUdp,enableTcp,preferUdp = false,preferTcp = false,initialAvailableOutgoingBitrate = 600000,enableSctp = false,numSctpStreams = { OS: 1024, MIS: 1024 },maxSctpMessageSize = 262144,sctpSendBufferSize = 262144,iceConsentTimeout = 30,appData,
}: WebRtcTransportOptions<WebRtcTransportAppData>): Promise<WebRtcTransport<WebRtcTransportAppData>
> {...// 生成UUID作为transportIdconst transportId = generateUUIDv4();...// 向Worker进程发送pipe消息const response = await this.#channel.request(webRtcServer? FbsRequest.Method.ROUTER_CREATE_WEBRTCTRANSPORT_WITH_SERVER: FbsRequest.Method.ROUTER_CREATE_WEBRTCTRANSPORT,FbsRequest.Body.Router_CreateWebRtcTransportRequest,requestOffset,this.#internal.routerId);...
}
3.4. createSendTransport
客户端收到 createWebRtcTransport 响应后,会调用 Device 接口创建 SendTransport,与服务器上创建的WebRtcTransport 遥相呼应。音视频复用同一个SendTransport。
async _joinRoom()
{...// 从服务器获取的一系列参数,用来创建本地Transportconst {id,iceParameters,iceCandidates,dtlsParameters,sctpParameters} = transportInfo;// 本地创建SendTransportthis._sendTransport = this._mediasoupDevice.createSendTransport({id,iceParameters,iceCandidates,dtlsParameters :{...dtlsParameters,role : 'auto'},sctpParameters,iceServers : [],proprietaryConstraints : PC_PROPRIETARY_CONSTRAINTS,additionalSettings :{ encodedInsertableStreams: this._e2eKey && e2e.isSupported() }});...
}
createSendTransport 最终会调用 Chrome111::run(以 Chrome111 作为 HandlerInterface 实例进行说明,后同),内部会创建 PeerConnection 对象。
run({direction,iceParameters,iceCandidates,dtlsParameters,sctpParameters,iceServers,iceTransportPolicy,additionalSettings,proprietaryConstraints,extendedRtpCapabilities,
}: HandlerRunOptions): void {logger.debug('run()');// "send" or "recv"this._direction = direction;// 生成远端SDP,暂时没有媒体描述this._remoteSdp = new RemoteSdp({iceParameters,iceCandidates,dtlsParameters,sctpParameters,});// extendedRtpCapabilities是调用Device.load方法时生成的// 可以认为协商后的能力集// 本端能力集this._sendingRtpParametersByKind = {audio: ortc.getSendingRtpParameters('audio', extendedRtpCapabilities),video: ortc.getSendingRtpParameters('video', extendedRtpCapabilities),};// 远端能力集this._sendingRemoteRtpParametersByKind = {audio: ortc.getSendingRemoteRtpParameters('audio',extendedRtpCapabilities),video: ortc.getSendingRemoteRtpParameters('video',extendedRtpCapabilities),};if (dtlsParameters.role && dtlsParameters.role !== 'auto') {this._forcedLocalDtlsRole =dtlsParameters.role === 'server' ? 'client' : 'server';}// 创建PeerConnectionthis._pc = new (RTCPeerConnection as any)({iceServers: iceServers || [],iceTransportPolicy: iceTransportPolicy || 'all',bundlePolicy: 'max-bundle',rtcpMuxPolicy: 'require',sdpSemantics: 'unified-plan',...additionalSettings,},proprietaryConstraints);...
}
3.5. createWebRtcTransport
客户端端可以通过在 URL 中添加“consume=false”来指示本端不接收媒体,如下所示:
https://192.168.28.164:3000/?roomId=6i7vwgur&consume=false
如果没有设置“consume=false”,则会请求服务器创建另一个用来接收媒体的 WebRtcTransport。
async _joinRoom()
{...if (this._consume){// 请求创建WebRtcTransportconst transportInfo = await this._protoo.request('createWebRtcTransport',{forceTcp : this._forceTcp, // 是否使用TCP传输媒体producing : false, // 不支持发送媒体consuming : true, // 支持接收媒体sctpCapabilities : this._useDataChannel? this._mediasoupDevice.sctpCapabilities: undefined});...}...
}
服务器收到createWebRtcTransport请求后,处理逻辑与1.2.3节一致,不再赘述。
3.6. createRecvTransport
客户端收到服务器响应后,会调用 Device 接口创建 RecvTransport。音视频复用同一个RecvTransport。
async _joinRoom()
{...// 从服务器获取的一系列参数,用来创建本地Transportconst {id, // 服务器生成的transport IDiceParameters,iceCandidates,dtlsParameters,sctpParameters} = transportInfo;// 创建本地RecvTransportthis._recvTransport = this._mediasoupDevice.createRecvTransport({id,iceParameters,iceCandidates,dtlsParameters :{...dtlsParameters,role : 'auto'},sctpParameters,iceServers : [],additionalSettings :{ encodedInsertableStreams: this._e2eKey && e2e.isSupported() }});...
}
createRecvTransport 最终会调用 Chrome111::run 方法,创建 PeerConnection 对象,相关处理逻辑与 createSendTransport 基本一致,不再赘述。
4. 加入房间(Join)
准备工作完成后,就可以加入房间了。加入房间的目的,一是与服务器交换能力集,这样客户端和服务器才能够完成能力协商;二是交换 peer 信息,这样才能够知道其他 peer 的存在并从其他 peer 接收媒体。加入房间时,如果本地要接收媒体,则需要携带本地能力集(房间中可能有其他 peer 正在发送媒体,节省一次协议交互)。
async _joinRoom()
{...const { peers } = await this._protoo.request('join',{displayName : this._displayName,device : this._device,rtpCapabilities : this._consume? this._mediasoupDevice.rtpCapabilities: undefined,sctpCapabilities : this._useDataChannel && this._consume? this._mediasoupDevice.sctpCapabilities: undefined});...
}
服务器会返回房间中其他 peer,如果房间中其他用户正在发送媒体,要通知当前加入房间的 peer 接收媒体,接收媒体的逻辑放在后面再说,这里先按下不表。
async _handleProtooRequest(peer, request, accept, reject)
{...case 'join':{// Ensure the Peer is not already joined.if (peer.data.joined)throw new Error('Peer already joined');const {displayName,device,rtpCapabilities,sctpCapabilities} = request.data;// Store client data into the protoo Peer data object.peer.data.joined = true;peer.data.displayName = displayName;peer.data.device = device;peer.data.rtpCapabilities = rtpCapabilities;peer.data.sctpCapabilities = sctpCapabilities;// Tell the new Peer about already joined Peers.// And also create Consumers for existing Producers.const joinedPeers =[...this._getJoinedPeers(),...this._broadcasters.values()];// 返回除自己之外的Peer列表const peerInfos = joinedPeers.filter((joinedPeer) => joinedPeer.id !== peer.id).map((joinedPeer) => ({id : joinedPeer.id,displayName : joinedPeer.data.displayName,device : joinedPeer.data.device}));accept({ peers: peerInfos });...}...
}
5. Send media
加入房间后,如果本地存在音视频设备,并且策略允许发送,则立即开始发送本地的音频和视频。但发送媒体的前提是完成 SDP 协商,并在服务器创建对应的 producer。
5.1. Client::produce
加入房间成功后,如果允许发送媒体,则打开麦克风和摄像头。由于已经获取到服务器的能力集,并且创建了SendTransport,已经有充足的信息完成发送媒体的协商。
async _joinRoom()
{...if (this._produce){...// 打开麦克风this.enableMic();const devicesCookie = cookiesManager.getDevices();// 打开摄像头if (!devicesCookie || devicesCookie.webcamEnabled || this._externalVideo)this.enableWebcam();...}...
}
以 enableMic 为例,其内部会调用本地 SendTransport::produce 方法。
async enableMic()
{...this._micProducer = await this._sendTransport.produce({track,codecOptions :{opusStereo : true,opusDtx : true,opusFec : true,opusNack : true}});...
}
SentTransport::produce 方法内部先调用 Chrome111::send 方法。
async produce<ProducerAppData extends AppData = AppData>({track,encodings,codecOptions,codec,stopTracks = true,disableTrackOnPause = true,zeroRtpOnPause = false,onRtpSender,appData = {} as ProducerAppData,
}: ProducerOptions<ProducerAppData> = {}): Promise<Producer<ProducerAppData>
> {...const { localId, rtpParameters, rtpSender } =await this._handler.send({track,encodings: normalizedEncodings,codecOptions,codec,onRtpSender,});...
}
Chrome111::send 方法内部调用 setupTransport 准备 transport,然后完成 PeerConnection SDP 协商。
async send({track, encodings, codecOptions, codec,}: HandlerSendOptions): Promise<HandlerSendResult> {...// 调用PeerConnection接口添加transceiver,相当于添加SDP中的media section// 方向为sendonlyconst transceiver = this._pc.addTransceiver(track, {direction: 'sendonly',streams: [this._sendStream],});// 发送媒体需要客户端创建Offerlet offer = await this._pc.createOffer();// 同步等待设置服务器WebRtcTransport参数if (!this._transportReady) {await this.setupTransport({localDtlsRole: this._forcedLocalDtlsRole ?? 'client',localSdpObject,});}...await this._pc.setLocalDescription(offer);...await this._pc.setRemoteDescription(answer);...
}
Chrome111::setupTransport 方法触发 @connect 事件,携带本端 DTLS 参数。
private async setupTransport({localDtlsRole,localSdpObject,
}: {localDtlsRole: DtlsRole;localSdpObject?: any;
}): Promise<void> {if (!localSdpObject) {localSdpObject = sdpTransform.parse(this._pc.localDescription.sdp);}// Get our local DTLS parameters.const dtlsParameters = sdpCommonUtils.extractDtlsParameters({sdpObject: localSdpObject,});// Set our DTLS role.dtlsParameters.role = localDtlsRole;// Update the remote DTLS role in the SDP.this._remoteSdp!.updateDtlsRole(localDtlsRole === 'client' ? 'server' : 'client');// Need to tell the remote transport about our parameters.// 触发@connect事件,通知服务器await new Promise<void>((resolve, reject) => {this.safeEmit('@connect', { dtlsParameters }, resolve, reject);});this._transportReady = true;
}
Transport 监听 @connect 事件进一步触发 connect 事件。
handler.on('@connect',({ dtlsParameters }: { dtlsParameters: DtlsParameters },callback: () => void,errback: (error: Error) => void) => {...this.safeEmit('connect', { dtlsParameters }, callback, errback);}
);
5.2. connectWebRtcTransport
RoomClient 监听 connect 事件,向服务器发送 connectWebRtcTransport 请求,携带本端 DTLS 参数。
this._sendTransport.on('connect', ({ iceParameters, dtlsParameters }, callback, errback) =>{this._protoo.request('connectWebRtcTransport',{transportId : this._sendTransport.id,iceParameters,dtlsParameters}).then(callback).catch(errback);});
服务器收到 connectWebRtcTransport 请求,调用 WebRtcTransport::connect 接口。
async _handleProtooRequest(peer, request, accept, reject)
{switch (request.method){...case 'connectWebRtcTransport':{const { transportId, dtlsParameters } = request.data;const transport = peer.data.transports.get(transportId);if (!transport)throw new Error(`transport with id "${transportId}" not found`);await transport.connect({ dtlsParameters });accept();break;}...}...
}
WebRtcTransport::connect 方法会向 Worker 进程发送 WEBRTCTRANSPORT_CONNECT 消息,Worker 会设置 WebRTCTransport 的 DTLS 参数,并确定 DTLS 角色。
async connect({dtlsParameters,}: {dtlsParameters: DtlsParameters;}): Promise<void> {logger.debug('connect()');// 携带DTLS参数const requestOffset = createConnectRequest({builder: this.channel.bufferBuilder,dtlsParameters,});// 同步发送pipe消息const response = await this.channel.request(FbsRequest.Method.WEBRTCTRANSPORT_CONNECT,FbsRequest.Body.WebRtcTransport_ConnectRequest,requestOffset,this.internal.transportId);/* Decode Response. */const data = new FbsWebRtcTransport.ConnectResponse();response.body(data);// 设置DTLS角色:'auto' | 'client' | 'server'this.#data.dtlsParameters.role = dtlsRoleFromFbs(data.dtlsLocalRole());
}
5.3. Server::produce
WebRtcTransport 创建成功后,服务器回复 connectWebRtcTransport 响应,一路返回到调用 Chrome111::setupTransport 处,调用 setLocalDescription 和 setRemoteDescription 完成 PeerConnection SDP 协商,并触发“produce”事件。
async produce<ProducerAppData extends AppData = AppData>({track,encodings,codecOptions,codec,stopTracks = true,disableTrackOnPause = true,zeroRtpOnPause = false,onRtpSender,appData = {} as ProducerAppData,}: ProducerOptions<ProducerAppData> = {}): Promise<Producer<ProducerAppData>> {...const { localId, rtpParameters, rtpSender } =await this._handler.send({track,encodings: normalizedEncodings,codecOptions,codec,onRtpSender,});...const { id } = await new Promise<{ id: string }>((resolve, reject) => {this.safeEmit('produce',{kind: track.kind as MediaKind,rtpParameters,appData,},resolve,reject);});
RoomClient 在创建 SendTransport 时,已经监听了“produce”事件,客户端 SDP 协商已经完成,需要向服务器发送 produce 请求,在服务器上创建 producer。
async _joinRoom()
{...this._sendTransport.on('produce', async ({ kind, rtpParameters, appData }, callback, errback) =>{try{// eslint-disable-next-line no-shadowconst { id } = await this._protoo.request('produce',{transportId : this._sendTransport.id,kind,rtpParameters,appData});callback({ id });}catch (error){errback(error);}});...
}
服务器收到 produce 请求,调用 Transport::produce 方法。
async _handleProtooRequest(peer, request, accept, reject)
{switch (request.method){case 'produce':{...const producer = await transport.produce({kind,rtpParameters,appData// keyFrameRequestDelay: 5000});// Store the Producer into the protoo Peer data Object.peer.data.producers.set(producer.id, producer);...accept({ id: producer.id });// Optimization: Create a server-side Consumer for each Peer.for (const otherPeer of this._getJoinedPeers({ excludePeer: peer })){this._createConsumer({consumerPeer : otherPeer,producerPeer : peer,producer});}...break;}
Transport::produce 方法向 Worker 发送 TRANSPORT_PRODUCE 消息,Worker 创建 producer 对象。至此,媒体发送的前期工作都已完成,浏览器开始连接 Worker 服务端口开始媒体传输。
async produce<ProducerAppData extends AppData = AppData>({id = undefined,kind,rtpParameters,paused = false,keyFrameRequestDelay,appData,
}: ProducerOptions<ProducerAppData>): Promise<Producer<ProducerAppData>> {logger.debug('produce()');if (id && this.#producers.has(id)) {throw new TypeError(`a Producer with same id "${id}" already exists`);} else if (!['audio', 'video'].includes(kind)) {throw new TypeError(`invalid kind "${kind}"`);} else if (appData && typeof appData !== 'object') {throw new TypeError('if given, appData must be an object');}// Clone given RTP parameters to not modify input data.const clonedRtpParameters = utils.clone<RtpParameters>(rtpParameters);// This may throw.ortc.validateRtpParameters(clonedRtpParameters);// If missing or empty encodings, add one.if (!clonedRtpParameters.encodings ||!Array.isArray(clonedRtpParameters.encodings) ||clonedRtpParameters.encodings.length === 0) {clonedRtpParameters.encodings = [{}];}// Don't do this in PipeTransports since there we must keep CNAME value in// each Producer.if (this.constructor.name !== 'PipeTransport') {// If CNAME is given and we don't have yet a CNAME for Producers in this// Transport, take it.if (!this.#cnameForProducers &&clonedRtpParameters.rtcp &&clonedRtpParameters.rtcp.cname) {this.#cnameForProducers = clonedRtpParameters.rtcp.cname;}// Otherwise if we don't have yet a CNAME for Producers and the RTP// parameters do not include CNAME, create a random one.else if (!this.#cnameForProducers) {this.#cnameForProducers = utils.generateUUIDv4().substr(0, 8);}// Override Producer's CNAME.clonedRtpParameters.rtcp = clonedRtpParameters.rtcp ?? {};clonedRtpParameters.rtcp.cname = this.#cnameForProducers;}const routerRtpCapabilities = this.#getRouterRtpCapabilities();// This may throw.const rtpMapping = ortc.getProducerRtpParametersMapping(clonedRtpParameters,routerRtpCapabilities);// This may throw.const consumableRtpParameters = ortc.getConsumableRtpParameters(kind,clonedRtpParameters,routerRtpCapabilities,rtpMapping);const producerId = id || utils.generateUUIDv4();const requestOffset = createProduceRequest({builder: this.channel.bufferBuilder,producerId,kind,rtpParameters: clonedRtpParameters,rtpMapping,keyFrameRequestDelay,paused,});// 发送pipe消息const response = await this.channel.request(FbsRequest.Method.TRANSPORT_PRODUCE,FbsRequest.Body.Transport_ProduceRequest,requestOffset,this.internal.transportId);/* Decode Response. */const produceResponse = new FbsTransport.ProduceResponse();response.body(produceResponse);const status = produceResponse.unpack();const data = {kind,rtpParameters: clonedRtpParameters,type: producerTypeFromFbs(status.type),consumableRtpParameters,};const producer = new Producer<ProducerAppData>({internal: {...this.internal,producerId,},data,channel: this.channel,appData,paused,});this.#producers.set(producer.id, producer);producer.on('@close', () => {this.#producers.delete(producer.id);this.emit('@producerclose', producer);});// 通知transport新增了一个producerthis.emit('@newproducer', producer);// Emit observer event.this.#observer.safeEmit('newproducer', producer);return producer;
}
6. Receive Media
客户端从服务器接收媒体,有两个触发场景,处理逻辑是一样的。
1)Peer 加入房间时,如果房间中已经有其他 peeer 在发送媒体,则需要通知刚加入房间的 peer 接收其他 peer 发送的媒体。
2)Peer 加入房间后,开始发送媒体,服务器需要通知其他 peer 接收此 peer 发送的媒体。
6.1. newConsumer
当需要通知客户端接收媒体时,服务器会调用 _createConsumer 方法。_createConsumer 会先调用 consume 方法在 Worker 上创建 Consumer,然后向客户端发送 newConsumer 反向请求(带响应的通知)。
async _createConsumer({ consumerPeer, producerPeer, producer })
{...for (let i=0; i<consumerCount; i++){promises.push((async () =>{// Create the Consumer in paused mode.let consumer;try{consumer = await transport.consume({producerId : producer.id,rtpCapabilities : consumerPeer.data.rtpCapabilities,// Enable NACK for OPUS.enableRtx : true,paused : true});}...// Send a protoo request to the remote Peer with Consumer parameters.try{await consumerPeer.request('newConsumer',{peerId : producerPeer.id,producerId : producer.id,id : consumer.id,kind : consumer.kind,rtpParameters : consumer.rtpParameters,type : consumer.type,appData : producer.appData,producerPaused : consumer.producerPaused});...})());}...}
Transport::consume 方法向 Worker 发送 TRANSPORT_CONSUME 消息在 Worker 上创建 Consumer。创建成功后,触发“newConsumer”事件。
async consume<ConsumerAppData extends AppData = AppData>({producerId,rtpCapabilities,paused = false,mid,preferredLayers,ignoreDtx = false,enableRtx,pipe = false,appData,
}: ConsumerOptions<ConsumerAppData>): Promise<Consumer<ConsumerAppData>> {...// 构造请求const consumerId = utils.generateUUIDv4();const requestOffset = createConsumeRequest({builder: this.channel.bufferBuilder,producer,consumerId,rtpParameters,paused,preferredLayers,ignoreDtx,pipe,});// 向 Worker 发送消息创建 Consumer 并同步等待const response = await this.channel.request(FbsRequest.Method.TRANSPORT_CONSUME,FbsRequest.Body.Transport_ConsumeRequest,requestOffset,this.internal.transportId);...// 创建并保存 consumerconst consumer = new Consumer<ConsumerAppData>({});this.consumers.set(consumer.id, consumer);consumer.on('@close', () => this.consumers.delete(consumer.id));consumer.on('@producerclose', () => this.consumers.delete(consumer.id));// 触发 newConsumer 事件this.#observer.safeEmit('newconsumer', consumer);return consumer;
}
6.2. consume
RoomClient 监听 newConsumer 事件,调用 RecvTransport 的 consume 方法。
this._protoo.on('request', async (request, accept, reject) =>
{switch (request.method){case 'newConsumer':{...const {peerId,producerId,id,kind,rtpParameters,type,appData,producerPaused} = request.data;const consumer = await this._recvTransport.consume({id,producerId,kind,rtpParameters,streamId : `${peerId}-${appData.share ? 'share' : 'mic-webcam'}`,appData : { ...appData, peerId } // Trick.});...}...}...
}
consume 方法最终会调用 chrome111::receive 方法。
async consume<ConsumerAppData extends AppData = AppData>({id,producerId,kind,rtpParameters,streamId,onRtpReceiver,appData = {} as ConsumerAppData,
}: ConsumerOptions<ConsumerAppData>): Promise<Consumer<ConsumerAppData>> {...const consumerCreationTask = new ConsumerCreationTask({id,producerId,kind,rtpParameters: clonedRtpParameters,streamId,onRtpReceiver,appData,});// Store the Consumer creation task.this._pendingConsumerTasks.push(consumerCreationTask);// 这里使用微任务进行调度queueMicrotask(() => {if (this._closed) {return;}if (this._consumerCreationInProgress === false) {this.createPendingConsumers<ConsumerAppData>();}});return consumerCreationTask.promise as Promise<Consumer<ConsumerAppData>>;
}
private async createPendingConsumers<ConsumerAppData extends AppData,>(): Promise<void> {...const results = await this._handler.receive(optionsList);...
}
由于客户端在连接服务器的时候已经拿到了服务器的能力集,可以依次执行以下动作:
1)receive 方法构造 offer 并调用 setRemoteDescription。
2)创建 answer 成功后,立即调用 connectWebRtcTransport 通知服务器准备好 transport(具体参考produce,不再赘述)。
3)同步等待服务器响应,使用创建的 answer 调用 setLocalDescription,完成 PeerConnection SDP 协商。
至此,接收通道已经准备好,接下来客户端可以连接 Worker 服务开始接受媒体。
async receive(optionsList: HandlerReceiveOptions[]): Promise<HandlerReceiveResult[]> {...await this._pc.setRemoteDescription(offer);let answer = await this._pc.createAnswer();...if (!this._transportReady) {await this.setupTransport({localDtlsRole: this._forcedLocalDtlsRole ?? 'client',localSdpObject,});}await this._pc.setLocalDescription(answer);...
}
7. 总结
如果只是完成基本的媒体通信,相比基于 SDP 的 WebRTC 协商,使用 mediasoup的 WebRTC 协商看起来更加复杂,这种感觉是正常的。这是因为 mediasoup 使用 ORTC 接口规范,ORTC 旨在简化 WebRTC 的 API,提供更加模块化和面向对象的接口,它把 WebRTC 的一整坨 API 进行了面向对象的细粒度模块划分,能够实现更灵活的控制。但这种模块化的拆分也带来了复杂性,以前的协商只需要处理 SDP 即可(包罗万象),现在要分别处理 Transport、DTLS、能力集等,再加上 mediasoup 服务器上 Worker/Producer/Consumer/Transport 等概念,如下图所示,使得 mediasoup 的交互流程理解起来更加困难。
如果你深入阅读 mediasoup client SDK 源码,会发现 SDP 与 ORTC 之间的转换逻辑更加复杂,客户端的实现其实也不简单,要真正吃透 mediaosoup client SDK 实现,需要对 WebRTC、SDP、ORTC 都有相当的理解才行。这超出了本文所讨论的范围。