Apache Nifi挂接MQTT与Kafka实践

目录

1.  说明:

2. 方案设计:

2.1 资源配置:

2.2 交互Topics:  

3. 实现步骤 

3.1 Nifi 桌面

3.2 MqttToKafka

3.2.1 配置

3.2.2 测试

3.2.3 结果

3.3 KafkaToMqtt

3.3.1 配置 

3.3.1 测试

3.3.1 结果 ​编辑

4. 总结:

4.1 知识点

Nifi Kafka Processor 配置字典:

Topic通配符:

5. 参考:


1.  说明:

      在一些方案实现过程中,感觉需要一种接驳器来连接不同的数据源并汇流到一处进行统一处理,于是寻到NIFI(官网)这个工具,它相当于“数据水管+接驳器工具箱”,能丝滑联结不同的数据源,总体思路是把各类数据源汇流到Kafka集中处理,比如日志文件,消息传递,数据库操作等。初步使用感觉很不错,分享之。

2. 方案设计:

- 连接Emqx集群(mqtt服务)与Kafka集群,实现数据流动的双工运作 - 客户端(连mqtt) <=> 应用服务(连kafka)

2.1 资源配置:

简单起见,在docker环境中实施,后续迁移到K8s

服务集群服务入口备注
MQTT (tcp|mqtt)://host001.dev.ia:1883

client id:

nifi-xio1-sub1 订阅者

nifi-xio1-pub1 发布者

Kafkahost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Apache Nifihttp://host001.dev.ia:9080/nifi/

Nifi的docker配置

# 建个卷,持久化数据
docker volume create nifi_data

docker-compose.yml

version: "3.7"
services:nifi:image: apache/nifi:1.9.2container_name: nifirestart: alwaysports:- "9080:8080"environment:- NIFI_WEB_HTTP_HOST=0.0.0.0#- NIFI_HOME=/home/nifi#- NIFI_LOG_DIR=/home/nifi/logsvolumes:- nifi_data:/home/nifivolumes:nifi_data:external: true
2.2 交互Topics:  
Topic备注
test.topic.nifi1测试接收
test.topic.bus总线
test.device.*测试通配符topic

test.device.mw3039kkj001

测试带设备id的topic

3. 实现步骤 

3.1 Nifi 桌面

配好后,访问​http://host001.dev.ia:9080/nifi/​, 中间是配好的两个Processor Group,分别是MqttToKafka与KafkaToMqtt,代表双向流动配置。

3.2 MqttToKafka
3.2.1 配置

 加ConsumeMQTT Processor:拉Processor组件下去,点开选ConsumeMQTT

Settings备注
NameConsumeMQTT
Automatically terminate relationships

failure / success 勾选

Properties备注
NameConsumeMQTT
Broker URItcp://host001.dev.ia:1883
Client IDnifi-xio1-sub1
Username/Password--
Topic Filtertest.topic.nifi1
Max Queue Size1000

加PublishKafka_2_0 Processor:拉Processor组件下去,点开选PublishKafka_2_0

Properties备注
NamePublishKafka_2_0
Kafka Brokershost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Security ProtocolPLAINTEXT
Topic Nametest.topic.nifi1
Delivery Guarantee

Guarantee Replicated Delivery

Use Transactionstrue

拖动ConsumeMQTT连接PublishKafka, 会添加一个队列连接组件命名为 Message

正确运行如图:

3.2.2 测试

说明:

  1. 用mqtt客户端工具MqttX向topic=tset.topic.nifi1发送json数据包
  2. 用python脚本作为消费者客户端连接kafka,订阅topic=tset.topic.nifi1,获取该数据包

python脚本:

from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import jsonasync def consume_loop(consumer, topics):try:# 订阅主题consumer.subscribe(topics)while True:# 轮询消息msg = consumer.poll(timeout=1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:# End of partition eventprint("%% %s [%d] reached end at offset %d\n"% (msg.topic(), msg.partition(), msg.offset()))elif msg.error():raise KafkaException(msg.error())else:# 正常消息raw_message = msg.value()print(f"Raw message: {raw_message}")parsed_message = json.loads(raw_message.decode("utf-8"))print(f"Received message: {type(parsed_message)} : {parsed_message}")await asyncio.sleep(0.01)  # 小睡片刻,让出控制权finally:# 关闭消费者consumer.close()async def consume():# 消费者配置conf = {"bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092","group.id": "mygroup1","auto.offset.reset": "earliest",}# 创建消费者consumer = Consumer(conf)await consume_loop(consumer, ["tset.topic.nifi1"])if __name__ == "__main__":asyncio.run(consume())
3.2.3 结果

