Bundle
org.apache.nifi | nifi-standard-nar
Description
生成一个SQL选择查询,或使用提供的语句,并执行它以获取指定最大值列中值大于之前所见最大值的所有行。查询结果将转换为Avro格式。支持对多个属性使用表达式语言,但不允许输入连接。可使用环境/系统属性为任何包含表达式语言的属性提供值。若希望利用流文件属性执行这些查询,可使用GenerateTableFetch和/或ExecuteSQL处理器。采用流式处理,因此支持任意大的结果集。此处理器可定时运行或基于cron表达式调度,使用标准调度方法。此处理器应仅在主节点上运行。FlowFile属性'querydbtable.row.count'指示选择了多少行。
Tags:database, jdbc, query, select, sql
Input Requirement: FORBIDDEN
Supports Sensitive Dynamic Properties:false
属性
- Columns to Return 以逗号分隔的列名列表,用于构建查询。若数据库需要对名称进行特殊处理(如引号包裹),每个名称应包含此类处理。若未提供列名,则返回指定表中的所有列。注意:为增量获取正常工作,必须对给定表使用一致的列名。
Display Name : Columns to Return
Description:以逗号分隔的列名列表,用于构建查询。若数据库需要对名称进行特殊处理(如引号包裹),每个名称应包含此类处理。若未提供列名,则返回指定表中的所有列。注意:为增量获取正常工作,必须对给定表使用一致的列名。
API Name:Columns to Return
Expression Language Scope:环境变量与FlowFile属性
Sensitive:false
Required:false
- Database Connection Pooling Service 用于获取数据库连接的控制器服务。
Display Name:Database Connection Pooling Service
Description:用于获取数据库连接的控制器服务。
API Name:Database Connection Pooling Service
Service Interface:org.apache.nifi.dbcp.DBCPService
Service Implementations:
- org.apache.nifi.dbcp.DBCPConnectionPool
- org.apache.nifi.dbcp.DBCPConnectionPoolLookup
- org.apache.nifi.dbcp.HikariCPConnectionPool
Expression Language Scope:不支持
Sensitive:false
Required:true
- Database Dialect Service 用于生成特定数据库方言语句的数据库方言服务。
Display Name:Database Dialect Service
Description:用于生成特定数据库方言语句的数据库方言服务。
API Name:Database Dialect Service
Service Interface:org.apache.nifi.database.dialect.service.api.DatabaseDialectService
Service Implementations:org.apache.nifi.database.dialect.service.StandardDatabaseDialectService
Expression Language Scope:不支持
Sensitive:false
Required:true
Dependencies:
- Database Type设置为 [Database Dialect Service] 中的任意值
- Database Type 数据库类型,用于生成特定供应商的语句。通用类型支持大多数情况,但选择特定类型可启用优化处理或附加功能。
Display Name:Database Type
Description:数据库类型,用于生成特定供应商的语句。通用类型支持大多数情况,但选择特定类型可启用优化处理或附加功能。
API Name:db-fetch-db-type
Default Value:Generic
Allowable Values
- Database Dialect Service
- Generic
- Oracle
- Oracle 12+
- MS SQL 2012+
- MS SQL 2008
- MySQL
- PostgreSQL
- Phoenix
Expression Language Scope:不支持
Sensitive:false
Required:true
- Custom Query 用于检索数据的自定义SQL查询。此查询将作为子查询包装,而非根据其他属性构建SQL查询。查询不得包含ORDER BY语句。
Display Name:Custom Query
Description:用于检索数据的自定义SQL查询。此查询将作为子查询包装,而非根据其他属性构建SQL查询。查询不得包含ORDER BY语句。
API Name:db-fetch-sql-query
Expression Language Scope:JVM级环境变量与系统属性
Sensitive:false
Required:false
- Additional WHERE clause 构建SQL查询时添加到WHERE条件中的自定义子句。
Display Name:Additional WHERE clause
Description:构建SQL查询时添加到WHERE条件中的自定义子句。
API Name:db-fetch-where-clause
Expression Language Scope:环境变量与FlowFile属性
Sensitive:false
Required:false
- Default Decimal Precision 当DECIMAL/NUMBER值作为Avro逻辑类型'decimal'写入时,必须指定表示可用位数的精度。通常精度由列数据类型定义或数据库引擎默认值决定,但某些数据库引擎可能返回未定义精度(0)。此属性用于写入此类未定义精度的数值。
Display Name:Default Decimal Precision
Description:当DECIMAL/NUMBER值作为Avro逻辑类型'decimal'写入时,必须指定表示可用位数的精度。通常精度由列数据类型定义或数据库引擎默认值决定,但某些数据库引擎可能返回未定义精度(0)。此属性用于写入此类未定义精度的数值。
API Name:dbf-default-precision
Default Value:10
Expression Language Scope:JVM级环境变量与系统属性
Sensitive:false
Required:true
- Default Decimal Scale 当DECIMAL/NUMBER值作为Avro逻辑类型'decimal'写入时,必须指定表示小数位数的刻度。通常刻度由列数据类型定义或数据库引擎默认值决定,但当返回未定义精度(0)时,某些数据库引擎可能无法确定刻度。此属性用于写入此类未定义数值。若值的十进制位数超过指定刻度,则值将被四舍五入,例如1.53在刻度0时变为2,刻度1时变为1.5。
Display Name:Default Decimal Scale
Description:当DECIMAL/NUMBER值作为Avro逻辑类型'decimal'写入时,必须指定表示小数位数的刻度。通常刻度由列数据类型定义或数据库引擎默认值决定,但当返回未定义精度(0)时,某些数据库引擎可能无法确定刻度。此属性用于写入此类未定义数值。若值的十进制位数超过指定刻度,则值将被四舍五入,例如1.53在刻度0时变为2,刻度1时变为1.5。
API Name:dbf-default-scale
Default Value:0
Expression Language Scope:JVM级环境变量与系统属性
Sensitive:false
Required:true
- Normalize Table/Column Names 是否将列名中的非Avro兼容字符更改为Avro兼容字符。例如,冒号和句点将更改为下划线以构建有效的Avro记录。
Display Name:Normalize Table/Column Names
Description:是否将列名中的非Avro兼容字符更改为Avro兼容字符。例如,冒号和句点将更改为下划线以构建有效的Avro记录。
API Name:dbf-normalize
Default Value:false
Allowable Values:
- true
- false
Expression Language Scope:不支持
Sensitive:false
Required:true
- Use Avro Logical Types 是否对DECIMAL/NUMBER、DATE、TIME和TIMESTAMP列使用Avro逻辑类型。若禁用,将作为字符串写入。若启用,逻辑类型将使用其底层类型写入:DECIMAL/NUMBER作为逻辑'decimal'(以字节形式写入,附加精度和刻度元数据),DATE作为逻辑'date-millis'(以自Unix纪元1970-01-01起的天数int形式写入),TIME作为逻辑'time-millis'(以自Unix纪元起的毫秒数int形式写入),TIMESTAMP作为逻辑'timestamp-millis'(以自Unix纪元起的毫秒数long形式写入)。若Avro记录的读取器也支持这些逻辑类型,则可根据读取器实现反序列化这些值。
Display Name:Use Avro Logical Types
Description:是否对DECIMAL/NUMBER、DATE、TIME和TIMESTAMP列使用Avro逻辑类型。若禁用,将作为字符串写入。若启用,逻辑类型将使用其底层类型写入:DECIMAL/NUMBER作为逻辑'decimal'(以字节形式写入,附加精度和刻度元数据),DATE作为逻辑'date-millis'(以自Unix纪元1970-01-01起的天数int形式写入),TIME作为逻辑'time-millis'(以自Unix纪元起的毫秒数int形式写入),TIMESTAMP作为逻辑'timestamp-millis'(以自Unix纪元起的毫秒数long形式写入)。若Avro记录的读取器也支持这些逻辑类型,则可根据读取器实现反序列化这些值。
API Name:dbf-user-logical-types
Default Value:false
Allowable Values:
- true
- false
Expression Language Scope:不支持
Sensitive:false
Required:true
- Fetch Size 每次从结果集中获取的行数。这是对数据库驱动程序的提示,可能不被遵循或精确执行。若值为零,则忽略此提示。若使用PostgreSQL,必须将'Set Auto Commit'设置为'false'才能使'Fetch Size'生效。
Display Name:Fetch Size
Description:每次从结果集中获取的行数。这是对数据库驱动程序的提示,可能不被遵循或精确执行。若值为零,则忽略此提示。若使用PostgreSQL,必须将'Set Auto Commit'设置为'false'才能使'Fetch Size'生效。
API Name:Fetch Size
Default Value:0
Expression Language Scope:JVM级环境变量与系统属性
Sensitive:false
Required:true
- Initial Load Strategy 首次启动处理器(或其状态被清除)时如何处理数据库表中的现有行。若配置了任何'initial.maxvalue.*'动态属性,此属性将被忽略。
Display Name:Initial Load Strategy
Description:首次启动处理器(或其状态被清除)时如何处理数据库表中的现有行。若配置了任何'initial.maxvalue.*'动态属性,此属性将被忽略。
API Name:initial-load-strategy
Default Value:Start at Beginning
Allowable Values:
- Start at Beginning
- Start at Current Maximum Values
Expression Language Scope:不支持
Sensitive:false
Required:true
- Max Wait Time SQL选择查询允许运行的最长时间,零表示无限制。小于1秒的时间将视为零。
Display Name:Max Wait Time
Description:SQL选择查询允许运行的最长时间,零表示无限制。小于1秒的时间将视为零。
API Name:Max Wait Time
Default Value:0 seconds
Expression Language Scope:环境变量与FlowFile属性
Sensitive:false
Required:true
- Maximum-value Columns 以逗号分隔的列名列表。处理器将跟踪自启动以来每个列返回的最大值。使用多列意味着列列表的顺序性,且每列的值增长应比前一列更慢。通常用于分区表。此处理器可用于仅检索自上次检索后新增/更新的行。注意:某些JDBC类型(如bit/boolean)不适合维护最大值,不应在此属性中列出,否则会导致处理错误。若未提供列,则考虑表中所有行,可能影响性能。注意:为增量获取正常工作,必须对给定表使用一致的max-value列名。
Display Name:Maximum-value Columns
Description:以逗号分隔的列名列表。处理器将跟踪自启动以来每个列返回的最大值。使用多列意味着列列表的顺序性,且每列的值增长应比前一列更慢。通常用于分区表。此处理器可用于仅检索自上次检索后新增/更新的行。注意:某些JDBC类型(如bit/boolean)不适合维护最大值,不应在此属性中列出,否则会导致处理错误。若未提供列,则考虑表中所有行,可能影响性能。注意:为增量获取正常工作,必须对给定表使用一致的max-value列名。
API Name:Maximum-value Columns
Expression Language Scope:环境变量与FlowFile属性
Sensitive:false
Required:false
- Maximum Number of Fragments 最大片段数。若值为零,则返回所有片段。防止因处理超大表导致内存不足错误。注意:设置此属性可能导致数据丢失,因为结果集未排序,片段可能在任意边界结束,导致部分行未被包含。
Display Name:Maximum Number of Fragments
Description:最大片段数。若值为零,则返回所有片段。防止因处理超大表导致内存不足错误。注意:设置此属性可能导致数据丢失,因为结果集未排序,片段可能在任意边界结束,导致部分行未被包含。
API Name:qdbt-max-frags
Default Value:0
Expression Language Scope:JVM级环境变量与系统属性
Sensitive:false
Required:true
- Max Rows Per Flow File 单个FlowFile中包含的最大行数。用于将超大结果集拆分为多个FlowFile。若值为零,则所有行写入单个FlowFile。
Display Name:Max Rows Per Flow File
Description:单个FlowFile中包含的最大行数。用于将超大结果集拆分为多个FlowFile。若值为零,则所有行写入单个FlowFile。
API Name:qdbt-max-rows
Default Value:0
Expression Language Scope:JVM级环境变量与系统属性
Sensitive:false
Required:true
- Output Batch Size 在提交进程会话前要排队的输出FlowFile数量。设置为零时,会话将在所有结果集行处理完毕且FlowFile准备传输到下游关系时提交。对于大型结果集,这可能导致FlowFile在处理器执行结束时集中传输。若设置此属性,则当指定数量的FlowFile准备传输时提交会话,从而释放FlowFile到下游关系。注意:设置此属性时,FlowFile不会包含maxvalue.*和fragment.count属性。
Display Name:Output Batch Size
Description:在提交进程会话前要排队的输出FlowFile数量。设置为零时,会话将在所有结果集行处理完毕且FlowFile准备传输到下游关系时提交。对于大型结果集,这可能导致FlowFile在处理器执行结束时集中传输。若设置此属性,则当指定数量的FlowFile准备传输时提交会话,从而释放FlowFile到下游关系。注意:设置此属性时,FlowFile不会包含maxvalue.*和fragment.count属性。
API Name:qdbt-output-batch-size
Default Value:0
Expression Language Scope:JVM级环境变量与系统属性
Sensitive:false
Required:true
- Set Auto Commit 允许启用或禁用数据库连接的自动提交功能。默认值为'No value set'。'No value set'将保持数据库连接的自动提交模式不变。对于PostgreSQL等JDBC驱动,需禁用自动提交以使'Fetch Size'生效。启用自动提交时,PostgreSQL驱动会忽略'Fetch Size'并将结果集所有行一次性加载到内存中,可能导致处理大数据集时内存占用过高。此行为的更多细节参见https://jdbc.postgresql.org//documentation/head/query.html。
Display Name:Set Auto Commit
Description:允许启用或禁用数据库连接的自动提交功能。默认值为'No value set'。'No value set'将保持数据库连接的自动提交模式不变。对于PostgreSQL等JDBC驱动,需禁用自动提交以使'Fetch Size'生效。启用自动提交时,PostgreSQL驱动会忽略'Fetch Size'并将结果集所有行一次性加载到内存中,可能导致处理大数据集时内存占用过高。此行为的更多细节参见
https://jdbc.postgresql.org//documentation/head/query.html。
API Name:Set Auto Commit
Allowable Values:
- true
- false
Expression Language Scope:JVM级环境变量与系统属性
Sensitive:false
Required:false
- Table Name 要查询的数据库表名。使用自定义查询时,此属性用于为查询设置别名,并作为属性出现在FlowFile上。
Display Name:Table Name
Description:要查询的数据库表名。使用自定义查询时,此属性用于为查询设置别名,并作为属性出现在FlowFile上。
API Name:Table Name
Expression Language Scope:环境变量与FlowFile属性
Sensitive:false
Required:true
- Transaction Isolation Level 此设置将为支持此功能的数据库驱动设置连接的事务隔离级别。
Display Name:Transaction Isolation Level
Description:此设置将为支持此功能的数据库驱动设置连接的事务隔离级别。
API Name:transaction-isolation-level
Allowable Values:
- TRANSACTION_NONE
- TRANSACTION_READ_COMMITTED
- TRANSACTION_READ_UNCOMMITTED
- TRANSACTION_REPEATABLE_READ
- TRANSACTION_SERIALIZABLE
Expression Language Scope:不支持
Sensitive:false
Required:false
动态属性
- initial.maxvalue.
添加。此值仅在首次访问表时使用(当指定Maximum Value Column时)。
Name:initial.maxvalue.
Description:为max-value列指定初始最大值。属性需按格式initial.maxvalue.
Value:指定列的初始最大值
Expression Language Scope:ENVIRONMENT
状态管理
Scopes | 描述 |
CLUSTER | 对指定表执行查询后,将保留指定列的最大值以供后续执行使用。这使得处理器仅获取最大值超过保留值的记录。可用于增量获取、新增行获取等。要清除最大值,请根据状态管理文档清除处理器状态。 |
Relationships
名称 | 描述 |
success | 从SQL查询结果集成功创建FlowFile。 |
Writes Attributes
名称 | 描述 |
tablename | 被查询的表名 |
querydbtable.row.count | 查询选择的行数 |
fragment.identifier | 若设置'Max Rows Per Flow File',则同一查询结果集的所有FlowFile将具有相同的fragment.identifier属性值,用于关联结果。 |
fragment.count | 若设置'Max Rows Per Flow File',则表示单个结果集生成的FlowFile总数。可与fragment.identifier结合使用以了解同一结果集包含的FlowFile数量。若设置Output Batch Size,则不填充此属性。 |
fragment.index | 若设置'Max Rows Per Flow File',则表示此FlowFile在源自同一结果集的FlowFile列表中的位置。可与fragment.identifier结合以确定FlowFile来源及顺序。 |
maxvalue.* | 每个属性包含指定'Maximum-value Column'的观测最大值。属性后缀为列名。若设置Output Batch Size,则不填充此属性。 |
See Also
- org.apache.nifi.processors.standard.ExecuteSQL
- org.apache.nifi.processors.standard.GenerateTableFetch