消息队列 Kafka 的基本知识及 .NET Core 客户端

前言

最新项目中要用到消息队列来做消息的传输,之所以选着 Kafka 是因为要配合其他 java 项目中,所以就对 Kafka 了解了一下,也算是做个笔记吧。

本篇不谈论 Kafka 和其他的一些消息队列的区别,包括性能及其使用方式。

简介

Kafka 是一个实现了分布式的、具有分区、以及复制的日志的一个服务。它通过一套独特的设计提供了消息系统中间件的功能。它是一种发布订阅功能的消息系统。

一些名词

如果要使用 Kafka ,那么在 Kafka 中有一些名词需要知道,文本不讨论这些名词是否在其他消息队列中具有相同的含义。所有名词均是针对于 Kafka。

Message

消息,就是要发送的内容,一般包装成一个消息对象。

Topic

通俗来讲的话,就是放置“消息”的地方,也就是说消息投递的一个容器。假如把消息看作是信封的话,那么 Topic 就是一个邮筒,如下图所示:

Partition && Log

Partition 分区,可以理解为一个逻辑上的分区,像是我们电脑的磁盘 C:, D:, E: 盘一样,
Kafka 为每个分区维护着一份日志Log文件。

每个分区是一个有序的,不可修改的,消息组成的队列。 当消息过来的时候,会被追加到日志文件中,这个追加是根据 commit 命令来执行的。

分区中的每一条消息都有一个编号,叫做 offset id,这个 id 在当前分区中是唯一的,并且是递增的。

日志,就是用来记录分区中接收到的消息,因为每一个 Topic 可以同时向一个或者多个分区投递消息,所以实际在存储日志的时候,每个分区会对应一个日志目录,其命名规则一般为 <topic_name>-<partition_id>, 目录中就是一个分区的一份 commit log 日志文件。

Kafka 集群会保存一个时间段内所有被发布出来的信息,无论这个消息是否已经被消费过,这个时间段是可以配置的。比如日志保存时间段被设置为2天,那么2天以内发布的消息都是可以消费的;而之前的消息为了释放空间将会抛弃掉。Kafka的性能与数据量不相干,所以保存大量的消息数据不会造成性能问题。

对日志进行分区主要是为了以下几个目的:第一、这可以让log的伸缩能力超过单台服务器上线,每个独立的partition的大小受限于单台服务器的容积,但是一个topic可以有很多partition从而使得它有能力处理任意大小的数据。第二、在并行处理方面这可以作为一个独立的单元。

生产者 Producers

和其他消息队列一样,生产者通常都是消息的产生方。
在 Kafka 中它决定消息发送到指定Topic的哪个分区上。

消费者 Consumers

消费者就是消息的使用者,在消费者端也有几个名词需要区分一下。

一般消息队列有两种模式的消费方式,分别是 队列模式 和 订阅模式

队列模式:一对一,就是一个消息只能被一个消费者消费,不能重复消费。一般情况队列支持存在多个消费者,但是对于一个消息,只会有一个消费者可以消费它。

订阅模式:一对多,一个消息可能被多次消费,消息生产者将消息发布到Topic中,只要是订阅改Topic的消费者都可以消费。

Consumer && Subscriber

Group: 组,是一个消费者的集合,每一组都有一个或者多个消费者,Kafka 中在一个组内,消息只能被消费一次。

在发布订阅模式中,消费者是以组的方式进行订阅的,就是Consumer Group,他们的关系如下图:

每个发布到Topic上的消息都会被投递到每个订阅了此Topic的消费者组中的某一个消费者,也就是每个组都会被投递,但是每个组都只会有一个消费者消费这个消息。

开头介绍了Kafka 是 发布-订阅 功能的消息队列,所以在Kafka中,队列模式是通过单个消费者组实现的,也就是整个结构中只有一个消费者组,消费者之间负载均衡。

Kafka 集群

Borker: Kafka 集群有多个服务器组成,每个服务器称做一个 Broker。同一个Topic的消息按照一定的key和算法被分区存储在不同的Broker上。


上图引用自:http://blog.csdn.net/lizhitao

因为 Kafka 的集群它是通过将分区散布到各个Server的实现的,也就是说集群中每个服务器他们都是彼此共享分区的数据和请求,每个分区的日志文件被复制成指定分数,分散在各个集群机器,这样来实现的故障转移。

对于每一个分区都会有一个服务器作为它的 "leader" 并且有零个或者多个服务器作为"followers" 。leader 服务器负责处理关于这个 partition 所有的读写请求, followers 服务器则被动的复制 leader 服务器。如果有 leader 服务器失效,那么 followers 服务器将有一台被自动选举成为新的 leader 。每个服务器作为某些 partition 的 leader 的同时也作为其它服务器的 follower ,从而实现了集群的负载均衡。

