NIFI使用

1 从Kafka接收消息,存储到数据库中。

在这里插入图片描述
(1) ConsumerKafka processor
在这里插入图片描述
(2)Execute Scripts Processor
我这里是使用JS脚本进行处理。 还有很多其他语言的脚本。
在这里插入图片描述

var flowFile = session.get();
if (flowFile != null) {var IOUtils = Java.type("org.apache.commons.io.IOUtils");var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");var DateFormatUtils=Java.type("org.apache.commons.lang3.time.DateFormatUtils");// var dataType=flowFile.getAttribute('data_type')// var FLAG=flowFile.getAttribute('flag')var tm = null;try {flowFile = session.write(flowFile, new StreamCallback(function (inputStream, outputStream) {var inputText = IOUtils.toString(inputStream, StandardCharsets.UTF_8);var msg = JSON.parse(inputText);var stationId = msg['stationId'];var stationName = msg['stationName'];var deviceId = msg['deviceId'];var deviceName = msg['deviceName'];var deviceNo = msg['deviceNo'];var receiveType = msg['receiveType'];var createAt = msg['createAt'];var createAtString=DateFormatUtils.format(Number(createAt),'yyyy-MM-dd HH:mm:ss');var obTime = msg['obTime'];var obDate = msg['obDate'];var obDateString=DateFormatUtils.format(Number(obDate),'yyyy-MM-dd HH:mm:ss');var order = msg['order'];var distance = msg['distance'];var channel1SignalStrength = msg['channel1SignalStrength']var powerVoltage = msg['powerVoltage']var sql = 'insert into "SJZT_ODS"."water_data_distance"('+ '"station_id", "station_name", "device_id", "device_name", "device_no", "receive_type", "create_at", "ob_time", "ob_date", "order", "distance", "channel1_signal_strength", "power_voltage")'+ 'VALUES('+ stationId + ', \'' + stationName + '\', ' + deviceId + ', \'' + deviceName + '\', \'' + deviceNo + '\', ' + receiveType + ', \'' + createAtString + '\', \'' + obTime + '\', \'' + obDateString + '\', ' + order + ', ' + distance + ', ' + channel1SignalStrength + ', ' + powerVoltage + ')';outputStream.write(sql.getBytes(StandardCharsets.UTF_8));}));// flowFile = session.putAttribute(flowFile, "tm",tableName);session.transfer(flowFile, REL_SUCCESS);} catch (e) {flowFile = session.putAttribute(flowFile, "rsvr.transfer.error", e);session.transfer(flowFile, REL_FAILURE);}
}

注意: 这里只是生成了一个sql字符串,并没有执行sql,因此需要后面的processor来执行sql语句。
(3)PutSql processor
在这里插入图片描述
注意:autocommit要设置为true,否则看不到数据库里面的数据的。

2 将一堆Processors移动到一个Group里面界面操作

貌似没有直接的移动操作。
(1) Ctrl + A 全选要移动的processors
(2) 点击左边的group按钮
在这里插入图片描述
(3)为新的Group命名
(4)好了。选中的所有的processors都移动到了自己新创建的group中了。

参考材料

[1] https://blog.csdn.net/guijianchouxyz/article/details/120340154

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/62513.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

kubeadm安装K8s集群之高可用组件keepalived+nginx

系列文章目录 1.kubeadm安装K8s集群之基础环境配置 2.kubeadm安装K8s集群之高可用组件keepalivednginx 3.kubeadm安装K8s集群之master节点加入 4.kubeadm安装K8s集群之worker1节点加入 kubeadm安装K8s集群之高可用组件keepalivednginx 1.安装kubeadm、kubectl、kubelet2.安装高…

子网划分实例

看到有人问这个问题: 想了一下,这是一个子网划分的问题: 处理方法如图: 这是一个子网划分的问题 设备1用三层交换机,端口设置为路由模式,设备2和设备3为傻瓜交换机模式 设备2和设备3下挂设备都是26为掩码&…

Ubuntu22.04深度学习环境安装【Anaconda+Pycharm】

anaconda可以提供多个独立的虚拟环境,方便我们学习深度学习(比如复现论文); Pycharm编辑器可以高效的编写python代码,也是一个很不错的工具。 下面就记录下Ubuntu22.04的安装流程: 1.Anaconda安装 下载Ana…

Transformer图解

前言 transformer是目前NLP甚至是整个深度学习领域不能不提到的框架,同时大部分LLM也是使用其进行训练生成模型,所以transformer几乎是目前每一个机器人开发者或者人工智能开发者不能越过的一个框架。接下来本文将从顶层往下去一步步掀开transformer的面…

种子流和花粉流怎么理解它们之间的大小关系

