kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ consumer实战

1、上一节课我们学习了MQTT producer 生产者步骤,MQTT consumer消费者步骤。该步骤可以从支持MRQTT协议的中间件获取数据,该步骤和kafka consumer 一样可以处理实时数据交互,如下图所示:

 2、双击步骤打开MQTT consumer 配置窗口,如下图所示:

Step name:自定义步骤名称。

Transformation:设置子转换,该子转换的作用是从中间件读取流数据,然后将字段返回给MQTT consumer步骤进行使用。

Connection:指定此步骤将连接的 MQTT 服务器的地址,如127.0.0.1:1883(注意这里的端口是1883,不是5672)

Client ID:指定 MQTT 客户端的唯一 ID。MQTT 服务器使用此客户端 ID 来识别每个不同的客户端及其当前状态。

Topics name:在主题名称字段中,输入您希望订阅流数据(消息)的 MQTT 主题的名称。其实这里的topic是RabbitMQ中的routing key(另外这里的routing key 一定不要绑定队列,否则MQTT consumer步骤无法接收数据)。

Quality of Service:是消息传递的保证级别。选择以下选项之一。
至多一次(0),这是默认值
至少一次(1)
恰好一次(2)

 3、安全验证设置,如下图所示:

Username:MQTT服务器的用户名,如admin

Password:MQTT服务器的密码。

Use secure protocol:选择此选项以定义连接的 SSL 属性,本次不做介绍。

 3、批次设置,使用此选项卡来指定在处理之前要拉取多少消息。您可以指定消息数量和/或特定的时间量。

Duration(ms):
请指定一个以毫秒为单位的时间。此值表示在执行转换之前该步骤将花费多少时间来收集记录。
如果将此选项设置为0,则根据参数Number of records记录数触发消费。要运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Number of records
指定一个数字。在每收集到‘X’条记录之后,指定的转换将被执行,并且这些‘X’条记录将被传递给转换过程。
如果将此选项设置为0,则将参数Duration按持续时间触发消费。为了运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Maximum concurrent batches

指定用于同时收集记录的最大批次数。默认值为1,表示使用单个批次来收集记录。仅当您的消费者步骤无法跟上数据流的速度时,才应使用此选项。
您的计算环境必须具备足够的 CPU 和内存来进行此实现。如果您的环境无法处理指定的最大并发批次数,将会出现错误。


Message prefetch limit
请指定此步骤将排队等待处理的传入消息的限制,即从 kfakfa broker接收到的消息。
设置此值会强制kafka broker处理超过指定限制的消息的背压。默认排队消息的数量是100000条。

 4、字段设置,这里采用默认值就行了,不用调整。

5、子转换结果字段设置,选择子转换返回数据的步骤。 

 6、同上一节课,本次不再介绍。

 7、子转换配置,将Get records from stream步骤拉到画布,然后填写Message、Topic两个字段,类型都是设置为String即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/1537.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ROS下机器人系统仿真及部分SLAM建图

文章目录 一、 Launch文件使用二、 参考资料三、 遇到的问题四、 效果演示五、相关代码5.1 一些简介5.2 机器人模型5.2.1 机器人底盘5.2.2 摄像头5.2.3 雷达 5.3 惯性矩阵 六、代码传送门实验结果及分析 温馨提示:如果有幸看到这个文章,不要看里面的内容…

java-单列集合List详解

一、List概述 ​​​​​​​List 接口继承自 Collection 接口。这意味着所有 List 类型的对象都是 Collection 类型的对象,它们共享 Collection 接口中定义的所有方法。 List集合的特点: 1、有序:存和取得元素顺序一致 2、有索引&#xf…

使用FPGA实现比较器

介绍 比较器就是通过比较输入的大小,然后输出给出判断。 在这个比较器中,有两个输入,三个输出。根据输出就可以判断出哪个输入值大了。 设计文件 library ieee; use ieee.std_logic_1164.all; use ieee.std_logic_arith.all; entity compa…

UE5 android package

1. plug 里删除所有IOS插件; 2.jdk11必须; 3.setting -windows-compiler Version 设置成你的版本; 4.andorid 变绿; 5.target SDK version :34; 6.package game date inside apk? check 7.allow large OBB files c…

【大语言模型LLM】-使用大语言模型搭建点餐机器人

关于作者 行业:人工智能训练师/LLM 学者/LLM微调乙方PM发展:大模型微调/增强检索RAG分享国内大模型前沿动态,共同成长,欢迎关注交流… 大语言模型LLM基础-系列文章 【大语言模型LLM】-大语言模型如何编写Prompt?【大语言模型LL…

zabbix监控华为网络设备路由器eNSP(3)

如果有没搭建zabbix的网友可以先看我上一个搭建文档,把监控系统搭建上https://blog.csdn.net/weixin_72819498/article/details/137751059 拓扑图: 绑定和服务器同地址的网卡 1.监控端配置 (1)SNMP服务安装 [rootzbx-server ~]# yum -y install net-…

Qt——Qt网络编程之TCP通信服务器端的实现(使用QTcpServer、QTcpSocket实现一个TCP服务器端例程)