脚本 Nifi 

3.3 KafkaToMqtt
3.3.1 配置 

加ConsumeKafkaProcessor:拉Processor组件下去,点开选ConsumeMQTT

Settings备注
NameConsumeKafka_2_0
Properties备注
Kafka Brokershost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Topic Name(s)test.topic.bus / test.device.*
Group IDtest

加PublishMQTT Processor:拉Processor组件下去,点开选PublishMQTT

Settings备注
NamePublishMQTT
Automatically terminate relationships

failure / success 勾选

Properties备注
Broker URItcp://host001.dev.ia:1883
Client IDnifi-xio1-pub1
Username/Password--
Topic Filtertest.topic.bus
QoS0

 拖动ConsumeKafka_2_0连接PublishMQTT, 会添加一个队列连接组件命名为 Message

正确运行如图:

3.3.1 测试

说明:

  1. python脚本向Kafka发布消息到 topic = test.topic.bus
  2. MqttX客户端订阅接收

脚本

from confluent_kafka import Producer
import jsondef delivery_report(err, msg):"""Called once for each message produced to indicate delivery result.Triggered by poll() or flush()."""if err is not None:print(f"Message delivery failed: {err}")else:print(f"Message delivered to {msg.topic()} [{msg.partition()}]")def create_async_producer(config):"""Creates an instance of an asynchronous Kafka producer."""return Producer(config)def produce_messages(producer, topic, messages):"""Asynchronously produces messages to a Kafka topic."""for message in messages:# Trigger any available delivery report callbacks from previous produce() callsproducer.poll(0)# Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.producer.produce(topic, json.dumps(message).encode("utf-8"), callback=delivery_report)# Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.producer.flush()if __name__ == "__main__":# Kafka configuration# Replace these with your server's configurationconf = {"bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092",  # Replace with your Kafka server addresses# "client.id": "python-producer",}# Create an asynchronous Kafka producerasync_producer = create_async_producer(conf)# Messages to send to Kafkamessages_to_send = [{"key": "value1a"}, {"key": "value2a"}, {"key": "value3a"}]# Produce messages# produce_messages(async_producer, "test.topic.bus", messages_to_send)produce_messages(async_producer, "test.device.mw3039kkj001", messages_to_send)
3.3.1 结果 

MqttX 

Nifi 

4. 总结:

       Nifi支持集群化部署,如此从数据采集,数据流动到数据存储都实现了分布式,而且有可视化的界面可方便地进行数据节点的集聚与增减配置,目前只是浅尝即止,更深入的研究待后续不断补充优化。

4.1 知识点
Nifi Kafka Processor 配置字典:
Delivery Guarantee

数据传递保证

  1. Best Effort (尽力交付,相当于ack=0)
  2. Guarantee Single Node Delivery(保证单节点交付,相当于ack=1,Kafka中的默认配置):
  3. Guarantee Replicated Delivery(保证复制交付,相当于ack=-1)
Use Transactions

使用事务 

true / false 

Topic通配符:
“/”

主题层级分隔符

如果存在分隔符,它将主题名分割为多个主题层级。

如:room401/tv/contrl/sensor

“#”多层通配符

匹配主题中任意层级的通配符

如果客户端订阅主题 “china/guangzhou/#”, 它会收到使用下列主题名发布的消息

china/guangzhou china/guangzhou/huangpu china/guangzhou/tianhe/zhongshanlu china/guangzhou/tianhe/zhongshanlu/num123

school/#                //也匹配单独的 “school” ,因为 # 包括它的父级。
#                       //是有效的,会收到所有的应用消息。
school/teacher/#        //有效的。
school/teacher#         //无效的。
school/teacher/#/lever  //无效的,必须是主题过滤器的最后一个字符
https://blog.51cto.com/u_16099203/10959511

“+”单层通配符

单个主题层级匹配的通配符。在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。

china/+ 只能匹配 china/guangzhou

china/+/+/zhongshanlu 能匹配china/guangzhou/tianhe/zhongshanlu和china/shenzhen/nanshan/zhongshanlu

“$”匹配一个字符$xx
/$xx
/xx$

5. 参考:

- https://zhuanlan.zhihu.com/p/628628189

- https://zhuanlan.zhihu.com/p/697301397

- https://blog.51cto.com/u_16213319/7344183

- Apache NiFi Docker Compose | All About

- https://blog.51cto.com/u_16099203/10959511

- 大数据NiFi(二十一):监控日志文件生产到Kafka-腾讯云开发者社区-腾讯云

- PublishMQTT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/50902.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3 命令运行窗口暴露网络地址,以及修改端口号

一般情况下这里的地址是隐藏的 这里加上 --host 可以暴露网络地址&#xff0c;再加上--port --8080 就可以将端口号修改为8080&#xff08;修改后边的数字就可以修改为你想要的端口号&#xff09;

物联网时代下的5G融合定位,可以实现哪些功能?

5G具有高带宽、高频谱&#xff08;毫米波&#xff09;、多天线阵列等特性&#xff0c;通过提升无线定位算法的能力、室内数字系统建设、完善5G定位服务流程以及与其它定位技术和平台的结合&#xff0c;可提高5G定位精度。室内高精度定位服务为5G定位扩展到更多应用场景和领域构…

JVM:垃圾回收器演进

文章目录 一、演进二、Shenandoah三、ZGC 一、演进 二、Shenandoah Shenandoah是由Red Hat开发的一款低延迟的垃圾收集器&#xff0c;Shenandoah并发执行大部分GC工作&#xff0c;包括并发的整理&#xff0c;堆大小对STW的时间基本没有影响。 三、ZGC ZGC是一种可扩展的低延…

算法——滑动窗口(day8)

30.串联所有单词的子串 30. 串联所有单词的子串 - 力扣&#xff08;LeetCode&#xff09; 必看&#xff01;&#xff01;&#xff01;本题是我们上次写的438.异位词的进阶版&#xff0c;可参考本篇文章&#xff1a;算法——滑动窗口&#xff08;day7&#xff09;-CSDN博客来…

W30-python01-Selenium Web自动化基础--百度搜索案例-chrome浏览器为例

原理图 一、下载webdriver--chrome浏览器 根据本机浏览器的版本号下载对应的webdriver版本 http://chromedriver.storage.googleapis.com/index.html 二、安装selenium库 pip install selenium -i Simple Index 三、第一个Web自动化脚本 selenium实现Web自动化的基本步骤&…

【Qt 】JSON 数据格式详解

文章目录 1. JSON 有什么作用?2. JSON 的特点3. JSON 的两种数据格式3.1 JSON 数组3.2 JSON 对象 4. Qt 中如何使用 JSON 呢&#xff1f;4.1 QJsonObject4.2 QJsonArray4.3 QJsonValue4.4 QJsonDocument 5. 构建 JSON 字符串6. 解析 JSON 字符串 1. JSON 有什么作用? &#x…

1、hadoop环境搭建

1、环境配置 ip(/etc/sysconfig/network-scripts) # 网卡1 DEVICEeht0 TYPEEthernet ONBOOTyes NM_CONTROLLEDyes BOOTPROTOstatic IPADDR192.168.59.11 GATEWAY192.168.59.1 NETMASK 255.255.255.0 # 网卡2 DEVICEeht0 TYPEEthernet ONBOOTyes NM_CONTROLLEDyes BOOTPROTOdh…

算法通关:006_3二分查找:查找数组中<=num 最右边的值

文章目录 说明主要代码全部代码运行结果 说明 大于等于最右不考&#xff0c;意义不大。 直接看&#xff08;arr.length-1&#xff09; 位&#xff08;即数组最后一位&#xff09;&#xff0c;如果大于num&#xff0c;那就说明arr[arr.length-1]是大于等于最右的数字数组最后一…

学习C语言第十四天(指针练习)

1.第一题C 2.第二题C 3.第三题 00345 short类型解引用一次访问两个字节 4.第四题 6&#xff0c;12 5.第五题C 6.第六题 下面代码结果是0x11223300 7.第七题 int main() {int a 0;int n 0;scanf("%d %d",&a,&n);int i 0;int k 0;int sum 0;for (i 0;…

【多线程】定时器

&#x1f970;&#x1f970;&#x1f970;来都来了&#xff0c;不妨点个关注叭&#xff01; &#x1f449;博客主页&#xff1a;欢迎各位大佬!&#x1f448; 文章目录 1. 定时器是什么&#xff1f;2. 定时器的应用场景3. Timer类的使用3.1 Timer类创建定时器3.2 schedule()方法…

