.NET Core微服务之基于MassTransit实现数据最终一致性(Part 1)

一、预备知识:数据一致性

  关于数据一致性的文章,园子里已经有很多了,如果你还不了解,那么可以通过以下的几篇文章去快速地了解了解,有个感性认识即可。

  (1)左正,《保证分布式系统数据一致性的6种方案》

  (2)成金之路,《分布式系统的数据一致性解决方案》

  (3)E_Star,《分布式环境下数据一致性的设计总结》

  必须要了解的点:ACID、CAP、BASE、强一致性、弱一致性、最终一致性

  640?wx_fmt=jpeg

  CAP理论由加州大学伯克利分校的计算机教授Eric Brewer在2000年提出,其核心思想是任何基于网络的数据共享系统最多只能满足数据一致性(Consistency)、可用性(Availability)和网络分区容忍(Partition Tolerance)三个特性中的两个(由此我们知道在分布式系统中,同时满足CAP三个特性是不可能的),三个特性的定义如下:

C:数据一致性(Consistency):如果系统对一个写操作返回成功,那么之后的读请求都必须读到这个新数据;如果返回失败,那么所有读操作都不能读到这个数据,对调用者而言数据具有强一致性(Strong Consistency)(又叫原子性Atomic或线性一致性Linerizable Consistency)

A:服务可用性(Availability):所有读写请求在一定时间内得到响应,可终止、不会一直等待

P:分区容错性(Partition-Tolerance):在网络分区的情况下,被分隔的节点仍能正常对外服务  

  • 强一致性:当更新操作完成之后,任何多个后续进程或者线程的访问都会返回最新的更新过的值。这种是对用户最友好的,就是用户上一次写什么,下一次就保证能读到什么。根据 CAP 理论,这种实现需要牺牲可用性。=> 在传统单体式应用中,大部分都是强一致性的应用,想想我们写过多少工作单元模式的Code?

  • 弱一致性:系统并不保证续进程或者线程的访问都会返回最新的更新过的值。系统在数据写入成功之后,不承诺立即可以读到最新写入的值,也不会具体的承诺多久之后可以读到。

  • 最终一致性:弱一致性的特定形式。系统保证在没有后续更新的前提下,系统最终返回上一次更新操作的值。在没有故障发生的前提下,不一致窗口的时间主要受通信延迟,系统负载和复制副本的个数影响。

  为保证可用性,互联网分布式架构中经常将强一致性需求转换成最终一致性的需求,并通过系统执行幂等性的保证,保证数据的最终一致性

  在微服务架构中,各个微服务之间通常会使用事件驱动通信和发布订阅系统实现最终一致性。

  640?wx_fmt=png

  更多背景知识,还是得看上面列出的参考文章,这里不再赘述。

二、MassTransit极简介绍

  MassTransit 是一个自由、开源、轻量级的消息总线, 用于使用. NET 框架创建分布式应用程序。MassTransit 在现有消息传输上提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠和可扩展的方式。

  官网地址:http://masstransit-project.com/,GitHub地址:https://github.com/MassTransit/MassTransit (目前:1590Star,719Fork)

  类似的国外开源组件还有NServiceBus,没有用过,据说MassTransit比NServiceBus更加轻量级,并且在开发之初就选用了RabbitMQ作为消息传输组件,当然MassTransit还支持Azure Service Bus。类似的国内开源组件则有园友savorboard(杨晓东)的CAP,这个我会在MassTransit学习结束后去使用使用,CAP在GitHub上已经有了超过1000个Star,是NCC的几个千星项目之一。另外,张善友大队长在他的NanoFabric项目中推荐我们使用Rebus和Ray,如下图所示:

  640?wx_fmt=png

  640?wx_fmt=jpeg

  由于时间和精力,以及文档资料的可见性,我在我的POC和这个系列博文的准备中,只会使用到MassTransit和CAP这两个开源项目。

三、MassTransit Quick Start

  这里以MassTransit + RabbitMQ为例子,首先请确保安装了RabbitMQ,如果没有安装,可以阅读我的《基于EasyNetQ使用RabbitMQ消息队列》去把RabbitMQ先安装到你的电脑上。另外,RabbitMQ的背景知识也有一堆,有机会也还是要了解下Exchange,Channel、Queue等内容。

640?wx_fmt=png

3.1 最简单的发送/接收实例

  (1)准备两个控制台程序,一个为Sender(发送者),一个为Receiver(接收者),并分别通过NuGet安装MassTransit以及MassTransit.RabbitMQ

NuGet>Install-Package MassTransit

