前言
现在直播平台由于弹幕的存在,主播与观众可以更轻松地进行互动,非常受年轻群众的欢迎。斗鱼TV就是一款非常流行的直播平台,弹幕更是非常火爆。看到有不少主播接入 弹幕语音播报器
、 弹幕点歌
等模块,这都需要首先连接斗鱼弹幕。
经常看到其它编程语言的开发者,分享了他们斗鱼弹幕客户端的代码。.NET
当然也能做,还能做得更好(只是不知为何很少见人分享?)。
本文将包含以下内容:
我将使用斗鱼TV官方公开的弹幕PDF文档,使用
Socket
/TcpClient
连续斗鱼弹幕;分析如何利用
.NET
强大的ValueTask
特性,在保持代码简洁的同时,轻松享受高性能异步代码的快乐;然后将使用
ReactiveExtensions
(RX
),演示如何将一系列复杂的弹幕接入操作,就像写HelloWorld
一般容易;用我自制的“准游戏引擎”
FlysEngine
,只需少量代码,即可将斗鱼TV的弹幕显示左右飞过的效果;
本文内容可能比较多,因此分上、下两篇阐述,上篇将具体聊聊第1、2点,第3、4点将在下篇进行,整篇完成后,最终效果如下:
斗鱼直播API
现在网上可以轻松找到 斗鱼弹幕服务器第三方接入协议v1.6.2.pdf
(网上搜索该关键字即可找到)。文档提到,第三方接入弹幕服务的服务器为 openbarrage.douyutv.com:8601
,我们可以使用 TcpClient
来方便连接:
using (var client = new TcpClient())
{ client.ConnectAsync("openbarrage.douyutv.com", 8601).Wait(); Stream stream = client.GetStream(); // do other works
}
该文档中提到所有数据包格式如下:
注意前两个4字节的消息长度是完全一样的,可以使用 Debug.Assert
进行断言。
其中所有数字都为小端整数,刚好 .NET
的 BinaryWriter
类默认都以小端整数进行转换。可以利用起来。
因此,读取一个消息包的完整代码如下:
using (var reader = new BinaryReader(stream, Encoding.UTF8, true))
{ var fullMsgLength = reader.ReadInt32(); var fullMsgLength2 = reader.ReadInt32(); Debug.Assert(fullMsgLength == fullMsgLength2); var length = fullMsgLength - 1 - 4 - 4; var packType = reader.ReadInt16(); Debug.Assert(packType == ServerSendToClient); var encrypted = reader.ReadByte(); Debug.Assert(encrypted == Encrypted); var reserved = reader.ReadByte(); Debug.Assert(reserved == Reserved); var bytes = reader.ReadBytes(length); var zero = reader.ReadByte(); Debug.Assert(zero == ByteZero);
}
其中 bytes
既是数据部分,根据 pdf
文档中的规定,该部分为 UTF-8
编码,在 C#
中使用 Encoding.UTF8.GetString()
即可获取其字符串,该字符串长这样子:
type@=chatmsg/rid@=633019/ct@=1/uid@=124155/nn@=夜科扬羽/txt@=这不压个蜥蜴/cid@=602c7f1becf2419962a6520300000000/ic@=avatar@S000@S12@S41@S55_avatar/level@=21/sahf@=0/cst@=1570891500125/bnn@=賊开心/bl@=8/brid@=5789561/hc@=21ebd5b2c86c01e0565453e45f14ca5b/el@=/lk@=/urlev@=10/
该格式不是 JSON
/ XML
等,但仔细分析又确实有逻辑,有层次感,根据文档,该格式为所谓的 STT
序列化,该格式包含键值对、数组等多种格式。虽然不懂为什么不用 JSON
。还好协议简单,我可以通过寥寥几行代码,即可转换为 Json.NET
的 JToken
格式:
public static JToken DecodeStringToJObject(string str)
{ if (str.Contains("//")) // 数组 { var result = new JArray(); foreach (var field in str.Split(new[] { "//" }, StringSplitOptions.RemoveEmptyEntries)) { result.Add(DecodeStringToJObject(field)); } return result; } if (str.Contains("@=")) // 对象 { var result = new JObject(); foreach (var field in str.Split(new[] { '/' }, StringSplitOptions.RemoveEmptyEntries)) { var tokens = field.Split(new[] { "@=" }, StringSplitOptions.None); var k = tokens[0]; var v = UnscapeSlashAt(tokens[1]); result[k] = DecodeStringToJObject(v); } return result; } else if (str.Contains("@A=")) // 键值对 { return DecodeStringToJObject(UnscapeSlashAt(str)); } else { return UnscapeSlashAt(str); // 值 }
}
static string EscapeSlashAt(string str)
{ return str .Replace("/", "@S") .Replace("@", "@A");
}
static string UnscapeSlashAt(string str)
{ return str .Replace("@S", "/") .Replace("@A", "@");
}
这样一来,即可将 STT
格式转换为 JSON
格式,因此只需像 JSON
格式取出 nn
字段和 txt
字段即可,还有一个 col
字段,可以用来确定弹幕颜色,我可以将其转换为 RGB
的 int32
值:
Color = (x["col"] ?? new JValue(0)).Value<int>() switch
{ 1 => 0xff0000, // 红 2 => 0x1e87f0, // 浅蓝 3 => 0x7ac84b, // 浅绿 4 => 0xff7f00, // 橙色 5 => 0x9b39f4, // 紫色 6 => 0xff69b4, // 洋红 _ => 0xffffff, // 默认,白色
}
该代码使用了 C# 8.0
的 switchexpression
功能,可以一个表达式转成整个颜色转换,比 if/else
和 switch/case
语句都精简不少,可谓一气呵成。
支持异步/ ValueTask
/ Memory<T>
优化
C# 5.0
提供了强大的异步 API
—— async/await
,通过异步API,以前难以用编程实现的操作现在可以像写串行代码一样轻松完成,还能轻松加入取消任务操作。
然后 C# 7.0
发布了 ValueTask
, ValueTask
是值类型,因此在频繁调用异步操作(如使用 Stream
读取字节)时,不会因为创建过多的 Task
而分配没必要的内存。这里,我确实是使用 TCP
连接流读取字节,是使用 ValueTask
的最佳时机。
这里我们将尝试将代码切换为 ValueTask
版本。
首先第一个问题是 BinaryReader
类,该类提供了便利的字节操作方式,且能确保字节端为小端,但该类不提供异步 API
,因此需要作一些特殊处理:
public static async Task<string> RecieveAsync(Stream stream, CancellationToken cancellationToken)
{ int fullMsgLength = await ReadInt32().ConfigureAwait(false); int fullMsgLength2 = await ReadInt32().ConfigureAwait(false); Debug.Assert(fullMsgLength == fullMsgLength2); int length = fullMsgLength - 1 - 4 - 4; short packType = await ReadInt16().ConfigureAwait(false); Debug.Assert(packType == ServerSendToClient); short encrypted = await ReadByte().ConfigureAwait(false); Debug.Assert(encrypted == Encrypted); short reserved = await ReadByte().ConfigureAwait(false); Debug.Assert(reserved == Reserved); Memory<byte> bytes = await ReadBytes(length).ConfigureAwait(false); byte zero = await ReadByte().ConfigureAwait(false); Debug.Assert(zero == ByteZero); return Encoding.UTF8.GetString(bytes.Span);
}
如代码所示,我封装了 ReadInt16()
和 ReadInt32()
两个方法,
var intBuffer = new byte[4];
var int32Buffer = new Memory<byte>(intBuffer, 0, 4);
async ValueTask<int> ReadInt32()
{ var memory = int32Buffer; int read = 0; while (read < 4) { read += await stream.ReadAsync(memory.Slice(read), cancellationToken).ConfigureAwait(false); } Debug.Assert(read == memory.Length); return (intBuffer[0] << 0) + (intBuffer[1] << 8) + (intBuffer[2] << 16) + (intBuffer[3] << 24);
}
如图,我还使用了一个 while
语句,因为不像 BinaryReader
,如果一次无法读取所需的字节数(4个字节), stream.ReadAsync()
并不会堵塞线程。然后需要将 int32Buffer
转换为 int
类型。
注意:此处我没有使用
BitConverter.ToInt32()
,也不能使用该方法,因为该方法不像BinaryReader
,它在大端/小端的CPU
上会有不同的行为。(其中在大端CPU
上将有错误的行为)涉及二进制序列化需要传输的,不能使用BitConverter
类。
同样的,写 TCP
流也需要有相应的变化:
static async Task SendAsync(Stream stream, byte[] body, CancellationToken cancellationToken)
{ var buffer = new byte[4]; await stream.WriteAsync(GetBytesI32(4 + 4 + body.Length + 1), cancellationToken).ConfigureAwait(false); await stream.WriteAsync(GetBytesI32(4 + 4 + body.Length + 1), cancellationToken).ConfigureAwait(false); await stream.WriteAsync(GetBytesI16(ClientSendToServer), cancellationToken).ConfigureAwait(false); await stream.WriteAsync(new byte[] { Encrypted}, cancellationToken).ConfigureAwait(false); await stream.WriteAsync(new byte[] { Reserved}, cancellationToken).ConfigureAwait(false); await stream.WriteAsync(body, cancellationToken).ConfigureAwait(false); await stream.WriteAsync(new byte[] { ByteZero}, cancellationToken).ConfigureAwait(false); Memory<byte> GetBytesI32(int v) { buffer[0] = (byte)v; buffer[1] = (byte)(v >> 8); buffer[2] = (byte)(v >> 16); buffer[3] = (byte)(v >> 24); return new Memory<byte>(buffer, 0, 4); } Memory<byte> GetBytesI16(short v) { buffer[0] = (byte)v; buffer[1] = (byte)(v >> 8);; return new Memory<byte>(buffer, 0, 2); }
}
总结
最终运行效果如下:
这一篇【DotNet骚操作】文章介绍了如何使用斗鱼tv开放弹幕 API
,下篇将会:
共享本文所使用的所有完整的源代码;
介绍如何使用
ReactiveExtensions
(RX
),演示这一系列操作用起来,就像写HelloWorld
一样简单;用我自制的“准游戏引擎”
FlysEngine
,只需少量代码,即可实现桌面弹幕的效果;
敬请期待!“刷一波666???”