kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ consumer实战

1、上一节课我们学习了MQTT producer 生产者步骤,MQTT consumer消费者步骤。该步骤可以从支持MRQTT协议的中间件获取数据,该步骤和kafka consumer 一样可以处理实时数据交互,如下图所示:

 2、双击步骤打开MQTT consumer 配置窗口,如下图所示:

Step name:自定义步骤名称。

Transformation:设置子转换,该子转换的作用是从中间件读取流数据,然后将字段返回给MQTT consumer步骤进行使用。

Connection:指定此步骤将连接的 MQTT 服务器的地址,如127.0.0.1:1883(注意这里的端口是1883,不是5672)

Client ID:指定 MQTT 客户端的唯一 ID。MQTT 服务器使用此客户端 ID 来识别每个不同的客户端及其当前状态。

Topics name:在主题名称字段中,输入您希望订阅流数据(消息)的 MQTT 主题的名称。其实这里的topic是RabbitMQ中的routing key(另外这里的routing key 一定不要绑定队列,否则MQTT consumer步骤无法接收数据)。

Quality of Service:是消息传递的保证级别。选择以下选项之一。
至多一次(0),这是默认值
至少一次(1)
恰好一次(2)

 3、安全验证设置,如下图所示:

Username:MQTT服务器的用户名,如admin

Password:MQTT服务器的密码。

Use secure protocol:选择此选项以定义连接的 SSL 属性,本次不做介绍。

 3、批次设置,使用此选项卡来指定在处理之前要拉取多少消息。您可以指定消息数量和/或特定的时间量。

Duration(ms):
请指定一个以毫秒为单位的时间。此值表示在执行转换之前该步骤将花费多少时间来收集记录。
如果将此选项设置为0,则根据参数Number of records记录数触发消费。要运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Number of records
指定一个数字。在每收集到‘X’条记录之后,指定的转换将被执行,并且这些‘X’条记录将被传递给转换过程。
如果将此选项设置为0,则将参数Duration按持续时间触发消费。为了运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Maximum concurrent batches

指定用于同时收集记录的最大批次数。默认值为1,表示使用单个批次来收集记录。仅当您的消费者步骤无法跟上数据流的速度时,才应使用此选项。
您的计算环境必须具备足够的 CPU 和内存来进行此实现。如果您的环境无法处理指定的最大并发批次数,将会出现错误。


Message prefetch limit
请指定此步骤将排队等待处理的传入消息的限制,即从 kfakfa broker接收到的消息。
设置此值会强制kafka broker处理超过指定限制的消息的背压。默认排队消息的数量是100000条。

 4、字段设置,这里采用默认值就行了,不用调整。

5、子转换结果字段设置,选择子转换返回数据的步骤。 

 6、同上一节课,本次不再介绍。

 7、子转换配置,将Get records from stream步骤拉到画布,然后填写Message、Topic两个字段,类型都是设置为String即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/1537.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ROS下机器人系统仿真及部分SLAM建图

文章目录 一、 Launch文件使用二、 参考资料三、 遇到的问题四、 效果演示五、相关代码5.1 一些简介5.2 机器人模型5.2.1 机器人底盘5.2.2 摄像头5.2.3 雷达 5.3 惯性矩阵 六、代码传送门实验结果及分析 温馨提示:如果有幸看到这个文章,不要看里面的内容…

java-单列集合List详解

一、List概述 ​​​​​​​List 接口继承自 Collection 接口。这意味着所有 List 类型的对象都是 Collection 类型的对象,它们共享 Collection 接口中定义的所有方法。 List集合的特点: 1、有序:存和取得元素顺序一致 2、有索引&#xf…

使用FPGA实现比较器

介绍 比较器就是通过比较输入的大小,然后输出给出判断。 在这个比较器中,有两个输入,三个输出。根据输出就可以判断出哪个输入值大了。 设计文件 library ieee; use ieee.std_logic_1164.all; use ieee.std_logic_arith.all; entity compa…

【大语言模型LLM】-使用大语言模型搭建点餐机器人

关于作者 行业:人工智能训练师/LLM 学者/LLM微调乙方PM发展:大模型微调/增强检索RAG分享国内大模型前沿动态,共同成长,欢迎关注交流… 大语言模型LLM基础-系列文章 【大语言模型LLM】-大语言模型如何编写Prompt?【大语言模型LL…

zabbix监控华为网络设备路由器eNSP(3)

如果有没搭建zabbix的网友可以先看我上一个搭建文档,把监控系统搭建上https://blog.csdn.net/weixin_72819498/article/details/137751059 拓扑图: 绑定和服务器同地址的网卡 1.监控端配置 (1)SNMP服务安装 [rootzbx-server ~]# yum -y install net-…

编译一个基于debian/ubuntu,centos,arhlinux第三方系统的问题解答

如果是开机卡boot注意看前面几行会有错误提示,一般会比较好找,下面是过了kernel内核加载后出现的问题 目录 上一篇文章 第一个问题 错误原因 解决办法 第二个问题 注意 第三个问题 上一篇文章 编译一个基于debian/ubuntu,centos,arhlinux第三方系…

什么是0-day漏洞,怎么防护0-day漏洞攻击

