[译]RabbitMQ教程C#版 - 远程过程调用(RPC)

先决条件
本教程假定 RabbitMQ 已经安装,并运行在localhost标准端口(5672)。如果你使用不同的主机、端口或证书,则需要调整连接设置。

从哪里获得帮助
如果您在阅读本教程时遇到困难,可以通过邮件列表 联系我们。

在第 教程[2] 中,我们学习了如何使用工作队列在多个工作单元之间分配耗时任务。

但是如果我们想要运行一个在远程计算机上的函数并等待其结果呢?这将是另外一回事了。这种模式通常被称为 远程过程调用 或 RPC 。

在本篇教程中,我们将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可扩展的 RPC 服务器。由于我们没有什么耗时任务值得分发,那干脆就创建一个返回斐波那契数列的虚拟 RPC 服务吧。

客户端接口

为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。该类将暴露一个名为Call的方法,用来发送 RPC 请求并且保持阻塞状态,直到接收到应答为止。

var rpcClient = new RPCClient();Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);rpcClient.Close();

关于 RPC 的说明

尽管 RPC 在计算机中是一种很常见的模式,但它经常受到批评。问题出现在当程序员不知道一个函数是本地调用还是一个耗时的 RPC 请求。这样的混淆,会导致系统不可预测,以及给调试增加不必要的复杂性。误用 RPC 可能会导致不可维护的混乱代码,而不是简化软件。

牢记这些限制,请考虑如下建议:

  • 确保可以明显区分哪些函数是本地调用,哪些是远程调用。

  • 为您的系统编写文档,明确组件之间的依赖关系。

  • 捕获异常,当 RPC 服务长时间宕机时客户端该如何应对。

当有疑问的时候可以先避免使用 RPC。如果可以的话,考虑使用异步管道 - 而不是类似 RPC 的阻塞,其会将结果以异步的方式推送到下一个计算阶段。

回调队列

一般来讲,基于 RabbitMQ 进行 RPC 通信是非常简单的,客户端发送一个请求消息,然后服务端用一个响应消息作为应答。为了能接收到响应,我们需要在发送请求过程中指定一个'callback'队列地址。

Copy

var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName;var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "",                     routingKey: "rpc_queue",                     basicProperties: props,                     body: messageBytes);// ... then code to read a response message from the callback_queue ...

消息属性

AMQP 0-9-1 协议在消息中预定义了一个包含 14 个属性的集合,大多数属性很少使用,但以下情况除外:
Persistent:将消息标记为持久的(值为2)或者瞬时的(其他值),可以参考 教程[2]。
DeliveryMode:熟悉 AMQP 协议的人可以选择此属性而不是熟悉协议的人可以选择使用此属性而不是Persistent,它们控制的东西是一样的。
ContentType:用于描述编码的 mime 类型。例如,对于经常使用的 JSON 编码,将此属性设置为:application/json是一种很好的做法。
ReplyTo:通常用于命名回调队列。
CorrelationId:用于将 RPC 响应与请求相关联。

关联ID

在上面介绍的方法中,我们建议为每个 RPC 请求创建一个回调队列,但是这种方式效率低。幸运的是我们有一种更好的方式,那就是为每个客户端创建一个独立的回调队列。

这种方式会引出一个新的问题,在收到响应的回调队列中,它无法区分响应属于哪一个请求,此时便是CorrelationId属性的所用之处。我们将为每个请求的CorrelationId设置一个唯一值。之后当我们在回调队列接收到响应的时候,再去检查下这个属性是否和请求中的值匹配,如此一来,我们就可以把响应和请求关联起来了。如果出现一个未知的CorrelationId值,我们可以安全的销毁这个消息,因为这个消息不属于我们的请求。

你可能会问,为什么我们应该忽略回调队列中的未知的消息,而不是用错误来标识失败呢?这是因为于服务器端可能存在竞争条件。虽然不太可能,但是 RPC 服务器可能在仅发送了响应消息而未发送消息确认的情况下挂掉,如果出现这种情况,RPC 服务器重启之后将会重新处理该请求。这就是为什么在客户端上我们必须优雅地处理重复的响应,并且理想情况下 RPC 应该是幂等的。

