NiFi-从部署到开发(图文详解)

NiFi简介

Apache NiFi 是一款强大的开源数据集成工具,旨在简化数据流的管理、传输和自动化。它提供了直观的用户界面和可视化工具,使用户能够轻松设计、控制和监控复杂的数据流程,NiFi 具备强大的扩展性和可靠性,可用于处理海量数据,并且能很好地应对复杂的数据转换需求,还可以设置定时调度任务

特点

  • 可视化操作:提供了图形化界面,用户可以通过拖放组件来构建数据处理流程。
  • 数据处理功能强大:能够实现数据的获取、转换、分发等操作。例如,可以从各种数据源获取数据,对数据进行格式转换、内容过滤等处理,再将数据发送到目标系统。
  • 可靠性高:在数据传输和处理过程中具有良好的容错机制,保障数据的完整性和准确性。
  • 扩展性好:可以轻松地扩展来处理大规模的数据流量和复杂的数据处理任务

NiFi的下载和安装

在本篇文章中主要讲述了四种部署方式,一种是单机部署比较简单,第二种是使用nifi自带的zookeeper部署伪分布模式,如果你的资源不足的话可以选用该模式,第三种是使用nifi自带的zookeeper部署集群模式,当然这种使用内部自带zookeeper的情况在实际开发中不常用,第三种就是比较常用的使用外部zookeeper的nifi集群模式的部署

首先我们先来看一下单机部署

单机部署

以下部署要在java环境下,若没有请事先安装jdk

下载安装包

进入Apache NiFi 官方网站来下载你所需要的版本

也可以直接点击nifi官网所有版本来下载

这里我下载的是1.13.2版本

下载完成后上传至Linux进行解压处理

解压:tar -zxvf  nifi-1.13.2-bin.tar.gz -C /opt/installs/

修改配置文件

cp nifi-1.13.2/conf/nifi.properties nifi-1.13.2/conf/nifi.properties.bak

修改ip和端口号

启动nifi

nifi-1.13.2/bin/nifi.sh

下面可以看到有启动/停止/运行/重启/状态等服务

启动nifi:nifi-1.13.2/bin/nifi.sh start

进入nifi的可视化界面

http://192.168.233.128:5800/nifi/

日志的位置

Nifi 集群部署-伪分布模式

由于nifi内置zookeeper,故我们先使用内置zookeeper进行搭建,可以在一台服务器上搭建伪分布模式,也可以在三台服务器上搭建集群模式,因为伪分布模式在一台机器上搭建,所以不同节点的相同功能端口会不同,如果搭建集群模式,IP不同,那么不同节点的相同功能端口可以相同,此处搭建集群模式

1、修改linux的⼀个安全机制

(1)进入vi /etc/selinux/config,添加SELINUX=disabled,防止后续出现一些问题

三台都需要修改,改完重启一下

(2)因为配置完单机模式后,nifi会产生许多新的数据库来存储数据,为了防止对伪分布模式有影响,这里先删除单机模式的nifi,重新解压一份

解压:tar -zxvf  nifi-1.13.2-bin.tar.gz -C /opt/installs/重命名:mv nifi-1.13.2/ nifi

rm -rf /opt/installs/nifi-1.13.2/

2、准备三个单机NIFI实例

nifi-1 nifi-2 nifi-3

3、修改配置文件

cp nifi/conf/nifi.properties nifi/conf/nifi.properties.bak

(1)修改三台服务器nifi中的zookeeper.properties

# 1节点2181,2节点2182,1节点2183clientPort=12181                                                                                                                    # 不同机器使用不同IPserver.1=bigdata01:12888:13888server.2=bigdata01:14888:15888server.3=bigdata01:16888:17888

(2)新建nifi-1/state/zookeeper,nifi-2/state/zookeeper'nifi-3/state/zookeeper在此文件夹中新建文件myid,分别对应写入1、2、3

(3)编辑节点conf/nifi.properties文件