.NET Core Kafka 客户端

在 .NET Core 中,有相对应的开源 kafka sdk 项目,就是 Rdkafka。它同时支持 .NET 4.5,并且支持跨平台,可以运行于Linux,macOS 和 Windows。

RdKafka Github :https://github.com/ah-/rdkafka-dotnet

RdKafka Nuget :Install-Package RdKafka

生产者 API

// Producer 接受一个或多个 BrokerList
using (Producer producer = new Producer("127.0.0.1:9092"))//发送到一个名为 testtopic 的Topic,如果没有就会创建一个u
sing
(Topic topic = producer.Topic("testtopic")) {    
//将message转为一个 byte[]byte[] data = Encoding.UTF8.GetBytes("Hello RdKafka");DeliveryReport deliveryReport = await topic.Produce(data);Console.WriteLine($"发送到分区:{deliveryReport.Partition}, Offset 为: {deliveryReport.Offset}"); }

消费者 API

由于 Kafka 是以消费者组的形式进行消费的,所以需要指定一个GroupId。

在内部实现上,消费者是通过一个轮询机制来实现的对 Topic 消息的监控,这也是Kafka推荐的方式,在 Rdkafka 中轮询的间隔为 1 秒钟。

//配置消费者组
var config = new Config()
{
  GroupId = "example-csharp-consumer" };
  using (var consumer = new EventConsumer(config, "127.0.0.1:9092"))
 {    //注册一个事件consumer.OnMessage += (obj, msg) =>{    
       string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");};    //订阅一个或者多个Topicconsumer.Subscribe(new[] { "testtopic" });    //启动consumer.Start();Console.WriteLine("Started consumer, press enter to stop consuming");Console.ReadLine(); }

原文地址:http://www.cnblogs.com/savorboard/p/dotnetcore-kafka.html


.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/327317.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入解读Service Mesh背后的技术细节

转载自 深入解读Service Mesh背后的技术细节 在Kubernetes称为容器编排的标准之后&#xff0c;Service Mesh开始火了起来&#xff0c;但是很多文章讲概念的多&#xff0c;讲技术细节的少&#xff0c;所以专门写一篇文章&#xff0c;来解析Service Mesh背后的技术细节。 一、…

CentOS7查看和关闭防火墙

https://blog.csdn.net/ytangdigl/article/details/79796961 CentOS7查看和关闭防火墙 蔚蓝色天空sky 2018-04-02 23:22:21 708762 收藏 236 分类专栏&#xff1a; linux 文章标签&#xff1a; centos 防火墙 CentOS 7.0默认使用的是firewall作为防火墙 查看防火墙状态 f…

Visual Studio Code 1.8版本添加了Hot Exit、Zen Mode及更多调试选项

最新发布的Visual Studio Code 1.8版本有许多改进和新功能&#xff0c;包括防止丢失任何编辑信息的Hot Exit&#xff0c;方便开发人员把注意力集中在代码上的Zen Mode&#xff0c;新的调试功能以及更方便的设置等。 Hot Exit是一项新功能&#xff0c;目的是在应用程序崩溃或退出…

ElasticSearch(笔记)

简介 本教程基于ElasticSearch7.6.1, 注意ES7的语法与ES6的API调用差别很大, 教程发布时最新版本为ES7.6.2(20200401更新);ES是用于全文搜索的工具: SQL: 使用like %关键词%来进行模糊搜索在大数据情况下是非常慢的, 即便设置索引提升也有限;ElasticSearch: 搜索引擎(baidu, …

漫画:什么是冒泡排序

转载自 漫画&#xff1a;什么是冒泡排序 什么是冒泡排序&#xff1f; 冒泡排序的英文Bubble Sort&#xff0c;是一种最基础的交换排序。 大家一定都喝过汽水&#xff0c;汽水中常常有许多小小的气泡&#xff0c;哗啦哗啦飘到上面来。这是因为组成小气泡的二氧化碳比水要轻…

CentOS - 修改主机名教程(将 localhost.localdomain 改成其它名字)

https://www.cnblogs.com/gudi/p/7846978.html 需要关闭防火墙&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; Linux修改主机名称 碰到这个问题的时候&#xff0c;是在安装Zookeeper集群的时候&#xff0c;碰到如下问题 java.net.U…

.net core 源码解析-web app是如何启动并接收处理请求(二) kestrel的启动

上篇讲到.net core web app是如何启动并接受请求的&#xff0c;下面接着探索kestrel server是如何完成此任务的。 1.kestrel server的入口KestrelServer.Start (Microsoft.AspNetCore.Hosting.Server.IHttpApplication ) FrameFactory创建的frame实例最终会交给libuv的loop回调…

MySQL中的any_value()函数

https://blog.csdn.net/u014079773/article/details/93722761 https://www.thinbug.com/q/37089347 https://blog.csdn.net/Peacock__/article/details/90608246 https://www.itranslater.com/qa/details/2109775246877262848 4.any_value()会选择被分到同一组的数据里…

MybatisPlus使用

Mybatisplus 导入依赖 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</gro…

.net core 源码解析-mvc route的注册,激活,调用流程(三)

.net core mvc route的注册&#xff0c;激活&#xff0c;调用流程 mvc的入口是route&#xff0c;当前请求的url匹配到合适的route之后&#xff0c;mvc根据route所指定的controller和action激活controller并调用action完成mvc的处理流程。下面我们看看服务器是如何调用route的。…

高可用性的几个级别

转载自 高可用性的几个级别 大家常说高可用&#xff0c;High Availablility&#xff0c;但是一般说到这个词的时候&#xff0c;具体指的什么方案呢&#xff1f; 级别一&#xff1a;FT (Fault Tolerance) 双击热备 通过创建与主实例保持虚拟同步的虚拟机&#xff0c;使应用在服…

mysql - Docker Wordpress连接到本地主机上的数据库服务器

视频上面的 docker service create --name mysql -p 3306:3306 --env MYSQL_ROOT_PASSWORDroot \ --env MYSQL_DATABASEwordpress \ --network demo \ --mount typevolume,sourcemysql-data,destination/var/lib/mysql \ mysql:5.7 docker service create -…

CoreCRM 开发实录——开始之新项目的技术选择

2016年11月&#xff0c;接受了一个工作&#xff0c;是对“悟空CRM”进行一些修补。这是一个不错的 CRM&#xff0c;开源&#xff0c;并提供一个 SaaS 的服务。正好微软的 .NET Core 和 ASP.NET Core 也发布了。于是就有了这个想法&#xff1a;使用 ASP.NET Core 来开发一个 CRM…

80%的程序员都不了解的调试技巧

转载自 80%的程序员都不了解的调试技巧 程序员的工作内容&#xff0c;除了大部分时间写代码之外&#xff0c;因为有不少的时间是用在调试代码上。甚至说不是在调试代码&#xff0c;就是即将调试代码。 :) 今天我们来谈谈调试代码的一些技巧&#xff0c;在使用IDE提供的debu…

复制vmware overLay网络无法ping通 ping www.baidu.com可以

因为忘记关闭防火墙了&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 要永久关闭&#xff01;&#xff01; 修改hostname https://blog.csdn.net/qq_27327261/article/details/109100219 关闭防火墙 https://blog.csdn.net/qq_27327261/article/details/1…

2016.NET Core相关内容回顾

每一年的脚步的确是快&#xff0c;转眼间马上就2017。.NET Core 2014年宣布开源以来&#xff0c;在2016年发布了第一个版本&#xff0c;2017年将发布第二个版本&#xff0c;在这新年之际&#xff0c;我们回顾2016年&#xff0c;新的一年&#xff0c;带着理想和抱负继续出发。 1…

微服务化的数据库设计与读写分离

转载自 微服务化的数据库设计与读写分离 数据库永远是应用最关键的一环&#xff0c;同时越到高并发阶段&#xff0c;数据库往往成为瓶颈&#xff0c;如果数据库表和索引不在一开始就进行良好的设计&#xff0c;则后期数据库横向扩展&#xff0c;分库分表都会遇到困难。 对于…

centos7 切换中文输入法 无需安装

*************** 当你发现自己的才华撑不起野心时&#xff0c;就请安静下来学习吧&#xff01;***************

Consul 服务注册与服务发现

1. 服务注册 对 Consul 进行服务注册之前&#xff0c;需要先部署一个服务站点&#xff0c;我们可以使用 ASP.NET Core 创建 Web 应用程序&#xff0c;并且部署到 Ubuntu 服务器上。 ASP.NET Core Hell World 应用程序示例代码&#xff0c;只需要三个文件&#xff0c;Startup.cs…

tar (child): .tgz\r:无法 open: 没有那个文件或目录

Linux下运行bash脚本显示“: /usr/bin/env: "bash\r": 没有那个文件或目录 程序员小熊 2017-12-18 14:45:45 18395 收藏 7 分类专栏&#xff1a; Linux 版权 用 ./ 运行bash脚本文件出现 报错信息 /usr/bin/env: "bash\r": 没有那个文件或目录 错误原…