IPC-CFX有两种主要的通信方式,可以通过RabbitMQ发布和订阅,也可以通过request和response进行点对点的通信,本文主要讲的是点对点的通信方式。
在vscode里建立新的dotnet项目,可以通过终端输入dotnet new console来建立,文件目录为CFXDemo->machine1和CFXDemo->machine2。
通过nuget插件分别为两个项目都安装CFX.CFXSDK、AMQPNetLite.Core和Newtonsoft.Json这几个metapackage。
我们将machine1作为发送端(sendRequest),machine2作为接收端(Receive),则Machine1的代码如下所示:
using System.Threading;
using CFX;
using CFX.Transport;
using System;
using System.Security.Principal;namespace machine1
{class Program{static void Main(string[] args){OpenRequest();Console.ReadLine();for(int i = 0;i<5;i++){SendRequest();Thread.Sleep(2000);}}static string sendCFXHandle = "a.b.001"; static string receiveCFXHandle = "a.b.002";static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");#region send requeststatic AmqpCFXEndpoint endpointSendRequest;static void OpenRequest(){if (endpointSendRequest != null){endpointSendRequest.Close();endpointSendRequest = null;}endpointSendRequest = new AmqpCFXEndpoint();if (!endpointSendRequest.IsOpen){Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());endpointSendRequest.Open(sendCFXHandle); //这一步会绑定endpointSendRequest里的CFXHandle,即sendCFXHandle的值Console.WriteLine("Request.Source is : {0}",endpointSendRequest.CFXHandle);Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());}// Set a timeout of 20 seconds. If the target endpoint does not// respond in this time, the request will time out.AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);}static void SendRequest(){// Build a GetEndpointInfomation RequestCFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest(){CFXHandle = receiveCFXHandle});request.Source = endpointSendRequest.CFXHandle;request.Target = receiveCFXHandle;try{CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);Console.WriteLine($"response:\n{response.ToJson(true)}");}catch (Exception ex){Console.WriteLine(ex.Message);}}#endregion send request#region receive requeststatic AmqpCFXEndpoint endpointReceiveRequest;static void OpenReceive(){if (endpointReceiveRequest != null){endpointReceiveRequest.Close();endpointReceiveRequest = null;}endpointReceiveRequest = new AmqpCFXEndpoint();endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;if (!endpointReceiveRequest.IsOpen){Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());}}static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request){Console.WriteLine($"Endpoint_OnRequestReceived: { request.ToString()}");// Process request. Return Result.if (request.MessageBody is WhoIsThereRequest){CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse(){ CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });result.Source = receiveCFXHandle;result.Target = request.Source;result.TimeStamp = DateTime.Now;return result;}if (request.MessageBody is GetEndpointInformationRequest){CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse(){ CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "...." });result.Source = receiveCFXHandle;result.Target = request.Source;result.TimeStamp = DateTime.Now;return result;}return null;}#endregion receive request}
}
作为接收端,machine2的代码如下所示:
using CFX;
using CFX.Transport;
using System;namespace machine2
{class Program{static void Main(string[] args){Console.WriteLine("ReceivEndPoint is waiting Request......");OpenReceive();Console.WriteLine("Press Enter Key to end the App");Console.ReadKey();}static string sendCFXHandle = "a.b.001";static string receiveCFXHandle = "a.b.002";static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");#region send requeststatic AmqpCFXEndpoint endpointSendRequest;static void OpenRequest(){if (endpointSendRequest != null){endpointSendRequest.Close();endpointSendRequest = null;}endpointSendRequest = new AmqpCFXEndpoint();if (!endpointSendRequest.IsOpen){Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());endpointSendRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());}// Set a timeout of 20 seconds. If the target endpoint does not// respond in this time, the request will time out.AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);}static void SendRequest(){// Build a GetEndpointInfomation RequestCFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest(){CFXHandle = receiveCFXHandle});request.Source = endpointSendRequest.CFXHandle;request.Target = receiveCFXHandle;try{CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);Console.WriteLine($"response:\n{response.ToJson(true)}");}catch (Exception ex){Console.WriteLine(ex.Message);}}#endregion send request#region receive requeststatic AmqpCFXEndpoint endpointReceiveRequest;static void OpenReceive(){if (endpointReceiveRequest != null){endpointReceiveRequest.Close();endpointReceiveRequest = null;}endpointReceiveRequest = new AmqpCFXEndpoint();endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;if (!endpointReceiveRequest.IsOpen){Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));//endpointReceiveRequest.Open(receiveCFXHandle);Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());}}static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request){Console.WriteLine($"Endpoint_OnRequestReceived: { request.ToString()}");Console.WriteLine($"request:\n{request.ToJson(true)}");// Process request. Return Result.if (request.MessageBody is WhoIsThereRequest){CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse(){ CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });result.Source = receiveCFXHandle;result.Target = request.Source;result.TimeStamp = DateTime.Now;return result;}if (request.MessageBody is GetEndpointInformationRequest){CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse(){ CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "..." });result.Source = receiveCFXHandle;result.Target = request.Source;result.TimeStamp = DateTime.Now;return result;}return null;}#endregion receive request}
}
运行结果,可以用json格式对response和request的内容进行解析。