.Net Core微服务入门全纪录(六)——EventBus-事件总线

系列文章目录

1、.Net Core微服务入门系列(一)——项目搭建
2、.Net Core微服务入门全纪录(二)——Consul-服务注册与发现(上)
3、.Net Core微服务入门全纪录(三)——Consul-服务注册与发现(下)
4、.Net Core微服务入门全纪录(四)——Ocelot-API网关(上)
5、.Net Core微服务入门全纪录(五)——Ocelot-API网关(下)
6、.Net Core微服务入门全纪录(六)——EventBus-事件总线
7、.Net Core微服务入门全纪录(八)——Docker Compose与容器网络


在这里插入图片描述

文章目录

  • 系列文章目录
  • 前言📃
  • 一、EventBus-事件总线
    • 1.1 什么是事件总线?
    • 1.2 为什么要用EventBus
  • 二、CAP使用
    • 2.1 环境准备
    • 2.2 代码修改
  • 三、运行测试
  • 四、总结


前言📃

关于 微服务 的概念解释网上有很多, 个人理解微服务是一种系统架构模式,它和语言无关,和框架无关,和工具无关,和服务器环境无关。

微服务思想 是将传统的单体系统按照业务拆分成多个职责单一、且可独立运行的接口服务。至于服务如何拆分,没有明确的定义。几乎任何后端语言都能做微服务开发。微服务也并不是完美无缺的,微服务架构会带来更多的问题,增加系统的复杂度,引入更多的技术栈。

上一篇【.Net Core微服务入门全纪录(五)——Ocelot-API网关(下)】中已经完成了 Ocelot + Consul 的搭建,这一篇简单说一下 EventBus

一、EventBus-事件总线

1.1 什么是事件总线?

🌈事件总线 是对观察者(发布-订阅)模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种 解耦 的目的。

如果没有接触过 EventBus ,可能不太好理解。其实 EventBus 在客户端开发中应用非常广泛android,ios,web 前端等,用于多个组件(或者界面)之间的相互通信。

1.2 为什么要用EventBus

就拿当前的项目举例,我们有一个订单服务,一个产品服务。客户端有一个下单功能,当用户下单时,调用订单服务的下单接口,那么下单接口需要调用产品服务的减库存接口,这涉及到服务与服务之间的调用。那么服务之间又怎么调用呢?直接 RESTAPI?或者效率更高的gRPC?可能这两者各有各的使用场景,但是他们都存在一个服务之间的耦合问题,或者难以做到异步调用。

试想一下:假设我们下单时调用订单服务,订单服务需要调用产品服务,产品服务又要调用物流服务,物流服务再去调用xx服务 等等。。。如果每个服务处理时间需要2s,不使用异步的话,那这种体验可想而知。

如果使用 EventBus 的话,那么订单服务只需要向 EventBus 发一个“下单事件”就可以了。产品服务会订阅“下单事件”,当产品服务收到下单事件时,自己去减库存就好了。这样就避免了两个服务之间直接调用的耦合性,并且真正做到了异步调用。

既然涉及到多个服务之间的异步调用,那么就不得不提分布式事务。分布式事务并不是微服务独有的问题,而是所有的分布式系统都会存在的问题。

关于分布式事务,可以查一下 “CAP原则”“BASE理论” 了解更多。当今的分布式系统更多的会追求事务的最终一致性。

下面使用国人开发的优秀项目 “CAP”,来演示一下 EventBus 的基本使用。之所以使用 “CAP”是因为它既能解决分布式系统的最终一致性,同时又是一个 EventBus,它具备 EventBus 的所有功能!
作者介绍:https://www.cnblogs.com/savorboard/p/cap.html

二、CAP使用

2.1 环境准备

Docker 中准备一下需要的环境,首先是数据库,数据库我使用 PostgreSQL,用别的也行。CAP 支持:SqlServer,MySql,PostgreSql,MongoDB。

然后是MQ,这里我使用 RabbitMQKafka 也可以。

Docker运行RabbitMQ:

docker pull rabbitmq:management
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:management

🔑默认用户:guest,密码:guest

环境准备就完成了,Docker 就是这么方便。

2.2 代码修改

为了模拟以上业务,需要修改大量代码,下面代码如有遗漏的直接去github找。

NuGet安装:

Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Tools
Npgsql.EntityFrameworkCore.PostgreSQL

在这里插入图片描述

CAP相关:

DotNetCore.CAP
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.PostgreSql

在这里插入图片描述