NuGet>Install-Package MassTransit.RabbitMQ  

  (2)编写Sender

640?wx_fmt=png

 这里首先连接到我的RabbitMQ服务,然后向指定的Queue发送消息(这里是一个Client类型的实例对象)。

  (3)编写Receiver

640?wx_fmt=png

 对于Receiver,要做的事就只有两件:一是连接到RabbitMQ,二是告诉RabbitMQ我要接收哪个消息队列的什么类型的消息。下面是TestConsumerClient和TestConsumerAgent的定义:

640?wx_fmt=png

(4)测试一下:

  640?wx_fmt=png

3.2 最简单的发布/订阅实例

  除了简单的发送/接收模式外,我们用的更多的是发布/订阅这种模式。

  (1)准备下图所示的类库和控制台项目,并对除Messages类库之外的其他项目安装MassTransit以及MassTransit.RabbitMQ。

  640?wx_fmt=png

  (2)Messages类库:准备需要传输的Message

640?wx_fmt=png

(3)Publisher:接收我的消息吧骚年们

640?wx_fmt=png

 这里向RabbitMQ发布了两个不同类型的消息(TestBaseMessage和TestCustomMessage)

  (4)SubscriberA:我只接收TestBaseMessage类型的消息,其他的我不要

640?wx_fmt=png

 这里ConsumerA的定义如下:可以看出,它实现了一个泛型接口IConsumer,然后指定了TestBaseMessage为消费的消息类型。

640?wx_fmt=png

 (5)SubscriberA:我只接收TestCustomMessage类型的消息,其他的我不要

640?wx_fmt=png

这里的ConsumerA的定义如下;它实现的接口是IConsumer<TestCustomMessage>

640?wx_fmt=png

(6)测试一下:由于Publisher发送了两个不同类型的消息,两个Subscriber均只接受其中的一个类型的消息。

  640?wx_fmt=png

3.3 带返回状态消息的示例

  之前的例子都是发布之后,不管订阅者有没有收到以及收到后有没有处理成功(即有没有返回消息,类似于HTTP请求和响应),在MassTransit中提供了这样的一种模式,并且还可以结合GreenPipes的一些扩展方法实现重试、限流以及熔断机制。这一部分详见官方文档:http://masstransit-project.com/MassTransit/usage/request-response.html

  (1)准备下图所示的三个项目:通过NuGet安装MassTransit以及MassTransit.RabbitMQ

  640?wx_fmt=png

  (2)Messages:准备请求和响应的消息传输类型

640?wx_fmt=png

(3)Sender 请求发送端

640?wx_fmt=png

  这里不再解释,请看注释。

  (4)Receiver 接收端

640?wx_fmt=png

 其中,RequestConsumer的定义如下:

640?wx_fmt=png

(5)测试一下:

  640?wx_fmt=png

  可以看出,请求调用方收到了来自接收方返回的状态消息,我们可以借助返回值去check一些状态。这里不再演示发生异常从而启用重试、熔断等的示例,有兴趣的园友可以自行测试。

3.4 Observer模式的发布/订阅示例

   在某些场景中,我们需要针对一个消息进行类似于AoP(面向切面编程)或者监控的操作,比如在发送消息之前和结束后记日志等操作,我们可以借助MassTransit中的Observer模式来实现。(在MassTransit的消息接收中,可以通过两种模式来实现:一种是基于实现IConsumer接口,另一种就是基于实现IObserver接口)关于这一部分,详见官方文档:http://masstransit-project.com/MassTransit/usage/observers.html

  (1)准备以下图所示的项目:

  640?wx_fmt=png

  (2)Messages:定义要使用的Consumer和Observer

  Consumer:

640?wx_fmt=png

Observer:

640?wx_fmt=png

(3)ObserverPublish

640?wx_fmt=png

可以看到,这里我们使用了两个用于发送消息的Observer:SendObserver和PublishObserver

  (3)ObserverReceive

640?wx_fmt=png

(4)测试一下:

  640?wx_fmt=gif

  由此看出,我们可以借助Observer这种方式来截取到消息的一个lifecycle,用以进行不同阶段的业务逻辑操作。

四、小结

  本篇极简的介绍了一下数据一致性和MassTransit这个开源的组件,通过几个例子介绍了在.NET环境下如何使用MassTransit操作RabbitMQ实现消息的接收/发送以及发布/订阅。后续我会继续使用MassTransit结合Quartz.Net和Polly在ASP.NET Core环境下实现一个简单的基于补偿机制的数据一致性的小案例,希望到时也可以和各位园友分享。

示例代码

  Click Here => 点我下载

