在前两讲《初探.Net Remoting服务端 Loading Remtoing配置内容的过程 》《初探.Net Remoting客户端 Loading Remtoing配置内容的过程 》中,我已经分析了Remoting 的Sink机制,接下来,就提供一个具体的范例:CompressionSink(原始SourceCode源于Advanced .Net Remoting 1StED)。 CompressionSink通过在客户端和服务端各自插入一个数据压缩-解压缩的Sink。目的是希望减少大数据量传递对网络带宽的占用,提高传输效率。下载SourceCode ,BTW,这个压缩Sink相对比较稳定,大家可以在各自的项目中放心使用。:-)
详细设计:
提供一个Assembly: CompressionSink.dll
它包括:
客户端:
CompressionSink.CompressionClientSinkProvider类和CompressionSink.CompressionClientSink类
服务端:
CompressionSink.CompressionServerSinkProvider类和CompressionSink.CompressionServerSink类
压缩类:CompressionHelper
压缩内核:NZipLib库。
客户端的配置文件 :
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.runtime.remoting>
<application>
<channels>
<channel ref="http">
<clientProviders>
<formatter ref="soap" />
<provider type="CompressionSink.CompressionClientSinkProvider, CompressionSink" />
</clientProviders>
</channel>
</channels>
<client>
<wellknown type="Service.SomeSAO, Service" url="http://localhost:5555/SomeSAO.soap" />
</client>
</application>
</system.runtime.remoting>
</configuration>
服务端的配置文件 :
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.runtime.remoting>
<application>
<channels>
<channel ref="http" port="5555">
<serverProviders>
<provider type="CompressionSink.CompressionServerSinkProvider, CompressionSink" />
<formatter ref="soap"/>
</serverProviders>
</channel>
</channels>
<service>
<wellknown mode="Singleton" type="Service.SomeSAO, Service" objectUri="SomeSAO.soap" />
</service>
</application>
</system.runtime.remoting>
</configuration>
1

public classCompressionClientSink: BaseChannelSinkWithProperties,
2

IClientChannelSink
3

{
4
private IClientChannelSink _nextSink;
5
6
public CompressionClientSink(IClientChannelSink next)
7
{
8
_nextSink = next;
9
}
10
11
public IClientChannelSink NextChannelSink
12
{
13
get
{
14
return _nextSink;
15
}
16
}
17
18
19
public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,
20
IMessage msg,
21
ITransportHeaders headers,
22
Stream stream)
23
{
24
25
26
// generate a compressed stream using NZipLib
27
stream = CompressionHelper.getCompressedStreamCopy(stream);
28
29
// push onto stack and forward the request
30
sinkStack.Push(this,null);
31
_nextSink.AsyncProcessRequest(sinkStack,msg,headers,stream);
32
}
33
34
35
public void AsyncProcessResponse(IClientResponseChannelSinkStack sinkStack,
36
object state,
37
ITransportHeaders headers,
38
Stream stream)
39
{
40
41
// deflate the response
42
stream =
43
CompressionHelper.getUncompressedStreamCopy(stream);
44
45
// forward the request
46
sinkStack.AsyncProcessResponse(headers,stream);
47
}
48
49
50
public Stream GetRequestStream(IMessage msg,
51
ITransportHeaders headers)
52
{
53
return _nextSink.GetRequestStream(msg, headers);
54
}
55
56
57
public void ProcessMessage(IMessage msg,
58
ITransportHeaders requestHeaders,
59
Stream requestStream,
60
out ITransportHeaders responseHeaders,
61
out Stream responseStream)
62
{
63
// generate a compressed stream using NZipLib
64
65
Stream localrequestStream =
66
CompressionHelper.getCompressedStreamCopy(requestStream);
67
68
Stream localresponseStream;
69
// forward the call to the next sink
70
_nextSink.ProcessMessage(msg,
71
requestHeaders,
72
localrequestStream,
73
out responseHeaders,
74
out localresponseStream);
75
76
// deflate the response
77
responseStream =
78
CompressionHelper.getUncompressedStreamCopy(localresponseStream);
79
80
}
81
}

