RabbitMQ一个简单可靠的方案(.Net Core实现)

前言

  最近需要使用到消息队列相关技术,于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题,归纳了一下,大概有以下几种:

  1. 临时异常,如数据库网络闪断、http请求临时失效等;

  2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行;

  3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理;

  4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等;

  5. 非法异常,一些伪造、攻击类型的消息。 

  针对这些异常,我采用了一种基于消息审计、消息重试、消息检索、消息重发的方案。

 

方案

 640?wx_fmt=png

 

  1. 消息均使用Exchange进行通讯,方式可以是direct或topic,不建议fanout。

  2. 根据业务在Exchange下分配一个或多个Queue,同时设置一个审计线程(Audit)监听所有Queue,用于记录消息到MongoDB,同时又不阻塞正常业务处理

  3. 生产者(Publisher)在发布消息时,基于AMQP协议,生成消息标识MessageId和时间戳Timestamp,根据消息业务添加头信息Headers便于跟踪。

  640?wx_fmt=png

  4. 消费者(Comsumer)消息处理失败时,则把消息发送到重试交换机(Retry Exchange),并设置过期(重试)时间及更新重试次数;如果超过重试次数则删除消息。

  5. 重试交换机Exchange设置死信交换机(Dead Letter Exchange),消息过期后自动转发到业务交换机(Exchange)。

  6. WebApi可以根据消息标识MessageId、时间戳Timestamp以及头信息Headers在MongoDB中对消息进行检索或重试。

   

  注:选择MongoDB作为存储介质的主要原因是其对头信息(headers)的动态查询支持较好,同等的替代产品还可以是Elastic Search这些。

 

生产者(Publisher)

  1. 设置断线自动恢复

  var factory = new ConnectionFactory{Uri = new Uri("amqp://guest:guest@192.168.132.137:5672"),AutomaticRecoveryEnabled = true  };

 

  2. 定义Exchange,模式为direct

  channel.ExchangeDeclare("Exchange", "direct");

 

  3. 根据业务定义QueueA和QueueB

  channel.QueueDeclare("QueueA", true, false, false);channel.QueueBind("QueueA", "Exchange", "RouteA");channel.QueueDeclare("QueueB", true, false, false);channel.QueueBind("QueueB", "Exchange", "RouteB");

 

  4. 启动消息发送确认机制,即需要收到RabbitMQ服务端的确认消息

  channel.ConfirmSelect();

 

  5. 设置消息持久化

  var properties = channel.CreateBasicProperties();properties.Persistent = true;

 

  6. 生成消息标识MessageId、时间戳Timestamp以及头信息Headers

  properties.MessageId = Guid.NewGuid().ToString("N");properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());properties.Headers = new Dictionary<string, object>  {{ "key", "value" + i}};

 

  7. 发送消息,偶数序列发送到QueueA(RouteA),奇数序列发送到QueueB(RouteB)

  channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body);

 

  8. 确定收到RabbitMQ服务端的确认消息

  var isOk = channel.WaitForConfirms();if (!isOk){throw new Exception("The message is not reached to the server!");} 

  完整代码

640?wx_fmt=png

效果:QueueA和QueueB各一条消息,QueueAudit两条消息

 640?wx_fmt=png

   注:Exchange下必须先声明Queue才能接收到消息,上述代码并没有QueueAudit的声明;需要手动声明,或者先执行下面的消费者程序进行声明。

 

正常消费者(ComsumerA)

  1. 设置预取消息,避免公平轮训问题,可以根据需要设置预取消息数,这里是1

  _channel.BasicQos(0, 1, false);

  640?wx_fmt=png

 

  2. 声明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");_channel.QueueDeclare("QueueA", true, false, false);_channel.QueueBind("QueueA", "Exchange", "RouteA");

 

  3. 编写回调函数

640?wx_fmt=png

 注:设置了RabbitMQ的断线恢复机制,当RabbitMQ连接不可用时,与MQ通讯的操作会抛出AlreadyClosedException的异常,导致主线程退出,哪怕连接恢复了,程序也无法恢复,因此,需要捕获处理该异常。

 

