NIFI实现JSON转SQL并插入到数据库表中

说明

本文中的NIFI是使用docker进行安装的,所有的配置参考:docker安装Apache NIFI

需求背景

现在有一个文件,里面存储的是一些json格式的数据,要求将文件中的数据存入数据库表中,以下是一些模拟的数据和对应的数据库建表语句。

json数据

[{"name": "张三","age": 23,"gender": 1},{"name": "李四","age": 24,"gender": 1},{"name": "小红","age": 18,"gender": 0}
]

建表语句

CREATE TABLE `sys_user` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '用户ID',`name` varchar(50) NOT NULL DEFAULT '' COMMENT '姓名',`age`  int NOT NULL DEFAULT 0 COMMENT '年龄',`gender` tinyint NOT NULL COMMENT '性别,1:男,0:女',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否已删除',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT  CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='用户表';

json数据中的属性名和数据库字段的名要一一对应,要不然后期还得做转换,比较麻烦

创建文件流

添加处理器:GetFile

点击工具栏的Processor,拖拽到画布中

筛选GetFile,点击ADD添加到画布中

配置GetFile处理器

双击添加的处理器,弹出对应的配置界面

可选操作)点击SETTINGS选项,在Name中输入处理器的名称:获取文件内容

点击SCHEDULING,在Run Schedule中输入定时器的时间,这里设置每10秒运行一次,如果不设置后面运行处理器的时候会无限循环运行

 点击PROPERTIES选项

 配置PROPERTIES,分别填写Input DirectoryFile FilterKeep Source File,其他选项默认即可。

说明:博主的NIFI是使用docker安装的,容器的数据全部挂载到了宿主机中,NIFI的HOME默认是在/opt/nifi/nifi-current,挂载到宿主机的路径为:/root/data/nifi/nifi-current。所以Input Directory中填写的路径/opt/nifi/nifi-current/mydata/file 实际对应宿主机路径为:/root/data/nifi/nifi-current/mydata/file,到时候把测试文件放到宿主机的/root/data/nifi/nifi-current/mydata/file下面即可

将文件放到对应的目录下

说明:mydata/file中的所有文件需要有读写的权限,否则后面读取文件会报错

修改权限:

chmod +777 /root/data/nifi/nifi-current/mydata/file

(可选操作)测试处理器配置是否成功

 添加LogAttribute处理器

连接处理器

将鼠标放到第一个处理器上,然后点击出现的箭头,将其拖拽到第二个处理器中,等待线条由红色变为绿色后,松开鼠标即可。

在弹出的界面中勾选success,然后点击ADD

 第一个显示红色方框的代表当前处理器可以正常使用;第二个出现黄色三角感叹号的代表当前处理器有问题,双击第二个处理器。

在弹出的界面选择RELATIONSHIPS选项卡,在success下勾选terminate,最后点击APPLY

说明:success下面的两个选项:terminate和retry分别代表着当前处理器执行成功的操作

terminate代表成功后终止,retry代表成功后继续尝试

可以看到黄色的三角变成了红色的方框,表示当前处理器没问题了。

运行处理器

运行处理器有两种方式,第一种是一个一个单独运行另一种是直接运行全部

第一种

鼠标放到第一个处理器中然后右键,可以看到有一堆选项,这里运行处理器可以选择Sart或者Run Once,为了方便调试,这里选择Run Once即只运行一次

点击Run Once之后可以看到,在处理器的右上角多了一个标志,这个代表当前有几个线程在运行中

 当处理器的任务执行结束后可以看到两个处理器的连接处会显示当前有几个队列,以及队列数据总的大小

将鼠标放到两个处理器的连接处,鼠标右键,选择List queue

 在弹出的界面中可以看到等待中的队列列表

选择其中一个队列,点击左上角的提示,可以看到上一个处理器(GetFile)的一些信息,包括一些属性啊什么的,这个可以自己去看,这里不再仔细说明。点击OK可以关闭当前的弹框

 点击某一个队列的右上角,第一个可以下载当前的内容,中间的小眼睛可以查看队列中的数据

 点击小眼睛,可以看到文件中的内容显示在了页面中,默认是original,也可以选择formatted和hex

 运行第二个处理器(LogAttribute),同样的鼠标放到处理器上,然后选择Run Once即可

