NiFi 中的处理器(二):PutDatabaseRecord
- 1.基本介绍
- 2.属性配置
- 3.连接关系
- 4.应用场景
1.基本介绍
PutDatabaseRecord
处理器使用指定的 RecordReader
从传入的流文件中读取(可能是多个,说数组也成)记录。这些记录将转换为 SQL 语句,并作为一个批次执行。如果发生任何错误,则将流文件路由到 failure
或 retry
,如果执行成功,则将传入的流文件路由到 success
。处理器执行的 SQL 语句类型通过 Statement Type
属性指定,该属性接受一些硬编码的值,例如 INSERT
,UPDATE
和 DELETE
,使用 "Use statement.type Attribute"
可以使处理器获取流文件属性中的语句类型。
说明:如果语句类型为 UPDATE
,正常的不应该修改主键的值。如果记录中修改主键的值,那么有可能找不到数据进行修改或者修改破坏了一些数据(说白了,代码是按照根据主键值为条件进行 update
的)。
当然,隐藏的功能是 statement.type
的值时 'SQL'
的时候,可以从 record
中的某个字段读取值,此值应该是一个可以执行的 SQL 语句,该处理器就执行这个 SQL 就可以了。
2.属性配置
在下面的列表中,必需属性的名称以粗体显示。任何其他属性(不是粗体)都被认为是可选的,并且指出属性默认值(如果有默认值),以及属性是否支持表达式语言。
| | | |
---|---|---|---|
Record Reader | Controller Service API: RecordReaderFactory Implementations: JsonPathReader XMLReader ScriptedReader CSVReader Syslog5424Reader GrokReader AvroReader JsonTreeReader ParquetReader SyslogReader | 指定用于解析传入数据和确定数据模式的 Controller Service。 | |
Database Type | Generic | Generic Oracle Oracle 12+ MSSQL 2012+ MSSQL 2008 MySQL PostgreSQL | 数据库的类型/风格,用于生成特定于数据库的代码。在许多情况下,通用类型就足够了,但是某些数据库(例如 Oracle)需要自定义 SQL 子句。 |
Statement Type | UPDATE INSERT UPSERT DELETE Use statement.type Attribute | 指定要生成的 SQL 语句的类型。请参考数据库文档以获取每个操作行为的描述。请注意,某些数据库类型可能不支持某些语句类型。如果选择了 "Use statement.type Attribute" ,则该值取自 FlowFile 中的 statement.type 属性。 "Use statement.type Attribute" 选项是唯一允许使用 "SQL" 语句类型的选项。如果指定了 "SQL" ,则 "Field ContainingSQL" 属性指定的字段的值应为目标数据库上的有效 SQL 语句,并将按原样执行。 | |
Database Connection Pooling Service | Controller Service API: DBCPService Implementations: DBCPConnectionPool HiveConnectionPool DBCPConnectionPoolLookup | Controller Service,用于获得与数据库的连接以发送记录。 | |
Catalog Name | 语句应更新的目录的名称。这可能不适用于你要更新的数据库。在这种情况下,请将该字段留空。 Supports Expression Language: true (will be evaluated using flow file attributes and variable registry) | ||
Schema Name | 表所属的schema的名称。这可能不适用于你要更新的数据库。在这种情况下,请将该字段留空。 Supports Expression Language: true (will be evaluated using flow file attributes and variable registry) | ||
Table Name | 语句应影响的表的名称。 Supports Expression Language: true (will be evaluated using flow file attributes and variable registry) | ||
Translate Field Names | true | true false | 如果为 true ,则处理器将尝试将字段名称转换为指定表的适当列名称。如果为 false ,则字段名称必须与列名称完全匹配,否则该列将不会更新。 |
Unmatched Field Behavior | Ignore Unmatched Fields | Ignore Unmatched Fields Fail on Unmatched Fields | 如果输入的记录有一个字段没有映射到数据库表的任何列,该属性会指定如何处理这种情况。 |
Unmatched Column Behavior | Fail on Unmatched Columns | Ignore Unmatched Columns Warn on Unmatched Columns Fail on Unmatched Columns | 如果输入的记录没有数据库表所有列的字段映射,该属性会指定如何处理这种情况。 |
Update Keys | 列名的逗号分隔列表,可唯一标识数据库中 UPDATE 语句的行。如果语句类型为 UPDATE 且未设置此属性,则使用表的主键。在这种情况下,如果不存在主键,并且如果 Unmatched Column Behaviour 设置为 FAIL ,则到 SQL 的转换将失败。如果语句类型为 INSERT ,则忽略此属性。Supports Expression Language: true (will be evaluated using flow file attributes and variable registry) | ||
Field Containing SQL | 如果语句类型为 "SQL" (在 statement.type 属性中设置),则此字段指示记录中的哪个字段包含要执行的 SQL 语句。该字段的值必须是单个 SQL 语句。如果语句类型不是 "SQL" ,则忽略此字段。Supports Expression Language: true (will be evaluated using flow file attributes and variable registry) | ||
Allow MultipleSQL Statements | false | true false | 如果语句类型为 "SQL" (在 statement.type 属性中设置),则此字段指示是否用分号分隔字段值并分别执行每个语句。如果有任何语句导致错误,则将回滚整个语句集。如果语句类型不是 "SQL" ,则忽略此字段。 |
Quote Column Identifiers | false | true false | 启用此选项将导致所有列名都被引用,从而允许你将保留字用作表中的列名。 |
Quote Table Identifiers | false | true false | 启用该选项后,表名将加引号,以支持在表名中使用特殊字符。 |
Max Wait Time | 0 0 0 seconds | 运行的 SQL 语句所允许的最长时间, 0 0 0 表示没有限制。少于 1 1 1 秒的最长时间将等于 0 0 0。 Supports Expression Language: true (will be evaluated using variable registry only) | |
Rollback On Failure | false | true false | 指定如何处理错误。默认情况下(false ),如果在处理 FlowFile 时发生错误,则 FlowFile 将根据错误类型路由到 "failure" 或 "retry" 关系,处理器可以继续使用下一个 FlowFile。相反,你可能想回滚当前已处理的 FlowFile,并立即停止进一步的处理。在这种情况下,你可以通过启用此 Rollback On Failure 属性来实现。如果启用,失败的 FlowFiles 将保留在输入关系中,而不会受到惩罚,并会反复处理,直到成功处理或通过其他方式将其删除。重要的是要设置足够的 "Yield Duration" ,以免重试太频繁。 |
Table Schema Cache Size | 100 100 100 | 指定应缓存多少个表模式 | |
Maximum Batch Size | 0 0 0 | 指定 INSERT 和 UPDATE 语句的最大批处理大小。该参数对 Statement Type 中指定的其他语句无效。 0 0 0 表示批量不受限制。Supports Expression Language: true (will be evaluated using flow file attributes and variable registry) |
3.连接关系
| |
---|---|
retry | 如果无法更新数据库,但再次尝试操作可能会成功,将 FlowFile 路由到此关系。 |
success | 从 SQL 查询结果集中成功创建了 FlowFile。 |
failure | 如果无法更新数据库,并且无法重试该操作(例如无效查询或违反完整性约束),也会将 FlowFile 路由到此关系。 |
4.应用场景
在 PutDatabaseRecord
之前,我们想要写入数据到数据库,往往需要使用 ConvertJsonToSql
+ PutSQL
组合,尤其是当数据格式不是 json
的时候还需要先将数据转换为 json
,而使用 ConvertJsonToSql
属于一边连接了目标库,一边要在内存解析一次数据,转成了参数化的 SQL,并且参数也是放到 FlowFile 的属性中,平白无故的这个 FlowFile 也就更吃内存了。PutDatabaseRecord
的好处就是我们可以将任何 NIFI 支持的 Record 写入指定目的,在内存解析一次数据就可以了。当然了,前后两种方式写数据到数据库的基本原理都是一样的,只是 PutDatabaseRecord
的效率更好一些。
最早,PutDatabaseRecord
支持将特定的 Record 集合转成 Insert
,Update
,Delete
语句,我们只要选择 Statement Type
即可。然后为了更灵活,增加了 Use statement.type Attribute
选项,我们可以在上游的 FlowFile 中指定 statement.type
属性,这期间又暗地里加了 "statement.type=SQL"
的功能,当 Statement Type 的值为 "SQL"
的时候,我们要配合 Field Containing SQL
配置进行工作。Field Containing SQL
指的是上游来的 FlowFile 中的一个字段,这个字段值是一个可执行的 SQL。
可能让我们比较迷茫的是 Unmatched Field Behavior
和 Unmatched Column Behavior
,我们如果纠结这两个配置的描述就会很难受,我们只关注两个单词 Field
和 Column
就可以分清楚了。
Column
我们知道,(目标)表的列嘛,就是说如果你手里的数据中的列没有与我目标表的 Column
对应会怎么样。而 Field
针对的是 Record(博主注:可以理解为一行行数据),是具体的数据,就是说如果你目标表里的列没有与我 Record 中的 Field
相对应会怎么样。具体的关系我描述一下:首先 Record 中会携带 schema
元数据信息(或推断出 schema
信息),信息里会有若干个 Field
。我们在生成 SQL 的时候,会从目标数据库查询指定表的元数据信息(放缓存里),而数据库里设置成非 null
的且非自增长的没有设置默认值的则认为是 required
字段。
- 然后针对
insert
、delete
大体有三个步骤:- 第一步是遍历
required
字段,看 Record 里是否都有这几个字段,如果没有就用到Unmatched Column Behavior
,如果我们配置了ignore
了,就继续执行。 - 第二步是对这几个 Field 的遍历,查询是否在指定表的元数据里有对应的列信息,当遇到没有的情况时,就是
Unmatched Field Behavior
,如果我们配置了ignore
了,就继续执行。如果存在,我们就放到一个集合set
里存起来。 - 第二步遍历结束后,第三步我们再判断这个集合
set
有没有值,如果是空的,就直接报"None of the fields in the record map to the columns defined by the " + tableName + " table"
的 SQLDataException 异常了。
- 第一步是遍历
update
的话稍微有些不一样,第一步就检测 Update Keys,如果没有对应值就默认使用目标表的主键,如果都没有值就报"Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified"
异常了,然后紧接着检测 Record 里是否有这些字段,没有就要Unmatched Column Behavior
。第二步跟上面一样,就是对这几个 Field 的遍历,查询是否在指定表的元数据里有对应的列信息,当遇到没有的情况时,就是Unmatched Field Behavior
,如果我们配置了ignore
了,就继续执行。- 最后
upset
的检查就是融合了insert
和update
。
然后得说一下这个 Translate Field Names
,这个功能点其实非常好,其实就是将列名转大写并替换下划线(Record 中的列和指定表的列都做此转换,指定表的列信息会做成一个 Map 映射,转换的列名 : 列元数据信息)。
private static String normalizeColumnName(final String colName, final boolean translateColumnNames) {return colName == null ? null : (translateColumnNames ? colName.toUpperCase().replace("_", "") : colName);}
将 fieldName
转大写并替换下划线,然后跟指定表的同样转换过后的列元数据信息映射进行匹配,记录下 Field
的那个索引值,然后组 SQL 设置参数的时候根据索引值找到 Record 中对应的 value
就行了。这个功能其实就是帮助我们更好的对 Record 列和目标表列进行匹配。而 SQL 中的列名其实用的还是从指定表查询出来的列元数据信息。