1 ####################2 # State Management #                                                                                                 3 ####################4 nifi.state.management.configuration.file=./conf/state-management.xml                                             5 nifi.state.management.provider.local=local-provider  6 nifi.state.management.provider.cluster=zk-provider7 #  指定此NiFi实例是否应运行嵌入式ZooKeeper服务器,默认是false                          8 nifi.state.management.embedded.zookeeper.start=true                                                                9 nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties 
10 
11 # web properties #                                                 
12 nifi.web.war.directory=./lib    
13 # HTTP主机。默认为空白                                               
14 nifi.web.http.host=bigdata01
15 # HTTP端口。默认值为8080
16 nifi.web.http.port=18001
17 
18 # cluster node properties (only configure for cluster nodes) #   
19 # 如果实例是群集中的节点,请将此设置为true。默认值为false
20 nifi.cluster.is.node=true 
21 # 节点的完全限定地址。默认为空白
22 nifi.cluster.node.address=bigdata01
23 # 节点的协议端口。默认为空白
24 nifi.cluster.node.protocol.port=28001
25 
26 # 指定在选择Flow作为“正确”流之前等待的时间量。如果已投票的节点数等于nifi.cluster.flow.election.max.candidates属性指定的数量,则群集将不会等待这么长时间。默认值为5 mins
27 nifi.cluster.flow.election.max.wait.time=1 mins 
28 # 指定群集中所需的节点数,以便提前选择流。这允许群集中的节点避免在开始处理之前等待很长时间,如果我们至少达到群集中的此数量的节点
29 nifi.cluster.flow.election.max.candidates=1
30 
31 # cluster load balancing properties #  
32 nifi.cluster.load.balance.host=
33 nifi.cluster.load.balance.port=6342
34 
35 # zookeeper properties, used for cluster management # 
36 # 连接到Apache ZooKeeper所需的连接字符串。这是一个以逗号分隔的hostname:port对列表
37 nifi.zookeeper.connect.string=bigdata01:12181,bigdata01:12182,bigdata01:12183
38 nifi.zookeeper.connect.timeout=3 secs                                                      
39 nifi.zookeeper.session.timeout=3 secs                                                   
40 nifi.zookeeper.root.node=/nifi
bigdata01
143行:nifi.web.http.host=bigdata01
144行:nifi.web.http.port=18001
241行:nifi.cluster.is.node=true
242行:nifi.cluster.node.address=bigdata01
243行:nifi.cluster.node.protocol.port=28001
256行:nifi.cluster.load.balance.port=6342
262行:nifi.zookeeper.connect.string=bigdata01:12181,bigdata01:12182,bigdata01:12183

节点2,节点3内容跟节点1相同,只是nifi.web.http.port,nifi.cluster.node.protocol.port,nifi.cluster.load.balance.port,这三个端口区分开来,避免端口重复

(4)修改conf/state-management.xml文件

61行:<property name="Connect String">bigdata01:12181,bigdata01:12182,bigdata01:12183</property>

4、启动三个实例,启动完成后进入可视化界面

启动:nifi-1/bin/nifi.sh start
启动:nifi-2/bin/nifi.sh start
启动:nifi-3/bin/nifi.sh start
可视化界面:bigdata01:18001

Nifi 集群部署-内置Zookeeper

因为配置完单机模式后,nifi会产生许多新的数据库来存储数据,为了防止对集群模式有影响,这里先删除单机模式的nifi,重新解压一份,重命名为nifi

1、分发nifi至三台服务器

xsync nifi/

2、修改配置文件

(1)修改state-management.xml

<cluster-provider><id>zk-provider</id><class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class><property name="Connect String">bigdata01:2181,bigdata02:2181,bigdata03:2181</property><property name="Root Node">/nifi</property><property name="Session Timeout">10 seconds</property><property name="Access Control">Open</property></cluster-provider>

进行分发

xsync /opt/installs/nifi/conf/state-management.xml

(2)修改nifi.properties