工作流 Flowable

工作流包括业务流和审批流等业务流程。 在一个流程系统中&#xff0c;任务间往往存在复杂的依赖关系&#xff0c;为保证pipeline的正确执行&#xff0c;就是要解决各任务间依赖的问题&#xff0c;这样DAG结合拓扑排序是解决存在依赖关系的一类问题的利器。DAG ( Directed Acyc…

池化层pytorch最大池化练习

神经网络构建 class Tudui(nn.Module):def __init__(self):super(Tudui, self).__init__()self.maxpool1 MaxPool2d(kernel_size3, ceil_modeFalse)def forward(self, input):output self.maxpool1(input)return output Tensorboard 处理 writer SummaryWriter("./l…

【React】详解如何获取 DOM 元素

文章目录 一、基础概念1. 什么是DOM&#xff1f;2. 为什么需要获取DOM&#xff1f; 二、使用 ref 获取DOM元素1. 基本概念2. 类组件中的 ref3. 函数组件中的 ref 三、 ref 的进阶用法1. 动态设置 ref2. ref 与函数组件的结合 四、处理特殊情况1. 多个 ref 的处理2. ref 与条件渲…

基于STM32F103的FreeRTOS系列(四)·FreeRTOS资料获取以及简介

目录 1. FreeRTOS简介 1.1 FreeRTOS介绍 1.2 为何选择FreeRTOS 1.3 FreeRTOS资料获取 1.3.1 官网下载 1.3.2 Github下载 1.3.3 托管网站下载 1.4 FreeRTOS的编程风格 1.4.1 数据类型 1.4.2 变量名 1.4.3 函数名 1.4.4 宏 1. FreeRTOS简介 1.1 Free…

11. Hibernate 持久化对象的各种状态

1. 前言 本节课和大家聊聊持久化对象的 3 种状态。通过本节课程&#xff0c;你将了解到&#xff1a; 持久化对象的 3 种状态&#xff1b;什么是对象持久化能力。 2. 持久化对象的状态 程序运行期间的数据都是存储在内存中。内存具有临时性。程序结束、计算机挂机…… 内存中…

Web前端浅谈ArkTS组件开发

本文由JS老狗原创。 有幸参与本厂APP的鸿蒙化改造&#xff0c;学习了ArkTS以及IDE的相关知识&#xff0c;并有机会在ISSUE上与鸿蒙各路大佬交流&#xff0c;获益颇丰。 本篇文章将从一个Web前端的视角出发&#xff0c;浅谈ArkTS组件开发的基础问题&#xff0c;比如属性传递、插…

hamcrest 断言框架使用示例和优势分析

引言 在软件测试领域&#xff0c;断言是验证代码行为是否符合预期的关键环节。Hamcrest 断言框架&#xff0c;以其独特的匹配器&#xff08;Matcher&#xff09;概念和清晰的失败信息&#xff0c;赢得了广泛的赞誉。尽管 Python 标准库中没有内置的 Hamcrest 库&#xff0c;但…

【Linux】-----工具篇(编译器gcc/g++,调试器gdb)

目录 一、gcc/g 简单认识 程序的翻译过程认识gcc 预处理(宏替换) 编译 汇编 链接 宏观认识 如何理解&#xff08;核心&#xff09; 什么是链接&#xff1f; 链接的分类 二、gdb 基本的认识 基本操作及指令 安装gdb 启动gdb ​编辑 显示源代码(list) 运行程序…

SQL labs-SQL注入(三,sqlmap使用)

本文仅作为学习参考使用&#xff0c;本文作者对任何使用本文进行渗透攻击破坏不负任何责任。 引言&#xff1a; 盲注简述&#xff1a;是在没有回显得情况下采用的注入方式&#xff0c;分为布尔盲注和时间盲注。 布尔盲注&#xff1a;布尔仅有两种形式&#xff0c;ture&#…

学习笔记:MySQL数据库操作3

1. 创建数据库和表 创建数据库 mydb11_stu 并使用该数据库。创建 student 表&#xff0c;包含字段&#xff1a;学号&#xff08;主键&#xff0c;唯一&#xff09;&#xff0c;姓名&#xff0c;性别&#xff0c;出生年份&#xff0c;系别&#xff0c;地址。创建 score 表&…