.NET Core 下使用 Kafka

安装

CentOS 安装 kafka

  • Kafka : http://kafka.apache.org/downloads

  • ZooLeeper : https://zookeeper.apache.org/releases.html

下载并解压

# 下载,并解压
$ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz
$ tar -zxvf  kafka_2.12-2.1.1.tgz
$ mv kafka_2.12-2.1.1.tgz /data/kafka# 下载 zookeeper,解压
$ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
$ tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
$ mv apache-zookeeper-3.5.8-bin /data/zookeeper

启动 ZooKeeper

# 复制配置模版
$ cd /data/kafka/conf
$ cp zoo_sample.cfg zoo.cfg# 看看配置需不需要改
$ vim zoo.cfg# 命令
$ ./bin/zkServer.sh start    # 启动
$ ./bin/zkServer.sh status   # 状态
$ ./bin/zkServer.sh stop     # 停止
$ ./bin/zkServer.sh restart  # 重启# 使用客户端测试
$ ./bin/zkCli.sh -server localhost:2181
$ quit

启动 Kafka

# 备份配置
$ cd /data/kafka
$ cp config/server.properties config/server.properties_copy# 修改配置
$ vim /data/kafka/config/server.properties# 集群配置下,每个 broker 的 id 是必须不同的
# broker.id=0# 监听地址设置(内网)
# listeners=PLAINTEXT://ip:9092# 对外提供服务的IP、端口
# advertised.listeners=PLAINTEXT://106.75.84.97:9092# 修改每个topic的默认分区参数num.partitions,默认是1,具体合适的取值需要根据服务器配置进程确定,UCloud.ukafka = 3
# num.partitions=3# zookeeper 配置
# zookeeper.connect=localhost:2181# 通过配置启动 kafka
$  ./bin/kafka-server-start.sh  config/server.properties&# 状态查看
$ ps -ef|grep kafka
$ jps

docker下安装Kafka

docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeperdocker pull wurstmeister/kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.111 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

介绍

  • Broker:消息中间件处理节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。

  • Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。

  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。

  • Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。

  • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。

kafka partition 和 consumer 数目关系

  • 如果consumer比partition多是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 。

  • 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目 。

  • 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

  • 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

快速开始

在 .NET Core 项目中安装组件

Install-Package Confluent.Kafka

开源地址:https://github.com/confluentinc/confluent-kafka-dotnet

添加IKafkaService服务接口

public interface IKafkaService
{/// <summary>/// 发送消息至指定主题/// </summary>/// <typeparam name="TMessage"></typeparam>/// <param name="topicName"></param>/// <param name="message"></param>/// <returns></returns>Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class;/// <summary>/// 从指定主题订阅消息/// </summary>/// <typeparam name="TMessage"></typeparam>/// <param name="topics"></param>/// <param name="messageFunc"></param>/// <param name="cancellationToken"></param>/// <returns></returns>Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class;
}

实现IKafkaService