bigdata01
51行:nifi.state.management.embedded.zookeeper.start=true(默认为false,如果使用外部的zookeeper集群为false)
143行:nifi.web.http.host=bigdata01
144行:nifi.web.http.port=18001
241行:nifi.cluster.is.node=true
143行:nifi.cluster.node.address=bigdata01
243行:nifi.cluster.node.protocol.port=28001
262行:nifi.zookeeper.connect.string=bigdata01:2181,bigdata02:2181,bigdata03:2181

进行分发并修改主机名

xsync /opt/installs/nifi/conf/nifi.properties
修改143行、143行主机名为相应的另外两台服务器的名字即可

(3)修改zookeeper.properties

clientPort=2181                                                                                                                    
server.1=bigdata01:2888:3888;2181
server.2=bigdata02:2888:3888;2181
server.3=bigdata03:2888:3888;2181

进行分发

xsync /opt/installs/nifi/conf/nifi.properties

(4)新建nifi/state/zookeeper,在此文件夹中新建文件myid,且在三台虚拟机中依次写入1、2、3

3、启动nifi并产看web界面

cd /opt/installs/nifibin/nifi.sh start #三台都要启动bigdata01:18001

Nifi 集群部署-外部Zookeeper

修改使用内置zookeeper状态下的nifi.properties中的第51行即可

web界面简介以及简单读取文件案例

选择自己想使用的处理器

填写文件夹路径(三台服务器都要有)

打勾了说明如果成功了就会停下来

把上述success打的勾取消,再拖拽入一个output类型的处理器进行连接,点击运行,发现getFile运行中,nifi的强大之处,只要有一个处理器能运行就会执行,此时我们点击运行,并往上述文件夹中发送数据,数据会先存储到管道中,数据会管道把数据存到本地磁盘,如果后续处理器发生错误也不会影响前面处理器的运行

点击小眼睛或者感叹号都可以查看内容

接下来配置输出路径,会发现数据传到了输出路径中,管道变成了0字节

从本地文件读取数据到MySQL

1、添加处理组file_to_mysql

开启hdfs服务:start-dfs.sh

2、填写hadoop中core-site-xml和hdfs-site.xml的路径

/opt/installs/hadoop/etc/hadoop/core-site.xml,/opt/installs/hadoop/etc/hadoop/hdfs-site.xml

其他参数可以先不配置

因为默认会加载你hadoop的配置文件

点击运行,往输入路径中写入文件

cp wcc.txt /home/data

离线同步MySQL数据到hdfs

类似于datax的功能

新建一个处理组mysql_to_hdfs

添加mysql端

添加驱动以及驱动地址

jdbc:mysql://bigdata01:3306/mydb01
com.mysql.jdbc.Driver
/opt/installs/hive/lib/mysql-connector-java-8.0.26.jar

这是因为没有填写账号密码

再次启动即可

这时发现文件传输速度特别快,可以修改调度时间,因为我们的离线数仓是一天调度一次,可以改成86400秒

查看hdfs的数据发现是乱码的

这时我们可以添加一个中间处理器将avro格式转换为json格式

添加之后再次启动即可

可以看到我们每次运行都把数据放入了同一个文件夹,这样是不行的,我们之前用datax导数据时放入了不同的文件夹

修改动态目录

添加dt后仍然无法识别,需要再添加一个处理器UpdateAttribute

给前三个处理器执行一下,使数据在第三个管道内

再运行第四个处理器,在hdfs上查看

但是还是要手动去写,能不能调用一些函数呢

双击可查看获取当前时间的方法

这里我们选用${now():format('yyyy')}并改成年月日的形式${now():format('yyyy-MM-dd')}

这样每天都会创建新的文件夹,但是打开文件夹后发现里面的文件名是用UUID命名的,虽然使文件名不重复但是不便于观看

以时间戳和后缀作为文件名,这样可以控制文件的滚动

再次运行可以看到文件名得到了修改

还可以修改成每个小时生成一个新文件

实时监控kafka数据到hdfs

启动kafka集群,添加kafka消费者,选择对应的版本

修改配置

复制一份puthdfs,修改输出路径

和上面一样通过添加日期函数使一个小时生成一个文件夹,这样可以解决小文件问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/63145.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