种子流和花粉流是植物繁殖和遗传多样性研究中的两个重要概念,它们分别描述了种子和花粉在空间上的传播过程。理解它们之间的大小关系,即传播距离和对遗传结构的影响,对于生态学和保护生物学具有重要意义。 种子流(Seed Dispersal&…

唇形同步视频生成工具:Wav2Lip

一、模型介绍 今天介绍一个唇形同步的工具-Wav2Lip;Wav2Lip是一种用于生成唇形同步(lip-sync)视频的深度学习算法,它能够根据输入的音频流自动为给定的人脸视频添加准确的口型动作。 (Paper) Wav2Lip模型…

C—指针初阶(2)

如果看完阁下满意的话,能否一键三连呢,我的动力就是大家的支持与肯定,冲! 二级指针 我们先看概念以及作用:用来存放一级指针的地址的指针 先看例子,我们逐一分析 我们先分析上面那个“1” 标注那里&#x…

PE文件结构:NT头部

NT 头部(NT Header)是 PE 文件格式的核心部分之一,它包含了有关程序如何加载、执行以及一些重要的文件属性。NT 头部常被认为是 PE 头部 的核心或“真正的”PE 头部,因为操作系统加载 PE 文件时,首先会查找 DOS 头部的…

Oracle EBS FA 如何打开关闭的资产会计期间?

用户“运行折旧”,误勾选为“关闭期间”,还有一部分资产还需要操作报废和调整,希望后台打开关闭的资产会计期 系统环境 RDBMS : 12.1.0.2.0 Oracle Applications : 12.2.9 解决方案 由官方提供SQL脚本代码如下: /*rollback120.sql - for Release 12.X only(based on r…

算法基础学习Day6(动态窗口)

文章目录 1.题目2.题目解答1.最大连续1的个数题目及题目解析算法学习思路一:暴力解法思路二:滑动窗口 代码提交 2.将x减到0的最小操作数题目及题目解析算法学习滑动窗口解决问题 代码提交 1.题目 1004. 最大连续1的个数 III - 力扣(LeetCode)1658. 将 x…

基于springboot+vue的公交线路查询系统(全套)

一、系统架构 前端:vue | element-ui | html 后端:springboot | mybatis-plus 环境:jdk1.8 | mysql | maven | nodejs 二、代码及数据库 三、功能介绍 01. web端-首页1 02. web端-首页2 03. web端-注册 04. web端-登录 …

ASP.NET Core8.0学习笔记(二十五)——EF Core Include导航数据加载之预加载与过滤

一、导航属性数据加载 1.在EF Core中可以使用导航属性来加载相关实体。 2.加载实体的三种方式: (1)预先加载:直接在查询主体时就把对应的依赖实体查出来(作为初始查询的一部分) (2)显式加载:使用代码指示稍后显式的从…

Linux 基础环境的开发工具以及使用(下)

1. make / Makefile 自动化构建的工具 1)引入 在我们进行一些大型的工程的时候,代码量是极其大,当我们代码在进行一系列的编译的时候,难免会出现一些错误,当我们对错误进行一系列的更改之后,难道我们需要…

沃丰科技智能客服在跨境电商独立站中的核心角色

随着全球化进程的加速和互联网技术的不断发展,跨境电商行业蓬勃兴起,为消费者提供了更广阔、更便捷的购物选择。在这样一个竞争激烈的市场环境中,优质的客户服务成为了企业脱颖而出的关键。沃丰科技智能客服凭借其先进的技术和人性化的设计理…

Centos7下搭建Prometheus+Grafana监控

Prometheus 监控 Prometheus 监控系统的架构包括以下组件: Prometheus Server: Prometheus 服务器是监控系统的核心组件,负责收集、存储和处理指标数据。它定期从各种数据源(如 Exporter、Agent 等)拉取指标数据&…

MyBatis-Plus(为简化开发而生)

一、MyBatis-Plus概述 官网: baomidou.com MyBatis-Plus(简称 MP) 在 MyBatis 的基础上只做增强不做改变,为简化开发、提高效率而生。 (1)单表操作 不需要编写sql语句,封装方法,…

深入解析 C++11 的 `std::atomic`:误区、性能与实际应用

在现代 C 开发中,std::atomic 是处理多线程同步时的重要工具之一。它通过提供原子操作保证了线程安全,但在实际使用时却隐藏着许多不为人知的陷阱和性能影响。本篇文章将带你深入理解 std::atomic 的使用方式、潜在问题,以及如何正确应用于多…

芋道源码,芋道sql,yudao,yudao-vue-pro拒绝割韭菜

芋道的开发指南实际上只需要小小的操作就可以观看啦 为了避免被割韭菜 我们可以使用插件去进行解锁文档 项目地址 otomayss/free-yd (github.com)[这里是图片002]https://github.com/otomayss/free-yd

Mac软件推荐

Mac软件推荐 截图SnipasteXnipBob 快捷启动Raycast 系统检测Stats 解压缩The UnarchiverKeka(付费) 视频播放IINA 视频下载Downie(付费) 屏幕刘海TopNotchMediaMate(付费)NotchDrop(付费&#x…

车站值班员题库

1. 联系用手信号显示十、五、三车距离信号中的“三车”(约33m)信号时,昼间的显示方式为展开的绿色信号旗单臂平伸下压 ( 一 )次。J442 2. 联系用手信号显示股道号码时,昼间右臂向上直伸&#xff0c…