在Asp.Net Core中集成Kafka

  在我们的业务中,我们通常需要在自己的业务子系统之间相互发送消息,一端去发送消息另一端去消费当前消息,这就涉及到使用消息队列MQ的一些内容,消息队列成熟的框架有多种,这里你可以读这篇文章来了解这些MQ的不同,这篇文章的主要目的是用来系统讲述如何在Asp.Net Core中使用Kafka,整篇文章将介绍如何写消息发送方代码、消费方代码、配套的工具的使用,希望读完这篇文章之后对整个消息的运行机制有一定的理解,在这里通过一张图来简要了解一下消息队列中的一些概念。


640?wx_fmt=png

图一 Kafka消息队列

  一 安装NUGET包

  在写代码之前首先要做的就是安装nuget包了,我们这里使用的是Confluent.Kafka 1.0.0-RC4版本,具体项目要根据具体的时间来确定引用包的版本,这些包可能更新比较快。

640?wx_fmt=png

图二 引用Kafka包依赖

  二 消息发送方(Producer)

  1 在项目中添加所有触发事件的接口 IIntegrationEvent,后面所有的触发事件都是继承自这个接口。


/// <summary>
/// 集成事件的接口定义
/// </summary>
public interface IIntegrationEvent {
string Key { get; set; }
}

  2 定义Kafka生产者


/// <summary>
/// Kafka 生产者的 Domain Service
/// </summary>
public class KafkaProducer : DomainService {
private readonly IConfiguration _config;
private readonly ILogger<KafkaProducer> _logger;
public KafkaProducer(IConfiguration config,
ILogger<KafkaProducer> logger) {
_config = config;
_logger = logger;
}
/// <summary>
/// 发送事件
/// </summary>
/// <param name="event"></param>
public void Produce(IIntegrationEvent @event) {
var topic = _config.GetValue<string>($"Kafka:Topics:{@event.GetType().Name}");
var producerConfig = new ProducerConfig {
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
MessageTimeoutMs = _config.GetValue<int>("Kafka:MessageTimeoutMs")
};
var builder = new ProducerBuilder<string, string>(producerConfig);
using (var producer = builder.Build()) {
try {
var json = JsonConvert.SerializeObject(@event);
var dr = producer.ProduceAsync(topic, new Message<string, string> { Key = @event.Key, Value = json }).GetAwaiter().GetResult();
_logger.LogDebug("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
} catch (ProduceException<string, string> ex) {
_logger.LogError(ex, "发送事件到 {0} 失败,原因 {1} ", topic, ex.Error.Reason);
}
}
}
}

  在这里我们的Producer根据业务的需要定义在领域服务中,这里面最关键的就是Produce方法了,该方法的参数是继承自IIntegrationEvent 接口的各种各样事件,在这个方法中,我们获取配置在appsetting.json中配置的各种Topic以及Kafka服务器的地址,具体的配置如下方截图所示。  

640?wx_fmt=png

图三 配置服务器地址以及各种Topic

  通过当前配置我们就知道我们的消息要发往何处,然后我们就可以创建一个producer来将我们的事件(实际上是定义的数据结构)序列化成Json,然后通过异步的方式发送出去,这里需要注意我们创建的Producer要放在一个using块中,这样在创建完成并发送消息之后就会释放当前生产者。这里如果发送失败会在当前日志中记录发送的值以及错误的原因从而便于进行调试。这里举出其中的一个事件RepairContractFinishedEvent为例来说明。


/// <summary>
/// 维修合同完成的事件
/// </summary>
public class RepairContractFinishedEvent : IIntegrationEvent {
public RepairContract RepairContract { get; set; }
//一个维修合同会对应多个调整单
public List<RepairContractAdjust> RepairContractAdjusts { get; set; }
public string Key { get; set; }
}

  这个里面RepairContract以及List集合都是我们定义的一种数据结构。

