在前两讲《初探.Net Remoting服务端 Loading Remtoing配置内容的过程 》《初探.Net Remoting客户端 Loading Remtoing配置内容的过程 》中,我已经分析了Remoting 的Sink机制,接下来,就提供一个具体的范例:CompressionSink(原始SourceCode源于Advanced .Net Remoting 1StED)。 CompressionSink通过在客户端和服务端各自插入一个数据压缩-解压缩的Sink。目的是希望减少大数据量传递对网络带宽的占用,提高传输效率。下载SourceCode ,BTW,这个压缩Sink相对比较稳定,大家可以在各自的项目中放心使用。:-)
详细设计:
提供一个Assembly: CompressionSink.dll
它包括:
客户端:
CompressionSink.CompressionClientSinkProvider类和CompressionSink.CompressionClientSink类
服务端:
CompressionSink.CompressionServerSinkProvider类和CompressionSink.CompressionServerSink类
压缩类:CompressionHelper
压缩内核:NZipLib库。
客户端的配置文件 :
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.runtime.remoting>
<application>
<channels>
<channel ref="http">
<clientProviders>
<formatter ref="soap" />
<provider type="CompressionSink.CompressionClientSinkProvider, CompressionSink" />
</clientProviders>
</channel>
</channels>
<client>
<wellknown type="Service.SomeSAO, Service" url="http://localhost:5555/SomeSAO.soap" />
</client>
</application>
</system.runtime.remoting>
</configuration>
服务端的配置文件 :
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.runtime.remoting>
<application>
<channels>
<channel ref="http" port="5555">
<serverProviders>
<provider type="CompressionSink.CompressionServerSinkProvider, CompressionSink" />
<formatter ref="soap"/>
</serverProviders>
</channel>
</channels>
<service>
<wellknown mode="Singleton" type="Service.SomeSAO, Service" objectUri="SomeSAO.soap" />
</service>
</application>
</system.runtime.remoting>
</configuration>
{
private IClientChannelSinkProvider _nextProvider;
public CompressionClientSinkProvider(IDictionary properties, ICollection providerData)
{
// not yet needed
}
public IClientChannelSinkProvider Next
{
get {
return _nextProvider;
}
set {
_nextProvider = value;
}
}
public IClientChannelSink CreateSink(IChannelSender channel, string url, object remoteChannelData)
{
// create other sinks in the chain
IClientChannelSink next = _nextProvider.CreateSink(channel,
url,
remoteChannelData);
// put our sink on top of the chain and return it
return new CompressionClientSink(next);
}
}
2 IClientChannelSink
3 {
4 private IClientChannelSink _nextSink;
5
6 public CompressionClientSink(IClientChannelSink next)
7 {
8 _nextSink = next;
9 }
10
11 public IClientChannelSink NextChannelSink
12 {
13 get {
14 return _nextSink;
15 }
16 }
17
18
19 public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,
20 IMessage msg,
21 ITransportHeaders headers,
22 Stream stream)
23 {
24
25
26 // generate a compressed stream using NZipLib
27 stream = CompressionHelper.getCompressedStreamCopy(stream);
28
29 // push onto stack and forward the request
30 sinkStack.Push(this,null);
31 _nextSink.AsyncProcessRequest(sinkStack,msg,headers,stream);
32 }
33
34
35 public void AsyncProcessResponse(IClientResponseChannelSinkStack sinkStack,
36 object state,
37 ITransportHeaders headers,
38 Stream stream)
39 {
40
41 // deflate the response
42 stream =
43 CompressionHelper.getUncompressedStreamCopy(stream);
44
45 // forward the request
46 sinkStack.AsyncProcessResponse(headers,stream);
47 }
48
49
50 public Stream GetRequestStream(IMessage msg,
51 ITransportHeaders headers)
52 {
53 return _nextSink.GetRequestStream(msg, headers);
54 }
55
56
57 public void ProcessMessage(IMessage msg,
58 ITransportHeaders requestHeaders,
59 Stream requestStream,
60 out ITransportHeaders responseHeaders,
61 out Stream responseStream)
62 {
63 // generate a compressed stream using NZipLib
64
65 Stream localrequestStream =
66 CompressionHelper.getCompressedStreamCopy(requestStream);
67
68 Stream localresponseStream;
69 // forward the call to the next sink
70 _nextSink.ProcessMessage(msg,
71 requestHeaders,
72 localrequestStream,
73 out responseHeaders,
74 out localresponseStream);
75
76 // deflate the response
77 responseStream =
78 CompressionHelper.getUncompressedStreamCopy(localresponseStream);
79
80 }
81 }
2 {
3 private IServerChannelSinkProvider _nextProvider;
4
5 public CompressionServerSinkProvider(IDictionary properties, ICollection providerData)
6 {
7 // not yet needed
8 }
9
10 public IServerChannelSinkProvider Next
11 {
12 get {
13 return _nextProvider;
14 }
15 set {
16 _nextProvider = value;
17 }
18 }
19
20 public IServerChannelSink CreateSink(IChannelReceiver channel)
21 {
22 // create other sinks in the chain
23 IServerChannelSink next = _nextProvider.CreateSink(channel);
24
25 // put our sink on top of the chain and return it
26 return new CompressionServerSink(next);
27 }
28
29 public void GetChannelData(IChannelDataStore channelData)
30 {
31 // not yet needed
32 }
33
34 }
usingSystem.Runtime.Remoting.Channels;
usingSystem.Runtime.Remoting.Messaging;
usingSystem.IO;
namespaceCompressionSink
{
public class CompressionServerSink: BaseChannelSinkWithProperties,
IServerChannelSink
{
private IServerChannelSink _nextSink;
public CompressionServerSink(IServerChannelSink next)
{
_nextSink = next;
}
public IServerChannelSink NextChannelSink
{
get
{
return _nextSink;
}
}
public void AsyncProcessResponse(IServerResponseChannelSinkStack sinkStack,
object state,
IMessage msg,
ITransportHeaders headers,
Stream stream)
{
// compressing the response
stream=CompressionHelper.getCompressedStreamCopy(stream);
// forwarding to the stack for further processing
sinkStack.AsyncProcessResponse(msg,headers,stream);
}
public Stream GetResponseStream(IServerResponseChannelSinkStack sinkStack,
object state,
IMessage msg,
ITransportHeaders headers)
{
return null;
}
public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,
IMessage requestMsg,
ITransportHeaders requestHeaders,
Stream requestStream,
out IMessage responseMsg,
out ITransportHeaders responseHeaders,
out Stream responseStream)
{
// uncompressing the request
Stream localrequestStream =
CompressionHelper.getUncompressedStreamCopy(requestStream);
// pushing onto stack and forwarding the call
sinkStack.Push(this,null);
Stream localresponseStream;
ServerProcessing srvProc = _nextSink.ProcessMessage(sinkStack,
requestMsg,
requestHeaders,
localrequestStream,
out responseMsg,
out responseHeaders,
out localresponseStream);
// compressing the response
responseStream=
CompressionHelper.getCompressedStreamCopy(localresponseStream);
// returning status information
return srvProc;
}
}
}
2 {
3
4 /**//// <summary>
5 /// refactor by zendy
6 /// </summary>
7 /// <param name="inStream"></param>
8 /// <returns></returns>
9 public static Stream getCompressedStreamCopy(Stream inStream)
10 {
11 MemoryStream outStream = new MemoryStream();
12 Deflater mDeflater = new Deflater(Deflater.BEST_COMPRESSION);
13 DeflaterOutputStream compressStream = new DeflaterOutputStream(outStream,mDeflater);
14
15 byte[] buf = new Byte[4096];
16 int cnt = inStream.Read(buf,0,4096);
17 while (cnt>0) {
18 compressStream.Write(buf,0,cnt);
19 cnt = inStream.Read(buf,0,4096);
20 }
21 compressStream.Finish();
22 //modify by zendy //这个设置非常重要,否则会导致后续Sink在处理该stream时失败,在原来的源码中就是因为没有这个处理导致程序运行失败
23 outStream.Seek(0,SeekOrigin.Begin);
24 return outStream;
25 }
26
27 /**//// <summary>
28 /// refactor by zendy
29 /// </summary>
30 /// <param name="inStream"></param>
31 /// <returns></returns>
32 public static Stream getUncompressedStreamCopy(Stream inStream)
33 {
34 InflaterInputStream unCompressStream = new InflaterInputStream(inStream);
35 MemoryStream outStream = new MemoryStream();
36 int mSize;
37 Byte[] mWriteData = new Byte[4096];
38 while(true)
39 {
40 mSize = unCompressStream.Read(mWriteData, 0, mWriteData.Length);
41 if (mSize > 0)
42 {
43 outStream.Write(mWriteData, 0, mSize);
44 }
45 else
46 {
47 break;
48 }
49 }
50 unCompressStream.Close();
51 //modify by zendy//这个设置非常重要,否则会导致后续Sink在处理该stream时失败,,在原来的源码中就是因为没有这个处理导致程序运行失败
52 outStream.Seek(0,SeekOrigin.Begin);
53 return outStream;
54 }
55 }
BTW,这个Sink还可以扩展,比如判断需要压缩Stream的大小,如果很大,就压缩,否则不压缩(可以在responseHeaders和requestHeaders添加是否 已经压缩的标记)