参考资料

(1)桂素伟,《基于.NET Core的微服务》

(2)richieyangs(张阳),《如何优雅的使用RabbitMQ》,《使用Masstransit开发基于消息传递的分布式应用》

(3)青客宝团队,《MassTransit&Sagas分布式服务开发ppt分享》

(4)成天,《MassTransit实现应用程序间的交互》

(5)娃娃都会打酱油了,《MassTransit学习记录》

(6)MassTransit 官方文档,http://masstransit-project.com/MassTransit/

原文地址

https://www.cnblogs.com/edisonchou/p/dnc_microservice_masstransit_foundation_part1.html

.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com

640?wx_fmt=jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/320793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2017-2018 ACM-ICPC Nordic Collegiate Programming Contest (NCPC 2017)

D.Distinctive Character 看到样例&#xff0c;第一个反应贪心。先写了个按这一位1和0的数目多少&#xff0c;确定0还是1的东西。感觉不够真&#xff0c;又写了个尽量加到相似的比较小的串上的贪心。在和前边的那个组合一下&#xff0c;换了换顺序。。。好吧就过了13组样例。。…

【二分】买礼物的艰辛

买礼物的艰辛 题目大意&#xff1a; 有n个物品的价格&#xff0c;还有m个人&#xff0c;m个人中的每个人都要购买一定的连续物品&#xff0c;问买下n个物品&#xff0c;花钱最大的人最少花多少钱 原题&#xff1a; 题目描述 小X同学给小C同学选了N件礼物&#xff0c;决定顺…

P4449-于神之怒加强版【莫比乌斯反演】

正题 题目链接:https://www.luogu.com.cn/problem/P4449 题目大意 TTT组询问给出n,mn,mn,m求∑i1n∑j1mgcd(i,j)k\sum_{i1}^n\sum_{j1}^mgcd(i,j)^ki1∑n​j1∑m​gcd(i,j)k 解题思路 ∑i1n∑j1mgcd(i,j)k\sum_{i1}^n\sum_{j1}^mgcd(i,j)^ki1∑n​j1∑m​gcd(i,j)k ∑d1ndk∑i…

Asp.NetCoreWebApi图片上传接口(二)集成IdentityServer4授权访问(附源码)

写在前面本文地址&#xff1a;http://www.cnblogs.com/yilezhu/p/9315644.html作者&#xff1a;yilezhu上一篇关于Asp.Net Core Web Api图片上传的文章使用的是mongoDB进行图片的存储&#xff0c;文章发布后&#xff0c;张队就来了一句&#xff0c;说没有使用GridFS。的确博主只…

Codeforces40E[Number Table]

Codeforces40E[Number Table] 解法一&#xff0c;如果有一维很小&#xff0c;可以考虑状压dp之类的&#xff0c;显然非常不优秀。 解法二&#xff0c;如果n*m较小&#xff0c;可以考虑&#xff0c;设出每个位置是否为-1&#xff0c;解xor线性方程组。再bitset优化一下之类的。然…

【DP】天平问题

天平问题 解题思路&#xff1a; 有n个砝码&#xff0c;问可以称出多少种重量&#xff0c;可以在左边或者右边放&#xff0c;也可以不放 原题&#xff1a; 题目描述 小C为了试验小X&#xff0c;便为物竞的小X出了一道物理相关的题&#xff1a;现在给出n个质量的砝码&#x…

.NET Core开发日志——从ASP.NET Core Module到KestrelServer

ASP.NET Core程序现在变得如同控制台(Console)程序一般&#xff0c;同样通过Main方法启动整个应用。而Main方法要做的事情很简单&#xff0c;创建一个WebHostBuilder类&#xff0c;调用其Build方法生成一个WebHost类&#xff0c;最后启动之。实现代码一目了然&#xff1a;要想探…

BZOJ5358: [Lydsy1805月赛]口算训练

题解&#xff1a;判断d是否整除&#xff0c;可以转化为求这段区间内d的因子的指数是否均大于d中的指数。容易想到把每个数字都分解为素因子形式&#xff0c;对每个素数出现的次数求个前缀和即可。然而&#xff0c;这样时间空间都不行。注意到对于一个数x&#xff0c;小于sqrt(x…

jzoj6800-NOIP2020.9.19模拟spongebob【枚举】

正题 题目链接:https://gmoj.net/senior/#contest/show/3222/0 题目大意 nnn个ai,bia_i,b_iai​,bi​&#xff0c;求一个xxx使得最小化∑i1n∣aixbi∣\sum_{i1}^n|a_ixb_i|i1∑n​∣ai​xbi​∣ 解题思路 每个∣aixbi∣|a_ixb_i|∣ai​xbi​∣可以视为一个分两段的函数&#…