  最后我们来看看在具体的领域层中我们该如何触发此事件的,这里我们也定义了一个叫做IRepairContractEventManager接口的领域服务,并在里面定义了一个叫做Finished的接口,然后在RepairContractEventManager中实现该方法。


public class RepairContractEventManager : DomainService, IRepairContractEventManager {
private readonly KafkaProducer _producer;
private readonly IRepository<RepairContract, Guid> _repairContractRepository;
private readonly IRepository<RepairContractAdjust, Guid> _repairContractAdjustRepository;
public RepairContractEventManager(KafkaProducer producer,
IRepository<RepairContract, Guid> repairContractRepository,
IRepository<RepairContractAdjust, Guid> repairContractAdjustRepository) {
_producer = producer;
_repairContractRepository = repairContractRepository;
_repairContractAdjustRepository = repairContractAdjustRepository;
}
public void Finished(Guid repairContractId) {
var repairContract = _repairContractRepository.GetAll()
.Include(c => c.RepairContractWorkItems).ThenInclude(w => w.Materials)
.SingleOrDefaultAsync(c => c.Id == repairContractId).GetAwaiter().GetResult();
var repairContractAdjusts = _repairContractAdjustRepository.GetAll()
.Include(a => a.WorkItems).ThenInclude(w => w.Materials)
.Where(a => a.RepairContractId == repairContractId).ToListAsync().GetAwaiter().GetResult();
var @event = new RepairContractFinishedEvent {
Key = repairContract?.Code,
RepairContract = repairContract,
RepairContractAdjusts = repairContractAdjusts
};
_producer.Produce(@event);
}
}

 

 这段代码就是组装RepairContractFinishedEvent的具体实现过程,然后调用我们之前创建的KafkaProducer对象然后将消息发送出去,这样在需要触发当前RepairContractFinishedEvent 的地方来注入IRepairContractEventManager接口,然后调对应的Finished方法,这样就完成了整个消息的发送的过程了。

  三 查看消息的发送

  在发送完消息后我们可以到Kafka 集群 Control Center中查找我们发送的所有消息。选择其中的一条消息,双击,然后选择INSPECT来查看发送的消息

640?wx_fmt=png

图四 Kafka Control Center中查看发送消息

  四 消息的接收方(Consumer)

