kafka streams_Kafka REST Proxy MapR Streams入门

kafka streams

MapR生态系统软件包2.0(MEP)随附了一些与MapR流有关的新功能:

  • 用于MapR Streams的Kafka REST代理为MapR Streams和Kafka集群提供了RESTful接口,使其易于使用和产生消息以及执行管理操作。
  • Kafka Connect for MapR Streams是一个实用程序,用于在MapR Streams与Apache Kafka和其他存储系统之间流式传输数据。

MapR生态系统软件包(MEP)是一种与核心升级脱钩的生态系统升级工具,可让您独立于MapR融合数据平台升级工具。 您可以在本文中了解有关MEP 2.0的更多信息 。

在此博客中,我们描述了如何使用Kafka REST代理向MapR流发布消息和从MapR流中使用消息。 REST代理是MapR融合数据平台的重要补充,它允许任何编程语言使用MapR流。

MapR Streams工具随附的Kafka REST Proxy可以与MapR Streams(默认)以及Apache Kafka(在混合模式下)一起使用。 在本文中,我们将重点介绍MapR流。

先决条件

  • 具有MEP 2.0的MapR融合数据平台5.2
    • 使用MapR Streams工具
  • curl,wget或任何HTTP / REST客户端工具

创建MapR流和主题

流是主题的集合,您可以通过以下方式将其分组管理:

  1. 设置适用于该流中所有主题的安全策略
  2. 为流中创建的每个新主题设置默认的分区数
  3. 设置流中每个主题中消息的生存时间

您可以在文档中找到有关MapR Streams概念的更多信息。

在您的MapR群集或沙盒上,运行以下命令:

$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p
$ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3
$ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3

启动Kafka控制台的生产者和消费者

打开两个终端窗口,并使用以下命令运行使用者的Kafka实用程序:

消费者

  • 主题传感器-json
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-json
  • 主题传感器二进制
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-binary

这两个终端窗口将允许您查看有关不同主题的消息。

使用Kafka REST代理

端点/ topics / [topic_name]允许您获取有关该主题的一些信息。 在MapR Streams中,主题是路径标识的流的一部分; 要通过REST API访问主题,您必须输入完整路径并在URL中进行编码; 例如:

  • / apps / iot-stream:sensor-json将使用%2Fapps%2Fiot-stream%3Asensor-json进行编码

运行以下命令,以获取有关sensor-json主题的信息:

$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

注意:为简单起见,我从运行Kafka REST代理的节点上运行命令,因此可以使用localhost

您可以通过添加Python命令,以漂亮的方式打印JSON,例如:

$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool

默认流

如上所述,流路径是您必须在命令中使用的主题名称的一部分。 但是,可以将MapR Kafka REST代理配置为使用默认流。 对于此配置,您应该在/opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties文件中添加以下属性:

  • stream.default.stream = / apps / iot-stream

更改Kafka REST代理配置时,必须使用maprcli或MCS重新启动服务。

使用streams.default.stream属性的主要原因是简化应用程序使用的URL。 例如:

  • 通过streams.default.stream ,可以使用curl -X GET http:// localhost:8082 / topics /
  • 如果没有此配置,或者要使用特定的流,则必须在URL中指定它: http:// localhost:8082 / topics /%2Fapps%2Fiot-stream%3Asensor-json

在本文中,所有URL都包含编码的流名称,因此您可以在不更改配置的情况下开始使用Kafka REST代理,也可以将其用于其他流。

用于MapR流的Kafka REST代理允许应用程序将消息发布到MapR流。 消息可以作为JSON或二进制内容(base64编码)发送。

要发送JSON消息:

  • 查询应该是HTTP POST
  • 内容类型应为:application / vnd.kafka.json.v1 + json
  • 身体:
{"records":[{"value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"}  }]
}

完整的请求是:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"}  }]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

您应该在/ apps / iot-stream:sensor-json使用者正在运行的终端窗口中看到打印的消息。

发送二进制消息:

  • 查询应该是HTTP POST
  • 内容类型应为:application / vnd.kafka.binary.v1 + json
  • 身体:
{"records":[{"value":"SGVsbG8gV29ybGQ="}]
}

请注意, SGVsbG8gV29ybGQ =是在Base64中编码的字符串“ Hello World”。

完整的请求是:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

您应该在/ apps / iot-stream:sensor-binary使用者正在运行的终端窗口中看到打印的消息。

发送多个消息:

HTTP正文的记录字段允许您发送多个消息; 例如,您可以发送:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"}  }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"}  } ]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

该命令将发送2条消息,并将偏移量增加2。您可以通过在JSON数组中添加新元素来对二进制内容执行相同的操作; 例如:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

您可能知道,可以将密钥绑定到消息,以确保所有具有相同密钥的消息都将到达同一分区。 为此,将key属性添加到消息中,如下所示:

{"records":[{"key": "K001","value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"}  }]
}

现在,您知道如何使用REST代理将消息发布到MapR Streams主题,让我们看看如何使用消息。

消费信息