总结

640?wx_fmt=png

我们的 RPC 会是这样工作:

  • 客户端启动时,会创建一个匿名的独占回调队列。

  • 对于 RPC 请求,客户端发送带有两个属性的消息:ReplyTo(设置为回调队列)和CorrelationId(为每个请求设置唯一值)。

  • 请求被发送到rpc_queue队列。

  • RPC 工作线程(或者叫:服务器)正在等待该队列上的请求。当出现请求时,它会执行该作业,并使用ReplyTo属性设置的队列将带有结果的消息发送回客户端。

  • 客户端等待回调队列上的数据。出现消息时,它会检查CorrelationId属性。如果它与请求中的值匹配,则返回对应用程序的响应。

组合在一起

斐波纳契 任务:

private static int fib(int n){    if (n == 0 || n == 1) return n;    return fib(n - 1) + fib(n - 2);
}

我们宣布我们的斐波那契函数。并假定只允许有效的正整数输入。 (不要期望这个适用于大数字,它可能是最慢的递归实现)。

我们的 RPC 服务端代码 RPCServer.cs 看起来如下所示:

using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;class RPCServer{    public static void Main()    {        var factory = new ConnectionFactory() { HostName = "localhost" };        using (var connection = factory.CreateConnection())        using (var channel = connection.CreateModel()){channel.QueueDeclare(queue: "rpc_queue", durable: false,exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 1, false);            var consumer = new EventingBasicConsumer(channel);channel.BasicConsume(queue: "rpc_queue",autoAck: false, consumer: consumer);Console.WriteLine(" [x] Awaiting RPC requests");consumer.Received += (model, ea) =>{                string response = null;                var body = ea.Body;                var props = ea.BasicProperties;                var replyProps = channel.CreateBasicProperties();replyProps.CorrelationId = props.CorrelationId;                try{                    var message = Encoding.UTF8.GetString(body);                    int n = int.Parse(message);Console.WriteLine(" [.] fib({0})", message);response = fib(n).ToString();}                catch (Exception e){Console.WriteLine(" [.] " + e.Message);response = "";}                finally{                    var responseBytes = Encoding.UTF8.GetBytes(response);channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,basicProperties: replyProps, body: responseBytes);channel.BasicAck(deliveryTag: ea.DeliveryTag,multiple: false);}};Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}    /// /// Assumes only valid positive integer input./// Don't expect this one to work for big numbers, and it's/// probably the slowest recursive implementation possible./// private static int fib(int n)    {        if (n == 0 || n == 1){            return n;}        return fib(n - 1) + fib(n - 2);}
}

服务端代码非常简单:

  • 像往常一样,首先建立连接,通道和声明队列。

  • 我们可能希望运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要设置channel.BasicQos中的prefetchCount值。

  • 使用BasicConsume访问队列,然后注册一个交付处理程序,并在其中完成工作并发回响应。

我们的 RPC 客户端 RPCClient.cs 代码:

using System;using System.Collections.Concurrent;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;public class RpcClient{    private readonly IConnection connection;    private readonly IModel channel;    private readonly string replyQueueName;    private readonly EventingBasicConsumer consumer;    private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();    private readonly IBasicProperties props;public RpcClient(){        var factory = new ConnectionFactory() { HostName = "localhost" };connection = factory.CreateConnection();channel = connection.CreateModel();replyQueueName = channel.QueueDeclare().QueueName;consumer = new EventingBasicConsumer(channel);props = channel.CreateBasicProperties();     
   var correlationId = Guid.NewGuid().ToString();props.CorrelationId = correlationId;props.ReplyTo = replyQueueName;consumer.Received += (model, ea) =>{          
     var body = ea.Body;        
      var response = Encoding.UTF8.GetString(body);      
           if (ea.BasicProperties.CorrelationId == correlationId){respQueue.Add(response);}};}  
 
   public string Call(string message)    {      
       var messageBytes = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "",routingKey: "rpc_queue",basicProperties: props,body: messageBytes);channel.BasicConsume(consumer: consumer,queue: replyQueueName,autoAck: true);      
         return respQueue.Take(); ;}  
 
   public void Close()    {connection.Close();} }
   
   public class Rpc{  
    public static void Main()  
     {      
       var rpcClient = new RpcClient();Console.WriteLine(" [x] Requesting fib(30)");      
       var response = rpcClient.Call("30");Console.WriteLine(" [.] Got '{0}'", response);rpcClient.Close();} }