  在正确创建消息的发送方后紧接着就是定义消息的接收方了,消息的接收方顾名思义就是消费刚才消息的一方,这里的步骤和发送类似,但是也有很大的不同,消息的消费方核心是一个后台服务,并且在单独的线程中监听来自发送方的消息,并进行消费,这里我们先定义一个叫做KafkaConsumerHostedService的基类,我们具体来看看代码。


/// <summary>
/// Kafka 消费者的后台服务基础类
/// </summary>
/// <typeparam name="T">事件类型</typeparam>
public abstract class KafkaConsumerHostedService<T> : BackgroundService where T : IIntegrationEvent {
protected readonly IServiceProvider _services;
protected readonly IConfiguration _config;
protected readonly ILogger<KafkaConsumerHostedService<T>> _logger;
public KafkaConsumerHostedService(IServiceProvider services, IConfiguration config, ILogger<KafkaConsumerHostedService<T>> logger) {
_services = services;
_config = config;
_logger = logger;
}
/// <summary>
/// 消费该事件,比如调用 Application Service 持久化数据等
/// </summary>
/// <param name="event">事件内容</param>
protected abstract void DoWork(T @event);
/// <summary>
/// 构造 Kafka 消费者实例,监听指定 Topic,获得最新的事件
/// </summary>
/// <param name="stoppingToken">终止标识</param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
await Task.Factory.StartNew(() => {
var topic = _config.GetValue<string>($"Kafka:Topics:{typeof(T).Name}");
var consumerConfig = new ConsumerConfig {
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = _config.GetValue<string>("Application:Name"),
EnableAutoCommit = true,
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
using (var consumer = builder.Build()) {
consumer.Subscribe(topic);
while (!stoppingToken.IsCancellationRequested) {
try {
var result = consumer.Consume(stoppingToken);
var @event = JsonConvert.DeserializeObject<T>(result.Value);
DoWork(@event);
//consumer.StoreOffset(result);
} catch (OperationCanceledException ex) {
consumer.Close();
_logger.LogDebug(ex, "Kafka 消费者结束,退出后台线程");
} catch (AbpValidationException ex) {
_logger.LogError(ex, $"Kafka {GetValidationErrorNarrative(ex)}");
} catch (ConsumeException ex) {
_logger.LogError(ex, "Kafka 消费者产生异常");
} catch (KafkaException ex) {
_logger.LogError(ex, "Kafka 产生异常");
} catch (ValidationException ex) {
_logger.LogError(ex, "Kafka 消息验证失败");
} catch (Exception ex) {
_logger.LogError(ex, "Kafka 捕获意外异常");
}
}
}
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
private string GetValidationErrorNarrative(AbpValidationException validationException) {
var detailBuilder = new StringBuilder();
detailBuilder.AppendLine("验证过程中检测到以下错误");
foreach (var validationResult in validationException.ValidationErrors) {
detailBuilder.AppendFormat(" - {0}", validationResult.ErrorMessage);
detailBuilder.AppendLine();
}
return detailBuilder.ToString();
}
}

  这段代码中我们会创建一个consumer,这里我们会在一个While循环中去订阅特定Topic消息,这里的BootstrapServers是和发送方保持一致,并且也是在当前应用程序中的appsetting.json中进行配置的,而且这里的consumer.Consume方法是一个阻塞式方法,当发送方发送特定事件后,这里会接收到同样名称的Topic的消息,然后将接收到的Json数据进行反序列化,然后交由后面的DoWork方法进行处理。这里还是以之前生成者发送的RepairContractFinished事件为例,这里也需要定义一个RepairContractFinishedEventHandler来处理生产者发送的消息。


public class RepairContractFinishedEventHandler : KafkaConsumerHostedService<RepairContractFinishedEvent> {
public RepairContractFinishedEventHandler(IServiceProvider services,
IConfiguration config, ILogger<KafkaConsumerHostedService<RepairContractFinishedEvent>> logger)
: base(services, config, logger) {
}
/// <summary>
/// 调用 Application Service,新增或更新维修合同及关联实体
/// </summary>
/// <param name="event">待消费的事件</param>
protected override void DoWork(RepairContractFinishedEvent @event) {
using (var scope = _services.CreateScope()) {
var service = scope.ServiceProvider.GetRequiredService<IRepairContractAppService>();
service.AddOrUpdateRepairContract(@event.RepairContract, @event.RepairContractAdjusts);
}
}
}

  这里需要特别注意的是在这里我么也需要定义一个继承自IIntegrationEvent接口的事件,这里也是定义一种数据结构,并且这里的数据结构和生成者定义的要保持一致,否则消费方在反序列化的时候会丢失不能够匹配的信息。


public class RepairContractFinishedEvent : IIntegrationEvent {
public RepairContractDto RepairContract { get; set; }
public List<RepairContractAdjustDto> RepairContractAdjusts { get; set; }
public string Key { get; set; }
}

  另外在DoWork方法中我们也需要注意代码也需要用using包裹,从而在消费方消费完后释放掉当前的应用服务。最后需要注意的就是我们的每一个Handle都是一个后台服务,我们需要在Asp.Net Core的Startup的ConfigureServices进行配置,从而将当前的后台服务添加到Asp.Net Core依赖注入容器中。


/// <summary>
/// 注册集成事件的处理器
/// </summary>
/// <param name="services"></param>
private void AddIntegrationEventHandlers(IServiceCollection services) {
services.AddHostedService<RepairContractFinishedEventHandler>();
services.AddHostedService<ProductTransferDataEventHandler>();
services.AddHostedService<PartUpdateEventHandler>();
services.AddHostedService<VehicleSoldFinishedEventHandler>();
services.AddHostedService<AddOrUpdateDealerEventHandler>();
services.AddHostedService<AddOrUpdateProductCategoryEventHandler>();
services.AddHostedService<CustomerFinishedEventHandler>();
services.AddHostedService<VehicleSoldUpdateStatusEventHandler>();
services.AddHostedService<AddCustomerEventHandler>();
}

  最后我们也看看我们的appsetting.json的配置文件关于kafka的配置。


"Kafka": {
"BootstrapServers": "127.0.0.1:9092",
"MessageTimeoutMs": 5000,
"Topics": {
"RepairContractFinishedEvent": "repair-contract-finished",
"AddOrUpdateProductCategoryEvent": "add-update-product-category",
"AddOrUpdateDealerEvent": "add-update-dealer",
"ClaimApproveEvent": "claim-approve",
"ProductTransferDataEvent": "product-update",
"PartUpdateEvent": "part-update",
"VehicleSoldFinishedEvent": "vehiclesold-finished",
"CustomerFinishedEvent": "customer-update",
"VehicleInformationUpdateStatusEvent": "add-update-vehicle-info",
"AddCustomerEvent": "add-customer"
}
},

  这里需要注意的是发送方和接收方必须保证Topic一致,并且配置的服务器名称端口保持一致,这样才能够保证消息的准确发送和接收。最后对于服务端,这里推荐一个VSCode的插件kafka,能够创建并发送消息,这样就方便我们来发送我们需要的数据了,这里同样需要我们先建立一个.kafka的文件,然后配置Kafka服务的地址和端口号。

640?wx_fmt=png

图五 利用VSCode Kafka插件发送消息

原文地址:https://www.cnblogs.com/seekdream/p/10757541.html

.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com 
640?wx_fmt=jpeg


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/316139.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分享一个.NET平台开源免费跨平台的大数据分析框架.NET for Apache Spark

今天早上六点半左右微信群里就看到张队发的关于.NET Spark大数据的链接https://devblogs.microsoft.com/dotnet/introducing-net-for-apache-spark/ &#xff0c;正印证了“微软在不断通过.NET Core补齐各领域开发&#xff0c;真正实现一种语言的跨平台”这句话。那么我们今天就…

acwing3132. 食物(BZOJ3028)

acwing3132. 食物 题意&#xff1a; 你当然要帮他计算携带 N 件物品的方案数。 承德汉堡&#xff1a;偶数个。 可乐&#xff1a;0 个或 1 个。 鸡腿&#xff1a;0 个&#xff0c;1 个或 2 个。 蜜桃多&#xff1a;奇数个。 鸡块&#xff1a;4 的倍数个。 包子&#xff1a;0 个…

持续畅销20年的《C#高级编程》出第11版了!

TA是谁&#xff1f;Wrox精品红皮书&#xff0c;引领无数程序员进入程序开发殿堂&#xff0c;C#专家级指南&#xff0c;是经验丰富的程序员提高效率的更快捷方式&#xff0c;连续畅销20年&#xff0c;累计销量超30万册。TA出生名门&#xff1a;TA战绩辉煌&#xff1a;2019新的征…

.NET微服务体系结构中为什么使用Ocelot实现API网关

为什么要使用API网关而不是直接通信&#xff1f;在微服务架构中&#xff0c;客户端应用程序通常需要使用来自多个微服务的功能。如果直接执行该消费&#xff0c;则客户端需要处理多个微服务端点以进行呼叫。当应用程序发展并引入新的微服务或更新现有的微服务时会发生什么&…

基于Jenkins Pipeline的ASP.NET Core持续集成实践

最近在公司实践持续集成&#xff0c;使用到了Jenkins的Pipeline来提高团队基于ASP.NET Core API服务的集成与部署&#xff0c;因此这里总结一下。一、关于持续集成与Jenkins Pipeline1.1 持续集成相关概念互联网软件的开发和发布&#xff0c;已经形成了一套标准流程&#xff0c…

编程语言之父谈语言设计,龟叔大赞TypeScript

争论哪门编程语言孰优孰劣&#xff0c;长期以来都是程序员乐此不疲的“娱乐活动”。之所以说是娱乐活动&#xff0c;因为这些争论到最后往往只是各自在发泄情绪&#xff0c;再则就是&#xff0c;脱离使用场景去讨论所谓哪门语言更好并没意义。但如果让编程语言作者坐在一起讨论…

你必须知道的 SmartSql

介绍SmartSql MyBatis Cache(Memory | Redis) R/W Splitting Dynamic Repository Diagnostics ......简洁、高效、高性能、扩展性、监控、渐进式开发&#xff01;她是如何工作的&#xff1f;SmartSql 借鉴了 MyBatis 的思想&#xff0c;使用 XML 来管理 SQL &#xff0c;并…

OsharpNS轻量级.net core快速开发框架简明入门教程

OsharpNS官方资源项目地址&#xff1a;https://github.com/i66soft/osharp-ns20演示地址&#xff1a;https://www.osharp.org 直接使用QQ登录可以查看效果文档地址&#xff1a;https://docs.osharp.org 正在完善中....发布博客&#xff1a;https://www.cnblogs.com/guomingfeng…

.net core 注入机制与Autofac

本来是要先出注入机制再出 管道 的&#xff0c;哈哈哈……就是不按计划来……这里扯扯题外话&#xff1a;为什么要注入&#xff08;DI&#xff0c;dependency-injection&#xff09;&#xff0c;而不用 new 对象&#xff1f;可能我们都很清楚&#xff0c;new 对象所造成的影响就…

浅析 .Net Core中Json配置的自动更新

Pre很早在看 Jesse 的Asp.net Core快速入门的课程的时候就了解到了在Asp .net core中,如果添加的Json配置被更改了,是支持自动重载配置的,作为一名有着严重"造轮子"情节的程序员,最近在折腾一个博客系统,也想造出一个这样能自动更新以Mysql为数据源的ConfigureSource…

E. Don‘t Really Like How The Story Ends(代码未补)

Don’t Really Like How The Story Ends 题意&#xff1a; 有n个点&#xff0c;m个边&#xff0c;现在要从1号边开始求dfs序&#xff0c;问最少加多少边可以是的dfs序是从1到n&#xff1f; 题解&#xff1a; dfs序的过程中&#xff0c;不走到叶子节点我们是无法回溯的&…

.NET Core 迁移躺坑记续集--Win下莫名其妙的超时

继上一集.NET Core 迁移躺坑记里说到遇到的各种问题并且弄了n个解决方案之后&#xff0c;特别是对于问题4的解决方案对于切换了HttpClientFactory我用了你家netcore 2.1下专门解决之前HttpClient口病已久的灵丹妙药了&#xff0c;信心满满的上线…..然后挂了&#xff0c;该超时…

使用Entity Framework Core访问数据库(Oracle篇)

前言哇。。看看时间 真的很久很久没写博客了 将近一年了。最近一直在忙各种家中事务和公司的新框架 终于抽出时间来更新一波了。本篇主要讲一下关于Entity Framework Core访问oracle数据库的采坑。。强调一下&#xff0c;本篇文章发布之前 关于Entity Framework Core访问oracl…

Asp.Net Core Docker镜像更新系统从wheezy改为stretch

之前写过一个在Asp.Net Core里调用System.Drawing.Common绘图的DEMO&#xff0c;部署到Docker里运行&#xff0c;需要更新Asp.Net Core镜像的操作系统。https://www.cnblogs.com/sunnytrudeau/p/9384620.html当时用的阿里云的源RUN echo "deb http://mirrors.aliyun.com/d…

Monster Hunter(2020南京M)

Monster Hunter(2020南京M) 题意&#xff1a; 给你一颗树&#xff0c;树上每个节点都是一个hpi 血量的怪物。打败每个怪物所需要的能量值为hpi 所 有 存 活 的 直 接 子 节 点 的 hpj 。每次必须要消灭父节点后才能消灭子节点。此外你还有m个魔咒&#xff0c;每个魔咒可以不…

网络数据采集(AngleSharp)-使用AngleSharp做html解析

有这么一本Python的书: <<Python 网络数据采集>>我准备用.NET Core及第三方库实现里面所有的例子. 这是第一部分, 主要使用的是AngleSharp: https://anglesharp.github.io/(文章的章节书与该书是对应的)发送Http请求在python里面这样发送http请求, 它使用的是pytho…

ASP.NET Core在Azure Kubernetes Service中的部署和管理

目标部署&#xff1a;掌握将aspnetcore程序成功发布到Azure Kubernetes Service&#xff08;AKS&#xff09;上管理&#xff1a;掌握将AKS上的aspnetcore程序扩容、更新版本准备工作注册 Azure 账户官网免费帐户Azure 免费帐户仅适用于新用户&#xff0c;并且仅限每个客户一个免…

深入研究 Mini ASP.NET Core,看看 ASP.NET Core 内部到底是如何运行的

几年前&#xff0c;Artech 老师写过一个 Mini MVC&#xff0c;用简单的代码告诉读者 ASP.NET MVC 内部到底是如何运行的。当时我研究完以后&#xff0c;受益匪浅&#xff0c;内心充满了对 Artech 老师的感激&#xff0c;然后用我自己理解的 MVC 知识&#xff0c;写了一篇 深入研…

一文读懂Asp.net core 依赖注入(Dependency injection)

一、什么是依赖注入首先在Asp.net core中是支持依赖注入软件设计模式&#xff0c;或者说依赖注入是asp.net core的核心&#xff1b;依赖注入&#xff08;DI&#xff09;和控制反转&#xff08;IOC&#xff09;基本是一个意思&#xff0c;因为说起来谁都离不开谁&#xff1b;或者…

P4619 [SDOI2018]旧试题

P4619 [SDOI2018]旧试题 题意&#xff1a; 求个式子&#xff1a; (∑i1A∑j1B∑k1Cd(i∗j∗k))mod(1097)(\sum_{i1}^{A}\sum_{j1}^{B}\sum_{k1}^{C}d(i*j*k))mod(10^97)(i1∑A​j1∑B​k1∑C​d(i∗j∗k))mod(1097) 题解&#xff1a; 原创博文1k纪念 很明显&#xff0c;莫比…