异常消费者(ComsumerB)

  1. 设置预取消息

  _channel.BasicQos(0, 1, false);

 

  2. 声明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");_channel.QueueDeclare("QueueB", true, false, false);_channel.QueueBind("QueueB", "Exchange", "RouteB");

 

  3.  设置死信交换机(Dead Letter Exchange)

  var retryDic = new Dictionary<string, object>  {

      {"x-dead-letter-exchange", "Exchange"},{"x-dead-letter-routing-key", "RouteB"}};_channel.ExchangeDeclare("Exchange_Retry", "direct");_channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic);_channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry");


  4. 重试设置,3次重试;第一次1秒,第二次10秒,第三次30秒

  _retryTime = new List<int>  {1 * 1000,10 * 1000,30 * 1000  };

 

  5. 获取当前重试次数

  var retryCount = 0;if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount")){retryCount = (int)ea.BasicProperties.Headers["retryCount"];_logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...");}

 

  6. 发生异常,判断是否可以重试

  private bool CanRetry(int retryCount){return retryCount <= _retryTime.Count - 1;} 

  7. 可以重试,则启动重试机制

640?wx_fmt=png

640?wx_fmt=png

审计消费者(Audit Comsumer)

  1. 声明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");_channel.QueueDeclare("QueueAudit", true, false, false);_channel.QueueBind("QueueAudit", "Exchange", "RouteA");_channel.QueueBind("QueueAudit", "Exchange", "RouteB");

 

  2. 排除死信Exchange转发过来的重复消息

  if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey("x-death")){...}

 

  3. 生成消息实体

  var message = new Message{MessageId = ea.BasicProperties.MessageId,Body = ea.Body,Exchange = ea.Exchange,Route = ea.RoutingKey};

  4. RabbitMQ会用bytes来存储字符串,因此,要把头中bytes转回字符串

  if (ea.BasicProperties.Headers != null){var headers = new Dictionary<string, object>();foreach (var header in ea.BasicProperties.Headers){if (header.Value is byte[] bytes){headers[header.Key] = Encoding.UTF8.GetString(bytes);}else{headers[header.Key] = header.Value;}}message.Headers = headers;}

  5. 把Unix格式的Timestamp转成UTC时间

  if (ea.BasicProperties.Timestamp.UnixTime > 0){message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime;var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);message.Timestamp = offset.UtcDateTime;}

 

  6. 消息存入MongoDB

  _mongoDbContext.Collection<Message>().InsertOne(message, cancellationToken: cancellationToken);

 

  MongoDB记录:

  640?wx_fmt=png

 

  重试记录:

  640?wx_fmt=png

 

消息检索及重发(WebApi)

  1. 通过消息Id检索消息

  640?wx_fmt=png

 

  2. 通过头消息检索消息

  640?wx_fmt=png

  640?wx_fmt=png

 

  3. 消息重发,会重新生成MessageId

  640?wx_fmt=png

  640?wx_fmt=png

 

Ack,Nack,Reject的关系

  1. 消息处理成功,执行Ack,RabbitMQ会把消息从队列中删除。

  2. 消息处理失败,执行Nack或者Reject:

  a) 当requeue=true时,消息会重新回到队列,然后当前消费者会马上再取回这条消息;

  b) 当requeue=false时,如果Exchange有设置Dead Letter Exchange,则消息会去到Dead Letter Exchange;

  c) 当requeue=false时,如果Exchange没设置Dead Letter Exchange,则消息从队列中删除,效果与Ack相同。

 

  3. Nack与Reject的区别在于:Nack可以批量操作,Reject只能单条操作。

  

RabbitMQ自动恢复

连接(Connection)恢复

  1. 重连(Reconnect)

  2. 恢复连接监听(Listeners)

  3. 重新打开通道(Channels)

  4. 恢复通道监听(Listeners)

  5. 恢复basic.qos,publisher confirms以及transaction设置

   

拓扑(Topology)恢复

  1. 重新声明交换机(Exchanges)

  2. 重新声明队列(Queues)

  3. 恢复所有绑定(Bindings)

  4. 恢复所有消费者(Consumers)

 

异常处理机制

  1. 临时异常,如数据库网络闪断、http请求临时失效等

  通过短时间重试(如1秒后)的方式处理,也可以考虑Nack/Reject来实现重试(时效性更高)。

 

  2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行

  通过长时间重试(如1分钟、30分钟、1小时、1天等),等待B任务先执行完的方式处理。

  

  3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理

  等系统修正后,通过消息重发的方式处理。

 

  4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等

  等系统恢复后,通过消息重发的方式处理。

 

  5. 非法异常,一些伪造、攻击类型的消息

  多次重试失败后,消息从队列中被删除,也可以针对此业务做进一步处理。

 

源码地址

https://github.com/ErikXu/RabbitMesage

相关文章:

  • CAP带你轻松玩转ASP.NETCore消息队列

  • .NetCore Cap 结合 RabbitMQ 实现消息订阅

  • [译]RabbitMQ教程C#版 - 发布订阅