draggable插件——实现元素的拖动排序——拖动和不可拖动的两种情况处理

最近在写后台管理系统的时候&#xff0c;遇到一个需求&#xff0c;就是关于拖动排序的功能。 我之前是写过一个关于拖动表格的功能&#xff0c;此功能可以实现表格中的每一行数据上下拖动实现排序的效果。 vue——实现表格的拖拽排序功能——技能提升 但是目前我这边的需求是…

Delphi Web前端开发教程(9):基于TMS WEB Core框架

3、REST Servers服务端(后端)框架 REST服务端特点&#xff1a; – 为远程资源提供一个REST API接口。也可以为其他网络内容提供服务&#xff1b; – 包括在Delphi Enterprise & Architect企业版和架构师版中的RAD服务器、DataSnap、WebBroker&#xff1b; – 开源框架&a…

MySQL 函数创建中的 Err 1418:原因解析与解决指南20241203

&#x1f6a8; MySQL 函数创建中的 Err 1418&#xff1a;原因解析与解决指南 &#x1f4d6; 引言 在使用 MySQL 创建函数时&#xff0c;许多开发者会偶然遇到如下报错&#xff1a; [Err] 1418 - This function has none of DETERMINISTIC, NO SQL, or READS SQL DATA in its…

前端首屏加载优化

1.首屏加载慢的原因 网络延迟资源太大服务器响应慢 1.网络延迟 首屏优化中网络延迟是一个重要的考虑因素&#xff0c;它直接影响到页面资源的加载速度和用户体验。 影响原因 后端服务器性能原因&#xff0c;导致响应速度慢&#xff0c;从而影响了首屏加载速度。网络传输速度…

利用空闲主机进行Nmap隐匿扫描:IP伪造与空闲扫描技术

IP伪造与空闲扫描技术 在网络安全领域&#xff0c;扫描和识别目标主机的开放端口是攻击者获取目标信息的重要手段。传统的扫描方法可能会暴露扫描者的真实IP地址&#xff0c;从而引起目标主机的警觉。然而&#xff0c;IP地址伪造是一种巧妙的方式&#xff0c;可以帮助攻击者在…

学习视频超分辨率扩散模型中的空间适应和时间相干性(原文翻译)

文章目录 摘要1. Introduction2. Related Work3. Our Approach3.1. Video Upscaler3.2. Spatial Feature Adaptation Module3.3. Temporal Feature Alignment Module3.4. Video Refiner3.5. Training Strategy 4. Experiments4.1. Experimental Settings4.2. Comparisons with …

JavaScript 键盘控制移动

如果你想通过 JavaScript 实现键盘控制对象&#xff08;比如一个方块&#xff09;的移动&#xff0c;下面是一个简单的示例&#xff0c;展示如何监听键盘事件并根据按下的键来移动一个元素。 HTML 和 CSS&#xff1a; <!DOCTYPE html> <html lang"en">…

SpringMVC其他扩展

一、全局异常处理机制: 1.异常处理两种方式: 开发过程中是不可避免地会出现各种异常情况的&#xff0c;例如网络连接异常、数据格式异常、空指针异常等等。异常的出现可能导致程序的运行出现问题&#xff0c;甚至直接导致程序崩溃。因此&#xff0c;在开发过程中&#xff0c;…

AWS S3 权限配置与文件上传下载指南