然后可以在nifi的日志中看到打印了一些日志,主要包括了处理器的属性和内容

说明:如果要想打印出文件的内容,LogAttribute处理器需要选择以下内容

正常打印数据说明GetFile处理器配置的没问题

Json数组分隔

添加处理器:SplitJson

 配置SplitJson处理器

双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容

JsonPath Expression(JSON 路径表达式):指定要提取的 JSON 对象的路径。例如,如果要提取根级别的 JSON 对象,可以将路径设置为 $

连接处理器

将GetFile处理器和SplitJson处理器连接起来,勾选For Relationships,然后选择ADD

可选操作)测试处理器配置是否成功

将SplitJson处理器和LogAttribute处理器连接,连接处理器中的For Relationships选择split

 此时发现SplitJson处理器还在告警,双击SplitJson处理器,选择RELATIONSHIPS,按照如图勾选

此时所有的处理器已正常显示

开启所有的处理器(在画布空白处鼠标右键,点击Start),查看nifi容器的日志,可以看到此时日志打印出来的不再是整个文件的内容,而是单独一条一条json数据

停止所有处理器(画布空白处鼠标右键,选择Stop),清空队列中的数据,在连接处鼠标右键,选择Empty queue

Json转为SQL

添加处理器:ConvertJSONToSQL

 配置ConvertJSONToSQL处理器

双击处理器,在弹出的界面点击PROPERTIES选项卡,配置以下内容

配置JDBC Connection Pool

Value下面点击,选择Create new service

根据自己的情况选择对应的services,我这里选择的是默认的 

点击最后面的右箭头

点击右侧的小齿轮

切换到SETTINGS选项卡,给驱动起个名字,方便以后识别

切换到PROPERTIES选项卡 ,配置数据库相关参数,其他按照默认的即可

校验参数配置是否正确,点击右上角的对号

校验通过会出现绿色对钩,如果配置不对会有对应提示,最后点击APPLY

开启JDBC的配置,点击闪电符号,在弹出的界面点击ENABLE,最后点击CLOSE

 最后可以看到state已经变为Enabled,点击右上角的X关闭

到此JDBC的配置结束

配置Statement Type

再次双击处理器,配置Statement Type,选择INSERT,代表生成的是INSERT语句

配置Table Name

校验配置是否正确

最后点击APPLY

连接处理器

将SplitJson处理器和ConvertJSONToSQL处理器进行连接,Relationships选择split

可选操作)测试处理器配置是否成功

这里跳过测试,如果需要测试自己的配置是否正确的,可以自行将处理器和LogAttribute处理器进行连接进行测试,以下是博主自己的测试结果,做个参考,最后面会打印生成的SQL语句

执行生成的SQL

添加处理器:PutSQL

配置PutSQL处理器

双击处理器,在PROPERTIES选项卡中配置以下内容,其他内容默认即可

 

 校验配置是否正确

最后点击APPLY

连接处理器

将ConvertJSONToSQL处理器和PutSQL处理器进行连接,Relationships选择sql

 处理PutSQL处理器的告警

双击处理器,在RELATIONSHIPS选项卡配置勾选以下内容

完整的配置结果

包含四个处理器,依次为GetFile=>SplitJson=>ConvertJSONToSQL=>PutSQL

 开启所有的处理器

数据库是否有数据

可以看到现在的数据库里面还是没有数据的

 开启处理器

在画布的空白位置,鼠标右键选择Start

开启后可以看到所有的处理器左上角都显示为绿色三角,表示处理器已经启动了,过十几秒再看处理器,发现已经有数据流入

 查看数据库数据