【系列专栏】:博主结合工作实践输出的,解决实际问题的专栏,朋友们看过来! 《项目案例分享》 《极客DIY开源分享》 《嵌入式通用开发实战》 《C++语言开发基础总结》 《从0到1学习嵌入式Linux开发》 《QT开发实战》 《Android开发实战》

数据仓库、数据中台、大数据平台的关系?

一、数据中台 数据中台是一个数据运营的概念,主要功能是将跨领域的数据集中聚合和治理,将其抽象为服务,提供具有业务价值的逻辑概念。 相较于传统的大数据平台,数据中台是升级版的概念,并不再简单地将各个功能混在一起…

编译一个基于debian/ubuntu,centos,arhlinux第三方系统的问题解答

如果是开机卡boot注意看前面几行会有错误提示,一般会比较好找,下面是过了kernel内核加载后出现的问题 目录 上一篇文章 第一个问题 错误原因 解决办法 第二个问题 注意 第三个问题 上一篇文章 编译一个基于debian/ubuntu,centos,arhlinux第三方系…

CloudCompare 二次开发(28)——最小二乘拟合空间直线

目录 一、概述二、代码集成三、结果展示一、概述 使用CloudCompare与PCL实现的最小二乘拟合直线。具体计算原理见:PCL 最小二乘拟合空间直线。 二、代码集成 1、mainwindow.h文件public中添加: void doActionPCLLeastSquareFit3DLine(); // 最小二乘拟合空间直线2、mainw…

什么是0-day漏洞,怎么防护0-day漏洞攻击

随着信息技术的快速发展,网络安全问题日益凸显,其中0day漏洞攻击作为一种高级威胁手段,给企业和个人用户带来了极大的风险。下面德迅云安全就对0day漏洞攻击进行简单讲解下,并分享相应的一些安全措施,以期提高网络安全…

第15届蓝桥杯题解

A题 结果:2429042904288 思路很简单 前20个数分别是 20 24 40 48 60 72 80 96 100 120 140 144 160 168 180 192 200 216 220 240 第2 4 6 8 12 ...n个数分别是24的 1倍 2倍 3倍 4倍 6倍 n/2倍 所以第202420242024 个数就是 24的 101210121012倍 B题 答案&am…

二维前缀和与差分

前言 延续前面所讲的一维前缀和以及差分,现在来写写二维前缀和与差分 主要这个画图就比前面的一维前缀和与差分复杂一点,不过大体思路是一样的 一维和二维的主要思路在于一维是只针对对一行一列,而二维是针对与一个矩阵的 好吧&#xff0…

flutter组件 ThemeData

这里只讲组件的定义&#xff0c;需要各位自己去尝试。 ThemeData({// 常规配置Iterable<Adaptation<Object>>? adaptations, // 定义主题自适应方案的列表。bool? applyElevationOverlayColor, // 是否应用海拔叠加颜色。NoDefaultCupertinoThemeData? cuperti…

从零开始精通RTSP之深入理解RTP协议

概述 RTP&#xff0c;即实时传输协议&#xff0c;英文全称为Real-Time Transport Protocol&#xff0c;是一种用于在互联网上传输视频、音频等实时数据的网络协议。RTP本身不提供任何服务质量保证&#xff0c;而是依赖于底层传输协议&#xff08;比如&#xff1a;UDP&#xff0…

ACE框架学习2

目录 ACE Service Configurator框架 ACE_Server_Object类 ACE_Server_Repository类 ACE_Server_Config类 ACE Task框架 ACE_Message_Queue类 ACE_TASK类 在开始之前&#xff0c;首先介绍一下模板类的实例化和使用。给出以下代码 //ACCEPTOR代表模板的方法 template <…

VSCode插件开发学习

一、环境准备 0、参考文档&#xff1a;VS Code插件创作中文开发文档 1、大于18版本的nodejs 2、安装Yeoman和VS Code Extension Generator&#xff1a; npm install -g yo generator-code 3、生成脚手架 yo code 选择内容&#xff1a; ? What type of extension do yo…

DASCTF X GFCTF 2024|四月开启第一局

前言 题目都比较简单&#xff0c;&#xff0c;&#xff0c;没啥好说的&#xff0c;很久没做题了&#xff0c;简单记录一下 dynamic_but_static 仅仅开了 NX 保护栈溢出 先泄漏 libc 地址&#xff0c;然后栈溢出打 ret2libc&#xff0c;开了沙箱得 orw from pwn import * c…

Vue3+TS版本Uniapp:项目前置操作

作者&#xff1a;前端小王hs 阿里云社区博客专家/清华大学出版社签约作者✍/CSDN百万访问博主/B站千粉前端up主 环境&#xff1a;使用vscode进行开发 如果一开始是使用的HbuilderX&#xff0c;请看hbuilderX创建的uniapp项目转移到vscode 为什么选择vscode&#xff1f;有更好…

Docker容器的原理及应用详解(三)

本系列文章简介&#xff1a; Docker是一种开源的容器化技术&#xff0c;它将应用程序及其依赖项打包为一个容器&#xff0c;以便在任何环境下运行。与传统的虚拟机相比&#xff0c;Docker容器更加轻量级且快速&#xff0c;可以在几秒钟内启动和停止。Docker的原理和应用非常广泛…