.Net Core 集成 Kafka

最近维护的一个系统并发有点高,所以想引入一个消息队列来进行削峰。考察了一些产品,最终决定使用kafka来当做消息队列。以下是关于kafka的一些知识的整理笔记。

kafka

kafka 是分布式流式平台。它由linkedin开发,后贡献给了Apache开源组织并成为顶级开源项目。它可以应用在高并发场景下的日志系统,也可以当作消息队列来使用,也可以当作消息服务对系统进行解耦。

流处理平台有以下三种特性:

1.可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。2.可以储存流式的记录,并且有较好的容错性。3.可以在流式记录产生时就进行处理。

一般它可以应用于两个场景:

1.构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。(相当于message queue)2.构建实时流式应用程序,对这些流数据进行转换或者影响。(就是流处理,通过kafka stream topic和topic之间内部进行变化)

broker

kafka中的每个节点即每个服务器就是一个broker 。

topic

kafka中的topic是一个分类的概念,表示一类消息。生产者在生产消息的时候需要指定topic,消费者在消费消息的时候也需要指定topic。

partition

partition是分区的概念。kafka的一个topic可以有多个partition。每个partition会分散到不同的broker上,起到负载均衡的作用。生产者的消息会通过算法均匀的分散在各个partition上。

consumer group

kafka的消费者有个组的概念。一个partition可以被多consumer group订阅。每个消息会广播到每一个group中。但是每个消息只会被group中的一个consumer消费。相当于每个group,一个partition只能有一个consumer订阅,所以group中的consumer数量不可以超过topic中partition的数量。并且消息的消费的顺序在每个partition中是保证有序的,但是在多个partition之间是不保证的,因为consumer的消费速度是有快慢的。
所以如果要用kafka实现严格的消息队列点对点模式那么我们可以设置一个partition并且设置一个consumer。如果对消息消费的顺序不是那么敏感,那么可以设置多个partition来并行消费消息,提高吞吐量。

安装kafka

为了能体验下kafka,我们还是要实际安装一下kafka,毕竟空想是没有用的。现在有了docker,安装起来也是相当滴简单。我们只需要定义好docker-compose的yml就行了。

version: '3'
services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkadepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.0.117KAFKA_CREATE_TOPICS: "test:3:1"KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

我们在yml里定义2个service:

1.zookeeper,kafka的分布式依赖zookeeper,所以我需要先定义它。2.kafka ,kafka的定义有几个地方要注意的。

•depends_on:zookeeper 指定kafka依赖zookeeper这个service,当启动kafka的时候自动会启动zookeeper。•KAFKA_ADVERTISED_HOST_NAME 这里要指定宿主机的ip•KAFKA_CREATE_TOPICS 这个变量只是的默认创建的topic。"test:3:1"代表创建一个名为test的topic并且创建3个分区1个复制。

定义好这些之后我们只需要使用docker-compose命令运行它:

sudo docker-compose up -d

.net 操作 kafka

安装好kafka的docker环境之后,下面演示下如何使用.net操作kafka,进行消息的生产与消费。

生产者

        static async Task Main(string[] args){Console.WriteLine("Hello World Producer!");var config = new ProducerConfig{BootstrapServers = "192.168.0.117:9092",ClientId = Dns.GetHostName(),};using (var producer = new ProducerBuilder<Null, string>(config).Build()){string topic = "test";for (int i = 0; i < 100; i++){var msg = "message " + i;Console.WriteLine($"Send message:   value {msg}");var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = msg });Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}");Thread.Sleep(500);}}Console.ReadLine();}

新建一个控制台项目,从nuget安装kafka的官方client。

Install-Package Confluent.Kafka

代码非常简单,使用ProducerBuilder构造一个producer,然后调用ProduceAsync方法发送消息。
其中需要注意的是如果你的场景并发非常之高,官方文档推荐的方法是Produce而不是ProduceAsync。这是一个比较迷的地方。按常理使用ProduceAsync应该比使用同步方法Produce能获得更高的并发才对。但是文档确确实实说高并发场景请使用Produce。可能是为了避免ProduceAsync结果返回的时候异步线程上下文切换造成的性能开销。
原文:

There are a couple of additional benefits of using the Produce method. First, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. With ProduceAsync, this is not the case because Tasks may complete on any thread pool thread. Second, Produce is more performant because there is unavoidable overhead in the higher level Task based API.

