kafka streams_Kafka REST Proxy for MapR Streams入门

kafka streams

介绍

MapR生态系统软件包2.0(MEP)随附了一些与MapR流有关的新功能:

  • 用于MapR Streams的Kafka REST代理为MapR Streams和Kafka集群提供RESTful接口,以使用和生成消息并执行管理操作。
  • Kafka Connect for MapR Streams是一个实用程序,用于在MapR Streams与Apache Kafka和其他存储系统之间流式传输数据。

MapR生态系统软件包(MEP)是一种提供与核心升级脱钩的生态系统升级的方法-允许您独立于聚合数据平台升级工具。 您可以在本文中进一步了解MEP 2.0。

在此博客中,我们描述了如何使用REST代理向MapR流发布消息和使用消息。 REST代理是对MapR融合数据平台的重要补充,允许任何编程语言使用MapR流。

MapR Streams工具随附的Kafka REST Proxy可以与MapR Streams一起使用(默认),也可以与Apache Kafka混合使用。 在本文中,我们将重点介绍MapR流。 <!–更多–>

先决条件

  • 具有MEP 2.0的MapR融合数据平台5.2
    • 使用MapR Streams工具
  • curl,wget或任何HTTP / REST客户端工具

创建MapR流和主题

流是主题的集合,您可以通过以下方式将其分组管理:

  1. 设置适用于该流中所有主题的安全策略
  2. 为流中创建的每个新主题设置默认的分区数
  3. 为流中每个主题的消息设置生存时间

您可以在文档中找到有关MapR Streams概念的更多信息。

在您的Mapr群集或沙盒上,运行以下命令:

$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p$ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3$ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3

启动Kafka控制台的生产者和消费者

打开两个终端窗口,并使用以下命令运行使用者的Kafka实用程序:

消费者

  • 主题传感器-json
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-json
  • 主题传感器二进制
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-binary

这两个终端窗口可让您查看有关不同主题的消息

使用Kafka REST代理

检查主题元数据

端点/topics/[topic_name]允许您获取有关该主题的一些信息。 在MapR Streams中,主题是路径标识的流的一部分; 要使用REST API使用主题,您必须使用完整路径,并在URL中进行编码; 例如:

  • /apps/iot-stream:sensor-json将使用%2Fapps%2Fiot-stream%3Asensor-json进行编码

运行以下命令,以获取有关sensor-json主题的信息

$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

注意:为简单起见,我从运行Kafka REST代理的节点上运行命令,因此可以使用localhost

您可以通过添加以下Python命令,以一种漂亮的方式打印JSON:

$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool

默认流

如上所述,流路径是您必须在命令中使用的主题名称的一部分。 但是可以将MapR Kafka REST代理配置为使用默认流。 为此,您应该在/opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties文件中添加以下属性:

  • streams.default.stream=/apps/iot-stream更改Kafka REST代理配置时,必须使用maprcli或MCS重新启动服务。使用streams.default.stream属性的streams.default.stream是简化URL使用的URL。以应用为例
    • 通过streams.default.stream ,可以使用curl -X GET http://localhost:8082/topics/

    在本文中,所有URL都包含编码的流名称,就像您可以开始使用Kafka REST代理而无需更改配置,也可以将其用于其他流。

发布消息

用于MapR流的Kafka REST代理允许应用程序将消息发布到MapR流。 消息可以作为JSON或二进制内容(base64编码)发送。

要发送JSON消息:

  • 查询应该是HTTP POST
  • 内容类型应为: application/vnd.kafka.json.v1+json
  • 身体:
{"records":[{"value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"}  }]
}

完整的请求是:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"}  }]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

您应该在运行/apps/iot-stream:sensor-json使用者的终端窗口中看到打印的消息。

要发送二进制消息:

  • 查询应该是HTTP POST
  • 内容类型应为: application/vnd.kafka.binary.v1+json
  • 身体:
{"records":[{"value":"SGVsbG8gV29ybGQ="}]
}

