Apache Nifi挂接MQTT与Kafka实践

目录

1.  说明:

2. 方案设计:

2.1 资源配置:

2.2 交互Topics:  

3. 实现步骤 

3.1 Nifi 桌面

3.2 MqttToKafka

3.2.1 配置

3.2.2 测试

3.2.3 结果

3.3 KafkaToMqtt

3.3.1 配置 

3.3.1 测试

3.3.1 结果 ​编辑

4. 总结:

4.1 知识点

Nifi Kafka Processor 配置字典:

Topic通配符:

5. 参考:


1.  说明:

      在一些方案实现过程中,感觉需要一种接驳器来连接不同的数据源并汇流到一处进行统一处理,于是寻到NIFI(官网)这个工具,它相当于“数据水管+接驳器工具箱”,能丝滑联结不同的数据源,总体思路是把各类数据源汇流到Kafka集中处理,比如日志文件,消息传递,数据库操作等。初步使用感觉很不错,分享之。

2. 方案设计:

- 连接Emqx集群(mqtt服务)与Kafka集群,实现数据流动的双工运作 - 客户端(连mqtt) <=> 应用服务(连kafka)

2.1 资源配置:

简单起见,在docker环境中实施,后续迁移到K8s

服务集群服务入口备注
MQTT (tcp|mqtt)://host001.dev.ia:1883

client id:

nifi-xio1-sub1 订阅者

nifi-xio1-pub1 发布者

Kafkahost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Apache Nifihttp://host001.dev.ia:9080/nifi/

Nifi的docker配置

# 建个卷,持久化数据
docker volume create nifi_data

docker-compose.yml

version: "3.7"
services:nifi:image: apache/nifi:1.9.2container_name: nifirestart: alwaysports:- "9080:8080"environment:- NIFI_WEB_HTTP_HOST=0.0.0.0#- NIFI_HOME=/home/nifi#- NIFI_LOG_DIR=/home/nifi/logsvolumes:- nifi_data:/home/nifivolumes:nifi_data:external: true
2.2 交互Topics:  
Topic备注
test.topic.nifi1测试接收
test.topic.bus总线
test.device.*测试通配符topic

test.device.mw3039kkj001

测试带设备id的topic

3. 实现步骤 

3.1 Nifi 桌面

配好后,访问​http://host001.dev.ia:9080/nifi/​, 中间是配好的两个Processor Group,分别是MqttToKafka与KafkaToMqtt,代表双向流动配置。

3.2 MqttToKafka
3.2.1 配置

 加ConsumeMQTT Processor:拉Processor组件下去,点开选ConsumeMQTT

Settings备注
NameConsumeMQTT
Automatically terminate relationships

failure / success 勾选

Properties备注
NameConsumeMQTT
Broker URItcp://host001.dev.ia:1883
Client IDnifi-xio1-sub1
Username/Password--
Topic Filtertest.topic.nifi1
Max Queue Size1000

加PublishKafka_2_0 Processor:拉Processor组件下去,点开选PublishKafka_2_0

Properties备注
NamePublishKafka_2_0
Kafka Brokershost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Security ProtocolPLAINTEXT
Topic Nametest.topic.nifi1
Delivery Guarantee

Guarantee Replicated Delivery

Use Transactionstrue

拖动ConsumeMQTT连接PublishKafka, 会添加一个队列连接组件命名为 Message

正确运行如图:

3.2.2 测试

说明:

  1. 用mqtt客户端工具MqttX向topic=tset.topic.nifi1发送json数据包
  2. 用python脚本作为消费者客户端连接kafka,订阅topic=tset.topic.nifi1,获取该数据包

python脚本:

from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import jsonasync def consume_loop(consumer, topics):try:# 订阅主题consumer.subscribe(topics)while True:# 轮询消息msg = consumer.poll(timeout=1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:# End of partition eventprint("%% %s [%d] reached end at offset %d\n"% (msg.topic(), msg.partition(), msg.offset()))elif msg.error():raise KafkaException(msg.error())else:# 正常消息raw_message = msg.value()print(f"Raw message: {raw_message}")parsed_message = json.loads(raw_message.decode("utf-8"))print(f"Received message: {type(parsed_message)} : {parsed_message}")await asyncio.sleep(0.01)  # 小睡片刻,让出控制权finally:# 关闭消费者consumer.close()async def consume():# 消费者配置conf = {"bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092","group.id": "mygroup1","auto.offset.reset": "earliest",}# 创建消费者consumer = Consumer(conf)await consume_loop(consumer, ["tset.topic.nifi1"])if __name__ == "__main__":asyncio.run(consume())
3.2.3 结果

