debezium官网地址参考: Debezium connector for MySQL :: Debezium Documentation
欢迎关注留言,我是收集整理小能手,笔芯笔芯.
CDC采集数据时会有字段包含敏感字符需要做脱敏处理,debezium提供了4种脱敏方案,注意只有字符串类型可以进行脱敏处理,数字类型不支持
1、字段黑/白名单
字段黑/白名单互斥,只能选择一种配置进行使用
column.exclude.list
column.include.list
column.exclude.list | empty string | An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to exclude from change event record values. Fully-qualified names for columns are of the form databaseName.tableName.columnName. |
empty string | An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to include in change event record values. Fully-qualified names for columns are of the form databaseName.tableName.columnName. |
2、字段截取
column.truncate.to._length_.chars
column.truncate.to._length_.chars | n/a | An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns whose values should be truncated in the change event record values if the field values are longer than the specified number of characters. You can configure multiple properties with different lengths in a single configuration. The length must be a positive integer. Fully-qualified names for columns are of the form databaseName.tableName.columnName. |
_length_ 为需要保留的数值长度,
例如:column.truncate.to.8.chars: dbname.order.address
示例中表示address字段保留8个字符
原文:上海市浦东新区川沙路2301弄
脱敏:上海市浦东新区川
3、字符隐藏显示"*"
column.mask.with._length_.chars
column.mask.with._length_.chars | n/a | An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns whose values should be replaced in the change event message values with a field value consisting of the specified number of asterisk ( |
_length_ 为需要显示几个*号,
例如:column.mask.with._length_.chars: dbname.order.address
示例中表示address字段保留8个字符
原文:上海市浦东新区川沙路2301弄
脱敏:********
4、哈希计算脱敏
column.mask.hash.hashAlgorithm.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt | n/a | An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns. Fully-qualified names for columns are of the form A pseudonym consists of the hashed value that results from applying the specified hashAlgorithm and salt. Based on the hash function that is used, referential integrity is maintained, while column values are replaced with pseudonyms. Supported hash functions are described in the MessageDigest section of the Java Cryptography Architecture Standard Algorithm Name Documentation. column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName If necessary, the pseudonym is automatically shortened to the length of the column. The connector configuration can include multiple properties that specify different hash algorithms and salts. |
_length_ 为需要显示几个*号,
例如:
column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName
column.mask.hash.v2.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName
column.mask.hash.MD5.with.salt.CzQMA0cB5K = inventory.orders.customerName
column.mask.hash.v2.MD5.with.salt.CzQMA0cB5K = inventory.orders.customerName
示例中2种哈希算法,2个版本,共四种规则
哈希脱敏源码 io.debezium.relational.mapping.MaskStrings.MaskingValueConverter
@Immutableprotected static final class MaskingValueConverter implements ValueConverter {protected final String maskValue;public MaskingValueConverter(String maskValue) {this.maskValue = maskValue;assert this.maskValue != null;}@Overridepublic Object convert(Object value) {return maskValue;}}@Immutableprotected static final class HashValueConverter implements ValueConverter {private static final Logger LOGGER = LoggerFactory.getLogger(HashValueConverter.class);private final byte[] salt;private final MessageDigest hashAlgorithm;private final HashingByteArrayStrategy hashingByteArrayStrategy;public HashValueConverter(byte[] salt, String hashAlgorithm, HashingByteArrayStrategy hashingByteArrayStrategy) {this.salt = salt;this.hashingByteArrayStrategy = hashingByteArrayStrategy;try {this.hashAlgorithm = MessageDigest.getInstance(hashAlgorithm);}catch (NoSuchAlgorithmException e) {throw new IllegalArgumentException(e);}}@Overridepublic Object convert(Object value) {if (value instanceof Serializable) {try {return toHash((Serializable) value);}catch (IOException e) {if (LOGGER.isErrorEnabled()) {LOGGER.error("can't calculate hash", e);}}}return null;}private String toHash(Serializable value) throws IOException {hashAlgorithm.reset();hashAlgorithm.update(salt);byte[] valueToByteArray = hashingByteArrayStrategy.toByteArray(value);return convertToHexadecimalFormat(hashAlgorithm.digest(valueToByteArray));}private String convertToHexadecimalFormat(byte[] bytes) {StringBuilder hashString = new StringBuilder();for (byte b : bytes) {hashString.append(String.format("%02x", b));}return hashString.toString();}}/*** V1 default and previous version. Because ObjectOutputStream is used, some characters are added before the actual value.* V2 should be used to fidelity for the value being hashed the same way in different places. The byte array also has only the actual value.**/public enum HashingByteArrayStrategy {V1 {@Overridebyte[] toByteArray(Serializable value) throws IOException {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutput out = new ObjectOutputStream(bos);out.writeObject(value);return bos.toByteArray();}},V2 {@Overridebyte[] toByteArray(Serializable value) {return value.toString().getBytes();}};abstract byte[] toByteArray(Serializable value) throws IOException;}