.NET Standard支持一组新的API,System.Span, System.Memory,还有System.IO.Pipelines。这几个新的API极大了提升了.NET程序的效能,将来.NET很多基础API都会使用它们进行重写。
Pipelines旨在解决.NET编写Socket通信程序时的很多困难,相信读者也对此不胜其烦,使用stream模型进行编程,就算能够解决,也是实在麻烦。
System.IO.Pipelines使用简单的内存片段来管理数据,可以极大的简化编写程序的过程。关于Pipelines的详细介绍,可以看看这里。现在ASP.NET Core中使用的Kestrel已经在使用这个API。(话说这个东西貌似就是Kestrel团队搞出来的。)
可能是直接需要用Socket场景有限(物联网用的还挺多的),Pipelines相关的资料感觉不是很多。官方给出的示例是基于ASCII协议的,有固定结尾的协议,这里我以物联网设备常用的BINARY二进制自定义协议为例,讲解基于Pipelines的程序套路。
与基于Stream的方式不同,pipelines提供一个pipe,用于存储数据,pipe中间存储的数据有点链表的感觉,可以基于SequencePosition
进行slice操作,这样就能得到一个ReadOnlySequence<T>
对象。reader可以进行自定义操作,并在操作完成之后告诉pipe已经处理了多少数据,整个过程是不需要进行内存复制操作的,因此性能得到了提升,还少了很多麻烦。可以简单理解作为服务器端,流程:
接受数据循环:接到数据->放pipe里面->告诉pipe放了多少数据
处理数据循环:在pipe里面找一条完整数据->交给处理流程->告诉pipe处理了多少数据
有一款设备,binary协议,数据包开头0x75, 0xbd, 0x7e, 0x97一共4个字节,随后跟数据包长度2个字节(固定2400字节,不固定长度也可以参照),随后是数据区。在设备连接成功之后,数据主动从设备发送到PC。
虽然是.NET Core平台的,但是.NET FRAMEWORK 4.6.1上面也可以nuget安装,直接
install-package system.io.pipelines
进行安装就可以了。Socket相关处理的代码不再写了,只列关键的。
代码第一步是声明pipe。
private async void InitPipe(Socket socket)
{
Pipe pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(socket, pipe.Reader);
await Task.WhenAll(reading, writing);
}
pipe有reader还有一个writer,reader负责读取pipe数据,主要用在数据处理循环,writer负责将数据写入pipe,主要用在数据接受循环。
private async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 1024 * 1024;
while (running)
{
try
{
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
if (!MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)memory, out ArraySegment<byte> arraySegment))
{
throw new InvalidOperationException("Buffer backed by array was expected");
}
int bytesRead = await SocketTaskExtensions.ReceiveAsync(socket, arraySegment, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
writer.Advance(bytesRead);
}
catch
{
break;
}
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
writer.Complete();
}
private async Task ReadPipeAsync(Socket socket, PipeReader reader)
{
while (running)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition? position = null;
do
{
position = buffer.PositionOf((byte)0x75);
if (position != null)
{
var headtoCheck = buffer.Slice(position.Value, 4).ToArray();
if (headtoCheck.SequenceEqual(new byte[] { 0x75, 0xbd, 0x7e, 0x97 }))
{
if (buffer.Slice(position.Value).Length >= 2400)
{
var mes = buffer.Slice(position.Value, 2400);
await ProcessMessage(mes.ToArray());
var next = buffer.GetPosition(2400, position.Value);
buffer = buffer.Slice(next);
}
else
{
break;
}
}
else
{
var next = buffer.GetPosition(1, position.Value);
buffer = buffer.Slice(next);
}
}
}
while (position != null);
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
{
break;
}
}
reader.Complete();
}
以上代码基本解决了以下问题:
数据接收不完整,找不到开头结尾,导致数据大量丢弃,或者自己维护一个queue的代码复杂性
数据接收与处理的同步问题
一次性收到多条数据的情况
本文只是解释了pipeline处理的模式,对于茫茫多的ToArray方法,可以使用基于Span的操作进行优化(有时间就来填坑)。另外,如果在await ProcessMessage(mes.ToArray());
这里,直接使用Task.Run(()=>ProcessMessage(mes);
代替的话,实测会出现莫名其妙的问题,很有可能是pipe运行快,在系统调度Task之前,已经将内存释放导致的,如果需要优化这一块的话,需要格外注意。
原文地址:https://www.cnblogs.com/podolski/p/10807204.html
.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com