脚本 Nifi 

3.3 KafkaToMqtt
3.3.1 配置 

加ConsumeKafkaProcessor:拉Processor组件下去,点开选ConsumeMQTT

Settings备注
NameConsumeKafka_2_0
Properties备注
Kafka Brokershost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Topic Name(s)test.topic.bus / test.device.*
Group IDtest

加PublishMQTT Processor:拉Processor组件下去,点开选PublishMQTT

Settings备注
NamePublishMQTT
Automatically terminate relationships

failure / success 勾选

Properties备注
Broker URItcp://host001.dev.ia:1883
Client IDnifi-xio1-pub1
Username/Password--
Topic Filtertest.topic.bus
QoS0

 拖动ConsumeKafka_2_0连接PublishMQTT, 会添加一个队列连接组件命名为 Message

正确运行如图:

3.3.1 测试

说明:

  1. python脚本向Kafka发布消息到 topic = test.topic.bus
  2. MqttX客户端订阅接收

脚本

from confluent_kafka import Producer
import jsondef delivery_report(err, msg):"""Called once for each message produced to indicate delivery result.Triggered by poll() or flush()."""if err is not None:print(f"Message delivery failed: {err}")else:print(f"Message delivered to {msg.topic()} [{msg.partition()}]")def create_async_producer(config):"""Creates an instance of an asynchronous Kafka producer."""return Producer(config)def produce_messages(producer, topic, messages):"""Asynchronously produces messages to a Kafka topic."""for message in messages:# Trigger any available delivery report callbacks from previous produce() callsproducer.poll(0)# Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.producer.produce(topic, json.dumps(message).encode("utf-8"), callback=delivery_report)# Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.producer.flush()if __name__ == "__main__":# Kafka configuration# Replace these with your server's configurationconf = {"bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092",  # Replace with your Kafka server addresses# "client.id": "python-producer",}# Create an asynchronous Kafka producerasync_producer = create_async_producer(conf)# Messages to send to Kafkamessages_to_send = [{"key": "value1a"}, {"key": "value2a"}, {"key": "value3a"}]# Produce messages# produce_messages(async_producer, "test.topic.bus", messages_to_send)produce_messages(async_producer, "test.device.mw3039kkj001", messages_to_send)
3.3.1 结果 

MqttX 

Nifi 

4. 总结:

       Nifi支持集群化部署,如此从数据采集,数据流动到数据存储都实现了分布式,而且有可视化的界面可方便地进行数据节点的集聚与增减配置,目前只是浅尝即止,更深入的研究待后续不断补充优化。

4.1 知识点
Nifi Kafka Processor 配置字典:
Delivery Guarantee

数据传递保证

  1. Best Effort (尽力交付,相当于ack=0)
  2. Guarantee Single Node Delivery(保证单节点交付,相当于ack=1,Kafka中的默认配置):
  3. Guarantee Replicated Delivery(保证复制交付,相当于ack=-1)
Use Transactions

使用事务 

true / false 

Topic通配符:
“/”

主题层级分隔符

如果存在分隔符,它将主题名分割为多个主题层级。

如:room401/tv/contrl/sensor

“#”多层通配符

匹配主题中任意层级的通配符

如果客户端订阅主题 “china/guangzhou/#”, 它会收到使用下列主题名发布的消息

china/guangzhou china/guangzhou/huangpu china/guangzhou/tianhe/zhongshanlu china/guangzhou/tianhe/zhongshanlu/num123

school/#                //也匹配单独的 “school” ,因为 # 包括它的父级。
#                       //是有效的,会收到所有的应用消息。
school/teacher/#        //有效的。
school/teacher#         //无效的。
school/teacher/#/lever  //无效的,必须是主题过滤器的最后一个字符
https://blog.51cto.com/u_16099203/10959511

“+”单层通配符

单个主题层级匹配的通配符。在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。

china/+ 只能匹配 china/guangzhou