客户端代码稍微复杂一些:

  • 建立连接和通道,并为响应声明一个独有的 'callback' 队列。

  • 订阅这个 'callback' 队列,以便可以接收到 RPC 响应。

  • Call方法用来生成实际的 RPC 请求。

  • 在这里,我们首先生成一个唯一的CorrelationId编号并保存它,while 循环会使用该值来捕获匹配的响应。

  • 接下来,我们发布请求消息,其中包含两个属性:ReplyTo和CorrelationId。

  • 此时,我们可以坐下来稍微一等,直到指定的响应到来。

  • while 循环做的工作非常简单,对于每个响应消息,它都会检查CorrelationId是否是我们正在寻找的那一个。如果是这样,它就会保存该响应。

  • 最后,我们将响应返回给用户。

客户发出请求:


var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close();

现在是查看 RPCClient.cs 和 RPCServer.cs 的完整示例源代码(包括基本异常处理)的好时机哦。

像往常一样设置(请参见 教程[1]]):

我们的 RPC 服务现已准备就绪,现在可以启动服务端:


cd RPCServer dotnet run# => [x] Awaiting RPC requests

要请求斐波纳契数,请运行客户端:


cd RPCClient dotnet run# => [x] Requesting fib(30)

这里介绍的设计并不是 RPC 服务的唯一可能实现,但它仍具有一些重要优势:

  • 如果 RPC 服务器太慢,您可以通过运行另一个服务器来扩展。尝试在新开一个控制台,运行第二个 RPCServer。

  • 在客户端,RPC 只需要发送和接收一条消息。不需要像QueueDeclare一样同步调用。因此,对于单个 RPC 请求,RPC 客户端只需要一次网络往返。

我们的代码很简单,也并没有尝试去解决更复杂(但很重要)的问题,比如就像:

  • 如果服务端没有运行,客户端应该如何反应?

  • 客户端是否应该为 RPC 设置某种超时机制?

  • 如果服务端出现故障并引发异常,是否应将其转发给客户端?

  • 在处理之前防止无效的传入消息(例如:检查边界、类型)。

如果您想进行实验,您可能会发现 管理 UI 对于查看队列非常有用。

写在最后

本文翻译自 RabbitMQ 官方教程 C# 版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的不好请见谅,如有翻译错误还请指正。

  • 原文链接:RabbitMQ tutorial - Remote procedure call (RPC)

  • 实验环境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code

  • 最后更新:2018-11-17

相关文章:

  • 分布式系统消息中间件——RabbitMQ的使用基础篇

  • ASP.NET Core 2.0利用MassTransit集成RabbitMQ

  • 浅谈surging服务引擎中的rabbitmq组件和容器化部署

  • RabbitMQ一个简单可靠的方案(.Net Core实现)

  • .NetCore Cap 结合 RabbitMQ 实现消息订阅

  • [译]RabbitMQ教程C#版 - 发布订阅

  • RabbitMQ教程C#版 - 工作队列

  • RabbitMQ教程C#版 “Hello World”

原文地址:  https://www.cnblogs.com/esofar/p/rabbitmq-rpc.html


.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com

640?wx_fmt=jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/318941.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

P7514-[省选联考2021A/B卷]卡牌游戏【贪心】

正题 题目链接:https://www.luogu.com.cn/problem/P7514 题目大意 给出nnn个卡牌有ai/bia_i/b_iai​/bi​&#xff0c;开始都是aia_iai​朝上&#xff0c;将不超过mmm张卡牌变为bib_ibi​面朝上&#xff0c;使得朝上的数字中最大值减去最小值最小。 3≤n≤106,1≤m<n,1≤a…

牛客题霸 [ 旋转数组] C++题解/答案

