【大数据】NiFi 中的 Controller Service

NiFi 中的 Controller Service

  • 1.Service 简介
    • 1.1 Controller Service 的配置
      • 1.1.1 SETTING 基础属性
      • 1.1.2 PROPERTIES 使用属性
      • 1.1.3 COMMENT 页签
    • 1.2 Service 的使用范围
  • 2.全局参数配置
  • 3.DBCPConnectionPool 的使用样例
  • 4.在 ExcuseGroovyScript 组件中使用 Service

1.Service 简介

首先 NiFi 中的 Controller Service 和我们 MVC 概念中的 Controller Service 不是一个概念,NiFi 中的 Controller Service 更像是和 Processor 同级的一个概念,它和 Processor 在我个人的使用经验来理解的话就是 它是预制好的各种服务,可以被 Processor 引用或者支撑 Processor,例如一个 SQL 读取的 Processor,它得需要 JDBC 的连接,才能访问数据库。这里 Controller Service 就可以是一个 JDBC 的连接池服务。

同理,Controller Service 也是支持扩展的,可以像自定义开发 Processor 一样,根据自己的业务需求,进行自定义的 Controller Service 开发。

当我们使用某些依赖 Service 的组件(Processor)时,在配置中会出现选择 Service 或者创建新的 Service 的情况,这里的 Service 即是 NiFi 的 Controller Service,一旦创建新的,则会生成一个以 Group 为范围的 “全局” Service 对象,这时,再有依赖同类型 Service 的 Processor 时,可以直接选中。

在这里插入图片描述
在这里插入图片描述

1.1 Controller Service 的配置

单独查看 Controller Service 可以从面板空白处,右键 Configure 来看,如下图:

在这里插入图片描述
这是一个 JDBC 的连接池 Service,它包含的属性有 名称类型简介启用状态操作;从操作中可以看到配置该 Service 需要填写基本的各类属性;其中,Service 是有启停状态的,如果想修改 Service 的属性内容,必须先保证该 Service 是停用状态,然后点击配置标识,则进入配置页面,它的配置和 Processor 的差不多,通过页签区别,共有三个页签:SETTING基础属性)、PROPERTIES使用属性)、COMMENT页签)。

1.1.1 SETTING 基础属性

基础属性,包含左侧的名称,名称可以进行更改,右侧包含引用此 Service 的 Processor 列表。

1.1.2 PROPERTIES 使用属性

在这里插入图片描述
核心的业务配置,此标签页的配置项根据不同的 Service,配置内容不一致,具体的配置项以及使用,可以参考官方的文档;这里的是 JDBC 的连接池,所以基本需要连接数据库所需的 URL、数据库的账号密码、数据库的驱动类名称、驱动类的依赖 jar 包路径 ,这里不少 Service 可能都需要第三方的 jar 包依赖才可以使用,长期使用或生产环境下,建议将所有 jar 资源集中放在统一路径下。

1.1.3 COMMENT 页签

在这里插入图片描述
一个提供 Service 使用说明的页签,可根据自己实际需求,补充使用 Service 的用法以及描述。

1.2 Service 的使用范围

在 NiFi 中,Group 同时也对 Service 起作用,如果我们在一个 NiFi 的最外层的平面上新增 Controller Service,那么这些 Service 的作用域是整个 NiFi 的任何位置,如果我们在某个 Group 内创建 Controller Service, 那么这个 Controller Service 仅在 Group 范围内可以被引用,NiFi 的这种机制也是方便 Service 的使用和维护。

在这里插入图片描述

2.全局参数配置

类似于数据库连接池、Kafka、Redis 等各种组件的连接池、客户端 Client 的 Service 在实际的使用中会非常多,由此配置的 Service 也会非常多,于是就会产生很多次的反复配置 URL、账号这一系列重复的内容,由于 NiFi 的特性,这些 Service 又和组件(Processor)一样,四散在各处,这就使得维护和运维管理变得很繁琐,调试、调整、查看的时候,要不停的各个 Group 来回跳转、调整不同的 Service 的 Configure;为应对此类问题,NiFi 提供了全局配置的机制来弥补。

使用变量前:

在这里插入图片描述
这里的 URLDriver Class NameDatabase User 在实际生产环境中,可能都是固定的数据库和固定的服务,几乎不需要变的,可能只需要配置一遍就好,不需要每次创建 Service 都写一遍;所以可以这里可以使用上下文变量(Parameter Context)。

