Kafka学习征途:.NET Core操作Kafka

4afb10bfeb4373ffb5c7d742384e9dcc.png

【Kafka】| 总结/Edison Zhou

1可用的Kafka .NET客户端

作为一个.NET Developer,自然想要在.NET项目中集成Kafka实现发布订阅功能。那么,目前可用的Kafka客户端有哪些呢?

目前.NET圈子主流使用的是 Confluent.Kafka

confluent-kafka-dotnet : https://github.com/confluentinc/confluent-kafka-dotnet

其他主流的客户端还有rdkafka-dotnet项目,但是其已经被并入confluent-kakfa-dotnet项目进行维护了。

因此,推荐使用confluent-kafka-dotnet,其配置友好,功能也更全面。

NCC千星项目CAP的Kafka扩展包(DotNetCore.CAP.Kafka)内部也是基于Confluent.Kafka来实现的:

efba0e142bd6509587b5ba732694541e.png

接下来,本文就来在.NET Core项目下通过Confluent.Kafka和CAP两个主流开源项目来操作一下Kafka,实现一下发布订阅的功能。

2基于Confluent.Kafka的Sample

要完成本文示例,首先得有一个启动好的Kafka Broker服务。关于如何搭建Kafka,请参考上一篇:通过Docker部署Kafka集群。

安装相关组件

在.NET Core项目中新建一个类库,暂且命名为EDT.Kafka.Core,安装Confluent.Kafka组件:

PM>Install-Package Confluent.Kafka

编写KafkaService

编写IKafkaService接口:

namespace EDT.Kafka.Core
{public interface IKafkaService{Task PublishAsync<T>(string topicName, T message) where T : class;Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;}
}

编写KafkaService实现类:

namespace EDT.Kafka.Core
{public class KafkaService : IKafkaService{public static string KAFKA_SERVERS = "127.0.0.1:9091";public async Task PublishAsync<T>(string topicName, T message) where T : class{var config = new ProducerConfig { BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小为16KLingerMs = 20 // 修改等待时间为20ms};using (var producer = new ProducerBuilder<string, string>(config).Build()){await producer.ProduceAsync(topicName, new Message<string, string>{Key = Guid.NewGuid().ToString(),Value = JsonConvert.SerializeObject(message)}); ;}}public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class{var config = new ConsumerConfig{BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer",EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假设只需要Leader响应即可AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起};using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()){consumer.Subscribe(topics);try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");if (consumeResult.IsPartitionEOF){Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}T messageResult = null;try{messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);}catch (Exception ex){var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";Console.WriteLine(errorMessage);messageResult = null;}if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine(e.Message);}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}}await Task.CompletedTask;}}
}

为了方便后续的演示,在此项目中再创建一个类 EventData:

public class EventData
{public string TopicName { get; set; }public string Message { get; set; }public DateTime EventTime { get; set; }
}

编写Producer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Producer,其主体内容如下:

namespace EDT.Kafka.Demo.Producer
{public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();for (int i = 0; i < 50; i++){var eventData = new EventData{TopicName = "testtopic",Message = $"This is a message from Producer, Index : {i + 1}",EventTime = DateTime.Now};await kafkaService.PublishAsync(eventData.TopicName, eventData);}Console.WriteLine("Publish Done!");Console.ReadKey();}}
}

编写Consumer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Consumer,其主体内容如下:

namespace EDT.Kafka.Demo.Consumer
{public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();var topics = new List<string> { "testtopic" };await kafkaService.SubscribeAsync<EventData>(topics, (eventData) => {Console.WriteLine($" - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");});}}
}

测试Pub/Sub效果

将Producer和Consumer两个项目都启动起来,可以看到当Consumer消费完50条消息并一一确认之后,Producer这边就算发布结束。

0eb57c565ee5d83e8431b90d680012a7.png

3基于CAP项目的Sample

模拟场景说明

假设我们有两个微服务,一个是Catalog微服务,一个是Basket微服务,当Catalog微服务产生了Product价格更新的事件,就会将其发布到Kafka,Basket微服务作为消费者就会订阅这个消息然后更新购物车中对应商品的最新价格。

5e67dffa1e91a1b651d43cf91bf6760a.png

Catalog API

新建一个ASP.NET Core WebAPI项目,然后分别安装以下组件:

PM>Install Package DotNetCore.CAP
PM>Install Package DotNetCore.CAP.MongoDB
PM>Install Package DotNetCore.CAP.Kafka

在Startup中的ConfigureServices方法中注入CAP:

public void ConfigureServices(IServiceCollection services)
{......services.AddCap(x =>{x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");});
}

新建一个ProductController,实现一个Update产品价格的接口,在其中通过CapPublisher完成发布消息到Kafka:

namespace EDT.Demo.Catalog.API.Controllers
{[ApiController][Route("[controller]")]public class ProductController : ControllerBase{private static readonly IList<Product> Products = new List<Product>{new Product { Id = "0001", Name = "电动牙刷A", Price = 99.90M,  Introduction = "暂无介绍" },new Product { Id = "0002", Name = "电动牙刷B", Price = 199.90M,  Introduction = "暂无介绍" },new Product { Id = "0003", Name = "洗衣机A", Price = 2999.90M,  Introduction = "暂无介绍" },new Product { Id = "0004", Name = "洗衣机B", Price = 3999.90M,  Introduction = "暂无介绍" },new Product { Id = "0005", Name = "电视机A", Price = 1899.90M,  Introduction = "暂无介绍" },};private readonly ICapPublisher _publisher;private readonly IMapper _mapper;public ProductController(ICapPublisher publisher, IMapper mapper){_publisher = publisher;_mapper = mapper;}[HttpGet]public IList<ProductDTO> Get(){return _mapper.Map<IList<ProductDTO>>(Products); ;}[HttpPut]public async Task<IActionResult> UpdatePrice(string id, decimal newPrice){// 业务代码var product = Products.FirstOrDefault(p => p.Id == id);product.Price = newPrice;// 发布消息await _publisher.PublishAsync("ProductPriceChanged", new ProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});return NoContent();}}
}

Basket API

参照Catalog API项目创建ASP.NET Core WebAPI项目,并安装对应组件,在ConfigureServices方法中注入CAP。

新建一个BasketController,用于订阅Kafka对应Topic:ProductPriceChanged 的消息。

namespace EDT.Demo.Basket.API.Controllers
{[ApiController][Route("[controller]")]public class BasketController : ControllerBase{private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO>{new MyBasketDTO { UserId = "U001", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0001", Name = "电动牙刷A", Price = 99.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0005", Name = "电视机A", Price = 1899.90M }, Count = 1 },}    },new MyBasketDTO { UserId = "U002", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0002", Name = "电动牙刷B", Price = 199.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0004", Name = "洗衣机B", Price = 3999.90M }, Count = 1 },}}};[HttpGet]public IList<MyBasketDTO> Get(){return Baskets;}[NonAction][CapSubscribe("ProductPriceChanged")]public async Task RefreshBasketProductPrice(ProductDTO productDTO){if (productDTO == null)return;foreach (var basket in Baskets){foreach (var catalog in basket.Catalogs){if (catalog.Product.Id == productDTO.Id){catalog.Product.Price = productDTO.Price;break;}}}await Task.CompletedTask;}}
}

测试效果

同时启动Catalog API 和 Basket API两个项目。

首先,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品是199.9元。

2e675b691deb7d5697ca8bfc944aaa4b.png

然后,通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。

18f6e41e8cc48e13b8a4ec3824ebe107.png

最后,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品已更新至499.9元。

922f592196747ba84c8318fccc03abf5.png

End总结

本文总结了.NET Core如何通过对应客户端操作Kafka,基于Confluent.Kafka项目和CAP项目可以方便的实现发布订阅的效果。

参考资料

阿星Plus,《.NET Core下使用Kafka》:https://blog.csdn.net/meowv/article/details/108675741

麦比乌斯皇,《.NET使用Kafka小结》:https://www.cnblogs.com/hsxian/p/12907542.html

Tony,《.NET Core事件总线解决方案:CAP基于Kafka》:https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html

极客时间,胡夕《Kafka核心技术与实战》

B站,尚硅谷《Kafka 3.x入门到精通教程》

3b5db3496503b68f3f6146c45eb2b8bb.gif

年终总结:Edison的2020年终总结

数字化转型:我在传统企业做数字化转型

C#刷题:C#刷剑指Offer算法题系列文章目录

.NET面试:.NET开发面试知识体系

