kafka跨地区跨集群同步工具MirrorMaker2 —— 筑梦之路

MM2简介 

KIP-382: MirrorMaker 2.0 - Apache Kafka - Apache Software Foundation

有四种运行MM2的方法:

As a dedicated MirrorMaker cluster.(作为专用的MirrorMaker群集)
As a Connector in a distributed Connect cluster.(作为分布式Connect群集中的连接器)
As a standalone Connect worker.(作为独立的Connect工作者)
In legacy mode using existing MirrorMaker scripts.(在旧模式下,使用现有的MirrorMaker脚本。)

MM2集群模式

配置文件示例1:

name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2# 定义集群别名
clusters = event-center, event-center-new# 设置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允许同步topic的正则
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*# MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1# 自定义参数
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=60
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 远端创建新topic的replication数量设置
replication.factor=3

注意:

replication.policy.class 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。

官方也给出了解释:

这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作.

参考资料:

【如何保持topic名称一致】

【Kafka】MM2同步Kafka集群时如何自定义复制策略(ReplicationPolicy)_kafka mm2同步的目标topic名不一样-CSDN博客

配置文件示例2:

#定义集群别名
clusters = A, B
A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092 # 设置A集群的kafka地址列表
B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092 # 设置B集群的kafka地址列表
A->B.enabled = true # 开启A集群向B集群同步
A->B.topics = .* # 允许同步topic的正则B->A.enabled = true # 开启B集群向A集群同步
B->A.topics = .* # 允许同步topic的正则#MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1#自定义参数
sync.topic.configs.enabled=true #是否同步源topic配置信息
sync.topic.acls.enabled=true #是否同步源ACL信息
emit.heartbeats.enabled=true #连接器是否发送心跳
emit.heartbeats.interval.seconds=5 #心跳间隔
emit.checkpoints.enabled=true #是否发送检查点
refresh.topics.enabled=true #是否刷新topic列表
refresh.topics.interval.seconds=5 #刷新间隔
refresh.groups.enabled=true #是否刷新消费者组id
refresh.groups.interval.seconds=5 #刷新间隔
readahead.queue.capacity=500 #连接器消费者预读队列大小
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy #使用LegacyReplicationPolicy模仿MM1
heartbeats.topic.retention.ms=1 day #首次创建心跳主题时,设置心跳数据保留时长
checkpoints.topic.retention.ms=1 day #首次创建检查点主题时,设置检查点数据保留时长
offset.syncs.topic.retention.ms=max long #首次创建偏移量主题时,设置偏移量数据保留时长
replication.factor=2 #远端创建新topic的replication数量设置

Standalone模式运行

需要两个配置文件,一个是作为worker的kafka集群信息(worker.properties),另一个是同步数据的配置(connector.properties)

worker.properties配置文件

bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAINkey.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverteroffset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connector.properties配置文件

name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=30
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=30
# 连接器消费者预读队列大小
# readahead.queue.capacity=500
# 使用自定义策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3

 启动命令

./connect-standalone.sh worker.properties connector.properties

参考资料:

kafka 异步双活方案 mirror maker2 深度解析 - 知乎

Apache Kafka MirrorMaker 2.0 指南 - Azure HDInsight | Microsoft Learn

简述 Kafka Mirror Maker v2 – K's Blog

Getting up to Speed with MirrorMaker 2 - Confluent

9.4. 使用 MirrorMaker 2.0 在 Kafka 集群间同步数据 Red Hat AMQ 2021.q3 | Red Hat Customer Portal

https://blog.51cto.com/u_14049791/5703235

KafkaMirrorMaker2.0架构与实战全解《三》 - 墨天轮

2.4. Kafka MirrorMaker 2 集群配置 Red Hat AMQ Streams 2.4 | Red Hat Customer Portal

kafka:MirrorMaker-V1(MM1)到MirrorMaker-V2(MM2)-腾讯云开发者社区-腾讯云

【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程_kafka数据同步到另外一个kafka-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/17843.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

每日一练 - 揭秘高级ACL的奥秘