usingSystem;

usingSystem.Runtime.Remoting.Channels;

usingSystem.Runtime.Remoting.Messaging;

usingSystem.IO;


namespaceCompressionSink


{

public class CompressionServerSink: BaseChannelSinkWithProperties,
IServerChannelSink

{

private IServerChannelSink _nextSink;

public CompressionServerSink(IServerChannelSink next)

{
_nextSink = next;
}

public IServerChannelSink NextChannelSink

{
get

{
return _nextSink;
}
}

public void AsyncProcessResponse(IServerResponseChannelSinkStack sinkStack,
object state,
IMessage msg,
ITransportHeaders headers,
Stream stream)

{
// compressing the response
stream=CompressionHelper.getCompressedStreamCopy(stream);

// forwarding to the stack for further processing
sinkStack.AsyncProcessResponse(msg,headers,stream);
}

public Stream GetResponseStream(IServerResponseChannelSinkStack sinkStack,
object state,
IMessage msg,
ITransportHeaders headers)

{
return null;
}

public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,
IMessage requestMsg,
ITransportHeaders requestHeaders,
Stream requestStream,
out IMessage responseMsg,
out ITransportHeaders responseHeaders,
out Stream responseStream)

{
// uncompressing the request
Stream localrequestStream =
CompressionHelper.getUncompressedStreamCopy(requestStream);

// pushing onto stack and forwarding the call
sinkStack.Push(this,null);

Stream localresponseStream;
ServerProcessing srvProc = _nextSink.ProcessMessage(sinkStack,
requestMsg,
requestHeaders,
localrequestStream,
out responseMsg,
out responseHeaders,
out localresponseStream);

// compressing the response
responseStream=
CompressionHelper.getCompressedStreamCopy(localresponseStream);

// returning status information
return srvProc;
}
}
}
1

public classCompressionHelper
2

{
3
4
/**//// <summary>
5
/// refactor by zendy
6
/// </summary>
7
/// <param name="inStream"></param>
8
/// <returns></returns>
9
public static Stream getCompressedStreamCopy(Stream inStream)
10
{
11
MemoryStream outStream = new MemoryStream();
12
Deflater mDeflater = new Deflater(Deflater.BEST_COMPRESSION);
13
DeflaterOutputStream compressStream = new DeflaterOutputStream(outStream,mDeflater);
14
15
byte[] buf = new Byte[4096];
16
int cnt = inStream.Read(buf,0,4096);
17
while (cnt>0)
{
18
compressStream.Write(buf,0,cnt);
19
cnt = inStream.Read(buf,0,4096);
20
}
21
compressStream.Finish();
22
//modify by zendy //这个设置非常重要,否则会导致后续Sink在处理该stream时失败,在原来的源码中就是因为没有这个处理导致程序运行失败
23
outStream.Seek(0,SeekOrigin.Begin);
24
return outStream;
25
}
26
27
/**//// <summary>
28
/// refactor by zendy
29
/// </summary>
30
/// <param name="inStream"></param>
31
/// <returns></returns>
32
public static Stream getUncompressedStreamCopy(Stream inStream)
33
{
34
InflaterInputStream unCompressStream = new InflaterInputStream(inStream);
35
MemoryStream outStream = new MemoryStream();
36
int mSize;
37
Byte[] mWriteData = new Byte[4096];
38
while(true)
39
{
40
mSize = unCompressStream.Read(mWriteData, 0, mWriteData.Length);
41
if (mSize > 0)
42
{
43
outStream.Write(mWriteData, 0, mSize);
44
}
45
else
46
{
47
break;
48
}
49
}
50
unCompressStream.Close();
51
//modify by zendy//这个设置非常重要,否则会导致后续Sink在处理该stream时失败,,在原来的源码中就是因为没有这个处理导致程序运行失败
52
outStream.Seek(0,SeekOrigin.Begin);
53
return outStream;
54
}
55
}
BTW,这个Sink还可以扩展,比如判断需要压缩Stream的大小,如果很大,就压缩,否则不压缩(可以在responseHeaders和requestHeaders添加是否 已经压缩的标记)