Order.API/Controllers/OrdersController.cs 增加下单接口:

[Route("[controller]")]
[ApiController]
public class OrdersController : ControllerBase
{private readonly ILogger<OrdersController> _logger;private readonly IConfiguration _configuration;private readonly ICapPublisher _capBus;private readonly OrderContext _context;public OrdersController(ILogger<OrdersController> logger, IConfiguration configuration, ICapPublisher capPublisher, OrderContext context){_logger = logger;_configuration = configuration;_capBus = capPublisher;_context = context;}[HttpGet]public IActionResult Get(){string result = $"【订单服务】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +$"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";return Ok(result);}/// <summary>/// 下单 发布下单事件/// </summary>/// <param name="order"></param>/// <returns></returns>[Route("Create")][HttpPost]public async Task<IActionResult> CreateOrder(Models.Order order){using (var trans = _context.Database.BeginTransaction(_capBus, autoCommit: true)){//业务代码order.CreateTime = DateTime.Now;_context.Orders.Add(order);var r = await _context.SaveChangesAsync() > 0;if (r){//发布下单事件await _capBus.PublishAsync("order.services.createorder", new CreateOrderMessageDto() { Count = order.Count, ProductID = order.ProductID });return Ok();}return BadRequest();}}}

Order.API/MessageDto/CreateOrderMessageDto.cs:

/// <summary>
/// 下单事件消息
/// </summary>
public class CreateOrderMessageDto
{/// <summary>/// 产品ID/// </summary>public int ProductID { get; set; }/// <summary>/// 购买数量/// </summary>public int Count { get; set; }
}

Order.API/Models/Order.cs订单实体类:

public class Order
{[Key][DatabaseGenerated(DatabaseGeneratedOption.Identity)]public int ID { get; set; }/// <summary>/// 下单时间/// </summary>[Required]public DateTime CreateTime { get; set; }/// <summary>/// 产品ID/// </summary>[Required]public int ProductID { get; set; }/// <summary>/// 购买数量/// </summary>[Required]public int Count { get; set; }
}

Order.API/Models/OrderContext.cs数据库Context:

public class OrderContext : DbContext
{public OrderContext(DbContextOptions<OrderContext> options): base(options){}public DbSet<Order> Orders { get; set; }protected override void OnModelCreating(ModelBuilder modelBuilder){}
}

Order.API/appsettings.json增加数据库连接字符串:

"ConnectionStrings": {"OrderContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Order;Pooling=true;"
}

Order.API/Startup.cs修改ConfigureServices方法,添加Cap配置:

public void ConfigureServices(IServiceCollection services)
{services.AddControllers();services.AddDbContext<OrderContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("OrderContext")));//CAPservices.AddCap(x =>{x.UseEntityFramework<OrderContext>();x.UseRabbitMQ("host.docker.internal");});
}

在这里插入图片描述
以上是订单服务的修改。

Product.API/Controllers/ProductsController.cs增加减库存接口:

[Route("[controller]")]
[ApiController]
public class ProductsController : ControllerBase
{private readonly ILogger<ProductsController> _logger;private readonly IConfiguration _configuration;private readonly ICapPublisher _capBus;private readonly ProductContext _context;public ProductsController(ILogger<ProductsController> logger, IConfiguration configuration, ICapPublisher capPublisher, ProductContext context){_logger = logger;_configuration = configuration;_capBus = capPublisher;_context = context;}[HttpGet]public IActionResult Get(){string result = $"【产品服务】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" +$"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}";return Ok(result);}/// <summary>/// 减库存 订阅下单事件/// </summary>/// <param name="message"></param>/// <returns></returns>[NonAction][CapSubscribe("order.services.createorder")]public async Task ReduceStock(CreateOrderMessageDto message){//业务代码var product = await _context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID);product.Stock -= message.Count;await _context.SaveChangesAsync();}}

Product.API/MessageDto/CreateOrderMessageDto.cs:

/// <summary>
/// 下单事件消息
/// </summary>
public class CreateOrderMessageDto
{/// <summary>/// 产品ID/// </summary>public int ProductID { get; set; }/// <summary>/// 购买数量/// </summary>public int Count { get; set; }
}

Product.API/Models/Product.cs产品实体类:

public class Product
{[Key][DatabaseGenerated(DatabaseGeneratedOption.Identity)]public int ID { get; set; }/// <summary>/// 产品名称/// </summary>[Required][Column(TypeName = "VARCHAR(16)")]public string Name { get; set; }/// <summary>/// 库存/// </summary>[Required]public int Stock { get; set; }
}

Product.API/Models/ProductContext.cs数据库Context:

public class ProductContext : DbContext
{public ProductContext(DbContextOptions<ProductContext> options): base(options){}public DbSet<Product> Products { get; set; }protected override void OnModelCreating(ModelBuilder modelBuilder){base.OnModelCreating(modelBuilder);//初始化种子数据modelBuilder.Entity<Product>().HasData(new Product{ID = 1,Name = "产品1",Stock = 100},new Product{ID = 2,Name = "产品2",Stock = 100});}
}

Product.API/appsettings.json增加数据库连接字符串:

"ConnectionStrings": {"ProductContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Product;Pooling=true;"
}

Product.API/Startup.cs修改ConfigureServices方法,添加Cap配置:\

public void ConfigureServices(IServiceCollection services)
{services.AddControllers();services.AddDbContext<ProductContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("ProductContext")));//CAPservices.AddCap(x =>{x.UseEntityFramework<ProductContext>();x.UseRabbitMQ("host.docker.internal");});
}