牛客题霸 [ 旋转数组] C题解/答案 题目描述 一个数组A中存有N&#xff08;N&gt0&#xff09;个整数&#xff0c;在不允许使用另外数组的前提下&#xff0c;将每个整数循环向右移M&#xff08;M>0&#xff09;个位置&#xff0c;即将A中的数据由&#xff08;A0 A1 ………

ML.NET速览

什么是ML.NET&#xff1f;ML.NET是由微软创建&#xff0c;为.NET开发者准备的开源机器学习框架。它是跨平台的&#xff0c;可以在macOS&#xff0c;Linux及Windows上运行。机器学习管道ML.NET通过管道(pipeline)方式组合机器学习过程。整个管道分为以下四个部分&#xff1a;Loa…

P6240 好吃的题目(分治+背包)

P6240 好吃的题目 类似于线段树分治&#xff0c;在每个节点预处理[l,mid],[mid1,r][l,mid],[mid1,r][l,mid],[mid1,r]的背包&#xff0c;然后询问即可 一般代码就类似下面的写法&#xff0c;但是此题有点卡空间于是稍微优化了一下空间。 时间复杂度O{nlog⁡nmax⁡(hi,ti)}O\{…

CF990G-GCD Counting【dfs】

正题 题目链接:https://www.luogu.com.cn/problem/CF990G 题目大意 给出一棵有点权的树&#xff0c;对于每个kkk求有多条路径的点权gcdgcdgcd为kkk 1≤n≤2105,1≤ai≤21051\leq n\leq 2\times 10^5,1\leq a_i\leq 2\times 10^51≤n≤2105,1≤ai​≤2105 解题思路 开始以为要…

牛客题霸 [ 旋转数组的最小数字] C++题解/答案

牛客题霸 [ 旋转数组的最小数字] C题解/答案 题目描述 把一个数组最开始的若干个元素搬到数组的末尾&#xff0c;我们称之为数组的旋转。 输入一个非递减排序的数组的一个旋转&#xff0c;输出旋转数组的最小元素。 NOTE&#xff1a;给出的所有元素都大于0&#xff0c;若数组…

.NET Core实战项目之CMS 第一章 入门篇-开篇及总体规划

写在前面千呼万唤始出来&#xff0c;首先&#xff0c;请允许我长吸一口气&#xff01;真没想到一份来自28岁老程序员的自白 这篇文章会这么火&#xff0c;更没想到的是张善友队长的公众号居然也转载了这篇文章&#xff0c;这就导致两天的时间就有两百多位读者朋友加入了.NET Co…

2018CCF-CSP 5.二次求和(点分治)

5.二次求和 暴力 首先观察询问&#xff0c;树上链u→vu\to vu→v点权加&#xff0c;显然可以用树上差分LOJ dfs序4 O(1)O(1)O(1)完成此操作&#xff0c;然后考虑对这些权值对答案的影响&#xff1f; 设经过某点uuu符合条件的路径条数为pathu\text{path}_upathu​ 当uuu点权ccc…

P4323-[JSOI2016]独特的树叶【换根dp,树哈希】

正题 题目链接:https://www.luogu.com.cn/problem/P4323 题目大意 给出nnn个点的树和加上一个点之后的树&#xff08;编号打乱&#xff09;。 求多出来的是哪个点&#xff08;如果有多少个就输出编号最小的&#xff09;。 1≤n≤1051\leq n\leq 10^51≤n≤105 解题思路 定义…

牛客题霸 [ 数字在升序数组中出现的次数] C++题解/答案

牛客题霸 [ 数字在升序数组中出现的次数] C题解/答案 题目描述 统计一个数字在升序数组中出现的次数。 题解&#xff1a; 直接for循环&#xff0c;if判断一下&#xff0c;如果是目标的话ant 代码&#xff1a; class Solution { public:int GetNumberOfK(vector<int>…

.Net Core微服务系列--理论篇

微服务的由来微服务最早由Martin Fowler与James Lewis于2014年共同提出来的&#xff0c;但是微服务也不是一个全新的概念&#xff0c;它是由一系列在实践中获得成功并流行起来的概念中总结出来的一种模式&#xff0c;一种概念。而这一系列的概念大体上有这些:领域驱动设计(DDD)…