【bfs】极其简单的最短路问题

极其简单的最短路问题 题目大意&#xff1a; 求最短路&#xff0c;权值只有1或2 原题&#xff1a; 题目描述 小C终于被小X感动了&#xff0c;于是决定与他看电影&#xff0c;然而小X距离电影院非常远&#xff0c;现在假设每条道路需要花费小X的时间为1&#xff0c;由于有数…

GraphQL 的前世今生

GraphQL是什么GraphQL是一种新的API标准&#xff0c;它提供了一种更高效、强大和灵活的数据提供方式。它是由Facebook开发和开源&#xff0c;目前由来自世界各地的大公司和个人维护。GraphQL本质上是一种基于api的查询语言&#xff0c;现在大多数应用程序都需要从服务器中获取数…

jzoj6801-NOIP2020.9.19模拟patrick【树状数组】

正题 题目大意 nnn个连续的数&#xff0c;第iii个为hih_ihi​。有操作 给出一个HHH&#xff0c;询问大于等于HHH的数能组成多少个联通块修改一个位置的数。 解题思路 考虑计算连通块尾的数量&#xff0c;我们可以发现一个位置作为联通块尾部当且仅当hi≥Hh_i\geq Hhi​≥H且h…

Codeforces Round #485 (Div. 2)

Codeforces986B [Petr and Permutations] 看到两个随机的swap次数&#xff0c;很容易想到跟奇偶性有关。然后就凉了。赛后思考了一下&#xff0c;这个思路应该没问题&#xff0c;那就需要考虑swap的奇偶性与排列的关系。因此&#xff0c;我们考虑如何把两个不相邻数的swap&…

初一模拟赛总结(5.18)

成绩&#xff1a; rankrankranknamenamenamescorescorescoreT1T1T1T2T2T2T3T3T3T4T4T4111lyflyflyf210210210505050606060303030707070111hkyhkyhky210210210100100100404040000707070333wjjwjjwjj190190190100100100000000909090444fyfyfy170170170100100100000000707070555tj…

Codeforces Round #486 (Div. 3)

E. Divisibility by 25 能被25整除的充要条件就是末两位是00&#xff0c;25&#xff0c;50&#xff0c;75。如果没有过程中不出现前导0这一限制&#xff0c;显然对每种情况&#xff0c;贪心取尽量低位即可。本题的关键就在于如何满足这个条件&#xff0c;首先有个”显然”的方法…

C#:如何将坏的代码重新编译为好的代码

自己的前言说明&#xff1a;本文原作者&#xff1a;Radoslaw Sadowski&#xff0c;原文链接为&#xff1a;C# BAD PRACTICES: Learn how to make a good code by bad example。本系列还有其他文章&#xff0c;后续将慢慢翻译。引言&#xff1a;我的名字叫Radoslaw Sadowski&…

P6855-「EZEC-4.5」走方格【dp】

正题 题目链接:https://www.luogu.com.cn/problem/P6855 题目大意 n∗mn*mn∗m的网格&#xff0c;每个格子有一个数&#xff0c;可以选择一个位置变为000。要求最小化最大权值和路径。 解题思路 考虑枚举哪个位置变为000&#xff0c;一个位置变为000后我们将路径分为两种路径…

【搜索树】高级打字机(luogu 1383)

高级打字机 luogu 1383 题目大意&#xff1a; 有三种操作&#xff1a;添加一个字符&#xff08;更改操作&#xff09;&#xff0c;撤回前iii步步更改操作&#xff08;更改操作&#xff0c;可以撤回自己&#xff09;&#xff0c;输出某一位的字符&#xff0c;现在要按要求输出…

AtCoder Grand Contest 025

B.RGB Coloring 绿色可以看作拿红和蓝都涂了&#xff0c;那么只需要满足A*a B*b K && 0 ≤ a,b ≤ n&#xff0c;答案加上C(n,a)*C(n,b) #include <bits/stdc.h> typedef long long ll; const ll mod 998244353; const int N 300000 100; using namespace s…

走进 Cake for .NET

一、什么是 CakeCake&#xff08;C# Make&#xff09; 是一个使用 C# DSL 面向 Task 的跨平台构建自动化系统&#xff0c;像编译代码&#xff0c;复制文件和文件夹&#xff0c;运行单元测试&#xff0c;压缩文件和构建 NuGet 包。更多内容请访问官网二、使用 Cake先尝试一下 P…