首先,打开 Parameter Context,创新一组新的变量:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
再进入 CONTROLLER SERVICES 对 Service 的配置进行修改,将具体的 URLDriver Class NameDatabase User 等参数,全部使用变量替换(变量使用 # 符 )。

在这里插入图片描述

3.DBCPConnectionPool 的使用样例

下面将使用 NiFi 实现一个简单的 Demo,从 MySQL 数据库中读取部分数据,将数据进行筛选,然后将数据输出;

首先,使用 ExecuteSQL 组件,读取 MySQL 中的数据,根据上文描述,创建一个 DBCPConnectionPool 的 Service,然后启动:

在这里插入图片描述
添加 ExecuteSQL 组件,配置相关内容,根据自定义编写的 SQL 读取数据库内容:

在这里插入图片描述
随后添加 ConvertAvroToJSON 组件,这里从数据库读出的数据是不可读的,为了方便查看调试、同时也是为了后续使用 Groovy 处理数据,所以选择转换为 JSON 进行处理,实际使用可以根据自身情况选择转换器:

在这里插入图片描述
添加 ExecuteGroovyScript 组件,使用 Groovy 脚本对数据进行处理。

在这里插入图片描述
Groovy 的脚本内容如下:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;def dataJson = getInputJSONData()
if(null == dataJson){return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){def tem = dataJson.get(i);//在这里可以对数据进行处理rss.add(tem.name);
}// 输出
if(rss.size()>0){sendData(rss,REL_SUCCESS);
}/*** 读取输入流* @author GCC***/
def getInputJSONData(){def flow = session.get()if(null == flow){log.error("the flow is null ...");return;}def dataJson = null;def jsonStr = "";session.read(flow,{inputStream ->jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback);try{dataJson = new JsonSlurper().parseText(jsonStr);}catch(Exception e){log.error("输入流格式错误")}session.remove(flow);return dataJson;
}/***输出数据至后续管道*@param result 输出的数据*@param outStream 输出的管道*@author GCC***/
void sendData(def result,def outStream){String successFlowFileStr = StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());def newflow = session.create();newflow = session.write(newflow, {outputStream ->outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)session.transfer(newflow, outStream);
}

最后使用 LogMessage 组件作为接收数据,实际情况可以将数据转为下一处理节点或存储等等。

在这里插入图片描述
在这里插入图片描述

4.在 ExcuseGroovyScript 组件中使用 Service

ExcuseGroovyScript 组件内部使用 Groovy 脚本处理数据时,可能需要再次读取数据库或者使用其他第三方数据来辅助处理,这时候,ExcuteGroovyScript 组件支持可以引入 Service,提供用户编写的 Groovy 脚本内部使用 Service;

首先需要在 ExcuteGroovyScript 组件的 PROPERTIES 配置中新增属性:

在这里插入图片描述
这里,添加属性时,会让用户输入用户给该属性的命名,如果是普通命名,这里的属性仅仅作为静态数据而已,但是如果使用关键字 SQL. 或者 CTL. 作为名称前缀时,则能够使用 Service,后续的属性值则会变成 Service 的选择。

在 Groovy 的代码中,则可以通过 SQL.mysql.{method} 的方式,调用 Service 的方法:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;def dataJson = getInputJSONData()
if(null == dataJson){return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){def tem = dataJson.get(i);def mapdic = [:]// 使用 Service 查询数据库SQL.mysql.eachRow("SELECT id, value FROM tb_dic_detail WHERE u_status = 1 "){row->mapdic.put(row.id.toString(),row.value.toString());    }rss.add(tem.name);
}// 输出
if(rss.size()>0){sendData(rss,REL_SUCCESS);
}/***************************************************公共函数***************************************************//*** 读取输入流* @author GCC***/
def getInputJSONData(){def flow = session.get()if(null == flow){log.error("the flow is null ...");return;}def dataJson = null;def jsonStr = "";session.read(flow,{inputStream ->jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback);try{dataJson = new JsonSlurper().parseText(jsonStr);}catch(Exception e){log.error("输入流格式错误")}session.remove(flow);return dataJson;
}/***输出数据至后续管道*@param result 输出的数据*@param outStream 输出的管道*@author GCC***/
void sendData(def result,def outStream){String successFlowFileStr = StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());def newflow = session.create();newflow = session.write(newflow, {outputStream ->outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)session.transfer(newflow, outStream);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/240351.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

记一次 Nginx 调参的踩坑经历

最近在基于SSE&#xff08;Server Sent Events&#xff09;做服务端单向推送服务&#xff0c;本地开发时一切顺利&#xff0c;但是在部署到预发环境时就碰到1个很诡异的问题&#xff0c;这里需要简单介绍下我们的整体架构&#xff1a; 整体架构 可以看到所有的请求都会先到统一…

2024 年 22 款顶级免费数据恢复软件比较 [Windows 和 Mac]

适用于 Windows 和 Mac 用户的最佳数据恢复软件下载列表和比较&#xff0c;可快速恢复丢失的数据、已删除的文件、照片或格式化的分区数据&#xff1a; 数据恢复软件是一种从任何存储介质恢复丢失文件的应用程序。它可以恢复由于病毒攻击、硬盘故障或任何其他原因而意外删除或…

NIO的实战教程(简单且高效)

1. 参考 建议按顺序阅读以下三篇文章 为什么NIO被称为同步非阻塞&#xff1f; Java IO 与 NIO&#xff1a;高效的输入输出操作探究 【Java.NIO】Selector&#xff0c;及SelectionKey 2. 实战 我们将模拟一个简单的HTTP服务器&#xff0c;它将响应客户端请求并返回一个固定的…

Maven核心概念

1 Maven工程的GAVP Maven 中的 GAVP 是指 GroupId、ArtifactId、Version、Packaging 等四个属性的缩写&#xff0c;其中前三个是必要的&#xff0c;而 Packaging 属性为可选项。 这四个属性主要为每个项目在maven仓库中做一个标识&#xff0c;方便项目之间相互引用。 GAV G 即…

桶装水送水小程序:提升服务质量的利器

随着移动互联网的发展&#xff0c;越来越多的消费者通过手机在线购物和订购商品。如果你是一名桶装水供应商&#xff0c;想要拓展线上业务&#xff0c;那么开发一个桶装水微信小程序将是一个明智的选择。本文将指导你从零开始开发一个桶装水微信小程序&#xff0c;让你轻松完成…

Coze在手,GPTsDALLE免费用

1. 关于Coze Coze 是一个应用程序编辑平台&#xff0c;旨在开发下一代人工智能聊天机器人。 你可以使用无代码创建各种类型的聊天机器人&#xff0c;并将其部署到各种社交平台和消息应用程序。 链接: Coze 2. Coze的特点 Coze有5个特点。下面由我来详细介绍一下&#xff01;…

高级数据结构 <二叉搜索树>

本文已收录至《数据结构(C/C语言)》专栏&#xff01; 作者&#xff1a;ARMCSKGT 目录 前言正文二叉搜索树的概念二叉搜索树的基本功能实现二叉搜索树的基本框架插入节点删除节点查找函数中序遍历函数析构函数和销毁函数(后序遍历销毁)拷贝构造和赋值重载(前序遍历创建)其他函数…

蓝牙物联网与嵌入式开发如何结合?

蓝牙物联网与嵌入式开发可以紧密结合&#xff0c;以实现更高效、更智能的物联网应用。以下是一些结合的方式&#xff1a; 嵌入式开发为蓝牙设备提供硬件基础设施和控制逻辑&#xff1a;嵌入式系统可以利用微处理器和各种外设组成的系统&#xff0c;为蓝牙设备提供硬件基础设施和…

基于ERC20代币协议实现的去中心化应用平台

文章目录 内容简介设计逻辑ERC20TokenLoanPlatform 合约事件结构体状态变量函数 Remix 运行实现部署相关智能合约存款和取款贷款和还款 源码地址 内容简介 使用 solidity 实现的基于 ERC20 代币协议的借贷款去中心化应用平台(极简版)。实现存款、取款、贷款、还款以及利息计算的…

爬虫API|批量抓取电商平台商品数据,支持高并发

随着互联网的快速发展&#xff0c;电商平台如雨后春笋般涌现&#xff0c;为消费者提供了丰富的购物选择。然而&#xff0c;对于许多商家和数据分析师来说&#xff0c;如何快速、准确地获取电商平台上的商品数据成为了一个难题。为了解决这个问题&#xff0c;我们开发了一个爬虫…

ModuleNotFoundError: No module named ‘tensorflow‘

直接运行pip install tensorflow安装成功之后&#xff0c;发现版本是tensorflow2.15.0 python的版本是3.9版本 导入包&#xff1a;import tensorflow 打包xxx.exe,调用之后提示错误 ModuleNotFoundError: No module named tensorflow 最后发现特定的python的版本对应特定的t…

基础数据结构(2):栈

1.栈的定义 栈是仅限在表尾进行插入和删除的线性表&#xff0c;栈又被称为后进先出的线性表 1.1栈顶和栈底 栈是一个线性表&#xff0c;我们允许插入和删除的一端称为栈顶 栈底和栈顶相对&#xff0c;实际上栈底的元素不需要关心 1.2入栈和出栈 栈元素的插入操作叫做入栈&…

润和软件HopeStage与亚信安全云主机深度安全防护系统完成产品兼容性互认证

近日&#xff0c;江苏润和软件股份有限公司&#xff08;以下简称“润和软件”&#xff09;HopeStage 操作系统与亚信科技&#xff08;成都&#xff09;有限公司&#xff08;以下简称“亚信安全”&#xff09;云主机深度安全防护系统完成兼容性测试。 测试结果表明&#xff0c;企…

12 Vue3中使用v-if指令实现条件渲染

概述 v-if指令主要用来实现条件渲染&#xff0c;在实际项目中使用得也非常多。 v-if通常会配合v-else-if、v-else指令一起使用&#xff0c;可以达到多个条件执行一个&#xff0c;两个条件执行一个&#xff0c;满足一个条件执行等多种场景。 下面&#xff0c;我们分别演示这三…

tamarin manual总结笔记2(tamarin实例)

最初的例子 我们将从一个简单的协议示例开始&#xff0c;该协议仅由两条消息组成&#xff0c;在这里以所谓的Alice-and-Bob表示法编写: C -> S: aenc(k, pkS) C <- S: h(k)在该协议中&#xff0c;客户端C生成一个新的对称密钥k&#xff0c;用服务器S的公钥pkS (aenc代表…

2023 英特尔On技术创新大会直播 |AI科技创新的引路者

英特尔大会 前言英特尔人工智能英特尔创新技术基于英特尔架构的科学计算总结 前言 英特尔技术创新大会是一个令人激动和启发的盛会。在这次大会上&#xff0c;我有幸观看了许多令人瞩目的科技创新和前沿技术的展示。这些展示不仅展示了英特尔作为科技巨头的实力&#xff0c;更…

浏览器原理篇—渲染阻塞

渲染阻塞 1.DOM 的解析 html 文档 边加载边解析 的&#xff1b;网络进程和渲染进程之间会建立一个共享数据的管道&#xff0c;网络进程接收到数据实时传递给渲染进程&#xff0c;渲染进程的 HTML 解析器&#xff0c;它会动态接收字节流&#xff0c;并将其解析为 DOM 2.字节流…

[每周一更]-(第38期):Go常见的操作消息队列

在Go语言中&#xff0c;常见的消息队列有以下几种&#xff1a; RabbitMQ&#xff1a;RabbitMQ是一个开源的AMQP&#xff08;高级消息队列协议&#xff09;消息代理软件&#xff0c;用于支持多种编程语言&#xff0c;包括Go语言。RabbitMQ提供了可靠的消息传递机制和灵活的路由…

基于SSM的在线学习系统的设计与实现论文

基于SSM的在线学习系统的设计与实现 摘要 随着信息互联网购物的飞速发展&#xff0c;一般企业都去创建属于自己的管理系统。本文介绍了在线学习系统的开发全过程。通过分析企业对于在线学习系统的需求&#xff0c;创建了一个计算机管理在线学习系统的方案。文章介绍了在线学习…

【习题】运行Hello World工程

判断题 1. DevEco Studio是开发HarmonyOS应用的一站式集成开发环境。 正确(True)错误(False) 正确(True) 2. main_pages.json存放页面page路径配置信息。 正确(True)错误(False) 正确(True) 单选题 1. 在stage模型中&#xff0c;下列配置文件属于AppScope文件夹的是&am…