NIFI使用

1 从Kafka接收消息,存储到数据库中。

在这里插入图片描述
(1) ConsumerKafka processor
在这里插入图片描述
(2)Execute Scripts Processor
我这里是使用JS脚本进行处理。 还有很多其他语言的脚本。
在这里插入图片描述

var flowFile = session.get();
if (flowFile != null) {var IOUtils = Java.type("org.apache.commons.io.IOUtils");var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");var DateFormatUtils=Java.type("org.apache.commons.lang3.time.DateFormatUtils");// var dataType=flowFile.getAttribute('data_type')// var FLAG=flowFile.getAttribute('flag')var tm = null;try {flowFile = session.write(flowFile, new StreamCallback(function (inputStream, outputStream) {var inputText = IOUtils.toString(inputStream, StandardCharsets.UTF_8);var msg = JSON.parse(inputText);var stationId = msg['stationId'];var stationName = msg['stationName'];var deviceId = msg['deviceId'];var deviceName = msg['deviceName'];var deviceNo = msg['deviceNo'];var receiveType = msg['receiveType'];var createAt = msg['createAt'];var createAtString=DateFormatUtils.format(Number(createAt),'yyyy-MM-dd HH:mm:ss');var obTime = msg['obTime'];var obDate = msg['obDate'];var obDateString=DateFormatUtils.format(Number(obDate),'yyyy-MM-dd HH:mm:ss');var order = msg['order'];var distance = msg['distance'];var channel1SignalStrength = msg['channel1SignalStrength']var powerVoltage = msg['powerVoltage']var sql = 'insert into "SJZT_ODS"."water_data_distance"('+ '"station_id", "station_name", "device_id", "device_name", "device_no", "receive_type", "create_at", "ob_time", "ob_date", "order", "distance", "channel1_signal_strength", "power_voltage")'+ 'VALUES('+ stationId + ', \'' + stationName + '\', ' + deviceId + ', \'' + deviceName + '\', \'' + deviceNo + '\', ' + receiveType + ', \'' + createAtString + '\', \'' + obTime + '\', \'' + obDateString + '\', ' + order + ', ' + distance + ', ' + channel1SignalStrength + ', ' + powerVoltage + ')';outputStream.write(sql.getBytes(StandardCharsets.UTF_8));}));// flowFile = session.putAttribute(flowFile, "tm",tableName);session.transfer(flowFile, REL_SUCCESS);} catch (e) {flowFile = session.putAttribute(flowFile, "rsvr.transfer.error", e);session.transfer(flowFile, REL_FAILURE);}
}

注意: 这里只是生成了一个sql字符串,并没有执行sql,因此需要后面的processor来执行sql语句。
(3)PutSql processor
在这里插入图片描述
注意:autocommit要设置为true,否则看不到数据库里面的数据的。

2 将一堆Processors移动到一个Group里面界面操作

貌似没有直接的移动操作。
(1) Ctrl + A 全选要移动的processors
(2) 点击左边的group按钮
在这里插入图片描述
(3)为新的Group命名
(4)好了。选中的所有的processors都移动到了自己新创建的group中了。

参考材料

[1] https://blog.csdn.net/guijianchouxyz/article/details/120340154

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/62513.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux系统使用nginx代理mysql数据库

##使用nginx代理mysql数据库 ##安装nginx ./configure --prefix/home/yym/nginx/nginx-install/ --with-http_addition_module --with-http_realip_module --with-stream make && make install ##nginx配置文件 stream { upstream mysqlserver { serv…

kubeadm安装K8s集群之高可用组件keepalived+nginx

系列文章目录 1.kubeadm安装K8s集群之基础环境配置 2.kubeadm安装K8s集群之高可用组件keepalivednginx 3.kubeadm安装K8s集群之master节点加入 4.kubeadm安装K8s集群之worker1节点加入 kubeadm安装K8s集群之高可用组件keepalivednginx 1.安装kubeadm、kubectl、kubelet2.安装高…

子网划分实例

看到有人问这个问题: 想了一下,这是一个子网划分的问题: 处理方法如图: 这是一个子网划分的问题 设备1用三层交换机,端口设置为路由模式,设备2和设备3为傻瓜交换机模式 设备2和设备3下挂设备都是26为掩码&…

【前端知识】微前端框架qiankun

微前端框架qiankun 简介一、核心思想二、主要特性三、关键技术四、使用场景五、使用示例六、优势与劣势七、总结 使用一、创建主应用(Angular CLI项目)二、创建子应用(Vue CLI项目)三、启动并测试 使用场景一、大型前端应用的拆分…

Ubuntu22.04深度学习环境安装【Anaconda+Pycharm】

anaconda可以提供多个独立的虚拟环境,方便我们学习深度学习(比如复现论文); Pycharm编辑器可以高效的编写python代码,也是一个很不错的工具。 下面就记录下Ubuntu22.04的安装流程: 1.Anaconda安装 下载Ana…

Transformer图解