此时数据库已经有数据插入,重复数据是因为每隔10秒执行一次任务,就会读取一次文件,然后重复往数据库插入数据,如果不想让数据不停插入数据库,可以将GetFile中的PROPERTIES下的Keep Source File设置为false即可(此操作需要停止处理器才能够设置)

结束语

NIFI学习需要花费一定的时间去仔细研究,它里面内置了大概300多个处理器,每个处理器实现的功能都不一样,配置也都不同。博主也正在不断地学习中,后续也会不断分享关于NIFI的内容,如果有什么疑问欢迎评论区进行评论。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/72143.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

centos7使用docker-compose一键搭建mysql高可用主从集群

docker部署 环境准备 卸载旧版本 yum remove -y docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-selinux \docker-engine-selinux \docker-engine 安装依赖 yum install -y yum-utils \…

伪微分反馈控制(Pesudo-Drivative Feedback Control——PDF)

运动控制-单轴伺服控制带宽分析(二) - 知乎 (zhihu.com) 伪微分反馈控制_百度百科 (baidu.com) 伺服电机控制器的参数整定_老马过河hhh的博客-CSDN博客 伪微分PIIP控制_yukee10的博客-CSDN博客

docker搭建个人网盘和私有仓库Harbor

目录 1、使用mysql:5.7和 owncloud 镜像,构建一个个人网盘 2、安装搭建私有仓库 Harbor 1、使用mysql:5.7和owncloud,构建一个个人网盘 1.拉取mysql:5.6镜像,并且运行mysql容器 [rootnode8 ~]# docker pull mysql:5.7 [rootnode8 ~]# doc…

Excel VSTO开发10 -自定义任务面板

版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 10 自定义任务面板 自定义任务面板(有些地方称为侧边面板)即CustomTaskPane,这个类在Microsoft…

leetcode 第 361 场周赛

2843. 统计对称整数的数目 核心思想:枚举每一个数是否是对称整数,第一种写法为python写法,第二种为一般写法我觉得更好,非常有思想性。 2844. 生成特殊数字的最少操作 核心思想:枚举特殊数字结尾的几种可能。其实自己做的时候一…

2023/09/07 c++qt day2

#include <iostream>using namespace std; //封装一个学生类 struct stu { private://存放学生的成绩int stu_score[256];//记录学生个数int stu_num; public://用于设置学生个数void setNum(){cout<<"请输入学生的个数"<<" ";cin>&g…

Stable Diffuse 之 本地环境部署 WebUI 进行汉化操作

Stable Diffuse 之 本地环境部署 WebUI 进行汉化操作 目录 Stable Diffuse 之 本地环境部署 WebUI 进行汉化操作 一、简单介绍 二、汉化操作 附录&#xff1a; 一、Install from URL 中出现 Failed to connect to 127.0.0.1 port 7890: Connection refused 错误&#xf…

【2023高教社杯数学建模国赛】ABCD题 问题分析、模型建立、参考文献及实现代码

【2023高教社杯数学建模国赛】ABCD题 问题分析、模型建立、参考文献及实现代码 1 比赛时间 北京时间&#xff1a;2023年9月7日 18:00-2023年9月10日20:00 2 思路内容 可以参考我提供的历史竞赛信息内容&#xff0c;最新更新我会发布在博客和知乎上&#xff0c;请关注我获得最…

C++核心编程--类篇

C核心编程 1.内存分区模型 C程序在执行时&#xff0c;将内存大方向分为4个区域 意义&#xff1a;不同区域存放数据&#xff0c;赋予不同的生命周期&#xff0c;更能灵活编程 代码区&#xff1a;存放函数体的二进制代码&#xff0c;由操作系统进行管理的全局区&#xff1a;存放…

Vue3+Element Plus实现el-table跨行显示(非脚手架)

Vue3Element Plus实现el-table跨行显示 app组件内容使用:span-method"objectSpanMethod"自定义方法实现跨行显示查询方法初始化挂载新建一个html即可进行测试&#xff0c;完整代码如下效果图 app组件内容 <div id"app"><!-- 远程搜索 --><e…