01 真题题目 以下关于高级 ACL 描述正确的是: A.高级 ACL 支持基于协议类型过滤报文 B.可以过滤的协议号的取值可以是 1-255 C.编号范围 3000-3999 D.可以定义生效时间 E.可以根据 MAC 地址过滤报文 02 真题答案 ABCD 03 答案解析 A. 正确:高级ACL的…

使用IDEA远程debug调试

文章目录 应用背景开启方式IDEA设置启动脚本改造 参考资料 应用背景 springboot项目,部署到服务器上,需要开启远程debug跟踪代码。 使用idea开启远程debug。 开启方式 IDEA设置 选择 Edit Configuration 如图,点击加号,选择Re…

【机器学习】利用机器学习优化陆军战术决策与战场态势感知

🔒文章目录: 💥1.引言 🛴2.机器学习在陆军战术决策中的应用 🛣️2.1数据收集与预处理 🌄2.2模型构建与训练: 🌅2.3实时决策支持: 🌅2.4代码实现 &…

力扣:454. 四数相加 II

454. 四数相加 II 给你四个整数数组 nums1、nums2、nums3 和 nums4 &#xff0c;数组长度都是 n &#xff0c;请你计算有多少个元组 (i, j, k, l) 能满足&#xff1a; 0 < i, j, k, l < nnums1[i] nums2[j] nums3[k] nums4[l] 0 示例 1&#xff1a; 输入&#xff…

排序算法——上

一、冒泡排序&#xff1a; 1、冒泡排序算法的思想 我们从左边开始把相邻的两个数两两做比较&#xff0c;当一个元素大于右侧与它相邻的元素时&#xff0c;交换它们之间位置&#xff1b;反之&#xff0c;它们之间的位置不发生变化。冒泡排序是一种稳定的排序算法。 2、代码实现…

5月20日分割等和子集+最后一块石头的重量Ⅱ

416.分割等和子集 给你一个 只包含正整数 的 非空 数组 nums 。请你判断是否可以将这个数组分割成两个子集&#xff0c;使得两个子集的元素和相等。 示例 1&#xff1a; 输入&#xff1a;nums [1,5,11,5] 输出&#xff1a;true 解释&#xff1a;数组可以分割成 [1, 5, 5] 和…

【ai】LiveKit Agent 的example及python本地开发模式工程实例

title: ‘LiveKit Agent Playground’ playgroundLiveKit Community playground的环境变量&#xff1a;LiveKit API # LiveKit API Configuration LIVEKIT_API_KEYYOUR_API_KEY LIVEKIT_API_SECRETYOUR_API_SECRET# Public configuration NEXT_PUBLIC_LIVEKIT_URLwss://YOUR_…

持续总结中!2024年面试必问 20 道 Rocket MQ面试题(一)

一、请简述什么是RocketMQ&#xff1f; RocketMQ是一个开源的消息中间件&#xff0c;由阿里巴巴团队开发&#xff0c;主要设计用于分布式系统中的异步通信、应用解耦、流量削峰和消息持久化。它支持高吞吐量、高可用性、可扩展性和容错性&#xff0c;是构建大规模实时消息处理…

Linux系统keepalived实现主备高可用方案

Linux系统keepalived实现主备高可用方案 环境准备 装备两台机器&#xff0c;IP地址信息如下&#xff1a; host1&#xff1a; 192.168.18.180 host2&#xff1a; 192.168.18.183 虚拟vip: 192.168.18.188为了测试&#xff0c;分别在两台机器上安装nginx服务&#xff0c;使下面…

React暴露组件的方法给全局作用域调用

在React中&#xff0c;如果你想要暴露组件的方法给全局作用域调用&#xff0c;你可以使用一个全局变量来引用你的组件实例&#xff0c;或者使用Context API来创建一个全局状态&#xff0c;通过它来传递方法引用。 以下是使用Context API的一个简单例子&#xff1a; 创建一个C…

JAVA智慧工厂制造生产管理MES系统,全套源码,多端展示(MES与ERP系统的区别和联系)