前言 transformer是目前NLP甚至是整个深度学习领域不能不提到的框架,同时大部分LLM也是使用其进行训练生成模型,所以transformer几乎是目前每一个机器人开发者或者人工智能开发者不能越过的一个框架。接下来本文将从顶层往下去一步步掀开transformer的面…

网络安全在数字时代保护库存数据中的作用

如今,通过软件管理库存已成为一种标准做法。企业使用数字工具来跟踪库存水平、管理供应链和规划财务。 然而,技术的便利性也带来了网络威胁的风险。黑客将库存数据视为有价值的目标。保护这些数据不仅重要,而且必不可少。 了解网络安全及其…

种子流和花粉流怎么理解它们之间的大小关系

种子流和花粉流是植物繁殖和遗传多样性研究中的两个重要概念,它们分别描述了种子和花粉在空间上的传播过程。理解它们之间的大小关系,即传播距离和对遗传结构的影响,对于生态学和保护生物学具有重要意义。 种子流(Seed Dispersal&…

唇形同步视频生成工具:Wav2Lip

一、模型介绍 今天介绍一个唇形同步的工具-Wav2Lip;Wav2Lip是一种用于生成唇形同步(lip-sync)视频的深度学习算法,它能够根据输入的音频流自动为给定的人脸视频添加准确的口型动作。 (Paper) Wav2Lip模型…

C编程求助问题:实验报告类型如何画出流程图并编写程序?

求助问题:请问一下怎么做 是实验报告类型的 画出流程图并编写程序: (1) 从键盘上任意输入5个字母,按ASCII从小到大的顺序依次排列输出。 (2) 输入某个字母,查找题(1)数组中是否存在,若存在则输出该字母在数组中的位置。…

C—指针初阶(2)

如果看完阁下满意的话,能否一键三连呢,我的动力就是大家的支持与肯定,冲! 二级指针 我们先看概念以及作用:用来存放一级指针的地址的指针 先看例子,我们逐一分析 我们先分析上面那个“1” 标注那里&#x…

Gradle-学习

本来没有想了解Gradle,但是在想看SpringBoot源码的时候发现,在SpringBoot2.2.8版本之后,不再使用maven进行构建,而是使用Gradle。想着把SpringBoot源码导入idea学习下源码,但是来来回回折腾了好几回,都是报…

PE文件结构:NT头部

NT 头部(NT Header)是 PE 文件格式的核心部分之一,它包含了有关程序如何加载、执行以及一些重要的文件属性。NT 头部常被认为是 PE 头部 的核心或“真正的”PE 头部,因为操作系统加载 PE 文件时,首先会查找 DOS 头部的…

Oracle EBS FA 如何打开关闭的资产会计期间?

用户“运行折旧”,误勾选为“关闭期间”,还有一部分资产还需要操作报废和调整,希望后台打开关闭的资产会计期 系统环境 RDBMS : 12.1.0.2.0 Oracle Applications : 12.2.9 解决方案 由官方提供SQL脚本代码如下: /*rollback120.sql - for Release 12.X only(based on r…

算法基础学习Day6(动态窗口)

文章目录 1.题目2.题目解答1.最大连续1的个数题目及题目解析算法学习思路一:暴力解法思路二:滑动窗口 代码提交 2.将x减到0的最小操作数题目及题目解析算法学习滑动窗口解决问题 代码提交 1.题目 1004. 最大连续1的个数 III - 力扣(LeetCode)1658. 将 x…

基于springboot+vue的公交线路查询系统(全套)

一、系统架构 前端:vue | element-ui | html 后端:springboot | mybatis-plus 环境:jdk1.8 | mysql | maven | nodejs 二、代码及数据库 三、功能介绍 01. web端-首页1 02. web端-首页2 03. web端-注册 04. web端-登录 …

ASP.NET Core8.0学习笔记(二十五)——EF Core Include导航数据加载之预加载与过滤

一、导航属性数据加载 1.在EF Core中可以使用导航属性来加载相关实体。 2.加载实体的三种方式: (1)预先加载:直接在查询主体时就把对应的依赖实体查出来(作为初始查询的一部分) (2)显式加载:使用代码指示稍后显式的从…

Linux 基础环境的开发工具以及使用(下)

1. make / Makefile 自动化构建的工具 1)引入 在我们进行一些大型的工程的时候,代码量是极其大,当我们代码在进行一系列的编译的时候,难免会出现一些错误,当我们对错误进行一系列的更改之后,难道我们需要…

沃丰科技智能客服在跨境电商独立站中的核心角色

随着全球化进程的加速和互联网技术的不断发展,跨境电商行业蓬勃兴起,为消费者提供了更广阔、更便捷的购物选择。在这样一个竞争激烈的市场环境中,优质的客户服务成为了企业脱颖而出的关键。沃丰科技智能客服凭借其先进的技术和人性化的设计理…

uniapp 弹出软键盘后打开二级页面,解决其UI布局变动

软键盘弹出,此时点击某按钮打开二级页面,position:fixed 位于底部的按钮不见了(通过加高其区域,发现被下移动了),什么原因不清楚? 但是发现是软键盘弹出导致,问题解决通过隐藏键盘再打开二级页…