【大数据】Apache NiFi 数据同步流程实践

Apache NiFi 数据同步流程实践

  • 1.环境
  • 2.Apache NIFI 部署
    • 2.1 获取安装包
    • 2.2 部署 Apache NIFI
  • 3.NIFI 在手,跟我走!
    • 3.1 准备表结构和数据
    • 3.2 新建一个 Process Group
    • 3.3 新建一个 GenerateTableFetch 组件
    • 3.4 配置 GenerateTableFetch 组件
    • 3.5 配置 DBCPConnectionPool 服务
    • 3.6 配置 GenerateTableFetch 组件 RelationShip
    • 3.7 配置 ExecuteSQLRecord 组件
    • 3.8 配置 PutDatabaseRecord 组件
    • 3.9 运行整个数据处理流程
    • 3.10 查看运行结果
    • 3.11 持续运行
    • 3.12 GenerateTableFetch 监听增量字段
  • 4.总结

初衷:对于一些新接触 Apache NIFI 的小伙伴来说,他们急于想体验 NIFI,恨不得直接找到一篇文章,照着做就直接能够解决目前遇到的需求或者问题,回想当初的我,也是这个心态。其实这样的心态是不对的。好多加入 NIFI 学习群的新手同学都会有这个问题,一些基本的概念和知识点都没有掌握,然后提出了一堆很初级的问题,对于这些问题,我们可能已经回答了几十上百次,厌倦了,所以大家一般会说 “你先去看文档吧!”。其实,对于一个新手,直接看文档,也是一脸懵。所以在这里,我带领新手的你,新建一个同步的流程,并尽可能在新建流程的同时,穿插一些基本概念。跟随本文一起操作或者只是看看,最后你可能就找到了入门的感觉了。

1.环境

  • Apache NiFi1.11.4 单节点
  • LinuxCentOS Linux release 7.5.1804,16G 内存,48G 存储,虚拟机
  • DBMySQL

2.Apache NIFI 部署