.NET大会:2020年中国.NET开发者大会PDF资料

91cc2e3d310148633b7d09b52fc5af49.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/285539.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言试题107之判断 101至200 之间有多少个素数,并输出所有素数。

✅作者简介:大家好我是码莎拉蒂,CSDN博客专家🥇🥇🥇 📃个人主页:个人主页 🔥系列专栏:C语言试题200例 💬推荐一款模拟面试、刷题神器👉 点击跳转进入网站 1、题目 题目:判断 101至200 之间有多少个素数,并输出所有素数 分析:判断素数的方法:用一个数分…

C语言将十进制输出二进制、八进制、十六进制的方法总结

文章目录 方法一:直接输出方法二:itoa函数方法一:直接输出 C语言中,控制printf函数输出格式的是格式字符,printf没有直接打出2进制数的格式符,直接打出16进制的格式符是x格式符,即%x。 printf函数中输出的格式为printf("<格式化字符串>", <参量表&…

四叉树算法

2019独角兽企业重金招聘Python工程师标准>>> title: 四叉树算法 date: 2016-1-11 15:10 categories: IOS tags: 算法 小小程序猿我的博客&#xff1a;http://daycoding.com 转载&#xff1a;http://blog.csdn.net/zhanxinhang/article/details/6706217 高德iOS聚合…

2019年中国教育信息化行业研究报告

2019年中国教育信息化行业研究报告 教育行业丨研究报告 本文转自&#xff1a;艾瑞咨询 核心摘要&#xff1a; 教育信息化2.0时代&#xff0c;教育相关政府/学校以更开放的姿态对待社会各类业态的进入&#xff0c;共建共享优质教育资源&#xff0c;提升教育公平与教育质量。同…

C语言试题109之将一个正整数分解质因数。例如:输入 90,打印出 90=2乘3乘3乘5

✅作者简介:大家好我是码莎拉蒂,CSDN博客专家🥇🥇🥇 📃个人主页:个人主页 🔥系列专栏:C语言试题200例 💬推荐一款模拟面试、刷题神器👉 点击跳转进入网站 1、题目 题目:将一个正整数分解质因数。例如:输入 90,打印出 90=233*5。 分析:对 n 进行分解质因…

dotnet 使用 Crossgen2 对 DLL 进行 ReadyToRun 提升启动性能

我对几个应用进行严格的启动性能评估&#xff0c;对比了在 .NET Framework 和 dotnet 6 下的应用启动性能&#xff0c;非常符合预期的可以看到&#xff0c;在用户的设备上&#xff0c;经过了 NGen 之后的 .NET Framework 可以提供非常优越的启动性能&#xff0c;再加上 .NET Fr…

使用myeclipse建立maven项目(重要)

maven是管理项目的&#xff0c;myeclipse是编写代码的。第一次写项目都要配置好多东西&#xff0c;很麻烦&#xff0c;now 来看看怎样新建一个maven项目。 工具/原料 myeclipsemaven方法/步骤 因为教程使用的maven是自己下载配置的&#xff0c;并没有使用myeclipse自带的&#…

LeetCode 每日一题 Day 22 || 枚举(数学方法)/二分

1954. 收集足够苹果的最小花园周长 给你一个用无限二维网格表示的花园&#xff0c;每一个 整数坐标处都有一棵苹果树。整数坐标 (i, j) 处的苹果树有 |i| |j| 个苹果。 你将会买下正中心坐标是 (0, 0) 的一块 正方形土地 &#xff0c;且每条边都与两条坐标轴之一平行。 给你…

不用@微信官网了,用python给自己的微信头像加个小国旗

国旗LOGO&#xff08;png透明格式&#xff09;&#xff1a; 微信头像 合成结果&#xff1a; import base64 import os import re from io import BytesIO from PIL import Image import tkinter as tk from tkinter import filedialog# 水印图片 可以自己指定 #markImageImage…

C语言试题106之有一对兔子问题

✅作者简介:大家好我是码莎拉蒂,CSDN博客专家🥇🥇🥇 📃个人主页:个人主页 🔥系列专栏:C语言试题200例 💬推荐一款模拟面试、刷题神器👉 点击跳转进入网站 1、题目 题目:有一对兔子,从出生后第 3 个月起每个月都生一对兔子,小兔子长到第三个月 后每个月又…

Blazor University (34)表单 —— 获得表单状态

原文链接&#xff1a;https://blazor-university.com/forms/accessing-form-state/获得表单状态源代码[1]有时&#xff0c;我们需要获得 <EditForm> 子内容中的表单状态。最常见的用途是当我们需要访问输入的 CSS 类时&#xff0c;指示输入是否被修改或有效/无效。例如&a…

[转]c# 中间件 的扩展模型(.net webapi/.net Core 的 MiddleWare 处理模型)

在学习 asp.net WebApi 或者asp.net Core 的时候&#xff0c;它们管道的处理模型跟 asp.net MVC/WebForm 的管道模型是不一样的。 asp.net WebApi 或者asp.net Core 他们使用了一种叫做“中间件”的处理模型&#xff0c;相对于传统管道模型&#xff0c;剔除了很多非必要的处理…

【ArcGIS Engine二次开发】入门基础(1):ArcGIS Engine简介及开发环境搭建

文章目录ArcGIS Engine概述ArcGIS Engine与ArcObjects的关系ArcGIS Engine下载及安装ArcGIS Engine概述 ArcGIS Engine简介 ArcGIS Engine是ESRI公司在2004年推出的用于开发C/S架构GIS应用软件的工具包&#xff0c;是将用于构建ArcGIS整套产品的组件库——ArcObjects的比分功…

微软Visual Studio 2019版本16.3 正式发布,支持 .NET Core 3.0

微软正式发布了Visual Studio 2019 16.3版本&#xff0c;主要更新内容如下&#xff1a; .NET Core 3.0 Visual Studio版本16.3包括对 .NET Core 3.0 的支持。 注意&#xff1a;如果使用的是.NET Core 3.0&#xff0c;则需要使用Visual Studio 16.3或更高版本。 .NET Core桌…

C语言试题120之输入两个正整数 m 和 n,求其最大公约数和最小公倍数

✅作者简介:大家好我是码莎拉蒂,CSDN博客专家🥇🥇🥇 📃个人主页:个人主页 🔥系列专栏:C语言试题200例 💬推荐一款模拟面试、刷题神器👉 点击跳转进入网站 1、题目 题目:输入两个正整数 m 和 n,求其最大公约数和最小公倍数 分析:利用辗除法 2 、温馨提示…

OSChina 周一乱弹 ——致我们终将逝去的青春

2019独角兽企业重金招聘Python工程师标准>>> 我们的青春是这样的。 从幼儿园午睡开始&#xff0c; 做了一万遍的广播体操&#xff0c; 一条充满了“血”和“泪”的三八线 遍地开花的煎饼果子摊 五毛钱只能养活三天的小鸡 象征着财富和地位的弹珠 放学后 奔向世界 放…

【ArcGIS Engine二次开发】入门基础(2):ArcGIS开发方式(VBA、DLL、Add-in、Engine)对比

文章目录2.1 使用VBA进行桌面软件开发2.1.1 VBA开发方式2.1.1 VBA代码的安全性2.2 使用DLL进行桌面软件开发2.2.1 DLL开发方式2.2.2 DLL功能的应用部署2.3 使用Add-in进行桌面软件开发2.4 使用ArcGIS Engine构建独立应用程序对于桌面GIS应用来说&#xff0c;ArcGIS平台不仅提供…

使用.Net驱动Jetson Nano的OLED显示屏

背景最近几年&#xff0c;边缘计算的需求急剧爆发。人工智能、物联网和 5G 的演进给边缘计算带来了无限的可能性。因为工作原因&#xff0c;有幸在2019开始接触了英伟达 Jetson 家族的各种边缘计算设备&#xff1a;Nano&#xff0c;TX2&#xff0c;AGX&#xff0c;NX等&#xf…

工程测量不用愁,120个视频动画帮你忙,简单易学直观明了!

工程测量不用愁,120个视频动画帮你忙,简单易学直观明了!文末提供下载地址。

2016-2017-2学期《程序设计与数据结构》教学进程

2016-2017-2学期《程序设计与数据结构》教学进程 目录 考核方式课前准备教学进程 第00周学习任务和要求第01周学习任务和要求第02周学习任务和要求第03周学习任务和要求第04周学习任务和要求第05周学习任务和要求第06周学习任务和要求第07周学习任务和要求第08周学习任务和要求…