china/+/+/zhongshanlu 能匹配china/guangzhou/tianhe/zhongshanlu和china/shenzhen/nanshan/zhongshanlu

“$”匹配一个字符$xx
/$xx
/xx$

5. 参考:

- https://zhuanlan.zhihu.com/p/628628189

- https://zhuanlan.zhihu.com/p/697301397

- https://blog.51cto.com/u_16213319/7344183

- Apache NiFi Docker Compose | All About

- https://blog.51cto.com/u_16099203/10959511

- 大数据NiFi(二十一):监控日志文件生产到Kafka-腾讯云开发者社区-腾讯云

- PublishMQTT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/50902.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mysql的RedoLog、BingLog、UndoLog

UndoLog undo log 是一种用于撤销回退的日志。在事务没提交之前&#xff0c;MySQL 会先记录更新前的数据到 undo log 日志文件里面&#xff0c;当事务回滚时&#xff0c;可以利用 undo log 来进行回滚。 每当 InnoDB 引擎对一条记录进行操作&#xff08;修改、删除、新增&#…

vue3 命令运行窗口暴露网络地址,以及修改端口号

一般情况下这里的地址是隐藏的 这里加上 --host 可以暴露网络地址&#xff0c;再加上--port --8080 就可以将端口号修改为8080&#xff08;修改后边的数字就可以修改为你想要的端口号&#xff09;

物联网时代下的5G融合定位,可以实现哪些功能?

5G具有高带宽、高频谱&#xff08;毫米波&#xff09;、多天线阵列等特性&#xff0c;通过提升无线定位算法的能力、室内数字系统建设、完善5G定位服务流程以及与其它定位技术和平台的结合&#xff0c;可提高5G定位精度。室内高精度定位服务为5G定位扩展到更多应用场景和领域构…

JVM:垃圾回收器演进

文章目录 一、演进二、Shenandoah三、ZGC 一、演进 二、Shenandoah Shenandoah是由Red Hat开发的一款低延迟的垃圾收集器&#xff0c;Shenandoah并发执行大部分GC工作&#xff0c;包括并发的整理&#xff0c;堆大小对STW的时间基本没有影响。 三、ZGC ZGC是一种可扩展的低延…

算法——滑动窗口(day8)

30.串联所有单词的子串 30. 串联所有单词的子串 - 力扣&#xff08;LeetCode&#xff09; 必看&#xff01;&#xff01;&#xff01;本题是我们上次写的438.异位词的进阶版&#xff0c;可参考本篇文章&#xff1a;算法——滑动窗口&#xff08;day7&#xff09;-CSDN博客来…

【SOLUTION】Spring Boot 集成 WebSocket

1. Maven 依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dependency>2. 注册ServerEndpointExporter、WebSocketConfigProperties WebSocketConfigProperties…

W30-python01-Selenium Web自动化基础--百度搜索案例-chrome浏览器为例

原理图 一、下载webdriver--chrome浏览器 根据本机浏览器的版本号下载对应的webdriver版本 http://chromedriver.storage.googleapis.com/index.html 二、安装selenium库 pip install selenium -i Simple Index 三、第一个Web自动化脚本 selenium实现Web自动化的基本步骤&…

【Qt 】JSON 数据格式详解

文章目录 1. JSON 有什么作用?2. JSON 的特点3. JSON 的两种数据格式3.1 JSON 数组3.2 JSON 对象 4. Qt 中如何使用 JSON 呢&#xff1f;4.1 QJsonObject4.2 QJsonArray4.3 QJsonValue4.4 QJsonDocument 5. 构建 JSON 字符串6. 解析 JSON 字符串 1. JSON 有什么作用? &#x…

1、hadoop环境搭建

1、环境配置 ip(/etc/sysconfig/network-scripts) # 网卡1 DEVICEeht0 TYPEEthernet ONBOOTyes NM_CONTROLLEDyes BOOTPROTOstatic IPADDR192.168.59.11 GATEWAY192.168.59.1 NETMASK 255.255.255.0 # 网卡2 DEVICEeht0 TYPEEthernet ONBOOTyes NM_CONTROLLEDyes BOOTPROTOdh…

算法通关:006_3二分查找:查找数组中<=num 最右边的值