原文地址: https://www.cnblogs.com/Erik_Xu/p/9515208.html


.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com

640?wx_fmt=jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/320189.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CF1120D Power Tree(树形DP/构造+差分+最小生成树)

解法一&#xff1a;树形DP 个人觉得这个方法是比较可能想到的&#xff0c;但是输出方案很恶心 先转换题意&#xff1a;“无论怎样规定叶子的初始点权&#xff0c;都可以通过操作你选择的点来让所有叶子的点权清空”意味着每个叶子节点都可以通过一系列操作单独1、-1 模拟一下…

牛牛染颜色

链接&#xff1a; 文章目录题目描述题意&#xff1a;题解&#xff1a;核心代码&#xff1a;时间限制&#xff1a;C/C 1秒&#xff0c;其他语言2秒 空间限制&#xff1a;C/C 131072K&#xff0c;其他语言262144K 64bit IO Format: %lld题目描述 牛牛最近得到了一颗树&#xff0…

【数学期望】【LCA】【树形DP】树

树 题目大意 给你一棵有n个节点的树&#xff0c;以及m个询问&#xff0c;每个询问需要你回答一个点到另一个点要经过的期望边数 输入样例 4 2 1 2 2 3 3 4 1 4 3 4输出样例 9 5 数据范围 对于 20%20\%20%的数据,N⩽10.N \leqslant 10.N⩽10. 对于 40%40\%40%的数据,N⩽10…

P3768-简单的数学题【莫比乌斯反演,杜教筛】

正题 题目链接:https://www.luogu.com.cn/problem/P3768 题目大意 给出n,pn,pn,p求∑i1n∑j1ngcd(i,j)∗i∗j\sum_{i1}^n\sum_{j1}^ngcd(i,j)*i*ji1∑n​j1∑n​gcd(i,j)∗i∗j模ppp的值。 解题思路 下文中定义Hy(x)∑i1xiyH_y(x)\sum_{i1}^xi^yHy​(x)∑i1x​iy 首先显然是…

.netcore consul实现服务注册与发现-单节点部署

一、Consul的基础介绍Consul是HashiCorp公司推出的开源工具&#xff0c;用于实现分布式系统的服务发现与配置。与其他分布式服务注册与发现的方案&#xff0c;比如 Airbnb的SmartStack等相比&#xff0c;Consul的方案更“一站式”&#xff0c;内置了服务注册与发现框 架、分布一…

MST(最小生成树)上的确定性和存在性问题

题目1&#xff1a; 给定一个n个点m条边的连通图&#xff0c;保证没有自环和重边。对于每条边求出,在其他边权值不变的情况下,它能取的最大权值&#xff0c;使得这条边在连通图的所有最小生成树上。假如最大权值为无限大&#xff0c;则输出-1。 题解&#xff1a; 先求出图的一…

牛客网 【每日一题】[SCOI2009]粉刷匠

链接&#xff1a; 题目描述 windy有 N 条木板需要被粉刷。 每条木板被分为 M 个格子。 每个格子要被刷成红色或蓝色。 windy每次粉刷&#xff0c;只能选择一条木板上一段连续的格子&#xff0c;然后涂上一种颜色。 每个格子最多只能被粉刷一次。 如果windy只能粉刷 T 次&#…

CF932E-Team Work【斯特林数,组合数学】

正题 题目链接:https://www.luogu.com.cn/problem/CF932E 题目大意 给出n,kn,kn,k&#xff0c;求∑i1nC(n,i)∗ik\sum_{i1}^nC(n,i)*i^ki1∑n​C(n,i)∗ik 解题思路 上式子的话&#xff0c;大体是先拆开iki^kik变成∑i1n(ni)∑j0k{kj}(ij)j!\sum_{i1}^n\binom{n}{i}\sum_{j0…

【数学】异或

异或 题目大意 问你不小于nnn的数对(a,b)(a,b)(a,b)&#xff0c;有多少个满足gcd(a,b)a⊕bgcd(a,b)a \oplus bgcd(a,b)a⊕b 输入样例#1 12输出样例#1 8输入样例#2 123456输出样例#2 214394数据范围 测试点数据规模110210031000450005100006100000750000081000000950000…

分布式事务解决方案以及 .Net Core 下的实现(上)

数据一致性是构建业务系统需要考虑的重要问题 &#xff0c; 以往我们是依靠数据库来保证数据的一致性。但是在微服务架构以及分布式环境下实现数据一致性是一个很有挑战的的问题。最近在研究分布式事物&#xff0c;分布式的解决方案有很多解决方案&#xff0c;也让我在研究的同…