请注意, SGVsbG8gV29ybGQ=是在Base64中编码的字符串“ Hello World”。

完整的请求是:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

您应该在/apps/iot-stream:sensor-binary使用者正在运行的终端窗口中看到打印的消息。

发送多条消息

HTTP正文的records字段允许您发送多个消息,例如,您可以发送:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"}  }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"}  } ]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

该命令将发送2条消息,并将偏移量增加2。您可以对二进制内容执行相同的操作,只需在JSON数组中添加新元素即可; 例如:

curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

您可能知道,可以为消息设置密钥,以确保所有具有相同密钥的消息都将到达同一分区。 为此,将key属性添加到消息中,如下所示:

{"records":[{"key": "K001","value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"}  }]
}

既然您知道如何使用REST代理将消息发布到MapR Stream主题,那么让我们看看如何使用消息。

消费信息

REST代理还可以用于消费主题消息。 为此,您需要:

  1. 创建使用者实例。
  2. 使用第一次调用返回的URL读取消息。
  3. 如果需要,请删除实例的使用者。

创建使用者实例

以下请求创建使用者实例:

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json

服务器的响应如下所示:

{"instance_id":"iot_json_consumer","base_uri":"http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer"
}

请注意,我们已经使用/consumers/[topic_name]创建使用者。

后续请求将使用base_uri从主题获取消息。 与任何MapR Streams / Kafka使用者一样, auto.offset.reset定义其行为。 在此示例中,该值设置为earliest ,这意味着使用者将从头开始阅读消息。 您可以在MapR Streams文档中找到有关使用者配置的更多信息。

消费信息

要使用这些消息,只需将Mapr Streams主题添加到使用者实体的URL。

以下请求使用了该主题的消息:

curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json

此调用返回JSON文档中的消息:

[{"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1},{"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2},{"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3}
]

每次对API的调用都会根据上一次调用的偏移量返回发布的新消息。

请注意,消费者将被销毁:

  • consumer.instance.timeout.ms实例。超时。毫秒设置的空闲时间后(默认值设置为300000毫秒/ 5分钟)
  • 使用REST API调用销毁它(见下文)。

消费二进制格式的消息

如果需要使用二进制消息,则需要更改格式并接受标头,该方法是相同的。

调用此URL为二进制主题创建使用者实例:

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary

然后使用消息,accept标头设置为application/vnd.kafka.binary.v1+json

curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary

该调用返回JSON文档中的消息,并且该值在Base64中编码

[{"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1},{"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2}
]

删除使用者实例

如前所述,将根据REST Proxy的consumer.instance.timeout.ms配置自动销毁consumer.instance.timeout.ms 。 也可以使用使用者实例URI和HTTP DELETE调用销毁实例,如下所示:

curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer

结论

在本文中,您学习了如何将Kafka REST代理用于MapR流,该代理允许任何应用程序使用在MapR聚合数据平台中发布的消息。

您可以在MapR文档和以下资源中找到有关Kafka REST代理的更多信息:

  • MapR Streams入门
  • Ted Dunning和Ellen Friedman撰写的“流传输体系结构:使用Apache Kafka和MapR流的新设计”电子书

翻译自: https://www.javacodegeeks.com/2017/01/getting-started-kafka-rest-proxy-mapr-streams.html

kafka streams

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/335252.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JSP 活动元素 <jsp:directive.pageimport=“zero.space.ch03.Bookbean“/> 解读

<jsp:directive.pageimport"zero.space.ch03.Bookbean"/> 是 JSP 的活动元素&#xff0c;其作用相当于 JSP 指令 <% page import"zero.space.ch03.Bookbean" %> 但是也有区别&#xff1a; 两个用法的作用范围不同&#xff0c;<% page impo…

centos一键清理磁盘空间_如何清理 Docker 占用的磁盘空间

Docker 很占用空间&#xff0c;每当我们运行容器、拉取镜像、部署应用、构建自己的镜像时&#xff0c;我们的磁盘空间会被大量占用。如果你也被这个问题所困扰&#xff0c;咱们就一起看一下 Docker 是如何使用磁盘空间的&#xff0c;以及如何回收。docker 占用的空间可以通过下…

计算机基础应用形考5access答案,计算机应用基础形考作业五答案.doc

1 对关系模型叙述错误的是()。选择一项&#xff1a;用二维表表示关系模型是其一大特点建立在严格的数学理论&#xff0c;集合论和谓词演算公式的基础之上微机 DBMS 绝大部分采取关系数据模型不具有连接操作的 DBMS 也可以是关系数据库系统正确答案是&#xff1a;建立在严格的数…

mongodb dsl_具有Java DSL的Spring Integration MongoDB适配器

mongodb dsl1引言 这篇文章解释了如何使用Spring Integration从MongoDB数据库中保存和检索实体。 为了完成此任务&#xff0c;我们将使用Java DSL配置扩展来配置入站和出站MongoDB通道适配器。 例如&#xff0c;我们将构建一个应用程序&#xff0c;使您可以将订单写入MongoDB存…

python工作方法_用python开始一天工作

run1pm.py 全部源码 python #!D:\appsoft\python\python.exe # -* - coding: UTF-8 -* - import os import sys import time # # 启动: # 1)run1pmEn.py # 退出: # 1) run1pmEn.py quit # # _tARG1start # start quit if len(sys.argv)>1 and sys.argv[1]quit: _tARG1sys.…

Oracle 数据库中较为复杂或典型的 SQL 语句的解读

文章目录批量生成 SQL 语句/拼接字符串多表关联查询 where 子句示例&#xff08;一&#xff09;示例&#xff08;二&#xff09;普通的表间内连接查询语句关键字 distinct 用法说明Oracle 数据库的分组排序查询Oracle 数据库 cast 函数Oracle 数据库 sum 函数的高级用法Oracle…

html ios视频播放器,良心推荐!iOS端的视频播放应用

不知道大家还记不记得之前小编我推荐的最好用的iOS音乐播放器&#xff0c;如果没看过&#xff0c;可以点击这里。既然之前介绍了一款iOS端的音乐播放器给大家&#xff0c;那么今天就介绍一款iOS端十分好用的万能解码的视频播放器。使用iPhone的用户都明白&#xff0c;iPhone自带…

私有方法与静态私有方法_每个私有静态方法都是新类的候选人

私有方法与静态私有方法您是否有私有的静态方法来帮助您将算法分解为更小的部分&#xff1f; 我做。 每当我编写一个新方法时&#xff0c;我就会意识到它可以是一个新类。 当然&#xff0c;我不会从所有课程中选修课程&#xff0c;但这必须是目标。 私有静态方法不可重用&#…

c语言插入排序_还有这种操作?C语言插入排序算法,一点就透

插入排序算法是所有排序方法中最简单的一种算法&#xff0c;其主要的实现思想是将数据按照一定的顺序一个一个的插入到有序的表中&#xff0c;最终得到的序列就是已经排序好的数据。更多C/C资料群文件&#xff1a;569268376直接插入排序是插入排序算法中的一种&#xff0c;采用…

Mac 如何操控远程的 Windows 电脑

文章目录使用 Remote Desktop Connection for mac 客户端第 1 步&#xff1a;Windows 电脑进行远程设置第 2 步&#xff1a;Windows 电脑设置管理员账号和密码第 3 步&#xff1a;获取 Windows 电脑的 IP 地址第 4 步&#xff1a;Mac 电脑安装远程桌面连接客户端第 5 步&#x…

map iterator_一个简单的Map Iterator性能测试

map iteratorJava Map性能有很多方面可以衡量&#xff0c;但是关键的一个是简单的单线程扫描。 这是一些针对Iterators和Java 8 Map.forEach()简单测试代码&#xff0c;以及一些图形结果。 1.性能测试困难 性能测试是一项非常困难的工作&#xff0c;精确的可重复性测试需要Jav…

学生用计算机中sto,STO 文件扩展名: 它是什么以及如何打开它?

STO 疑难解答常见的 STO 打开问题Ecru Software PRO100 不存在你尝试加载 STO 文件并收到错误&#xff0c;例如 “%%os%% 无法打开 STO 文件扩展名”。 如果是这种情况&#xff0c;通常是因为 你的计算机上没有安装 Ecru Software PRO100 for %%os%%。 由于您的操作系统不知道如…

MacBook 使用 Loopback 录屏和录音频(MacBook 录屏教程/录视频教程/Loopback 教程)

文章目录一、下载软体二、Loopback 界面介绍三、设置系统的声音输入/输出设备&#xff08;一&#xff09;设置声音输入设备&#xff08;二&#xff09;设置声音输出设备四、录制程序中选择声音输入设备五、开始录制一、下载软体 在網路上可以找到破解版的軟體 Loopback 二、L…

惠普照片打印软件_被看错的打印机?原来打印机还可以这么玩

孩提时代&#xff0c;经常弄丢试卷的小值君曾频繁地与打印店打交道&#xff0c;那是我最早接触打印机的时候。白驹过隙&#xff0c;时至当下&#xff0c;打印设备已然成为家庭不可或缺的部分。印象中&#xff0c;打印机要不就是打打文档&#xff0c;要不就是打打照片&#xff0…

dagger2 注入_使用Dagger 2在GWT中进行依赖注入

dagger2 注入依赖注入是一种软件开发概念&#xff0c;其中为对象提供了创建所需的所有对象或值。 GWT用户已经熟悉GIN&#xff0c;但已经不推荐使用此工具&#xff0c;因此不再支持它&#xff0c;因此使用GIN的应用程序当前确实需要告别。 Dagger是GWT的新依赖注入框架。 对于那…

oracle创建一个表同已存在表结构一样

oracle创建一个表同已存在表结构一样&#xff08;或者同时将数据导入&#xff09; –只是建立ta表&#xff0c;与emp表结构相同&#xff0c;并不添加数据 –这种构造与现存表相同结构的表&#xff0c;是不会将comment带过来的 create table ta as select * from scott.emp wh…

周敏教授计算机编码与密码学,中国科学院计算机与控制学院博士生导师:张玉清教授...

张玉清 男 博导 信息科学与工程学院电子邮件&#xff1a;zhangyqgucas.ac.cn通信地址&#xff1a;北京石景山区玉泉路19号甲邮政编码&#xff1a;100049部门/实验室&#xff1a;计算机与控制学院职务: 教授&#xff0c;副院长电子邮箱: zhangyqucas.ac.cn研究方向:计算机网络&a…

戴尔G3笔记本使用U盘重装操作系统

戴尔G3笔记本 下载安装大白菜U盘启动盘制作软件根据使用说明完成启动盘制作下载操作系统ISO文件重启电脑&#xff0c;连续按F12&#xff0c;打开如下界面后选择红色线框选项&#xff1a; 进入PE系统界面&#xff0c;打开【大白菜】&#xff0c;看到如下界面&#xff1a;

华南理工网络教育计算机概论,2020年《计算机概论》平时作业华南理工网络教育学院.pdf...

计算机概论平时作业1. 简 述人工智能答&#xff1a;人工智能是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门新的技术科学。人工智能是计算机科学的一个分支&#xff0c; 它企图了解智能的实质&#xff0c; 并生产出一种新的能以人类智能相似的方…

python合并csv文件_Python学习——pandas 合并csv文件

import pandas as pd import os import re import numpy as np Folder_Path rC:/Users/Cable-Ching\Desktop/New folder/New folder # 要拼接的文件夹及其完整路径&#xff0c;注意不要包含中文 SaveFile_Path rC:/Users/Cable-Ching\Desktop/New folder/New folder # 拼接后…