文章目录 说明主要代码全部代码运行结果 说明 大于等于最右不考&#xff0c;意义不大。 直接看&#xff08;arr.length-1&#xff09; 位&#xff08;即数组最后一位&#xff09;&#xff0c;如果大于num&#xff0c;那就说明arr[arr.length-1]是大于等于最右的数字数组最后一…

Redux +Toolkit 工具包快速入门

您将学到什么 如何设置并使用 Redux Toolkit 和 React-Redux 先决条件 熟悉ES6 语法和功能了解 React 术语&#xff1a;JSX、State、Function Components 、 Props和Hooks理解Redux 术语和概念 1、基本使用 1.1、安装 Redux Toolkit 和 React- Redux 将 Redux Toolkit 和 Rea…

学习C语言第十四天(指针练习)

1.第一题C 2.第二题C 3.第三题 00345 short类型解引用一次访问两个字节 4.第四题 6&#xff0c;12 5.第五题C 6.第六题 下面代码结果是0x11223300 7.第七题 int main() {int a 0;int n 0;scanf("%d %d",&a,&n);int i 0;int k 0;int sum 0;for (i 0;…

sklearn聚类算法用于图片压缩与图片颜色直方图分类

上期文章:机器学习之SKlearn(scikit-learn)的K-means聚类算法 我们分享了sklearn的基本知识与基本的聚类算法,这里主要是机器学习的算法思想,前期文章我们也分享过人工智能的深度学习,二者有如何区别,可以先参考如下几个实例来看看机器学习是如何操作的 不同K值下的聚…

算法解决海量数据的 topK

目录 设计算法解决海量数据的 topK 问题如何统计不同电话号码的个数&#xff1f;题目描述解答思路与实现步骤注意点 如何在大量的数据中判断一个数是否存在&#xff1f;题目描述解决方案具体步骤优化与注意事项 如何从大量的 URL 中找出相同的 URL&#xff1f;题目描述解答思路…

系统架构设计师教程 第4章 信息安全技术基础知识-4.5 密钥管理技术4.6 访问控制及数字签名技术-解读

系统架构设计师教程 第4章 信息安全技术基础知识-4.5 密钥管理技术&4.6 访问控制及数字签名技术 4.5 密钥管理技术4.5.1 对称密钥的分配与管理4.5.1.1 密钥的使用控制4.5.1.1.1 密钥标签4.5.1.1.2 控制矢量4.5.1.2 密钥的分配4.5.1.2.1物理方式14.5.1.2.2 物理方式24.5.1…

【多线程】定时器

&#x1f970;&#x1f970;&#x1f970;来都来了&#xff0c;不妨点个关注叭&#xff01; &#x1f449;博客主页&#xff1a;欢迎各位大佬!&#x1f448; 文章目录 1. 定时器是什么&#xff1f;2. 定时器的应用场景3. Timer类的使用3.1 Timer类创建定时器3.2 schedule()方法…

C语言——结构体(struct)对齐

目录 前言 一、结构体对齐规则 1、结构体的总大小对齐规则 2、结构体成员的对齐规则 3、数组和结构体的对齐规则 二、改变编译器对齐数&#xff08;#pragma pack&#xff09; 三、如何减小结构体占用内存 1、 重新排列成员顺序 2、使用#pragma pack指令 3、使用位域 4、其他 总…

使用sheetjs导出CSV文本为excel

使用SheetJS&#xff08;也称为xlsx库&#xff09;导出CSV文本为Excel文件&#xff0c;你可以先将CSV文本解析为SheetJS支持的工作表格式&#xff0c;然后再将其写入为一个新的Excel文件。以下是一个简单的示例代码&#xff1a; const XLSX require(xlsx); const fs requir…

.net core 8.0 新建的项目无法使用 IApplicationBuilder

1、在项目文件中添加 <ItemGroup><FrameworkReference Include"Microsoft.AspNetCore.App" /> </ItemGroup> 2、在使用的地方添加 using Microsoft.AspNetCore.Builder;

工作流 Flowable

工作流包括业务流和审批流等业务流程。 在一个流程系统中&#xff0c;任务间往往存在复杂的依赖关系&#xff0c;为保证pipeline的正确执行&#xff0c;就是要解决各任务间依赖的问题&#xff0c;这样DAG结合拓扑排序是解决存在依赖关系的一类问题的利器。DAG ( Directed Acyc…