public class KafkaService : IKafkaService
{public async Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class{var config = new ProducerConfig{BootstrapServers = "127.0.0.1:9092"};using var producer = new ProducerBuilder<string, string>(config).Build();await producer.ProduceAsync(topicName, new Message<string, string>{Key = Guid.NewGuid().ToString(),Value = message.SerializeToJson()});}public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class{var config = new ConsumerConfig{BootstrapServers = "127.0.0.1:9092",GroupId = "consumer",EnableAutoCommit = false,StatisticsIntervalMs = 5000,SessionTimeoutMs = 6000,AutoOffsetReset = AutoOffsetReset.Earliest,EnablePartitionEof = true};//const int commitPeriod = 5;using var consumer = new ConsumerBuilder<Ignore, string>(config).SetErrorHandler((_, e) =>{Console.WriteLine($"Error: {e.Reason}");}).SetStatisticsHandler((_, json) =>{Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");}).SetPartitionsAssignedHandler((c, partitions) =>{string partitionsStr = string.Join(", ", partitions);Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");}).SetPartitionsRevokedHandler((c, partitions) =>{string partitionsStr = string.Join(", ", partitions);Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");}).Build();consumer.Subscribe(topics);try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");if (consumeResult.IsPartitionEOF){Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}TMessage messageResult = null;try{messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value);}catch (Exception ex){var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";Console.WriteLine(errorMessage);messageResult = null;}if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine(e.Message);}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}await Task.CompletedTask;}
}

注入IKafkaService,在需要使用的地方直接调用即可。

public class MessageService : IMessageService, ITransientDependency
{private readonly IKafkaService _kafkaService;public MessageService(IKafkaService kafkaService){_kafkaService = kafkaService;}public async Task RequestTraceAdded(XxxEventData eventData){await _kafkaService.PublishAsync(eventData.TopicName, eventData);}
}

以上相当于一个生产者,当我们消息队列发出后,还需一个消费者进行消费,所以可以使用一个控制台项目接收消息来处理业务。

var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{e.Cancel = true;cts.Cancel();
};await kafkaService.SubscribeAsync<XxxEventData>(topics, async (eventData) =>
{// Your logicConsole.WriteLine($" - {eventData.EventTime:yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");
}, cts.Token);

IKafkaService中已经写了订阅消息的接口,这里也是注入后直接使用即可。

生产者消费者示例

生产者

static async Task Main(string[] args)
{if (args.Length != 2){Console.WriteLine("Usage: .. brokerList topicName");// 127.0.0.1:9092 helloTopicreturn;}var brokerList = args.First();var topicName = args.Last();var config = new ProducerConfig { BootstrapServers = brokerList };using var producer = new ProducerBuilder<string, string>(config).Build();Console.WriteLine("\n-----------------------------------------------------------------------");Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");Console.WriteLine("-----------------------------------------------------------------------");Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:");Console.WriteLine("> key value<Enter>");Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:");Console.WriteLine("> value<enter>");Console.WriteLine("Ctrl-C to quit.\n");var cancelled = false;Console.CancelKeyPress += (_, e) =>{e.Cancel = true;cancelled = true;};while (!cancelled){Console.Write("> ");var text = string.Empty;try{text = Console.ReadLine();}catch (IOException){break;}if (string.IsNullOrWhiteSpace(text)){break;}var key = string.Empty;var val = text;var index = text.IndexOf(" ");if (index != -1){key = text.Substring(0, index);val = text.Substring(index + 1);}try{var deliveryResult = await producer.ProduceAsync(topicName, new Message<string, string>{Key = key,Value = val});Console.WriteLine($"delivered to: {deliveryResult.TopicPartitionOffset}");}catch (ProduceException<string, string> e){Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");}}
}

消费者

static void Main(string[] args)
{if (args.Length != 2){Console.WriteLine("Usage: .. brokerList topicName");// 127.0.0.1:9092 helloTopicreturn;}var brokerList = args.First();var topicName = args.Last();Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");var cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) =>{e.Cancel = true;cts.Cancel();};var config = new ConsumerConfig{BootstrapServers = brokerList,GroupId = "consumer",EnableAutoCommit = false,StatisticsIntervalMs = 5000,SessionTimeoutMs = 6000,AutoOffsetReset = AutoOffsetReset.Earliest,EnablePartitionEof = true};const int commitPeriod = 5;using var consumer = new ConsumerBuilder<Ignore, string>(config).SetErrorHandler((_, e) =>{Console.WriteLine($"Error: {e.Reason}");}).SetStatisticsHandler((_, json) =>{Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > monitoring..");//Console.WriteLine($"Statistics: {json}");}).SetPartitionsAssignedHandler((c, partitions) =>{Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");}).SetPartitionsRevokedHandler((c, partitions) =>{Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");}).Build();consumer.Subscribe(topicName);try{while (true){try{var consumeResult = consumer.Consume(cts.Token);if (consumeResult.IsPartitionEOF){Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");if (consumeResult.Offset % commitPeriod == 0){try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine($"Commit error: {e.Error.Reason}");}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/307706.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

leetcode70. 爬楼梯

一:题目 二:上码 class Solution { public:/**思路&#xff1a;分析题意:爬到第一层楼有一种方法,爬到第二层楼有两种方法那么由第一层到第三层需要跨2步,由第二层到第三层需要跨一步;那么到第三层的方法可以由 到第一层和第二层推导出来(因为只剩下最后一步了)动态规划五步走…

发现一款.NET Core开源爬虫神器:DotnetSpider

没有爬虫就没有互联网&#xff01;爬虫的意义在于采集大批量数据&#xff0c;然后基于此进行加工/分析&#xff0c;做更有意义的事情。谷歌&#xff0c;百度&#xff0c;今日头条&#xff0c;天眼查都离不开爬虫。去开源中国和Github查询C#的爬虫项目&#xff0c;仅有几个非常简…

leetcode746. 使用最小花费爬楼梯

一:题目 二:上码 class Solution { public:/**思路:1.分析题意给出的数组的下标代表楼梯的台阶数2.动态规划五步走1>:确定dp数组以及下标的含义dp[i]:表示到达第i层所需要花费的体力2>:确定dp数组的递推公式那么如何得到dp[i](花费的体力)呢&#xff1f;dp[i]由dp[i-1]或…

SS CMS 全新跨平台 V7.0 版本正式发布

今天&#xff0c;我们很高兴宣布基于.NET CORE平台的全新 SS CMS V7.0正式发布&#xff0c;新版本采用.NET CORE模块化和高性能实现&#xff0c;用于创建在Windows&#xff0c;Linux、Mac以及Docker上运行的Web应用程序和服务。SS CMS 7.0 之旅在此&#xff0c;我们简单回顾一下…

leetcode62. 不同路径

一:题目 二:上码 class Solution { public:/**思路:1.分析题意:2.动态规划五步走:1>:确定dp数组和其下标的含义dp[i][j]为到达二维数组下标为i&#xff0c;j的路径条数,i和j为下标2>:确定dp数组的递推公式那么dp[i][j]是如何求解出来的呢?只能是两个方向左边:dp[i-1][j…

推荐一本基于ASP.NET Core 3.1的实战来了

第一本基于 ASP.NET Core 3.1 的实战书来了我脱产花费了一年时间创作书籍《深入浅出 ASP.NET Core》&#xff0c;终于上架了。目前天猫、京东等主流平台均有销售。这本书是基于.NET Core3.1 平台&#xff0c;从 ASP.NET Core 的基础入门,通过项目实战结合 ASP.NET Core 源代码解…

leetcode63. 不同路径 II

一:题目 二&#xff1a;上码 class Solution { public:/**思路:1.分析题意:这里有障碍物,所以我们的做法会有点变化&#xff0c;如果这个障碍物出现在左侧或则右测的话那么我们确定的是其障碍物包括其后面的下标将均为0,如果障碍物出现在上诉区域外的话&#xff0c;那么就直接…

C#刷剑指Offer | 从上到下打印二叉树

【C#刷题】| 作者 / Edison Zhou这是EdisonTalk的第288篇原创内容我们来用之前学到的数据结构知识来刷《剑指Offer》的一些核心题目&#xff08;精选了其中30道题目&#xff09;&#xff0c;希望对你有帮助&#xff01;本文题目为&#xff1a;从上到下打印二叉树。1题目介绍题目…

跟我一起学.NetCore之自定义配置源-热更新-对象绑定

前言上一篇针对不同的配置源进行举例演示&#xff0c;感受到不同配置源和不同数据格式使用统一操作的便捷(即没有什么加一层解决不了的&#xff0c;这个不是我说的)&#xff0c;这里接着说说自定义配置源、配置热更新、配置绑定对象相关操作&#xff1b;配置源回顾&#xff1a;…

leetcode343. 整数拆分(思路+详解)

一:题目 二:上码 class Solution { public:/**思路:1.分析题意:将一个数拆分为几个数相加的和 然后求取这几个数相乘的最大积,这个就很动态规划也就是我们可以得到多种结果&#xff0c;要在多种结果中取最优2.动态规划:1>:确定dp数组代表啥&#xff0c;以及下标的含义dp[i…

C++ 学习之旅(14)——构造函数constructors和析构函数destructors

首先我们看下一个简单的类&#xff1a; #include <iostream>class Entity { public:float X, Y;void Print(){std::cout << X << "," << Y << std::endl;} };int main() {Entity e;e.Print();std::cin.get(); }输出结果如下&#xf…

跟我一起学.NetCore之配置变更监听

前言通常程序中配置少不了&#xff0c;配置的修改也避免不了&#xff0c;配置的热更新为此给应用程序带来很大的便捷&#xff0c;不用重启&#xff0c;提高用户体验&#xff1b;但往往有时候需要对修改进行审计&#xff0c;也就是需要记录&#xff0c;有时候也会针对配置修改的…

解决Spring boot整合mybatis,xml资源文件放置及路径配置问题

一:问题描述 1:前言 无论你是将mapper.xml文件是和resources建造在一块&#xff0c;还是将mapper.xml文件和mapper放在一块,我们只要修改在yaml当中的mapper-locations的相对路径即可。&#xff08;前提是你在pom文件中导入了相关的resources路径&#xff09; 2:下方是将map…

C++ 学习之旅(15)——继承inheritance

所谓继承&#xff0c;就是在原有的类的基础上&#xff0c;通过继承它并添加一些新的成员&#xff0c;从而产生出一个新的类。例如我们在游戏有实体Entity和玩家Player&#xff0c;它们都有位置X&#xff0c;Y和一个移动的方法Move&#xff0c;但不同的是玩家有名字Name&#xf…

dotNET Core:编码规范

在项目开发过程中&#xff0c;由于时间紧、任务重&#xff0c;很容易导致面向功能编程。实现相同的功能&#xff0c;代码可以写的很优雅&#xff0c;也可以写的很晦涩和复杂。现在的工作&#xff0c;都需要进行团队协作&#xff0c;代码就需要有一定的规范进行指引&#xff0c;…

C++ 学习之旅(16)——虚函数与纯虚函数virtual

关于虚函数的概念讲解&#xff0c;可以看这篇文章&#xff1a; https://blog.csdn.net/siwuxie095/article/details/71159414 以下用例子进行实际说明&#xff1a; #include <iostream> using namespace std;class Shape { public:int width, height;Shape(int a 0,…

.NET 5 自身就是一个 .NET Standard

微软2014年开源.NET的动作是实现一个全新的.NET Core ,从而引入了一个新的问题&#xff1a;选择代码的编译目标变得困难&#xff0c;需要面对3个环境。因此微软为了解决这个问题&#xff0c;引入了两个主要的概念&#xff1a;目标框架别名&#xff08;Target Framework Moniker…

Springboot后台管理(CRUD)

一:前言 这个就是用springboot实现后台管理(CRUD)的小demo,相比于SSM的配置地狱&#xff0c;springboot的自动配置确实很牛&#xff0c;tomcat都内置&#xff0c;我们想要修改相关的信息只要在yaml配置文件修改就行了&#xff0c;关于Dao层&#xff0c;这个mybatis-plus,quo实…

保护 .NET Core 项目的敏感信息

我们的项目中几乎都会有配置文件&#xff0c;里面可能会存储一些敏感信息&#xff0c;比如数据库连接字符串、第三方API的AppKey和SecretKey等。对于开源项目&#xff0c;这些敏感信息肯定不能随着源代码一起提交到托管平台。对于网站应用大多都是要部署到有公开IP的服务器上的…

背包(二维数组版和一维数组版)

一:前言 这是动态规划的经典题型&#xff0c;那么我们也是 按照动态规划五步走的策略分析的 确定dp数组的含义以及下标的含义确定dp数组的递推公式确定dp数组的初始化确定dp数组的遍历顺序举例验证&#xff08;如果不是做题可省略&#xff09; 二:二维数组 1:示例 2:dp数组…