Kafka.net使用编程入门(一)

最近研究分布式消息队列,分享下!

首先zookeeper  和 kafka 压缩包 解压 并配置好!

我本机zookeeper环境配置如下:

D:\Worksoftware\ApacheZookeeper3\conf\zoo.cfg

以下是kafka的配置

D:\Worksoftware\Apachekafka2.11\config\server.properties

我已经加了path环境变量,没加的话需要到zookeeper对应bin目录下执行zkServer

然后执行cmd命令:

 

结果:

 然后打开第二个dos窗口,我没加环境变量path,执行kafka命令如下:

 

 

重头戏来了,开始kafka C#客户端处理:

首先引用kafka-net.dll,可以用vs2013的nuget下载,

以下是Prorame.cs:

 

[csharp] view plaincopy
  1. class Program  
  2.      {  
  3.          static void Main(string[] args)  
  4.          {  
  5.              const string topicName = "test";  
  6.              var options = new KafkaOptions(new Uri("http://localhost:9092"))  
  7.              {  
  8.                  Log = new ConsoleLog()  
  9.              };  
  10.                
  11.              Task.Run(() =>  
  12.              {  
  13.                  var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)) { Log = new ConsoleLog() });  
  14.                  foreach (var data in consumer.Consume())  
  15.                  {  
  16.                      Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String());  
  17.                  }  
  18.              });  
  19.    
  20.              //创建一个生产者发消息  
  21.              var producer = new Producer(new BrokerRouter(options))  
  22.              {  
  23.                  BatchSize = 100,  
  24.                  BatchDelayTime = TimeSpan.FromMilliseconds(2000)  
  25.              };  
  26.    
  27.              Console.WriteLine("打出一条消息按 enter...");  
  28.              while (true)  
  29.              {  
  30.                  var message = Console.ReadLine();  
  31.                  if (message == "quit") break;  
  32.    
  33.                  if (string.IsNullOrEmpty(message))  
  34.                  {  
  35.                      //  
  36.                      SendRandomBatch(producer, topicName, 200);  
  37.                  }  
  38.                  else  
  39.                  {  
  40.                      producer.SendMessageAsync(topicName, new[] { new Message(message) });  
  41.                  }  
  42.              }  
  43.    
  44.              //释放资源  
  45.              using (producer)  
  46.              {  
  47.    
  48.              }  
  49.          }  
  50.          private static async void SendRandomBatch(Producer producer, string topicName, int count)  
  51.          {  
  52.              //发送多个消息  
  53.              var sendTask = producer.SendMessageAsync(topicName, Enumerable.Range(0, count).Select(x => new Message(x.ToString())));  
  54.    
  55.              Console.WriteLine("传送了 #{0} messages.  Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);  
  56.    
  57.              var response = await sendTask;  
  58.    
  59.              Console.WriteLine("已完成批量发送: {0}. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);  
  60.              foreach (var result in response.OrderBy(x => x.PartitionId))  
  61.              {  
  62.                  Console.WriteLine("主题:{0} PartitionId:{1} Offset:{2}", result.Topic, result.PartitionId, result.Offset);  
  63.              }  
  64.    
  65.          }  
  66.      }  
 结果:

闲的蛋疼,随便研究一些好东西,.net环境太封闭,每个.net程序员都要扩展视野,技术交流,本人QQ827937686

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/538610.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python包路径有几个_python的搜索路径与包(package)

python的搜索路径其实是一个列表,它是指导入模块时,python会自动去找搜索这个列表当中的路径,如果路径中存在要导入的模块文件则导入成功,否则导入失败: >>> importsys>>>sys.path [, C:\\Python33\…

配置文件管理服务器,06-配置文件管理

1配置文件管理设备运行于FIPS模式时,本特性部分配置相对于非FIPS模式有所变化,具体差异请见本文相关描述。有关FIPS模式的详细介绍请参见“安全配置指导”中的“FIPS”。1.1 配置文件简介配置文件是用来保存配置的文件。配置文件主要用于: …

虚拟机安装spark配置推荐

如果虚拟机配置的内存太少,spark运行计算的时候会报: WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster uito ensure that workers are registered and have sufficient memory 资源不足的问题,导…

Linux常用命令——chattr、lsattr

chattr 改变文件的扩展属性 语法格式:chattr 【option】【mode】【files】chattr [选项] [模式] [文件或目录]注意:chattr 命令及后面的选项和文件里,每个元素之间都要至少要有一个空格参数选项: 参数选项解释说明-a只能向文件中…

两个相邻盒子的边框怎么只显示一个_【前端小课堂】0044 盒子

这是一个面向零基础的前端教程,很简单,用零散时间就可以学习。 推荐早上读一下,晚上复习一下,如果可以奢侈一点,白天稍微练习一下下,总共花费 5~15 分钟。就酱!已经好几次提到块(block)元素了&a…

scala apply方法 笔记

原文出处:http://blog.csdn.net/pzw_0612/article/details/48576569 ----------------------------------------------------- Scala比Java更面向对象的一个方面是Scala没有静态成员。替代品是,Scala有单例对象:singleton object。 当单例对…

转:6.1海量数据处理

本文转自看云,原文地址请移步:https://www.kancloud.cn/kancloud/the-art-of-programming/41608 偶然闲游,偶遇某一站点,发现这里写的关于海量数据处理相关的思路还挺不错,所以在这里采摘收藏,如有侵权之处还请评论区或…

flash一个按钮控制动画_flutter闪屏过渡动画,闪光占位动画

在程序设计的理念中,讲究一切都来源于物理世界,在现实世界中,人们在每接触到一个新的事物或者说在手指触碰到一个事物时,总是心里默许期望有一个反馈效果,这就是来源于心底深处常常被人忽略的一个潜在期望。在程序的世…

动态改变_Excel中如何动态改变可编辑区域?

有这样一个工作场景:我们制作一个工作报表模板给同事填写数据,这个工作表格只可以在预设的可编辑区域编辑,其它都是受密码保护的,而且这个可编辑区域是随着日期——工作周的变化而发生变化的。也就是说在不同的工作周可编辑的区域…

java集合框架之ArrayList与LinkedList的区别

参考http://how2j.cn/k/collection/collection-arraylist-vs-linkedlist/690.html#nowhere ArrayList和LinkedList的区别 ArrayList 插入,删除数据慢LinkedList, 插入,删除数据快ArrayList是顺序结构,所以定位很快,指哪…

Spark集群安装

Spark是独立的,所以集群安装的时候,不像hive,hbase等需要先安装hadoop,除非文件保存在hadoop上,才需要安装hadoop集群。 如果虚拟机安装,点击阅读推荐配置 前提环境: 1、安装了JDK1.7及以上版…

列表逆序排序_【Python自学笔记】集合——列表

list列表类型是一个与元组tuple类似的有序序列。构造函数是list()切片# 切片 fruit ["Apple", "Hawthorn", "Loquat", "Medlar", "Pear", "Quince"] print(fruit[:2]) print(fruit[-1])语法与字符串和元组中的一…

esp8266 阿里云 arduino_NUCLEO-G071RB通过WiFi与NB连接阿里云

开箱体验试用背景去年年初,有新项目要让移动式容器设备的监控数据上云,选型时主要考虑三个系列STM32L0、STM32G0和STM8。最初有意向选用STM32L052RB,主要是为了满足低功耗需求。恰逢G0系列上市,价格亲民,性能却要高很多…

“云上金融,智创未来” 腾讯“云+未来”峰会金融专场在广州举行

5月24日,腾讯“云未来“峰会金融专场在广州举行。来自央行、腾讯公司以及银行、证券、保险、互金公司等腾讯金融云的合作伙伴代表以及行业专家,共同分享了智慧金融、企业数字化转型、腾讯金融云业务布局以及与合作伙伴取得的最新成绩等话题。活动现场&am…

Spark算子reduceByKey深度解析

原文地址:http://blog.csdn.net/qq_23660243/article/details/51435257 -------------------------------------------- 最近经常使用到reduceByKey这个算子,懵逼的时间占据多数,所以沉下心来翻墙上国外的帖子仔细过了一遍,发现一…

绕固定轴分解_3轴 / 5轴 / 3+2到底是什么......??

一、 什么是32定位加工在一个三轴铣削程序执行时,使用五轴机床的两个旋转轴将切削刀具固定在一个倾斜的位置,32加工技术的名字也由此而来,这也叫做定位五轴机床,因为第四个轴和第五个轴是用来确定在固定位置上刀具的方向&#xff…

启动spark shell

spark集群安装教程:http://blog.csdn.net/zengmingen/article/details/72123717 启动spark shell. 在spark安装目录bin文件夹下 ./spark-shell --master spark://nbdo1:7077 --executor-memory 2g --total-executor-cores 2 参数说明: --master spark…

python发送excel文件_Python操作Excel, 开发和调用接口,发送邮件

接口开发: importflaskimporttoolsimportjson,redisimportrandom server flask.Flask(__name__)#新建一个服务,把当前这个python文件当做一个服务 ip 118.24.3.40passwordHK139bc&*r redis.Redis(hostip,passwordpassword,port6379,db10, decode_res…

go conn 读取byte数组后是否要_【技术推荐】正向角度看Go逆向

Go语言具有开发效率高,运行速度快,跨平台等优点,因此正越来越多的被攻击者所使用,其生成的是可直接运行的二进制文件,因此对它的分析类似于普通C语言可执行文件分析,但是又有所不同,本文将会使用…

Confluence 6 选择一个外部数据库

2019独角兽企业重金招聘Python工程师标准>>> 注意: 选择一个合适的数据库通常需要花费很多时间。同时 Confluence 自带的 XML 数据备份和恢复功能通常也不适合合并和备份有大量数据的数据库。如果你想在系统运行后进行数据合并,你通常需要使用…