2.1 获取安装包

  • 我是直接编译的 1.11.4 的源码获取的安装包,将 zip 包上传到 Linux,解压即可。

  • 通过官方网站下载对应的 NIFI 安装包:https://nifi.apache.org/download.html

  • 通过国内镜像来下载最新发布的 NIFI 安装包:

    • 北理镜像(http://mirror.bit.edu.cn/apache/nifi/)
    • 清华镜像(https://mirrors.tuna.tsinghua.edu.cn/apache/nifi/)

如果不是编译源码,建议使用国内镜像去下载,一般直接下载 zip 包就可以了,加入最新版本时 1.11.4,那就下载 nifi-1.11.4-bin.zip

2.2 部署 Apache NIFI

  • 上传 Apache NIFI 包到 Linux 上,解压安装包;或者将你的本地作为服务器,直接解压 zip 包。

  • 在解压的目录下,找到 conf 目录,编辑 bootstrap.conf 文件,修改 NIFI 的内存配置,默认的值比较小,比如这里我改成启动 2g,最大 10g

java.arg.2=-Xms2g
java.arg.3=-Xmx10g
  • 在解压的目录下,找到 bin 目录,可以看到里面有一些脚本。
    • Linux 或者 Mac,使用 nifi.sh start 启动 NIFI,nifi.sh stop 停止 NIFI,nifi.sh restart 重启 NIFI。
    • Windows 下,直接双击 run-nifi.bat 即可,退出的时候关闭运行窗口就可以了。
dump-nifi.bat  
nifi-env.bat  
nifi-env.sh  
nifi.sh         
run-nifi.bat  
status-nifi.bat
  • 在解压的目录下,找到 log 目录,当看到类似于下面的内容时,NIFI 就启动成功了
2020-05-27 14:26:02,844 INFO [main] o.eclipse.jetty.server.AbstractConnector Started ServerConnector@58516c91{HTTP/1.1,[http/1.1]}{0.0.0.0:8080}
2020-05-27 14:26:02,845 INFO [main] org.eclipse.jetty.server.Server Started @151563ms
2020-05-27 14:26:02,900 INFO [main] org.apache.nifi.nar.NarAutoLoader Starting NAR Auto-Loader for directory ./extensions ...
2020-05-27 14:26:02,901 INFO [main] org.apache.nifi.nar.NarAutoLoader NAR Auto-Loader started
2020-05-27 14:26:02,902 INFO [main] org.apache.nifi.web.server.JettyServer NiFi has started. The UI is available at the following URLs:
2020-05-27 14:26:02,903 INFO [main] org.apache.nifi.web.server.JettyServer http://******:8080/nifi
2020-05-27 14:26:02,903 INFO [main] org.apache.nifi.web.server.JettyServer http://127.0.0.1:8080/nifi
2020-05-27 14:26:02,906 INFO [main] org.apache.nifi.BootstrapListener Successfully initiated communication with Bootstrap
2020-05-27 14:26:02,907 INFO [main] org.apache.nifi.NiFi Controller initialization took 65344195582 nanoseconds (65 seconds).
2020-05-27 14:27:58,182 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@2407f1a8 checkpointed with 0 Records and 0 Swap Files in 27 milliseconds (Stop-the-world time = 4 milliseconds, Clear Edit Logs time = 4 millis), max Transaction ID -1

3.NIFI 在手,跟我走!

在浏览器输入 http://127.0.0.1:8080/nifi,进入 Apache NIFI 的交互界面。我们看到背景是网格式的,通常我习惯把它叫做设计页面,在这个设计页面上,我们可以通过拖拉拽的形式设计 DataFlow。

3.1 准备表结构和数据

我在 MySQL 里新建了两张表,一个叫 source 来源表,一个叫 target 目标表。两张表的结构是一样的,其中 increase 设计成自动递增的,这样 increase 是一个增量字段。

在这里插入图片描述
增量字段顾名思义,数据库表里每次新来的数据的这个增量字段的值,都比上一次的大,严格意义上增量字段是递增且不重复的(区别于将时间戳字段作为增量字段,通常业务里的时间戳字段都不是严格意义上的增量字段)。

现在 source 表里还没有数据,这里我随意在 NIFI 里拉了两个组件往 source 表里写数据,你不用关心这里的处理,我只是在准备来源表的数据,你可以使用任何方式向 source 表里写数据。

在这里插入图片描述
最终我一共向 source 表里写了 253001 253001 253001 条数据。

在这里插入图片描述

3.2 新建一个 Process Group

在 NIFI 交互界面的顶层,有一排的可拖拽按钮,按照如下图示,拖拽一个 Process Group 出来。

在这里插入图片描述

我们给这个 Process Group 起一个名字叫 体验流程,在这里你可以简单的理解 Process Group 是一个组,在这个组里面可以设计你的流程,甚至设计其他的组。

双击体验流程这个 Process Group,我们会进入一个新的空白的设计页面。

3.3 新建一个 GenerateTableFetch 组件

进入到体验流程这个 Process Group 后,如下图所示,我们左键点顶层第一个按钮,按住左键向空白页面拖拽,会弹出一个 Add Processor 的对话框。

在这里插入图片描述
在这个对话框里,我们找到 GenerateTableFetch 这个组件。

简单说一下 GenerateTableFetch 这个组件,它的作用就是根据指定的表和表字段(通常是一个增量字段),生成一批 SQL 语句,这些 SQL 是分页的(或者说分片的),这样一张有很多数据的一张表,我们就可以通过多个 SQL 分批的查询出来,这样会更高效。如果直接去全表扫描一张大表,有可能会等待很长时间,有可能会因为数据太多发生一些异常,这都不是我们想看到的。

双击 GenerateTableFetch 这个组件,这个组件就会出现在我们的设计页面上了。双击这个 GenerateTableFetch 组件或者鼠标右键选择 Configure,会弹出 Configure Processor 对话框。

在这里插入图片描述
Configure Processor 对话框有四个页签,这里我简单说明一下,不必纠结没有提及的那些配置究竟是什么意思。其他没有提及的配置你就当成是高级应用,不明白这些配置并不影响本文的阅读。

  • SETTINGS:可以配置 Terminate 哪些 RelationShip,Terminate 你可以简单理解为我们忽略、不在意这个 RelationShip。

RelationShip:每一个 Processor 处理完数据得到的结果,一般会将他们传输出去,RelationShip 就是他们传输的方向。具体哪部分数据传输到哪个方向,由具体的程序代码决定。每个组件的 RelationShip 可能都不一样,具体的含义需要查看各个组件的详细说明文档,你可以右键组件,选择 View usage 来查看这个组件具体的用法。这里我们要注意一点就是,每个组件的所有 RelationShip 都应该有所指向(下面会提到将一个组件连接到另一个组件,组件中间会有一个 Connection 的东西,这个 Connection 会包含一个或多个 RelationShip,那么这样的 RelationShip 就是有所指向的。另外 Terminate 也算是有所指向)。

  • SCHEDUING:配置调度的地方,可用的调度的策略(Scheduling Strategy)有两种,一个是 Timer driven,配置每多久执行一次调度。另一个是 CRON driven,可以配置比如每天在几点几分执行一次调度。具体的调度时间是在 Run Schedule 里配置的。Concurrent Tasks 是说一次调度,这个组件最多可以同时启动多少个任务来处理数据。Execution 是针对集群的,你可以先不用理解,它是设置组件只在主节点运行还是在所有节点运行。

  • PROPERTIES:这个是每个组件的核心功能配置,每个组件的配置都是不一样的。

  • COMMENTS:注释,可以在里面添加一些描述信息。

3.4 配置 GenerateTableFetch 组件

SCHEDUING 页签,我们选择 Timer drivenRun Schedule 配置成 5 s e c 5\ sec 5 sec,即每 5 5 5 秒调度一次。

在这里插入图片描述
PROPERTIES 页签,如下图,我们通过 Database Connection Pooling Service 新建了一个 DBCPConnectionPool 数据库连接池服务,Database Type 选择了 MySQLTable Name 配置填写了来源表名称 sourceMaximum-value Columns 我们配置了增量字段 increase

在这里插入图片描述
点击 Apply 后,回到设计页面,我们发现组件左上角是一个感叹号,感叹号你可以理解为这个组件还没有满足运行的条件,把鼠标停留在感叹号上会有提示信息。

在这里插入图片描述
通过提示信息我们看到两个问题,第一个是 DBCPConnectionPool 数据库连接池服务还不可用,另一个是当前的组件 successfailure 两个 RelationShip 还没有指向。

3.5 配置 DBCPConnectionPool 服务

如下图所示,鼠标右键点击空白页面,选择 Configure,进入 Process Group 的配置页面。

在这里插入图片描述
进入 Process Group 的配置页面后,选择 CONTROLLER SERVICES 页签,我们可以看到我们之前建的 DBCPConnectionPool 数据库连接池服务。点击齿轮形状的配置按钮,如下图所示会弹出 Configure Controller Service 的对话框。

在这里插入图片描述
点击对话框的 PROPERTIES 页签,按如下图所示配置 MySQL 数据库的连接信息。

在这里插入图片描述
这里我们把本文的数据库连接列出来:

jdbc:mysql://ip:port/nifi?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=falsecom.mysql.jdbc.Driver驱动包需要自己去下载,然后添加到 NIFI 本机的某个地方
/data/nifi-1.11.4/jdbc/mysql-connector-java-5.1.46.jar

配置完毕后,点击 APPLY,然后如下图,点击闪电符号按钮,启用 DBCPConnectionPool 数据库连接池服务。

在这里插入图片描述

3.6 配置 GenerateTableFetch 组件 RelationShip

回到设计页面,我们看到 GenerateTableFetch 这个组件黄色感叹号的提示信息关于 DBCPConnectionPool 的已经没有了,现在提示的是关于 RelationShip 的。

在这里插入图片描述
现在我们通过新建 GenerateTableFetch 同样的方式,在设计页面新增一个 ExecuteSQLRecord 组件,然后将鼠标停留在 GenerateTableFetch 组件上,会出现一个箭头,点击拉取这个箭头然后指向 ExecuteSQLRecord

在这里插入图片描述
指向的过程中会弹出 Create Connection 的配置页面,在里面的 For Relationships 勾选 success(后期我们可以右键点击 Connection 进入配置页面)。

在这里插入图片描述
点击 ADD 后,再次回到设计页面。

在这里插入图片描述
我们看到 GenerateTableFetch 这个组件黄色感叹号的提示信息只剩下 failure RelationShip 的了。

此时我们再次到 GenerateTableFetch 的配置页面,在 SETTINGS 页面的 Automatically Terminate Relationships 里勾选 failure

在这里插入图片描述
点击 APPLY 后,GenerateTableFetch 这个组件黄色感叹号就会消失了。

3.7 配置 ExecuteSQLRecord 组件

简单说一下 ExecuteSQLRecord 组件,执行上游传输过来的 SQL 语句,然后将查询结果以指定的数据格式输出到下游。

与配置 GenerateTableFetch 配置的操作流程都是大体相似的,这里不做重复性的叙述了,看过程图就可以了。

在这里插入图片描述
在这里插入图片描述

3.8 配置 PutDatabaseRecord 组件

我们在设计页面上新增一个 PutDatabaseRecord 组件,并做相应配置。

简单说一下 PutDatabaseRecord 组件,以指定格式读取上游的数据,然后将数据 insert / update / delete 到指定的数据库表。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3.9 运行整个数据处理流程

右键点击每个组件选择 start 或者点击空白出选择 start

在这里插入图片描述
可以看到已经有数据在流动被处理了。

写入数据总是比较慢的,这个时候我们可以适当的停止 PutDatabaseRecord 组件修改配置提高它的并发任务数(注意,只有 stop 这个组件,才可以对它进行配置)。

在这里插入图片描述
当我们再次运行 PutDatabaseRecord 组件,在设计页面会发现流程报错了(这并不是意外,这是我设计好展示给你们看的效果)。

在这里插入图片描述

我们可以看到 PutDatabaseRecord 的右上角有一个 8 8 8,并且 GenerateTableFetch 组件右上角有一个红色告警,将鼠标停留在红色告警,会有提示信息。看报错信息的意思是说 GenerateTableFetch 无法获取到数据库连接(DBCPConnectionPool 数据库连接池默认是 8 8 8 个连接,但这 8 8 8 个全被 PutDatabaseRecord 拿去用了,GenerateTableFetch 拿不到连接所以报错了)。

好了,上面的报错设计只是为了让你看到多任务时组件右上角会有任务数的提示,当发生异常时组件的右上角也会有红色告警信息。

3.10 查看运行结果

等待一段时间,流程中的数据都被处理完了(Connection 中没有数据了)。然后我们去查询 target 表里一共被同步了多少数据,结果一看,也是 253001 253001 253001 条。

在这里插入图片描述

3.11 持续运行

那么这就完了吗?不,我们这个流程不是一个一次性任务,它是持续的。如下图所示:

在这里插入图片描述
此时我们向 source 添加一条数据,它是第 253002 253002 253002 条。

在这里插入图片描述
添加完后观察我们正在运行的流程,发现原本组件上那些 InOut 已经为 0 0 0 的状态现在变成了 1 1 1,说明刚才有数据流过了。

在这里插入图片描述
然后我们去查看 target 表,发现第 253002 253002 253002 条数据已经被同步过来了。

在这里插入图片描述

3.12 GenerateTableFetch 监听增量字段

这里简单说一下 GenerateTableFetch 增量同步数据的原理,右键点击 GenerateTableFetch,选择 View state

在这里插入图片描述
如下图,我们可以看到 Compontent State 这个对话框里记录了 increase 的值。

在这里插入图片描述
state 是 NIFI 提供的稳定、可靠的存储机制。它适合存储少量的数据,一般是一些状态信息。

GenerateTableFetch 利用 state 记录了每次扫描 sourceincrease 最大的值,然后在下一次扫描生成 SQL 时,会扫描那些 increase 值大于 state 中记录的行,相应的生成查询这些行数据的 SQL。这样就达到了增量抽取数据的目的。

4.总结

如果说你跟着本文一起操作或者仔细的阅读了整篇文章,那么我相信你已经入门 Apache NIFI 了 。

好吧,和你开玩笑的,你还没有入门。这篇文章只是简单带你(替你)体验了一把 Apache NIFI,如果想要入门,还需要更多的阅读和实践。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/130640.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

答题测评考试小程序的效果如何

在线答题系统是一种在线练习、考试、测评的智能答题系统,适用于企业培训、测评考试、知识竞赛、模拟考试等场景,管理员可任意组题、随机出题,答题者成功提交后,系统自动判分。 多种题目类型,两种答题模式 练习模式&a…

Apache Flink 1.12.0 on Yarn(3.1.1) 所遇到的問題

Apache Flink 1.12.0 on Yarn(3.1.1) 所遇到的問題 新搭建的FLINK集群出现的问题汇总 1.新搭建的Flink集群和Hadoop集群无法正常启动Flink任务 查看这个提交任务的日志无法发现有用的错误信息。 进一步查看yarn日志: 发现只有JobManager的错误日志出现了如下的…

请求地址‘/operlog‘,发生未知异常

👨🏻‍💻 热爱摄影的程序员 👨🏻‍🎨 喜欢编码的设计师 🧕🏻 擅长设计的剪辑师 🧑🏻‍🏫 一位高冷无情的编码爱好者 大家好,我是全栈工…

[nodejs] 爬虫加入并发限制并发实现痞客邦网页截图

今晚想给偶像的相册截个图,避免某一天网站挂了我想看看回忆都不行,用的是js的木偶师来爬虫台湾的部落格,效果图大概是这样,很不错 问题来了.我很贪心, 我想一次性把相册全爬了,也就是并发 ,这个人的相册有19个!!我一下子要开19个谷歌浏览器那个什么进程, 然后程序就崩了, 我就想…

软件设计模式原则(二)开闭原则

继续讲解第二个重要的设计模式原则——开闭原则~ 一.定义 开闭原则,在面向对象编程领域中,规定“软件中的对象(类,模块,函数等等)应该对于扩展是开放的,但是对于修改是封闭的”,这意…

半导体芯片制造行业MES系统解决方案

半导体产业作为现代电子科技的重要支柱,驱动着电子设备和通信技术的飞速发展。随着技术不断演进,半导体制造企业面临着越来越多的挑战,如高度复杂的工艺流程、全球化的竞争、质量控制的要求以及能源效率等问题。 为了应对这些挑战&#xff0…

Python测试之Pytest详解

概要 当涉及到python的测试框架时,pytest是一个功能强大且广泛应用的第三方库。它提供简洁而灵活的方式来编写和执行测试用例,并具有广泛的应用场景。下面是pytest的介绍和详细使用说明: pytest是一个用于python单元测试的框架,它…

Dubbo篇---第一篇

系列文章目录 文章目录 系列文章目录一、说说一次 Dubbo 服务请求流程?二、说说 Dubbo 工作原理三、Dubbo 支持哪些协议?一、说说一次 Dubbo 服务请求流程? 基本工作流程: 上图中角色说明: 二、说说 Dubbo 工作原理 工作原理分 10 层: 第一层:service 层,接口层,…

Flutter 05 组件状态、生命周期、数据传递(共享)、Key

一、Android界面渲染流程UI树与FlutterUI树的设计思路对比 二、Widget组件生命周期详解 1、Widget组件生命周期 和其他的视图框架比如android的Activity一样,flutter中的视图Widget也存在生命周期,生命周期的回调函数体现在了State上面。组件State的生命…

mysql索引深度学习

索引是什么? 索引是一种用于加快查询和索引的数据结构,其本质上就是一种排序好的数据结构,就类似书的目录。 索引的底层有多种实现的结构:b树,b树,Hash,红黑树。InnoDB和MyISAM的索引都是通过…

Python模块psutil:系统进程管理与Selenium效率提升的完美结合

前言 在前面编写一个Selenium的自动化程序时候,发现一个问题。 因笔记本配置较为差,所以每次初始化Selenium的WebDriver都会非常慢,整个等待过程是不友好的。 所以我就想到: 在程序中初始化一个全局的WebDriver对象&#xff0c…

算法——多数相和

三数 15. 三数之和 - 力扣&#xff08;LeetCode&#xff09; 所以代码实现应该是 vector<vector<int>> threeSum(vector<int>& nums) {int n nums.size();sort(nums.begin(), nums.end()); // 对数组进行排序&#xff0c;以便后续操作vector<vector…

快速了解推荐引擎检索技术

目录 一、推荐引擎和其检索技术 二、推荐引擎的整体架构和工作过程 &#xff08;一&#xff09;用户画像 &#xff08;二&#xff09;文章画像 &#xff08;三&#xff09;推荐算法召回 三、基于内容的召回 &#xff08;一&#xff09;召回算法 &#xff08;二&#xf…

C#高级--IO详解

零、文章目录 IO详解 1、IO是什么 &#xff08;1&#xff09;IO是什么 IO是输入/输出的缩写&#xff0c;即Input/Output。在计算机领域&#xff0c;IO通常指数据在内部存储器和外部存储器或其他周边设备之间的输入和输出。输入和输出是信息处理系统&#xff08;例如计算器&…

分享者 - 携程旅游创作者搬砖项目图文教程

大家好&#xff01;携程这个出行旅游平台相信大家都不陌生吧。 每天都有大量的旅客在里面浏览攻略&#xff0c;寻找灵感和旅游建议。 那么&#xff0c;我们的项目就是把一些优质的小红书平台上的旅游攻略或作品&#xff0c;经过处理后搬运到携程平台上发布。 这个项目如何操作呢…

Portraiture4.1.2最新中文汉化版

提起PS后期修图人像美白磨皮&#xff0c;大家会想到各种磨皮工具&#xff0c;其中Portraiture这款磨皮效率超高&#xff0c;是99%摄影师的必备插件&#xff0c;一秒磨皮&#xff0c;无卡顿&#xff0c;效果好&#xff01;人像摄影师人均一款&#xff0c;磨皮质感非常好&#xf…

独创改进 | RT-DETR 引入双向级联特征融合结构 RepBi-PAN | 附手绘结构图原图

本专栏内容均为博主独家全网首发,未经授权,任何形式的复制、转载、洗稿或传播行为均属违法侵权行为,一经发现将采取法律手段维护合法权益。我们对所有未经授权传播行为保留追究责任的权利。请尊重原创,支持创作者的努力,共同维护网络知识产权。 文章目录 YOLOv6贡献RepBi-…

实习记录--(海量数据如何判重?)--每天都要保持学习状态和专注的状态啊!!!---你的未来值得你去奋斗

海量数据如何判重&#xff1f; 判断一个值是否存在&#xff1f;解决方法&#xff1a; 1.使用哈希表&#xff1a; 可以将数据进行哈希操作&#xff0c;将数据存储在相应的桶中。 查询时&#xff0c;根据哈希值定位到对应的桶&#xff0c;然后在桶内进行查找。这种方法的时间复…

一站式解决方案:体验亚马逊轻量服务器/VPS的顶级服务与灵活性

文章目录 一、什么是轻量级服务器/VPS 二、服务器创建步骤 三、服务器连接客户端(私钥登录) 四、使用服务器搭建博客网站 五、个人浅解及总结 一、什么是轻量级服务器/VPS 亚马逊推出的轻量级服务器/VPS&#xff1a;是一种基于云计算技术的虚拟服务器解决方案。它允许用户…

0005Java安卓程序设计-ssm基于Android的网店系统

文章目录 **摘要**目录系统设计开发环境 编程技术交流、源码分享、模板分享、网课教程 &#x1f427;裙&#xff1a;776871563 摘要 随着Internet的发展&#xff0c;人们的日常生活已经离不开网络。未来人们的生活与工作将变得越来越数字化&#xff0c;网络化和电子化。网上管…