随着信息技术的快速发展,网络安全问题日益凸显,其中0day漏洞攻击作为一种高级威胁手段,给企业和个人用户带来了极大的风险。下面德迅云安全就对0day漏洞攻击进行简单讲解下,并分享相应的一些安全措施,以期提高网络安全…

第15届蓝桥杯题解

A题 结果:2429042904288 思路很简单 前20个数分别是 20 24 40 48 60 72 80 96 100 120 140 144 160 168 180 192 200 216 220 240 第2 4 6 8 12 ...n个数分别是24的 1倍 2倍 3倍 4倍 6倍 n/2倍 所以第202420242024 个数就是 24的 101210121012倍 B题 答案&am…

二维前缀和与差分

前言 延续前面所讲的一维前缀和以及差分,现在来写写二维前缀和与差分 主要这个画图就比前面的一维前缀和与差分复杂一点,不过大体思路是一样的 一维和二维的主要思路在于一维是只针对对一行一列,而二维是针对与一个矩阵的 好吧&#xff0…

ACE框架学习2

目录 ACE Service Configurator框架 ACE_Server_Object类 ACE_Server_Repository类 ACE_Server_Config类 ACE Task框架 ACE_Message_Queue类 ACE_TASK类 在开始之前&#xff0c;首先介绍一下模板类的实例化和使用。给出以下代码 //ACCEPTOR代表模板的方法 template <…

VSCode插件开发学习

一、环境准备 0、参考文档&#xff1a;VS Code插件创作中文开发文档 1、大于18版本的nodejs 2、安装Yeoman和VS Code Extension Generator&#xff1a; npm install -g yo generator-code 3、生成脚手架 yo code 选择内容&#xff1a; ? What type of extension do yo…

DASCTF X GFCTF 2024|四月开启第一局

前言 题目都比较简单&#xff0c;&#xff0c;&#xff0c;没啥好说的&#xff0c;很久没做题了&#xff0c;简单记录一下 dynamic_but_static 仅仅开了 NX 保护栈溢出 先泄漏 libc 地址&#xff0c;然后栈溢出打 ret2libc&#xff0c;开了沙箱得 orw from pwn import * c…

Vue3+TS版本Uniapp:项目前置操作

作者&#xff1a;前端小王hs 阿里云社区博客专家/清华大学出版社签约作者✍/CSDN百万访问博主/B站千粉前端up主 环境&#xff1a;使用vscode进行开发 如果一开始是使用的HbuilderX&#xff0c;请看hbuilderX创建的uniapp项目转移到vscode 为什么选择vscode&#xff1f;有更好…

Windows进入黑屏,操作CMD提示命令提示符已被系统管理员停用

背景 由于安装或者卸载某些服务导致主机无法正常显示桌面&#xff0c;从控制台进入打开操作执行命令提示禁用。 操作步骤 注意务必做好快照备份后再操作。 打开注册表中将其重新启用&#xff1a; 依次打开“运行”命令&#xff0c;然后在打开的“运行”对话框中输入 “regedit…

一个简单的记工tkinter窗口

代码分享: 导入datetime模块&#xff0c;用于获取当前日期 import datetime as da 导入csv模块&#xff0c;用于读写csv文件 import csv 导入tkinter模块&#xff0c;用于创建窗口和按钮 from tkinter import * 创建主窗口 appTk() 设置窗口大小为1048x2048&#xff0…

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第六套

华为海思校园招聘-芯片-数字 IC 方向 题目分享——第六套 (共9套&#xff0c;有答案和解析&#xff0c;答案非官方&#xff0c;未仔细校正&#xff0c;仅供参考&#xff09; 部分题目分享&#xff0c;完整版获取&#xff08;WX:didadidadidida313&#xff0c;加我备注&#x…

【OpenHarmony-NDK技术】简单将cJson移植到OpenHarmony中,并在c层修改参数值再返回json

1、cJson的简单介绍 cJson - github网址 概述 一般使用cJson是&#xff0c;需要将json文本转化为json对象–编码&#xff0c;将json对象转化为json文本–解析。 git clone https://github.com/DaveGamble/cJSON.git 后留意cJSON.h和cJSON.h两个文件。 1、cJson的介绍 cJso…

Golang那些违背直觉的编程陷阱

目录 知识点1&#xff1a;切片拷贝之后都是同一个元素 知识点2&#xff1a;方法集合决定接口实现&#xff0c;类型方法集合是接口方法集合的超集则认定为实现接口&#xff0c;否则未实现接口 切片拷贝之后都是同一个元素 package mainimport ("encoding/json"&quo…

Redis 如何实现分布式锁

课程地址 单机 Redis naive 版 加锁&#xff1a; SETNX ${lockName} ${value} # set if not exist如果不存在则插入成功&#xff0c;返回 1&#xff0c;加锁成功&#xff1b;否则返回 0&#xff0c;加锁失败 解锁&#xff1a; DEL ${lockName}问题1 2 个线程 A、B&#…

前后端交互概念

前后端交互概念 1前后端分离开发概念2搭建后端环境2.1配置文件commomcommon-utilservice-utilmodelservice gitee使用 1前后端分离开发概念 前段&#xff1a;运用html、css、js和现成库&#xff0c;对数据作展示。 后端&#xff1a;运用Java和Java框架&#xff0c;提供数据或操…