在这里插入图片描述
以上是产品服务的修改。

订单服务和产品服务的修改到此就完成了,看着修改很多,其实功能很简单。就是各自增加了自己的数据库表,然后订单服务增加了下单接口,下单接口会发出 “下单事件”。产品服务增加了减库存接口,减库存接口会订阅 “下单事件”。然后客户端调用下单接口下单时,产品服务会减去相应的库存,功能就这么简单。

关于 EF数据库迁移 之类的基本使用就不介绍了。使用 Docker 重新构建镜像,运行订单服务,产品服务:

docker build -t orderapi:1.1 -f ./Order.API/Dockerfile .
docker run -d -p 9060:80 --name orderservice orderapi:1.1 --ConsulSetting:ServicePort="9060"
docker run -d -p 9061:80 --name orderservice1 orderapi:1.1 --ConsulSetting:ServicePort="9061"
docker run -d -p 9062:80 --name orderservice2 orderapi:1.1 --ConsulSetting:ServicePort="9062"docker build -t productapi:1.1 -f ./Product.API/Dockerfile .
docker run -d -p 9050:80 --name productservice productapi:1.1 --ConsulSetting:ServicePort="9050"
docker run -d -p 9051:80 --name productservice1 productapi:1.1 --ConsulSetting:ServicePort="9051"
docker run -d -p 9052:80 --name productservice2 productapi:1.1 --ConsulSetting:ServicePort="9052"

最后 Ocelot.APIGateway/ocelot.json 增加一条路由配置:

在这里插入图片描述

好了,进行到这里,整个环境就有点复杂了。确保我们的PostgreSQL,RabbitMQ,Consul,Gateway,服务实例都正常运行。

服务实例运行成功后,数据库应该是这样的:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
📃产品表种子数据:

在这里插入图片描述
cap.published 表和 cap.received 表是由 CAP自动生成的,它内部是使用本地消息表+MQ来实现异步确保。

三、运行测试

这次使用 Postman 作为客户端调用下单接口( 9070 是之前的 Ocelot 网关端口):

在这里插入图片描述
订单库 published 表:

在这里插入图片描述

订单库 order 表:

在这里插入图片描述

产品库 received 表:

在这里插入图片描述

产品库 product 表:

在这里插入图片描述

再试一下:

在这里插入图片描述
在这里插入图片描述
OK,完成。虽然功能很简单,但是我们实现了服务的解耦,异步调用,和最终一致性。

四、总结

注意,上面的例子纯粹是为了说明 EventBus 的使用,实际中的下单流程绝对不会这么做的!希望大家不要较真。

可能有人会说如果下单成功,但是库存不足导致减库存失败了怎么办,是不是要回滚订单表的数据?如果产生这种想法,说明还没有真正理解最终一致性的思想。

首先下单前肯定会检查一下库存数量,既然允许下单那么必然是库存充足的。这里的事务是指:订单保存到数据库,和下单事件保存到 cap.published 表(保存到 cap.published 表理论上就能够发送到MQ)这两件事情,要么一同成功,要么一同失败。如果这个事务成功,那么就可以认为这个业务流程是成功的,至于产品服务的减库存是否成功那就是产品服务的事情了(理论上也应该是成功的,因为消息已经确保发到了MQ,产品服务必然会收到消息),CAP也提供了失败重试,和失败回调机制。