[AGC014D] Black and White Tree(树形DP,博弈)

每次找到所有叶子节点&#xff0c;把它们的父亲染白&#xff0c;自己染黑。这个时候染完的叶子节点及其父亲节点对树的其他部分已无影响 ,可以直接删掉。 那么只需要判断树的其他部分是否有先手必胜策略即可。用递归遍历。 边界条件&#xff1a;若树为单一节点先手必胜。 代…

【每日一题】5月7日题目精讲 「火」皇家烈焰

链接&#xff1a; 「火」皇家烈焰 文章目录题目描述题解&#xff1a;代码&#xff1a;时间限制&#xff1a;C/C 1秒&#xff0c;其他语言2秒 空间限制&#xff1a;C/C 262144K&#xff0c;其他语言524288K 64bit IO Format: %lld题目描述 帕秋莉掌握了一种火属性魔法 由于钟爱扫…

【最小生成树】灌水

灌水 题目大意 给你n个点&#xff0c;你可以在某个点建水库&#xff08;生产水&#xff09;&#xff0c;或从其他有水的点建水管送过来 现在问你让所有点都有水的最小代价是多少 输入样例 4 5 4 4 3 0 2 2 2 2 0 3 3 2 3 0 4 2 3 4 0输出样例 9样例解释 FarmerJohnFarmer …

P6257-[ICPC2019 WF]First of Her Name【AC自动机】

正题 题目链接:https://www.luogu.com.cn/problem/P6257 题目大意 给出一个字典树&#xff0c;kkk次询问求每个节点出发到根节点的路径有多少包含前缀sis_isi​。 解题思路 我们按照所有询问串的反串构造ACACAC自动机。 那么此时如果我们用一个串SSS上去跑匹配的话&#xf…

微软MVP张善友告诉你,微服务选型要注意这些地方

周六的下午&#xff0c;广州周大福金融中心的写字楼静悄悄的&#xff0c;53楼的实盈多功能会议室却异常火爆&#xff0c;热闹非凡。来自广州各大科技公司的技术小伙伴们齐聚一堂&#xff0c;他们都在期待着一个人&#xff0c;那就是——微软MVP张善友和他带来的 .NET Core 微服…

像鱼

链接&#xff1a; 时间限制&#xff1a;C/C 1秒&#xff0c;其他语言2秒 空间限制&#xff1a;C/C 262144K&#xff0c;其他语言524288K 64bit IO Format: %lld题目描述 给你一个边长为 n 的用硬币摆成的实心三角形&#xff0c;请问把他倒过来最少需要多少步&#xff1f; 例子…

[ZJOI2005]午餐(贪心+dp)

首先若只有一个窗口&#xff0c;利用贪心&#xff0c;按吃饭时间从大到小排序即可 正确性证明&#xff1a; 定义 eat[i] 第i个人的吃饭时间&#xff0c;time[i] 第i个人的打饭时间 延长时间T[i]max(eat[i]- ∑ji1ntimej\sum\limits_{ji1}^ntime_jji1∑n​timej​ ,0) 最后…

【图论】【模板】静态仙人掌(luogu 5236)

【模板】静态仙人掌 题目大意 给你一个无向仙人掌图&#xff08;保证每条边至多出现在一个简单回路中的无向图&#xff09;&#xff0c;问你两个点之间的最短路距离 输入样例#1 9 10 2 1 2 1 1 4 1 3 4 1 2 3 1 3 7 1 7 8 2 7 9 2 1 5 3 1 6 4 5 6 1 1 9 5 7输出样例#1 5 …

Wannafly挑战赛24D-无限手套【dp,生成函数】

正题 题目链接:https://ac.nowcoder.com/acm/contest/186/D 题目大意 mmm个二元组(ai,bi)(a_i,b_i)(ai​,bi​)&#xff0c;对于一个序列xxx的贡献是∏i1n(aixi2bixi1)\prod_{i1}^n(a_ix_i^2b_ix_i1)i1∏n​(ai​xi2​bi​xi​1) qqq次询问给出nnn求在xi≥0x_i\geq 0xi​≥0且…

SCF: 简单配置门面

Simple Configuration Facade, 简写为 SCF。是 代码 和 外部配置 (properties文件, 环境变量&#xff0c;系统/命令行参数, yaml文件, 等等)之间的一层抽象. 命名上和另一个著名组件slf4j (Simple Logging Facade for Java)相似, 在配置领域的地位也和slf4j &#xff08;.NET可…