Debezium日常分享系列之:向 Debezium 连接器发送信号
- 一、概述
- 二、激活源信号通道
- 三、信令数据集合的结构
- 四、创建信令数据集合
- 五、激活kafka信号通道
- 六、数据格式
- 七、激活JMX信号通道
- 八、自定义信令通道
- 九、Debezium 核心模块依赖项
- 十、部署自定义信令通道
- 十一、信号动作
- 十二、记录信号
- 十三、即席快照信号
- 十四、特别快照停止信号
- 十五、增量快照
- 十六、增量快照暂停信号
- 十七、增量快照恢复信号
- 十八、阻止快照信号
- 十九、应用案例
一、概述
Debezium 信号机制提供了一种修改连接器行为或触发一次性操作(例如启动表的临时快照)的方法。要使用信号触发连接器执行指定操作,可以将连接器配置为使用以下一个或多个通道:
- 源信号通道:可以发出 SQL 命令将信号消息添加到专门的信令数据集合中。在源数据库上创建的信令数据集合专门用于与 Debezium 进行通信。
- Kafka信号通道;将信号消息提交到可配置的 Kafka 主题。
- Jmx信号通道:通过 JMX 信号操作提交信号。
- 文件信号通道:可以使用文件来发送信号。
- Custom:将信号提交到实施的自定义通道。当 Debezium 检测到新的日志记录或临时快照记录添加到通道时,它会读取信号并启动请求的操作。
信号传输可与以下 Debezium 连接器一起使用:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
可以通过设置 signal.enabled.channels 配置属性来指定启用哪个通道。该属性列出了已启用的通道的名称。默认情况下,Debezium 提供以下渠道:source 和 kafka。源通道默认启用,因为增量快照信号需要它。
二、激活源信号通道
默认情况下,Debezium 源信令通道已启用。
必须为要使用它的每个连接器显式配置信令。
程序:
- 在源数据库上,创建信令数据收集表,用于向连接器发送信号。
- 对于实现本机变更数据捕获 (CDC) 机制的源数据库(例如 Db2 或 SQL Server),为信令表启用 CDC。
- 将信令数据集合的名称添加到 Debezium 连接器配置中。在连接器配置中,添加属性 signal.data.collection,并将其值设置为您在步骤 1 中创建的信令数据集合的完全限定名称。
例如,signal.data.collection = inventory.debezium_signals。
信令集合的完全限定名称的格式取决于连接器。
以下示例显示了每个连接器使用的命名格式:
- Db2:.
- MongoDB:.
- MySQL:.
- Oracle:..
- PostgreSQL:.
- SQL Server:..
三、信令数据集合的结构
信令数据集合或信令表存储您发送到连接器以触发指定操作的信号。信令表的结构必须符合以下标准格式。
- 包含三个字段(列)。
- 字段按特定顺序排列,如表 1 所示。
表 1. 信令数据集合所需的结构
字段 | 类型 | 描述 |
---|---|---|
id(required) | string | 标识信号实例的任意唯一字符串。为提交到信令表的每个信号分配一个 ID。通常,ID 是 UUID 字符串。可以使用信号实例进行日志记录、调试或重复数据删除。当信号触发 Debezium 执行增量快照时,它会生成带有任意 id 字符串的信号消息。生成的消息包含的 id 字符串与提交信号中的 id 字符串无关。 |
type(required) | string | 指定要发送的信号类型。可以将某些信号类型与任何可提供信号传输的连接器一起使用,而其他信号类型仅可用于特定的连接器。 |
data(optional) | string | 指定要传递给信号操作的 JSON 格式的参数。每种信号类型都需要一组特定的数据。 |
数据集合中的字段名称是任意的。上表提供了建议的名称。如果使用不同的命名约定,请确保每个字段中的值与预期内容一致。
四、创建信令数据集合
可以通过向源数据库提交标准 SQL DDL 查询来创建信令表。
先决条件:
- 有足够的访问权限在源数据库上创建表。
程序:
- 向源数据库提交SQL查询,创建符合所需结构的表,如下例所示:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
注意:
分配给 id 变量的 VARCHAR 参数的空间量必须足以容纳发送到信令表的信号 ID 字符串的大小。如果 ID 的大小超出可用空间,连接器将无法处理信号。
以下示例显示了创建三列 debezium_signal 表的 CREATE TABLE 命令:
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
五、激活kafka信号通道
可以通过将 Kafka 信令通道添加到 signal.enabled.channels 配置属性,然后将接收信号的主题名称添加到 signal.kafka.topic 属性来启用 Kafka 信令通道。启用信令通道后,将创建 Kafka 消费者来消费发送到配置的信号主题的信号。
可供消费者使用的附加配置:
- Db2 connector Kafka signal configuration properties
- MongoDB connector Kafka signal configuration properties
- MySQL connector Kafka signal configuration properties
- Oracle connector Kafka signal configuration properties
- PostgreSQL connector Kafka signal configuration properties
- SQL Server connector Kafka signal configuration properties
注意:
- 要使用 Kafka 信令触发大多数连接器的临时增量快照,必须首先在连接器配置中启用源信令通道。
- 源通道实现了水印机制,以对可能由增量快照捕获并在流恢复后再次捕获的事件进行重复数据删除。
- 使用信令通道触发启用GTID的只读MySQL数据库的增量快照时,不需要启用源通道。
六、数据格式
Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。
该值是具有类型和数据字段的 JSON 对象。
当信号类型设置为执行快照时,数据字段必须包括下表中列出的字段:
表 2. 执行快照数据字段
字段 | 默认值 | 值 |
---|---|---|
type | incremental | 要运行的快照的类型。目前 Debezium 支持增量和阻塞类型。 |
data-collections | N/A | 一组以逗号分隔的正则表达式,与要包含在快照中的数据集合的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。 |
additional-condition | N/A | 一个可选字符串,指定连接器评估的条件,以指定要包含在快照中的记录子集。注意:此属性已弃用,应由附加条件属性替换。 |
additional-conditions | N/A | 一个可选数组,指定连接器评估的一组附加条件,以确定要包含在快照中的记录子集。每个附加条件都是一个对象,指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下属性:数据采集:过滤器应用到的 {data-collection} 的完全限定名称。您可以对每个{data-collection}应用不同的过滤器。过滤:指定数据库记录中必须存在的列值,快照才能包含该列值,例如“color=‘blue’”。快照进程根据过滤器值评估 {data-collection} 中的记录,并仅捕获包含匹配值的记录。分配给过滤器属性的具体值取决于临时快照的类型:对于增量快照,可以指定一个搜索条件片段,例如“color=‘blue’”,快照会将其附加到查询的条件子句中。对于阻塞快照,可以指定完整的 SELECT 语句,例如您可以在 snapshot.select.statement.overrides 属性中设置的语句。 |
以下示例显示了典型的执行快照 Kafka 消息:
Key = `test_connector`Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
七、激活JMX信号通道
可以通过将 jmx 添加到连接器配置中的 signal.enabled.channels 属性来启用 JMX 信号,然后启用 JMX MBean 服务器来公开信号 Bean。
程序
-
使用首选的 JMX 客户端(例如 JConsole 或 JDK Mission Control)连接到 MBean 服务器。
-
搜索 Mbean debezium.<连接器类型>.management.signals.<服务器>。 Mbean 公开接受以下输入参数的信号操作:
- p0:信号的 ID。
- p1:信号的类型,例如执行快照。
- p2:包含有关指定信号类型的附加信息的 JSON 数据字段。
-
通过提供输入参数的值来发送执行快照信号。
在 JSON 数据字段中,包含下表中列出的信息:
表 2. 执行快照数据字段
字段 | 默认值 | 值 |
---|---|---|
type | incremental | 要运行的快照的类型。目前 Debezium 支持增量和阻塞类型。 |
data-collections | N/A | 一组以逗号分隔的正则表达式,与要包含在快照中的数据集合的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。 |
additional-condition | N/A | 一个可选字符串,指定连接器评估的条件,以指定要包含在快照中的记录子集。注意:此属性已弃用,应由附加条件属性替换。 |
additional-conditions | N/A | 一个可选数组,指定连接器评估的一组附加条件,以确定要包含在快照中的记录子集。每个附加条件都是一个对象,指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下属性:数据采集:过滤器应用到的 {data-collection} 的完全限定名称。您可以对每个{data-collection}应用不同的过滤器。过滤:指定数据库记录中必须存在的列值,快照才能包含该列值,例如“color=‘blue’”。快照进程根据过滤器值评估 {data-collection} 中的记录,并仅捕获包含匹配值的记录。分配给过滤器属性的具体值取决于临时快照的类型:对于增量快照,可以指定一个搜索条件片段,例如“color=‘blue’”,快照会将其附加到查询的条件子句中。对于阻塞快照,可以指定完整的 SELECT 语句,例如可以在 snapshot.select.statement.overrides 属性中设置的语句。 |
下图显示了如何使用 JConsole 发送信号的示例:
八、自定义信令通道
信令机制被设计为可扩展的。可以根据需要实施通道,以最适合您环境的方式向 Debezium 发送信号。
添加信令通道涉及几个步骤:
- 为通道创建一个Java项目来实现通道,并添加Debezium Core作为依赖项。
- 部署自定义信令通道。
- 通过修改连接器配置,使连接器能够使用自定义信令通道。
提供自定义信令通道
自定义信号通道是实现 io.debezium.pipeline.signal.channels.SignalChannelReader 服务提供者接口 (SPI) 的 Java 类。例如:
public interface SignalChannelReader {String name(); void init(CommonConnectorConfig connectorConfig); List<SignalRecord> read(); void close();
}
- 读者姓名。要使 Debezium 能够使用通道,请在连接器的 signal.enabled.channels 属性中指定此名称。
- 初始化通道所需的特定配置、变量或连接。
- 从通道读取信号。 SignalProcessor 类调用此方法来检索要处理的信号。
- 关闭所有分配的资源。 Debezium 在连接器停止时调用此方法。
九、Debezium 核心模块依赖项
自定义信令通道 Java 项目具有对 Debezium 核心模块的编译依赖项。必须将这些编译依赖项包含在项目的 pom.xml 文件中,如以下示例所示:
<dependency><groupId>io.debezium</groupId><artifactId>debezium-core</artifactId><version>${version.debezium}</version>
</dependency>
- ${version.debezium} 表示 Debezium 连接器的版本。
- 在 META-INF/services/io.debezium.pipeline.signal.channels.SignalChannelReader 中声明实现
十、部署自定义信令通道
先决条件
- 有一个自定义信令通道 Java 程序。
程序
- 要将自定义信号通道与 Debezium 连接器结合使用,请将 Java 项目导出到 JAR 文件,然后将该文件复制到包含要与其一起使用的每个 Debezium 连接器的 JAR 文件的目录。
- 例如,在典型部署中,Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中,每个连接器 JAR 位于其自己的子目录中 (/kafka/connect/debezium-connector-db2、/kafka /connect/debezium-connector-mysql 等)。
注意:
- 要将自定义信号通道与多个连接器一起使用,必须将自定义信号通道 JAR 文件的副本放置在每个连接器的子目录中。
配置连接器以使用自定义信号通道
- 将自定义信令通道的名称添加到 signal.enabled.channels 配置属性中。
十一、信号动作
可以使用信令来发起以下操作:
- 将消息添加到日志中。
- 触发临时增量快照。
- 停止执行临时快照。
- 暂停增量快照。
- 恢复增量快照。
- 触发临时阻塞快照。
- 自定义动作。
有些信号并不与所有连接器兼容。
十二、记录信号
可以通过创建具有日志信号类型的信令表条目来请求连接器将条目添加到日志中。处理信号后,连接器将指定的消息打印到日志中。或者,可以配置信号,以便生成的消息包含流坐标。
表 4. 用于添加日志消息的信令记录示例
字段 | 值 | 描述 |
---|---|---|
id | 924e3ff8-2245-43ca-ba77-2af9af02fa07 | |
type | log | 信号的动作类型。 |
data | {“message”: “Signal message at offset {}”} | message 参数指定要打印到日志的字符串。 |
如果您向消息添加占位符 ({}),它将被替换为流坐标。 |
十三、即席快照信号
可以通过创建具有执行快照信号类型的信号来请求连接器启动临时快照。处理信号后,连接器运行请求的快照操作。
与连接器首次启动后运行的初始快照不同,临时快照是在连接器已经开始从数据库传输更改事件之后在运行时期间发生的。可以随时启动临时快照。
临时快照可用于以下 Debezium 连接器:
- Db2
- MongoDB
- MySQL
- Oracle
- PostgreSQL
- SQL Server
表 5. 临时快照信号记录示例
字段 | 值 |
---|---|
id | d139b9b7-7777-4547-917d-e1775ea61d41 |
type | execute-snapshot |
data | {“data-collections”: [“public.MyFirstTable”, “public.MySecondTable”]} |
表 6. 即席快照信号消息示例
键 | 值 |
---|---|
test_connector | {“type”:“execute-snapshot”,“data”: {“data-collections”: [“public.MyFirstTable”], “type”: “INCREMENTAL”, “additional-conditions”:[{“data-collection”: “public.MyFirstTable”, “filter”:“color=‘blue’ AND brand=‘MyBrand’”]}} |
其他资源
- Db2 连接器增量快照
- MongoDB 连接器增量快照
- MySQL 连接器增量快照
- Oracle 连接器增量快照
- PostgreSQL 连接器增量快照
- SQL Server 连接器增量快照
十四、特别快照停止信号
可以通过创建具有停止快照信号类型的信号表条目来请求连接器停止正在进行的临时快照。处理完信号后,连接器将停止当前正在进行的快照操作。
表 7. 停止临时快照信号记录示例
字段 | 值 |
---|---|
id | d139b9b7-7777-4547-917d-e1775ea61d41 |
type | stop-snapshot |
data | {“type”:“INCREMENTAL”, “data-collections”: [“public.MyFirstTable”]} |
必须指定信号的类型。数据收集字段是可选的。将数据收集字段留空以请求连接器停止当前快照中的所有活动。如果希望继续执行增量快照,但希望从快照中排除特定集合,请提供要排除的集合或正则表达式的名称的逗号分隔列表。连接器处理信号后,增量快照将继续,但它会排除指定的集合中的数据。
十五、增量快照
增量快照是一种特定类型的临时快照。在增量快照中,连接器捕获您指定的表的基线状态,类似于初始快照。但是,与初始快照不同,增量快照以块的形式捕获表,而不是一次捕获所有表。连接器使用水印方法来跟踪快照的进度。
通过以块的形式而不是在单个整体操作中捕获指定表的初始状态,增量快照比初始快照过程具有以下优势:
- 当连接器捕获指定表的基线状态时,来自事务日志的近实时事件流将继续不间断。
- 如果增量快照过程中断,可以从停止点恢复。
- 可以随时启动增量快照。
十六、增量快照暂停信号
可以通过创建具有暂停快照信号类型的信号表条目来请求连接器暂停正在进行的增量快照。处理完信号后,连接器将停止暂停当前正在进行的快照操作。因此,无法指定数据收集,因为快照处理将暂停在处理信号时的位置。
表 8. 暂停增量快照信号记录示例
字段 | 值 |
---|---|
id | d139b9b7-7777-4547-917d-e1775ea61d41 |
type | pause-snapshot |
必须指定信号的类型。数据字段被忽略。
十七、增量快照恢复信号
可以通过创建具有恢复快照信号类型的信号表条目来请求连接器恢复暂停的增量快照。处理信号后,连接器将恢复之前暂停的快照操作。
表 9. 恢复增量快照信号记录示例
字段 | 值 |
---|---|
id | d139b9b7-7777-4547-917d-e1775ea61d41 |
type | resume-snapshot |
十八、阻止快照信号
可以通过创建具有执行快照信号类型和具有值阻塞的 data.type 的信号来请求连接器启动临时阻塞快照。处理信号后,连接器运行请求的快照操作。
与连接器首次启动后运行的初始快照不同,临时阻塞快照在连接器停止从数据库传输更改事件后在运行时发生。您可以随时启动临时阻止快照。
表 10. 阻塞快照信号记录示例
字段 | 值 |
---|---|
id | d139b9b7-7777-4547-917d-e1775ea61d41 |
type | execute-snapshot |
data | {“type”: “blocking”, “data-collections”: [“schema1.table1”, “schema1.table2”], “additional-conditions”: [{“data-collection”: “schema1.table1”, “filter”: “SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC”}, {“data-collection”: “schema1.table2”, “filter”: “SELECT * FROM [schema1].[table2] WHERE column2 > 0”}]} |
表 11. 阻塞快照信号消息示例
键 | 值 |
---|---|
test_connector | {“type”:“execute-snapshot”,“data”: {“type”: “blocking”} |
十九、应用案例
- Debezium系列之:实现增量快照incremental技术的详细步骤
- Debezium系列之:基于数据库信号表和Kafka信号Topic两种技术方案实现增量快照incremental技术的详细步骤
- Debezium系列之:深入理解临时阻塞快照
更多Debezium实战应用可以参考博主Debezium专栏:
- Debezium专栏,Debezium实战应用详细总结