Nifi中的Controller Service

Service简介

首先Nifi中的Controller Service 和我们MVC概念中的Controller Service不是一个概念,Nifi中的Controller Service更像是和Processor同级的一个概念,它和Processor在我个人的使用经验来理解的话就是它是预制好的各种服务,可以被Processor引用或者支撑Processor,例如一个SQL读取的Processor,它得需要JDBC的连接,才能访问数据库。这里Controller Service 就可以是一个JDBC的连接池服务。

同理,Controller Service 也是支持扩展的,可以像自定义开发Processor一样,根据自己的业务需求,进行自定义的Controller Service 开发。

当我们使用某些依赖Service的组件(Processor)时,在配置中会出现选择Service或者创建新的Service的情况,这里的Service即是Nifi的Controller Service,一旦创建新的,则会生成一个以Group为范围的 “全局” Service对象,这时,再有依赖同类型Service的Processor时,可以直接选中:

 

 Controller Service的配置

单独查看Controller Service 可以从面板空白处,右键Configure来看,如下图:

 这是一个JDBC的连接池Service,它包含的属性有名称、类型、简介、启用状态、操作;从操作中可以看到配置该Service需要填写基本的各类属性;其中,Service是有启停状态的,如果想修改Service的属性内容,必须先保证该Service是停用状态,然后点击配置标识,则进入配置页面,它的配置和Processor的差不多,通过页签区别,共有三个页签:SETTING(基础属性)、PROPERTIES(使用属性)、COMMENT(页签):

SETTING 基础属性

 基础属性,包含左侧的名称,名称可以进行更改,右侧包含引用此Service的Processor 列表

PROPERTIES 使用属性

核心的业务配置,此标签页的配置项根据不同的Service,配置内容不一致,具体的配置项以及使用,可以参考官方的文档;这里的是JDBC的连接池,所以基本需要连接数据库所需的URL、数据库的账号密码、数据库的驱动类名称、驱动类的依赖 jar包路径 ,这里不少Service可能都需要第三方的jar包依赖才可以使用,长期使用或生产环境下,建议将所有jar资源集中放在统一路径下。

COMMENT 页签 

一个提供Service使用说明的页签,可根据自己实际需求,补充使用Service的用法以及描述

Service 的使用范围

 在Nifi的基本使用中的Group的使用介绍,Group同时也对Services起作用,如果我们在一个Nifi的最外层的平面上 新增Controller Service,那么这些Service的作用域是整个Nifi的任何位置,如果我们在某个Group内创建Controller Service, 那么这个Controller Service 仅在Group范围内可以被引用,Nifi的这种机制也是方便Service的使用和维护

 

全局参数配置

类似于 数据库连接池、Kafka、Redis等各种组件的连接池、客户端Client的Service在实际的使用中会非常多,由此配置的Service也会非常多,于是就会产生很多次的反复配置URL、账号这一系列重复的内容,由于Nifi的特性,这些Service又和组件(Processor)一样,四散在各处,这就使得维护和运维管理变得很繁琐,调试、调整、查看的时候,要不停的各个group来回跳转、调整不同的Service的Configure;为应对此类问题,Nifi 提供了全局配置的机制来弥补。

 使用变量前:

这里的 URL、Driver Class Name、Database User在实际生产环境中,可能都是固定的数据库和固定的服务,几乎不需要变得,可能只需要配置一遍就好,不需要每次创建Service都写一遍;所以可以这里可以使用上下文变量(Parameter Context)

首先,打开Parameter Context,创新一组新的变量:

 

 

 

 

之后进入Service 的管控面板(空白处右键选择Configure),先选择变量组:

 