如果非要数据回滚也是能实现的,CAPICapPublisher.Publish 方法提供一个 callbackName参数,当减库存时,可以触发这个回调。其本质也是通过发布订阅完成,这是不推荐的做法,就不详细说了,有兴趣自己研究一下。
另外,CAP 无法保证消息不重复,实际使用中需要自己考虑一下消息的重复过滤和幂等性。


在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/66567.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#防止重复提交

C#防止重复提交 文章目录 C#防止重复提交前言防止重复提交的思路Web API 防止重复提交代码实现代码讲解使用方法 MVC防止重复提交总结 前言 当用户在前端进行提交数据时&#xff0c;如果网络出现卡顿和前端没有给出响应的话顾客通常都会狂点提交按钮&#xff0c;这样就很容易导…

python学opencv|读取图像(三十九 )阈值处理Otsu方法

【1】引言 前序学习了5种阈值处理方法&#xff0c;包括(反)阈值处理、(反)零值处理和截断处理&#xff0c;还学习了一种自适应处理方法&#xff0c;相关文章链接为&#xff1a; python学opencv|读取图像&#xff08;三十三&#xff09;阈值处理-灰度图像-CSDN博客 python学o…

嵌入式硬件篇---PID控制

文章目录 前言第一部分&#xff1a;连续PID1.比例&#xff08;Proportional&#xff0c;P&#xff09;控制2.积分&#xff08;Integral&#xff0c;I&#xff09;控制3.微分&#xff08;Derivative&#xff0c;D&#xff09;控制4.PID的工作原理5..实质6.分析7.各种PID控制器P控…

日志收集Day001

1.ElasticSearch 作用&#xff1a;日志存储和检索 2.单点部署Elasticsearch与基础配置 rpm -ivh elasticsearch-7.17.5-x86_64.rpm 查看配置文件yy /etc/elasticsearch/elasticsearch.yml&#xff08;这里yy做了别名&#xff0c;过滤掉空行和注释行&#xff09; yy /etc/el…

《offer 来了:Java 面试核心知识点精讲 -- 框架篇》(附资源)

继上篇文章介绍了《offer 来了&#xff1a;Java 面试核心知识点精讲 -- 原理篇》书后&#xff0c;本文章再给大家推荐兄弟篇 《offer来了&#xff1a;Java面试核心知识点精讲--框架篇》&#xff0c; 简直就是为Java开发者量身定制的面试神器。 本书是对Java程序员面试中常见的…

Low-Level 大一统:如何使用Diffusion Models完成视频超分、去雨、去雾、降噪等所有Low-Level 任务?

Diffusion Models专栏文章汇总:入门与实战 前言:视频在传输过程中常常因为各种因素(如恶劣天气、噪声、压缩和传感器分辨率限制)而出现质量下降,这会严重影响计算机视觉任务(如目标检测和视频监控)的性能。现有的视频修复方法虽然取得了一些进展,但通常只能针对特定的退…

Video-RAG:一种将视频RAG新框架

1. 摘要及主要贡献点 摘要&#xff1a; 检索增强生成&#xff08;RAG&#xff09;是一种强大的策略&#xff0c;通过检索与查询相关的外部知识并将其整合到生成过程中&#xff0c;以解决基础模型生成事实性错误输出的问题。然而&#xff0c;现有的RAG方法主要集中于文本信息&…

Docker Load后存储的镜像及更改镜像存储目录的方法

Docker Load后存储的镜像及更改镜像存储目录的方法 Docker Load后存储的镜像更改镜像存储目录的方法脚本说明注意事项Docker作为一种开源的应用容器引擎,已经广泛应用于软件开发、测试和生产环境中。通过Docker,开发者可以将应用打包成镜像,轻松地进行分发和运行。而在某些场…

Amazon MSK 开启 Public 访问 SASL 配置的方法

1. 开启 MSK Public 1.1 配置 MSK 参数 进入 MSK 控制台页面&#xff0c;点击左侧菜单 Cluster configuration。选择已有配置&#xff0c;或者创建新配置。在配置中添加参数 allow.everyone.if.no.acl.foundfalse修改集群配置&#xff0c;选择到新添加的配置。 1.2 开启 Pu…

Windows FileZila Server共享电脑文件夹 映射21端口外网连接

我有这样一个使用场景&#xff0c;在外部网络环境下&#xff0c;通过手机便捷地读取存储在电脑上的视频文件。比如在外出旅行、出差&#xff0c;身边没有携带电脑&#xff0c;仅依靠手机设备&#xff0c;就能随时获取电脑里存储的各类视频&#xff0c;无论是学习资料视频、工作…

