MM2简介
KIP-382: MirrorMaker 2.0 - Apache Kafka - Apache Software Foundation
有四种运行MM2的方法:
As a dedicated MirrorMaker cluster.(作为专用的MirrorMaker群集)
As a Connector in a distributed Connect cluster.(作为分布式Connect群集中的连接器)
As a standalone Connect worker.(作为独立的Connect工作者)
In legacy mode using existing MirrorMaker scripts.(在旧模式下,使用现有的MirrorMaker脚本。)
MM2集群模式
配置文件示例1:
name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2# 定义集群别名
clusters = event-center, event-center-new# 设置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允许同步topic的正则
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*# MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1# 自定义参数
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=60
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 远端创建新topic的replication数量设置
replication.factor=3
注意:
replication.policy.class 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。
官方也给出了解释:
这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作.
参考资料:
【如何保持topic名称一致】
【Kafka】MM2同步Kafka集群时如何自定义复制策略(ReplicationPolicy)_kafka mm2同步的目标topic名不一样-CSDN博客
配置文件示例2:
#定义集群别名
clusters = A, B
A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092 # 设置A集群的kafka地址列表
B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092 # 设置B集群的kafka地址列表
A->B.enabled = true # 开启A集群向B集群同步
A->B.topics = .* # 允许同步topic的正则B->A.enabled = true # 开启B集群向A集群同步
B->A.topics = .* # 允许同步topic的正则#MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1#自定义参数
sync.topic.configs.enabled=true #是否同步源topic配置信息
sync.topic.acls.enabled=true #是否同步源ACL信息
emit.heartbeats.enabled=true #连接器是否发送心跳
emit.heartbeats.interval.seconds=5 #心跳间隔
emit.checkpoints.enabled=true #是否发送检查点
refresh.topics.enabled=true #是否刷新topic列表
refresh.topics.interval.seconds=5 #刷新间隔
refresh.groups.enabled=true #是否刷新消费者组id
refresh.groups.interval.seconds=5 #刷新间隔
readahead.queue.capacity=500 #连接器消费者预读队列大小
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy #使用LegacyReplicationPolicy模仿MM1
heartbeats.topic.retention.ms=1 day #首次创建心跳主题时,设置心跳数据保留时长
checkpoints.topic.retention.ms=1 day #首次创建检查点主题时,设置检查点数据保留时长
offset.syncs.topic.retention.ms=max long #首次创建偏移量主题时,设置偏移量数据保留时长
replication.factor=2 #远端创建新topic的replication数量设置
Standalone模式运行
需要两个配置文件,一个是作为worker的kafka集群信息(worker.properties),另一个是同步数据的配置(connector.properties)
worker.properties配置文件
bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAINkey.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverteroffset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.properties配置文件
name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=30
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=30
# 连接器消费者预读队列大小
# readahead.queue.capacity=500
# 使用自定义策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3
启动命令
./connect-standalone.sh worker.properties connector.properties
参考资料:
kafka 异步双活方案 mirror maker2 深度解析 - 知乎
Apache Kafka MirrorMaker 2.0 指南 - Azure HDInsight | Microsoft Learn
简述 Kafka Mirror Maker v2 – K's Blog
Getting up to Speed with MirrorMaker 2 - Confluent
9.4. 使用 MirrorMaker 2.0 在 Kafka 集群间同步数据 Red Hat AMQ 2021.q3 | Red Hat Customer Portal
https://blog.51cto.com/u_14049791/5703235
KafkaMirrorMaker2.0架构与实战全解《三》 - 墨天轮
2.4. Kafka MirrorMaker 2 集群配置 Red Hat AMQ Streams 2.4 | Red Hat Customer Portal
kafka:MirrorMaker-V1(MM1)到MirrorMaker-V2(MM2)-腾讯云开发者社区-腾讯云
【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程_kafka数据同步到另外一个kafka-CSDN博客