再进入 CONTROLLER SERVICES 对Service的配置进行修改,将具体的RUL、Driver-name、user等参数,全部使用变量替换(变量使用‘#’符 )

 

DBCPConnectionPool的使用样例

下面将使用Nifi 实现一个简单的Demo,从Mysql数据库中读取部分数据,将数据进行筛选,然后将数据输出;

首先,使用ExecuteSQL组件,读取Mysql中的数据,根据上文描述,创建一个DBCPConnectionPool 的Service,然后启动 :

 添加 ExecuteSQL组件,配置相关内容,根据自定义编写的SQL读取数据库内容:

随后添加 ConvertAvroToJSON 组件,这里从数据库读出的数据是不可读的,为了方便查看调试、同时也是为了后续使用groovy处理数据,所以选择转换为JSON进行处理,实际使用可以根据自身情况选择转换器:

添加 ExecuteGroovyScript 组件,使用groovy脚本对数据进行处理,groovy的脚本内容如下: 

 

groovy内容:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;def dataJson = getInputJSONData()
if(null == dataJson){return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){def tem = dataJson.get(i);//在这里可以对数据进行处理rss.add(tem.name);
}// 输出
if(rss.size()>0){sendData(rss,REL_SUCCESS);
}/*** 读取输入流* @author GCC***/
def getInputJSONData(){def flow = session.get()if(null == flow){log.error("the flow is null ...");return;}def dataJson = null;def jsonStr = "";session.read(flow,{inputStream ->jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback);try{dataJson = new JsonSlurper().parseText(jsonStr);}catch(Exception e){log.error("输入流格式错误")}session.remove(flow);return dataJson;
}
/***输出数据至后续管道*@param result 输出的数据*@param outStream 输出的管道*@author GCC***/
void sendData(def result,def outStream){String successFlowFileStr =StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());def newflow = session.create();newflow = session.write(newflow, {outputStream ->outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)session.transfer(newflow, outStream);
}

 最后使用LogMessage组件作为接收数据,实际情况可以将数据转为下一处理节点或存储等等

 

在ExcuseGroovyScript组件中使用Service

 在 ExcuseGroovyScript 组件内部使用groovy脚本处理数据时,可能需要再次读取数据库或者使用其他第三方数据来辅助处理,这时候,ExcuteGroovyScript组件支持可以引入Service,提供用户编写的groovy脚本内部使用Service;

首先需要在ExcuteGroovyScript组件的PROPERTIES  配置中新增属性:

 

这里,添加属性时,会让用户输入用户给该属性的命名,如果是普通命名,这里的属性仅仅作为静态数据而已,但是如果使用关键字 ‘SQL.’ 或者 'CTL.'作为名称前缀时,则能够使用Service,后续的属性值则会变成Service的选择。

在groovy的代码中,则可以通过 SQL.mysql.{method}的方式,调用Service的方法,在ExcuseScript组件中配合脚本语言进行数据的操作:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;def dataJson = getInputJSONData()
if(null == dataJson){return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){def tem = dataJson.get(i);def mapdic = [:]//使用Service查询数据库SQL.mysql.eachRow("SELECT id,value FROM tb_dic_detail WHERE u_status = 1 "){row->mapdic.put(row.id.toString(),row.value.toString());    }rss.add(tem.name);
}// 输出
if(rss.size()>0){sendData(rss,REL_SUCCESS);
}/*****************************************************************公共函数*********************************************************************//*** 读取输入流* @author GCC***/
def getInputJSONData(){def flow = session.get()if(null == flow){log.error("the flow is null ...");return;}def dataJson = null;def jsonStr = "";session.read(flow,{inputStream ->jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback);try{dataJson = new JsonSlurper().parseText(jsonStr);}catch(Exception e){log.error("输入流格式错误")}session.remove(flow);return dataJson;
}
/***输出数据至后续管道*@param result 输出的数据*@param outStream 输出的管道*@author GCC***/
void sendData(def result,def outStream){String successFlowFileStr =StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());def newflow = session.create();newflow = session.write(newflow, {outputStream ->outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)session.transfer(newflow, outStream);
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/45279.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

综合实验作业

node01&#xff1a;192.168.175.146 node02&#xff1a;192.168.175.147 【node01】 node01 与 node02 防火墙在本实验中都需要放行的服务&#xff1b; [rootlocalhost ~]# firewall-cmd --permanent --add-servicedns success [rootlocalhost ~]# firewall-cmd --permanent -…

基于web、dns、nfs的综合实验

题目&#xff1a; 现有主机 node01 和 node02&#xff0c;完成如下需求&#xff1a; 1、在 node01 主机上提供 DNS 和 WEB 服务 2、dns 服务提供本实验所有主机名解析 3、web服务提供 www.rhce.com 虚拟主机 4、该虚拟主机的documentroot目录在 /nfs/rhce 目录 5、该目录由 no…

Python酷库之旅-第三方库Pandas(018)

目录 一、用法精讲 44、pandas.crosstab函数 44-1、语法 44-2、参数 44-3、功能 44-4、返回值 44-5、说明 44-6、用法 44-6-1、数据准备 44-6-2、代码示例 44-6-3、结果输出 45、pandas.cut函数 45-1、语法 45-2、参数 45-3、功能 45-4、返回值 45-5、说明 4…

python 代码设计贪吃蛇

代码&#xff1a; # -*- codeing utf-8 -*- import tkinter as tk import random from tkinter import messageboxclass Snake:def __init__(self, master):self.master masterself.master.title("Snake")# 创建画布self.canvas tk.Canvas(self.master, width400,…

构造函数的初始化列表,static成员,友元,内部类【类和对象(下)】

P. S.&#xff1a;以下代码均在VS2022环境下测试&#xff0c;不代表所有编译器均可通过。 P. S.&#xff1a;测试代码均未展示头文件stdio.h的声明&#xff0c;使用时请自行添加。 博主主页&#xff1a;LiUEEEEE                        …

基于Python+Flask+MySQL的新冠疫情可视化系统

基于PythonFlaskMySQL的新冠疫情可视化系统 FlaskMySQL 基于PythonFlaskMySQL的新冠疫情可视化系统 项目主要依赖前端&#xff1a;layui&#xff0c;Echart&#xff0c;后端主要是Flask&#xff0c;系统的主要支持登录注册&#xff0c;Ecahrt构建可视化图&#xff0c;可更换主…

【爬虫】爬虫基础

目录 一、Http响应与请求1、Http请求2、Http响应3、状态码 二、Requests库1、发起GET请求2、发起POST请求3、处理请求头 三、BeautifulSoup库1、解析HTML文档2、查找和提取数据Ⅰ、查找单个元素Ⅱ、查找所有元素Ⅲ、使用CSS选择器Ⅳ、获取元素属性 四、爬取豆瓣电影榜 一、Http…

谷粒商城实战笔记-27-分布式组件-SpringCloud-Gateway-创建测试API网关

本节的主要内容是创建网关模块&#xff0c;将网关注册到Nacos&#xff0c;并配置路由进行测试。 一&#xff0c;创建网关模块 右键工程New->Module&#xff0c;创建新模块&#xff0c;模块名称 gulimall-gateway。 填充各种信息。 选中Gateway依赖。 点击Create创建模块。…

为什么使用代理IP无法访问网站

代理IP可以为用户在访问网站时提供更多的便利性和匿名性&#xff0c;但有时用户使用代理IP后可能会遇到无法访问目标网站的问题。这可能会导致用户无法完成所需的业务要求&#xff0c;给用户带来麻烦。使用代理IP时&#xff0c;您可能会因为各种原因而无法访问您的网站。以下是…

电脑录音如何操作?电脑麦克风声音一起录制,分享7款录音软件

电脑录音已经成为我们日常生活和工作中不可或缺的一部分。无论是录制会议、教学、音乐、网络直播、音源采集还是其他声音&#xff0c;电脑录音软件都为我们提供了极大的便利。本文将为大家介绍如何操作电脑录音&#xff0c;并分享七款录音软件&#xff0c;包括是否收费、具体操…

关于 Qt在国产麒麟系统上设置的setFixedSize、setMinimumFixed、setMaxmumFixed设置无效 的解决方法

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/140242881 长沙红胖子Qt&#xff08;长沙创微智科&#xff09;博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV…

OpenCV中的浅拷贝和深拷贝

文章目录 前言一、浅拷贝二、深拷贝三、比较总结 前言 在数字图像处理中&#xff0c;针对读取到的一张图像&#xff0c;需要反复利用这张图像做各种的变换&#xff0c;以满足我们项目的需求。在这之前&#xff0c;最容易忽略的一点就是图像之间的拷贝问题&#xff0c;其中的浅…

解决在window资源管理器的地址栏中输入\\192.168.x.x\sambashare之后显示无法访问,错误代码 0x80070035,找不到网络路径。

一、错误重现 二、解决方法 1、在cmd中输入gpedit.msc gpedit.msc确定 -> 打开本地组策略编辑器 2、启用不安全的来宾登录 计算机配置 -> 管理模板 -> 网络 -> Lanman工作站 -> 右侧双击编辑"启用不安全的来宾登录"&#xff0c;把状态改为 “已启…

数据结构——查找算法

文章目录 1. 查找算法 2. 顺序查找 2. 二分查找 1. 查找算法 查找算法是用于在数据集中定位特定元素的位置的算法。查找是计算机科学中一项基本操作&#xff0c;几乎在所有应用程序中都需要使用。例如&#xff0c;数据库查询、信息检索、字典查找等都涉及到查找操作。查找算…

【JavaScript 报错】未捕获的类型错误:Uncaught TypeError

&#x1f525; 个人主页&#xff1a;空白诗 文章目录 一、错误原因分析1. 调用不存在的方法2. 访问未定义的属性3. 数据类型不匹配4. 函数参数类型不匹配 二、解决方案1. 检查方法和属性是否存在2. 使用可选链操作符3. 数据类型验证4. 函数参数类型检查 三、实例讲解四、总结 在…

[C++初阶]list类的初步理解

一、标准库的list类 list的底层是一个带哨兵位的双向循环链表结构 对比forward_list的单链表结构&#xff0c;list的迭代器是一个双向迭代器 与vector等顺序结构的容器相比&#xff0c;list在任意位置进行插入删除的效率更好&#xff0c;但是不支持任意位置的随机访问 list是一…

mac生成.dmg压缩镜像文件

mac生成.dmg压缩镜像文件 背景准备内容步骤1&#xff0c;找一个文件夹2&#xff0c;制作application替身1&#xff0c;终端方式2&#xff0c;黄金右手方式 3&#xff0c;.app文件放入文件夹4&#xff0c;制作.dmg压缩镜像文件5&#xff0c;安装.dmg 总结 背景 为绕开App Store…

视频融合共享平台视频共享融合赋能平台数字化升级医疗体系

在当前&#xff0c;医疗健康直接关系到国计民生&#xff0c;然而&#xff0c;由于医疗水平和资源分布不均&#xff0c;以及信息系统老化等问题&#xff0c;整体医疗服务能力和水平的提升受到了限制。视频融合云平台作为数字医疗发展的关键推动力量&#xff0c;在医疗领域的广泛…

Docker部署gitlab私有仓库后查看root默认密码以及修改external_url路径和端口的方法

文章目录 1、docker部署最新版gitlab2、进入gitlab容器3、修改路径地址ip和端口4、检验效果 1、docker部署最新版gitlab #docker安装命令 docker run --detach \--name gitlab \--restart always \-p 1080:80 \-p 10443:443 \-p 1022:22 \-v /gitlab/config:/etc/gitlab \-v …

MacOS 开发 — Packages 程序 macOS新版本 演示选项卡无法显示

MacOS 开发 — Packages 程序 macOS新版本 演示选项卡无法显示 问题描述 &#xff1a; 之前写过 Packages 的使用以及如何打包macOS程序。最近更新了新的macOS系统&#xff0c;发现Packages的演示选项卡无法显示&#xff0c;我尝试从新安转了Packages 也是没作用&#xff0c;…