MES与ERP系统的区别和联系 MES制造执行系统&#xff0c;是一套面向制造公司车间执行层的生产信息化管理系统。MES 可觉得公司提供涉及制造数据管理、计划排产管理、生产调度管理、库存管理、质量管理、人力资源管理、工作中心、设备管理、工具工装管理、采购管理、成本管理、项…

为什么推荐前端用WebStorm软件编程?

一、介绍 WebStorm是由JetBrains公司开发的一款JavaScript开发工具&#xff0c;被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。它支持JavaScript、ECMAScript 6、TypeScript、CoffeeScript、Dart和Flow等多种语言的代码…

大学搜题软件音乐类?分享三个支持答案和解析的工具 #微信#媒体

高效的学习工具可以帮助我们提高记忆力和理解能力&#xff0c;使知识更加深入人心。 1.彩虹搜题 这是个微信公众号 一款专门供全国大学生使用的查题神器!致力于帮助大学生解决学习上的难题,涵盖了大学生学习所需的学习资料。 下方附上一些测试的试题及答案 1、甲、乙合伙开…

goimghdr,一个有趣的 Python 库!

更多Python学习内容&#xff1a;ipengtao.com 大家好&#xff0c;今天为大家分享一个有趣的 Python 库 - goimghdr。 Github地址&#xff1a;https://github.com/corona10/goimghdr 在图像处理和分析过程中&#xff0c;识别图像文件的类型是一个常见的需求。Python自带的imghdr…

开源与闭源:AI模型发展的两条路径

目录 前言1 数据隐私保护与用户数据安全1.1 开源大模型的透明性与挑战1.2 闭源大模型的控制与责任 2 商业应用的优劣比较2.1 开源大模型的灵活性与创新2.2 闭源大模型的可靠性与服务质量 3 社区参与与合作的差异3.1 开源大模型的社区驱动与协作3.2 闭源大模型的企业主导与保密性…

【openlayers系统学习】3.3假彩色图像合成(三个波段合成假彩色图像)

三、假彩色图像合成 在上一步中&#xff0c;我们使用 ol/source/GeoTIFF​ 源从单个多波段源&#xff08;具有红色、绿色、蓝色和Alpha波段&#xff09;渲染真彩色图像。在下面这个例子中&#xff0c;我们将从可见光谱之外提取数据&#xff0c;并使用它来呈现假彩色合成。 我…

ts面试题: 面试题2

31. 计算字符串长度 // 计算字符串的长度&#xff0c;类似于 String#length 。答案 type test Str1<"abc123">; type Str1<T extends string, L extends any[] []> T extends ${infer f}${infer b} ? Str1<b, [...L, f]> : L[length];32. 接…

JavaScript中的var变量详解:定义、提升与注意事项

在JavaScript中&#xff0c;var关键字用于声明变量。虽然ES6引入了let和const作为更现代的变量声明方式&#xff0c;但理解var的工作原理对于学习JavaScript基础依然至关重要。下面将深入探讨var变量的定义、变量提升现象以及一些值得注意的使用细节。 var变量定义 var声明创…

【基于 PyTorch 的 Python 深度学习】9 目标检测与语义分割(2)

前言 文章性质&#xff1a;学习笔记 &#x1f4d6; 学习资料&#xff1a;吴茂贵《 Python 深度学习基于 PyTorch ( 第 2 版 ) 》【ISBN】978-7-111-71880-2 主要内容&#xff1a;根据学习资料撰写的学习笔记&#xff0c;该篇主要介绍了优化候选框的几种方法。 一、优化候选框的…

抖店怎么选品?抖店爆款选品思路技巧,新手直接用!

大家好&#xff0c;我是电商花花。 抖店选品永远是我们做抖店&#xff0c;做电商的核心&#xff0c;店铺想要出单&#xff0c;想要赚钱&#xff0c;我们就一定要学会怎么选品&#xff0c;怎么筛选商品。 而我们绝大多数新手并没有办法保证持续选爆款的能力&#xff0c;如果店…