codeforces1496 D. Let‘s Go Hiking(乱搞+讨论)

这题我tm服了&#xff0c;考试中途肯定添加了一组数据&#xff0c;提交完A了之后&#xff0c;还有20min结束&#xff0c;感觉写不了下一个题了&#xff0c;就下班了&#xff0c;谁知道它有填了一组测试数据把我的乱搞给卡过去了&#xff0c;我又被fst了&#xff1f;&#xff1f…

CF1511G-Chips on a Board【倍增】

正题 题目链接:https://www.luogu.com.cn/problem/CF1511G 题目大意 给出n∗mn*mn∗m的棋盘上每一行有一个棋子&#xff0c;双方轮流操作可以把一个棋子向左移动若干步&#xff08;不能不动&#xff09;&#xff0c;无法操作者输。 qqq次询问只留下期盼的l∼rl\sim rl∼r列时…

牛客题霸 [ 调整数组顺序使奇数位于偶数前面] C++题解/答案

牛客题霸 [ 调整数组顺序使奇数位于偶数前面] C题解/答案 题目描述 输入一个整数数组&#xff0c;实现一个函数来调整该数组中数字的顺序&#xff0c;使得所有的奇数位于数组的前半部分&#xff0c;所有的偶数位于数组的后半部分&#xff0c;并保证奇数和奇数&#xff0c;偶数…

Asp.net Core Jenkins Docker 实现一键化部署

写在前面在前段时间尝试过用Jenkins来进行asp.net core 程序在IIS上面的自动部署。大概的流程是Jenkins从git上获取代码最开始Jenkins是放在Ubuntu的Docker中&#xff0c;但是由于Powershell执行的原因&#xff0c;就把Jenkins搬到了windows上。因为我们网站的部署需要停掉IIS站…

P5782-[POI2001]和平委员会【2-SAT】

正题 题目链接:https://www.luogu.com.cn/problem/P5782 题目大意 nnn对人&#xff0c;每对之间恰好有一个人出席。mmm对仇恨关系表示两个人不能同时出席。 求是否有解并输出。 1≤n≤8000,1≤m≤200001\leq n\leq 8000,1\leq m\leq 200001≤n≤8000,1≤m≤20000 解题思路 裸…

codeforces1497 E. Square-free division(数学+dp)

开学了&#xff0c;感觉没时间打cf了&#xff0c;上课听不懂&#xff0c;而且一直在忙转班的事情~~ 下周就要回学校了开心 昨天卡C题太久了&#xff0c;一直在想lcm的性质&#xff0c;还好最后回头了&#xff0c;当成构造题做了&#xff0c;瞎搞了搞就出来了&#xff0c;然后看…

牛客题霸 [合并二叉树] C++题解/答案

牛客题霸 [合并二叉树] C题解/答案 题目描述 已知两颗二叉树&#xff0c;将它们合并成一颗二叉树。合并规则是&#xff1a;都存在的结点&#xff0c;就将结点值加起来&#xff0c;否则空的位置就由另一个树的结点来代替。例如&#xff1a; 两颗二叉树是: Tree 1 1 / \ 3 2 / …

“校长”潘淳:侠之大者,一蓑烟雨任平生

我是与丁磊、蔡文胜同时代的人&#xff0c;他们都是70后大我两岁。我的经历与爱好与丁磊有丁点接近&#xff0c;但是没他下海走一走的胆识。又或者与蔡文胜一样&#xff0c;也算是国内最早的域名代理商&#xff0c;却又没有投资的勇气。—— 潘淳《IT英雄传》这一期的主角儿是江…

P4922-[MtOI2018]崩坏3?非酋之战!【dp】

正题 题目链接:https://www.luogu.com.cn/problem/P4922 题目大意 题目好长直接放了 在崩坏 3 中有一个叫做天命基地的地方&#xff0c;女武神们将在基地中开派对与敌人们厮杀。 女武神们的攻击力为 atkatkatk&#xff0c;她们将进行资源保卫战&#xff01; 天命基地中有 …