REST代理还可以用于消费主题消息。 为此,您需要:

  1. 创建使用者实例。
  2. 使用第一个调用返回的URL来阅读消息。
  3. 如果需要,请删除使用者实例。

创建使用者实例

以下请求创建使用者实例:

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json

服务器的响应如下所示:

{"instance_id":"iot_json_consumer","base_uri":"http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer"
}

请注意,我们已经使用/ consumers / [topic_name]创建使用者。 后续请求将使用base_uri从主题获取消息。 与任何MapR Streams / Kafka使用者一样, auto.offset.reset定义其行为。 在此示例中,该值设置为最早 ,这意味着使用者将从头开始阅读消息。 您可以在MapR Streams文档中找到有关使用者配置的更多信息。

消费信息

要使用消息,只需将MapR Streams主题添加到使用者实例的URL。

以下请求使用了该主题的消息:

curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json

此调用返回JSON文档中的消息:

[{"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1},{"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2},{"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3}
]

每次对API的调用都会根据上一次调用的偏移量返回发布的新消息。

请注意,消费者将被销毁:

  • Consumer.instance.timeout.ms设置了一些空闲时间(默认值设置为300000ms / 5分钟)后,使用REST API调用销毁了该空闲时间(请参见下文)。

消费二进制格式的消息

如果需要使用二进制消息,则方法是相同的:您需要更改格式和Accept标头。

调用此URL为二进制主题创建使用者实例:

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary

然后使用消息,accept标头设置为application / vnd.kafka.binary.v1 + json

curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary

该调用返回JSON文档中的消息,并且该值在Base64中编码:

[{"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1},{"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2}
]

删除使用者实例

如前所述,将根据REST Proxy的consumer.instance.timeout.ms配置自动销毁使用者 。 也可以使用使用者实例URI和HTTP DELETE调用销毁实例,如下所示:

curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer

结论

在本文中,您学习了如何将Kafka REST代理用于MapR流,该代理允许任何应用程序使用在MapR融合数据平台中发布的消息。

您可以在MapR文档和以下资源中找到有关Kafka REST代理的更多信息:

  • MapR Streams入门
  • Ted Dunning和Ellen Friedman撰写的“流传输体系结构:使用Apache Kafka和MapR流的新设计”电子书

翻译自: https://www.javacodegeeks.com/2017/01/getting-started-kafka-rest-proxy-mapr-streams-2.html

kafka streams

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/335304.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

r语言三维柱状图_R语言三维图的绘制

R语言在可视化方面的地位是毋庸置疑的,但是呢相对于MatalabR语言在三维图形的展示上存在一定的劣势。当然,作为大众的免费软件,指定不服,很多人为此也基于R语言开发了一些相应的三维图的绘制包,像rgl,gg3D&…

从事仪表专业学c语言有用吗,测控专业就业方向有哪些 就业前景比你想象中的好...

测控专业就业方向有哪些?这个专业的就业前景好不好?这些问题都是小伙伴们比较关心的问题,下面随小编一起来了解一下吧。主要就业方向1.智能仪器仪表方向,我觉得这个方向主要是从事仪器仪表,电子产品的软件,硬件研发,…

c语言 将url图片存到本地_python爬虫:爬取男生喜欢的图片

任务目标:1.抓取不同类型的图片2.编写一个GUI界面爬虫程序,打包成exe重新文件3.遇到的难点1.分析如何抓取不同类型的图片首先打开网站,可以看到有如下6个类型的菜单在这里插入图片描述点击不同菜单,发现URL显示如下大胸妹&#xf…

hazelcast集群配置_使用HazelCast进行Hibernate缓存:基本配置

hazelcast集群配置之前,我们对JPA缓存,机制以及hibernate提供的内容进行了介绍 。 接下来是一个使用Hazelcast作为二级缓存的Hibernate项目。 为此,我们将在JPA中使用一个基本的spring boot项目。 Spring Boot使用Hibernate作为默认的JPA提…

c语言编译后找不到exe,在VS 2015命令提示符中找不到c – rc.exe

我刚刚安装了Windows 10 Creators Update(版本10.0.15063).我安装了多个版本的Visual Studio(2012年,2013年,2015年和2017年).我几周前才安装了VS 2017.问题在“VS2015 x64本机命令提示符”中运行时,CMake(版本3.8.1)不再找到C/C编译器(在VS 2017命令提示符下运行时它可以正常工…

tomcat如何通过配置的方式部署web工程

Workspaces 下有很多工程文件,这个 Workspaces 是 Myeclipse 自动生成的,我们通过 Myeclipse 写的工程都在这个 Workspaces 文件夹下。 我们部署工程到服务器上,就是要每个 WEB 工程里面的 context 文件夹,这个文件夹可以放在 t…

python爬虫源码_Python—爬虫:王者荣耀全套皮肤【附源码】

怎么获取全套皮肤?用钱买,或者用爬虫爬取下来~虽然后者不能穿。这个案例稍微复杂一点,但是一个非常值得学习的项目。具体实现思路:分析网页源代码结构找到合适的入口穷举访问并解析爬取所有英雄所有皮肤图片代码思路/程序流程&…

java ee规范_测试Java EE 8规范

java ee规范Java EE 8平台肯定在过去的几个月中一直在发展。 规范已经发布了早期的草案评审,里程碑甚至最终版本。 实际上,随着JSF 2.3的发布,JSR-372才刚刚进入最终版本。 有关更多信息,请参见 Arjan的帖子 。 它有幸成为JSR-37…

c语言中Gretchen函数的功能,听过很多的歌的音乐达人给我推荐一下

『 挚爱篇 』1 Bloated -- 挚爱篇2 Joy Is WIthin Reach -- Adrienne3 First Love -- 挚爱篇4 First Time -- 挚爱篇5 Get Funky -- 挚爱篇6 Walking on Air -- 挚爱篇7 Por qu te vas -- 挚爱篇8 爵士慢摇曲 -- Milky9 I Just Cant Get You Out of My Head (Club Mix) -- 挚爱…

JavaScript(JS)常用正则表达式汇总

文章目录自定义字符串校验函数常用正则表达式自定义字符串校验函数 校验字符串是否全由数字组成,是则返回true,否则返回false: function isDigit(str) {var regExp /^[0-9]{1,20}$/;// exec方法如果找到符合正则表达式的字符串&#xff0c…

警惕成教自考_不,保持警惕不会伤害Java。 关于Java许可的评论。

警惕成教自考所以。 Oracle希望通过Java赚钱。 然后,The Register发表了一篇非常对立的文章,上面有一个超级吸引人的标题。 根据他们的消息来源,“ Oracle正在大力加强对声称违反其许可证的Java客户的审计”。 当Twitter诗句对人们批评Oracle…

android colorstatelist_Android 样式系统 | 主题背景属性

在 Android 样式系统系列的前几篇文章中,我们介绍了主题背景与样式的区别,以及为什么说通过主题背景和公共主题背景属性来分解您要实现的内容是一个不错的主意,请点击链接回顾:Android 样式系统 | 主题背景和样式Android 样式系统 | 常见的主…

浙江科技学院c语言考试试卷,浙江科技学院c语言C试卷A.doc

浙江科技学院c语言C试卷A浙江科技学院2012 - 2013学年第学期考试试卷A卷信息、经管、 学院 20 年级 理工科专业得分一、判断题(本大题共12小题,每题小1分,共12分)得分二、单选题(本大题共20小题,每题1分,共20分)A.循环控制表达式的…

MyEclipse的Debug功能最基本的操作

使用 Debug 功能最基本的操作: 1.首先在一个 Java 文件中设断点,然后 Debug as --> Open Debug Dialog,然后在对话框中选类后,再点击 Run 运行程序,当程序走到断点处就会自动转到 Debug 视图 2.F5 键与F6 键均为单…

python二级考试可以用pycharm吗_学Python,Pycharm不能不知道怎么用

栏目介绍必会的Pycharm。我决定把去年写的Python文章整理一个专栏,垃圾的就直接删除,将多篇博文整理成一篇。 工欲善其事必先利其器,Pycharm 是最受欢迎的Python开发工具,它提供的功能非常强大,我尽量把自己用的都写写…

C语言写出生命游戏什么水平,我也来汇报~~~生命游戏。

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼int main(void){FILE *wen1,*wen2;system("color f0");char a[1000]{0};int i0;xx ch{0,0};wen1fopen("shuru.txt","r");if (wen1!NULL){while(!feof(wen1)){fread(&a[i],1,1,wen1);}chchinesec…

hazelcast 使用_使用HazelCast进行Hibernate缓存:JPA缓存基础知识

hazelcast 使用HazelCast的最大功能之一就是对Hibernate第二级缓存的支持 。 JPA具有两个级别的缓存。 一级缓存在事务期间缓存对象的状态。 通过两次查询相同的对象,您必须获得第一次获取的对象。 但是,在包含您检索并访问数据库的复杂查询的情况下&…

判断输入的字符串总字节数是否超出限制

function checkByteLength(str) { var bytes 0; for(var i 0;i<str.length;i){ var c str.charAt(i); if(c<256){ bytes 1; }else{ bytes 2; } } if(bytes >6){ alert(“您输入的字数超过限制&#xff01;”); } }

android页面统计代码,android流量统计(示例代码)

android.net.TrafficStats类中&#xff0c;提供了多种静态方法&#xff0c;可以直接调用获取&#xff0c;返回类型均为long型&#xff0c;如果返回等于-1代表 UNSUPPORTED 当前设备不支持统计。static long getMobileRxBytes() //获取通过Mobile连接收到的字节总数&#xff0…

python二维数组排序_Python实现二维数组按照某行或列排序的方法【numpy lexsort】...

本文实例讲述了Python实现二维数组按照某行或列排序的方法。分享给大家供大家参考&#xff0c;具体如下&#xff1a; lexsort支持对数组按指定行或列的顺序排序&#xff1b;是间接排序&#xff0c;lexsort不修改原数组&#xff0c;返回索引。 &#xff08;对应lexsort 一维数组…