消费者

        static void Main(string[] args){Console.WriteLine("Hello World kafka consumer !");var config = new ConsumerConfig{BootstrapServers = "192.168.0.117:9092",GroupId = "foo",AutoOffsetReset = AutoOffsetReset.Earliest};var cancel = false;using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()){var topic = "test";consumer.Subscribe(topic);while (!cancel){var consumeResult = consumer.Consume(CancellationToken.None);Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");}consumer.Close();}}

消费者的演示代码同样很简单。我们需要指定groupId,然后订阅topic。使用ConsumerBuilder构造一个consumer,然后调用Consume方法进行消费就可以。
注意:
这里默认是自动commit消费。你也可以根据情况手动提交commit。

运行一下

我们运行一个生产者进程,按照500ms的速度生产消息。运行三个consumer进行消费,可以看到消息被均匀的推送到三个consumer上去。

总结

以上简单的介绍了kafka的背景、安装方法、使用场景。还简单演示了如何使用.net来操作kafka。它可以当作流式计算平台来使用,也可以当作传统的消息队列使用。它当前非常流行,网上的资料也多如牛毛。官方也提供了简单易用的.net sdk ,为.net 平台集成kafka提供了便利。

关注我的公众号一起玩转技术

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/302880.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如果生活中没有数学,那么。。。

随着科技的快速发展&#xff0c;人工智能的重要性日渐显现。对于大多数新手来说&#xff0c;弄清楚入门人工智能需要哪些数学基础、需要熟悉什么框架等&#xff0c;都至关重要。机器学习是一个异常丰富的研究领域&#xff0c;有大量未解决的问题&#xff1a;公正、可解释性、易…

mysql5.6查看归档_MySQL5.6 使用 pt-archiver 分批删除、归档数据

pt-archiver是一个十分高效的表数据归档工具&#xff0c;归档数据可以分批进行事务处理&#xff0c;减少性能消耗&#xff1b;如果实例开启了GTID&#xff0c;因为GTID不支持CTAS创建表的语法&#xff0c;可以使用pt-archiver处理&#xff1b;对于跨实例或者跨服务器的表数据归…

Microsoft宣布将停止支持多个 .NET Framework版本

Microsoft 宣布&#xff0c;使用传统的、不安全的安全哈希算法1&#xff08;SHA-1&#xff09;签名的多个 .NET 框架版本将在明年停止支持。据 .NET 首席工程经理 Jamshed Damkewala 表示&#xff0c;.NET 框架 4.5.2、4.6 和 4.6.1 将在 2022 年 4 月 26 日后停止支持&#xf…

算法有偏见?总比人类识别强吧!

在讨伐算法导致的偏见和产生的作用时&#xff0c;更重要的问题是&#xff1a;与完全没有使用算法的情况相比是怎样的&#xff1f;我们应该比较算法的缺陷与人类的缺陷&#xff0c;而不是简单地询问算法是否存在缺陷。一场革命正在悄然进行。这场革命与大部分新闻报道中出现的人…

通过Dapr实现一个简单的基于.net的微服务电商系统(八)——一步一步教你如何撸Dapr之链路追踪

Dapr提供了一些开箱即用的分布式链路追踪解决方案&#xff0c;今天我们来讲一讲如何通过dapr的configuration来实现非侵入式链路追踪的目录&#xff1a;一、通过Dapr实现一个简单的基于.net的微服务电商系统二、通过Dapr实现一个简单的基于.net的微服务电商系统(二)——通讯框架…

21副酷炫的动图让你了解各种数学概念

数学是很难的科学&#xff0c;但因为它是科学家用数学来解释宇宙的语言&#xff0c;我们无可避免的要学习它。看看下面的这些GIF动图&#xff0c;它们提供了视觉的方式来帮助你理解各种数学技巧。1椭圆的画法2杨辉三角问题(Pascal triangles)解法3使用“FOIL”轻松的解决二项式…

VMware 虚拟机(linux)增加根目录磁盘空间

今天查看学校的监控报修系统&#xff0c;不能访问了&#xff01;&#xff01;&#xff01;系统运行很慢&#xff0c;用top命令查看发现内存使用率90%&#xff0c;用"df -h ”查看“/”目录使用率已达到80%&#xff0c;导致系统运行很慢。我用以下方法扩大根目录磁盘空间。…

网关Ocelot功能演示完结,久等了~~~

前言关于网关(Ocelot)的分享&#xff0c;还遗留一些功能没演示呢&#xff0c;接着来聊聊&#xff1b;这次重点针对网关Ocelot使用缓存、集成Polly做服务治理、集成IdentityServer4做认证授权来详细说说&#xff1b;如果对上一篇感兴趣&#xff0c;点这里(网关Ocelot功能演示安排…

数学课本上的几大变态

数学课本上的几大变态数据与算法之美用数据解决不可能长按扫码关注

牛逼顿的一生:当智商高到一定程度,情商就不重要了

牛顿老师在科学圈里曾经很有权势&#xff0c;被女王封了爵位成了贵族&#xff0c;人称牛爵爷&#xff0c;官至皇家造币局局长兼皇家学会会长。如果阿尔伯特没有辞了以色列总统的话和他有一拼。说他有权势并不仅是官大&#xff0c;主要是贡献大。如果17世纪就有诺贝尔奖的话&…

趣味图解+源码分析,轻松吃透Linux

如今的软件开发行业&#xff0c;服务器端市场基本被 Linux 系统占领了。移动端中的 Android 系统是基于 Linux 内核开发的&#xff0c;那些很火的虚拟化、消息队列、云计算、大数据等技术&#xff0c;都默认支持 Linux 操作系统。而对软件工程师来说&#xff0c;也几乎一定会遇…

java 发送tcp_Java TCP发送与接收

IP地址&#xff1f;端口号&#xff1f;主机名&#xff1f;什么是Socket?什么是UDP&#xff1f;什么是TCP&#xff1f;UDP和TCP区别&#xff1f;以上问题请自行百度&#xff0c;有标准解释&#xff0c;此处不再赘述&#xff0c;直接上干货&#xff01;实例&#xff1a;发送端&a…

从零维到十维空间

事情是这样的&#xff0c;这周我给学生讲3dmax的课。为了让学生了解三视图我就顺便科普了一下什么是零维、一维、二维、三维空间。讲完不过瘾&#xff0c;感觉一支粉笔一块黑板讲维度是一件很爽的事情&#xff0c;那么.........接下来请同学们打开脑洞&#xff0c;看我用一支笔…

如何更好使用多线程

说到线程相信很多开发人员都会认为只要使用了多线程技术服务性能就会提高很多&#xff0c;但涉及过渡使用问题就很少人去了解。在使用上更多是了解是创建&#xff0c;使用&#xff0c;销毁或使用线程池之类的。但这些资料更多是如何使用线程&#xff0c;但对于应用怎样针对性规…

前端又一本升级版图书上市了,听说比第一版还好看

哇&#xff01;听说《Node.js实战&#xff08;第2版&#xff09;》来了&#xff1f;没错&#xff01;这本让读者久等了的书&#xff0c;终于上市啦&#xff01;最近的升级版图书还是很多的&#xff0c;但是小伙伴对这本的期待值依旧不减&#xff01;毕竟第一版在豆瓣上获得了 8…

人气TOP|当红炸子鸡「小明机器人」,出道走花路啦

在全球新一轮技术革命的时代背景下&#xff0c;越来越多的企业走上了数字化之路。伴随着企业对数字化转型的持续关注&#xff0c;各行各业对“数字化员工”即RPA&#xff08;机器人流程自动化&#xff0c;Robotic Process Automation&#xff09;的需求也越发旺盛&#xff0c;都…

MFC和Win32之三___CGdiObject类和windows Gdi对象

小结&#xff1a; 前面讲到的windows窗口对象&#xff0c;在windows下用句柄来代表之&#xff0c;并且用了一个数据结构WNDCLASS&#xff08;窗口类&#xff09;来描述之。同理&#xff0c;windows的Gdi对象也有一些句柄来代表之&#xff08;比如hPen等&#xff09;&#xff0c…

java io流 教程_Java基础教程:IO流与文件基础

Java:IO流与文件基础说明&#xff1a;本章内容将会持续更新&#xff0c;大家可以关注一下并给我提供建议&#xff0c;谢谢啦。走进流什么是流流&#xff1a;指的是从源到目的地的字节的有序序列。在Java中&#xff0c;可以从其中读取一个字节序列的对象称作 输入流&#xff0c;…

用数学模型向你解释离婚

A Mathematical Model of Sentimental Dynamics Accounting for Marital Dissolution解释离婚的情感动力学数学模型背景西方社会的离婚是普遍存在的。它提出了重大的科学和社会学问题&#xff0c;不管是理论上还是解决方式上。学者和问题处理专家认为存在一种情感关系热力学第二…

记一次 .NET医疗布草API程序 内存暴涨分析

一&#xff1a;背景 1. 讲故事我在年前写过一篇关于CPU爆高的分析文章 再记一次 应用服务器 CPU 暴高事故分析 &#xff0c;当时是给同济做项目升级&#xff0c;看过那篇文章的朋友应该知道&#xff0c;最后的结论是运维人员错误的将 IIS 应用程序池设成 32bit 导致了事故的发生…