算法训练day43|动态规划 part05:0-1背包 (LeetCode 1049. 最后一块石头的重量 II、494. 目标和、474.一和零)

文章目录 1049. 最后一块石头的重量 II思路分析代码实现 494. 目标和思路分析动规方法代码实现总结思考 474.一和零思路分析代码实现思考总结 var code "57a5e730-4e5e-43ad-b567-720d69f0371a"1049. 最后一块石头的重量 II 题目链接&#x1f525;&#x1f525; 有…

揭秘拼多多API接口:让商家和用户实现高效连接与便捷操作

随着电商行业的飞速发展&#xff0c;拼多多作为一家新兴电商平台&#xff0c;近年来已逐渐成为市场的焦点。为了满足商家和用户的需求&#xff0c;拼多多不断创新&#xff0c;推出了智能化的API接口&#xff0c;以实现更加高效、便捷的操作和管理。本文将深入探讨拼多多API接口…

提高使用VS Code工作效率的技巧

提高使用VS Code工作效率的技巧 时间轴视图&#xff1a;本地源代码控制 时间轴视图为我们提供了内置的源代码控制。 我们中的许多人都知道 Git 和其他源代码控制工具有多么有用&#xff0c;它们可以帮助我们轻松跟踪文件更改并在需要时恢复到之前的状态。 因此&#xff0c;…

go基础08-map的内部实现

和切片相比&#xff0c;map类型的内部实现要复杂得多。Go运行时使用一张哈希表来实现抽象的map类型。运行时实现了map操作的所有功能&#xff0c;包括查找、插入、删除、遍历等。在编译阶段&#xff0c;Go编译器会将语法层面的map操作重写成运行时对应的函数调用。 下面是大致的…

YOLOV7改进-添加Deformable Conv V2

可变形卷积link class DCNv2(nn.Module):def __init__(self, in_channels, out_channels, kernel_size, stride1,padding1, groups1, actTrue, dilation1, deformable_groups1):super(DCNv2, self).__init__()self.in_channels in_channelsself.out_channels out_channelsse…

QT for andriod

QT for andriod 开发 apk软件&#xff0c;因为一些特殊的原因&#xff0c;在这里简单的记录一哈自己开发apk的流程和心得。 首先说明我采用的环境有哪些&#xff1f; 1、QT的版本&#xff0c;个人建议5.15.2的版本及以上&#xff0c;我是用的5.15.2。 2、andriod studio 可以…

3D数据导出工具HOOPS Publish:3D数据查看、生成标准PDF或HTML文档!

HOOPS中文网http://techsoft3d.evget.com/ 一、3D导出SDK HOOPS Publish是一款功能强大的SDK&#xff0c;可以创作丰富的工程数据并将模型文件导出为各种行业标准格式&#xff0c;包括PDF、STEP、JT和3MF。HOOPS Publish核心的3D数据模型是经过ISO认证的PRC格式(ISO 14739-1:…

STM32移植FAT文件系统

所谓“移植”&#xff0c;就是打通FAT源码和物理设备之间的软件接口。 FAT源码早就被公益组织给写好了&#xff0c;直接下载源码。但是FAT作为顶层应用程序&#xff0c;它需要面对的底层物理设备是不确定的&#xff0c;那么底层的物理设备驱动程序就需要程序员来自己写。物理设…

Android:基于mvvm框架使用viewPage

一、前言&#xff1a; 最近在学习viewpage的使用&#xff0c;加上一直以来用mvvm框架。就想着记录一下。 二、代码展示&#xff1a; 1.引入依赖 //viewPage2引用(微信左右滑动页面)implementation androidx.viewpager2:viewpager2:1.0.0 2.在xml中的使用 3.在代码中找到vie…

脚本:python实现樱花树

文章目录 代码效果 代码 from turtle import * from random import * from math import * def tree(n, l):pd () # 下笔# 阴影效果t cos ( radians ( heading () 45 ) ) / 8 0.25pencolor ( t, t, t )pensize ( n / 3 )forward ( l ) # 画树枝if n > 0:b random () *…