MySQL 实战 4 种将数据同步到ES方案

文章目录 1. 前言2. 数据同步方案 2.1 同步双写2.2 异步双写2.3 定时更新2.4 基于 Binlog 实时同步 3. 数据迁移工具选型 3.1 Canal3.2 阿里云 DTS3.3 Databus3.4 Databus和Canal对比3.4 其它 4. 后记 上周听到公司新同事分享 MySQL 同步数据到 ES 的方案&#xff0c;发现很有…

虚幻基础-1:cpu挑选(14600kf)

能帮到你的话&#xff0c;就给个赞吧 &#x1f618; 文章目录 ue非常吃cpu拉满主频打开项目编写蓝图运行原因 时间长 关于压力测试 本文以14600kf为例&#xff0c;双12购入&#xff0c;7月份产。 ue非常吃cpu 经本人测试&#xff0c;ue是非常吃cpu的。 拉满主频 无论任何时间…

QTableWidget的简单使用

1.最简单的表格示例&#xff1a; ui->tableWidget->setRowCount(2);// 设置行数ui->tableWidget->setColumnCount(3);// 设置列数&#xff0c;一定要放在设置行表头之前QStringList rowHeaderList;// 行表头rowHeaderList << QStringLiteral("姓名"…

深入探究分布式日志系统 Graylog:架构、部署与优化

文章目录 一、Graylog简介二、Graylog原理架构三、日志系统对比四、Graylog部署传统部署MongoDB部署OS或者ES部署Garylog部署容器化部署 五、配置详情六、优化网络和 REST APIMongoDB 七、升级八、监控九、常见问题及处理 一、Graylog简介 Graylog是一个简单易用、功能较全面的…

2024年我的技术成长之路

2024年我的技术成长之路 大家好&#xff0c;我是小寒。又到年底了&#xff0c;一年过得真快啊&#xff01;趁着这次活动的机会&#xff0c;和大家聊聊我这一年在技术上的收获和踩过的坑。 说实话&#xff0c;今年工作特别忙&#xff0c;写博客的时间比去年少了不少。不过还是…

嵌入式硬件篇---基本组合逻辑电路

文章目录 前言基本逻辑门电路1.与门&#xff08;AND Gate&#xff09;2.或门&#xff08;OR Gate&#xff09;3.非门&#xff08;NOT Gate&#xff09;4.与非门&#xff08;NAND Gate&#xff09;5.或非门&#xff08;NOR Gate&#xff09;6.异或门&#xff08;XOR Gate&#x…

数据库管理-第285期 Oracle 23ai:深入浅出向量索引(20250117)

数据库管理285期 20245-01-17 数据库管理-第285期 Oracle 23ai&#xff1a;深入浅出向量索引&#xff08;20250117&#xff09;1 HNSW事务支持解读 2 IVF分区支持解读 3 混合向量索引何时选择混合向量索引为何选择混合向量索引 总结 数据库管理-第285期 Oracle 23ai&#xff1a…

行人识别检测数据集,yolo格式,PASICAL VOC XML,COCO JSON,darknet等格式的标注都支持,准确识别率可达99.5%

作者简介&#xff1a; 高科&#xff0c;先后在 IBM PlatformComputing从事网格计算&#xff0c;淘米网&#xff0c;网易从事游戏服务器开发&#xff0c;拥有丰富的C&#xff0c;go等语言开发经验&#xff0c;mysql&#xff0c;mongo&#xff0c;redis等数据库&#xff0c;设计模…

【Spring】原型 Bean 被固定

问题描述 在定义 Bean 时&#xff0c;有时候我们会使用原型 Bean&#xff0c;例如定义如下&#xff1a; Service Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class ServiceImpl { }然后我们按照下面的方式去使用它&#xff1a; RestController public class Hello…

2024年美赛C题评委文章及O奖论文解读 | AI工具如何影响数学建模?从评委和O奖论文出发-O奖论文做对了什么?

模型假设仅仅是简单陈述吗&#xff1f;允许AI的使用是否降低了比赛难度&#xff1f;还在依赖机器学习的模型吗&#xff1f;处理题目的方法有哪些&#xff1f;O奖论文的优点在哪里&#xff1f; 本文调研了当年赛题的评委文章和O奖论文&#xff0c;这些问题都会在文章中一一解答…