本文介绍如何配置 AWS S3 存储桶的访问权限,实现 EC2 实例上传文件和本地用户下载文件的功能。 权限配置 © ivwdcwso (ID: u012172506) 1. EC2 角色上传权限 创建 IAM 角色并附加以下策略,允许 EC2 实例上传文件到 S3: {"Version": "2012-10-17&qu…

Flink随笔 20241203 Flink重点内容

Flink 是一个强大的流处理框架&#xff0c;它的设计理念是高吞吐量、低延迟的流式计算。你提到的这些重点是 Flink 的核心组成部分&#xff0c;下面我将详细解析每一个方面。 1. 窗口&#xff08;Window&#xff09; 窗口是 Flink 流处理中一个非常重要的概念&#xff0c;主要…

Linux-异步IO和存储映射IO

异步IO 在 I/O 多路复用中&#xff0c;进程通过系统调用 select()或 poll()来主动查询文件描述符上是否可以执行 I/O 操作。而在异步 I/O 中&#xff0c;当文件描述符上可以执行 I/O 操作时&#xff0c;进程可以请求内核为自己发送一个信号。之后进程就可以执行任何其它的任务…

docker更换容器存储位置

一&#xff1a;原因 今天之前在某个服务器上使用docker搭建的服务突然无法访问了&#xff0c;进入服务器查看发现服务运行正常&#xff0c;但是就是无法使用&#xff0c;然后我这边准备将docker服务重新启动下看看&#xff0c;发现docker服务无法重启&#xff0c;提示内存已满…

工业—使用Flink处理Kafka中的数据_ProduceRecord2

使用 Flink 消费 Kafka 中 ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入HBase 中的 gyflinkresult:Produce5minAgg 表, rowkey“

什么是TCP的三次握手

TCP&#xff08;传输控制协议&#xff09;的三次握手是一个用于在两个网络通信的计算机之间建立连接的过程。这个过程确保了双方都有能力接收和发送数据&#xff0c;并且初始化双方的序列号。以下是三次握手的详细步骤&#xff1a; 第一次握手&#xff08;SYN&#xff09;&…

外卖开发(二)开发笔记——DTO、自定义全局异常处理、ThreadLocal、日期格式化

外卖开发&#xff08;二&#xff09;开发笔记 一、DTO二、自定义全局异常处理三、ThreadLocal存入、提取当前登陆用户的id四、日期格式化1、实体类属性上加入注解JsonFormat2、在WebMvcConfiguration中扩展SpringMVC的消息转换器 一、DTO 数据传输对象&#xff08;DTO&#xf…

Java 中tableaw 实战教程

java中tableaw库通过简单的API实现过滤、连接、绘制和操作表格数据。支持CSV&#xff0c;数据库&#xff0c;Excel等数据源。 安装依赖 tableaw是用于分析表格数据的开源Java库&#xff0c;构建在Java 8流之上。它可以从GitHub下载&#xff0c;也可以作为Maven或Gradle项目的…

jvm-46-jvm Thread Dump 线程的信息获取+可视分析化工具 FastThread

拓展阅读 JVM FULL GC 生产问题 I-多线程通用实现 JVM FULL GC 生产问题 II-如何定位内存泄露&#xff1f; 线程通用实现 JVM FULL GC 生产问题 III-多线程执行队列的封装实现&#xff0c;进一步抽象 jvm-44-jvm 内存性能分析工具 Eclipse Memory Analyzer Tool (MAT) / 内…

手机上怎么拍证件照,操作简单且尺寸颜色标准的方法

在数字化时代&#xff0c;手机已成为我们日常生活中不可或缺的一部分。它不仅是通讯工具&#xff0c;更是我们拍摄证件照的便捷利器。然而&#xff0c;目前证件照制作工具鱼龙混杂&#xff0c;很多打着免费名号的拍照软件背后却存在着泄漏用户信息、照片制作不规范导致无法使用…

PHP使用RabbitMQ(正常连接与开启SSL验证后的连接)

代码中包含了PHP在一般情况下使用方法和RabbitMQ开启了SSL验证后的使用方法&#xff08;我这边消费队列是使用接口请求的方式&#xff0c;每次只从中取出一条&#xff09; 安装amqp扩展 PHP使用RabbitMQ前&#xff0c;需要安装amqp扩展&#xff0c;之前文章中介绍了Windows环…

【Go 基础】channel

Go 基础 channel 什么是channel&#xff0c;为什么它可以做到线程安全 Go 的设计思想就是&#xff1a;不要通过共享内存来通信&#xff0c;而是通过通信来共享内存。 前者就是传统的加锁&#xff0c;后者就是 channel。也即&#xff0c;channel 的主要目的就是在多任务间传递…