kafka跨地区跨集群同步工具MirrorMaker2 —— 筑梦之路

MM2简介 

KIP-382: MirrorMaker 2.0 - Apache Kafka - Apache Software Foundation

有四种运行MM2的方法:

As a dedicated MirrorMaker cluster.(作为专用的MirrorMaker群集)
As a Connector in a distributed Connect cluster.(作为分布式Connect群集中的连接器)
As a standalone Connect worker.(作为独立的Connect工作者)
In legacy mode using existing MirrorMaker scripts.(在旧模式下,使用现有的MirrorMaker脚本。)

MM2集群模式

配置文件示例1:

name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2# 定义集群别名
clusters = event-center, event-center-new# 设置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允许同步topic的正则
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*# MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1# 自定义参数
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=60
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 远端创建新topic的replication数量设置
replication.factor=3

注意:

replication.policy.class 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。

官方也给出了解释:

这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作.

参考资料:

【如何保持topic名称一致】

【Kafka】MM2同步Kafka集群时如何自定义复制策略(ReplicationPolicy)_kafka mm2同步的目标topic名不一样-CSDN博客

配置文件示例2:

#定义集群别名
clusters = A, B
A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092 # 设置A集群的kafka地址列表
B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092 # 设置B集群的kafka地址列表
A->B.enabled = true # 开启A集群向B集群同步
A->B.topics = .* # 允许同步topic的正则B->A.enabled = true # 开启B集群向A集群同步
B->A.topics = .* # 允许同步topic的正则#MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1#自定义参数
sync.topic.configs.enabled=true #是否同步源topic配置信息
sync.topic.acls.enabled=true #是否同步源ACL信息
emit.heartbeats.enabled=true #连接器是否发送心跳
emit.heartbeats.interval.seconds=5 #心跳间隔
emit.checkpoints.enabled=true #是否发送检查点
refresh.topics.enabled=true #是否刷新topic列表
refresh.topics.interval.seconds=5 #刷新间隔
refresh.groups.enabled=true #是否刷新消费者组id
refresh.groups.interval.seconds=5 #刷新间隔
readahead.queue.capacity=500 #连接器消费者预读队列大小
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy #使用LegacyReplicationPolicy模仿MM1
heartbeats.topic.retention.ms=1 day #首次创建心跳主题时,设置心跳数据保留时长
checkpoints.topic.retention.ms=1 day #首次创建检查点主题时,设置检查点数据保留时长
offset.syncs.topic.retention.ms=max long #首次创建偏移量主题时,设置偏移量数据保留时长
replication.factor=2 #远端创建新topic的replication数量设置

Standalone模式运行

需要两个配置文件,一个是作为worker的kafka集群信息(worker.properties),另一个是同步数据的配置(connector.properties)

worker.properties配置文件

bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAINkey.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverteroffset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connector.properties配置文件

name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=30
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=30
# 连接器消费者预读队列大小
# readahead.queue.capacity=500
# 使用自定义策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3

 启动命令

./connect-standalone.sh worker.properties connector.properties

参考资料:

kafka 异步双活方案 mirror maker2 深度解析 - 知乎

Apache Kafka MirrorMaker 2.0 指南 - Azure HDInsight | Microsoft Learn

简述 Kafka Mirror Maker v2 – K's Blog

Getting up to Speed with MirrorMaker 2 - Confluent

9.4. 使用 MirrorMaker 2.0 在 Kafka 集群间同步数据 Red Hat AMQ 2021.q3 | Red Hat Customer Portal

https://blog.51cto.com/u_14049791/5703235

KafkaMirrorMaker2.0架构与实战全解《三》 - 墨天轮

2.4. Kafka MirrorMaker 2 集群配置 Red Hat AMQ Streams 2.4 | Red Hat Customer Portal

kafka:MirrorMaker-V1(MM1)到MirrorMaker-V2(MM2)-腾讯云开发者社区-腾讯云

【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程_kafka数据同步到另外一个kafka-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/17843.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用IDEA远程debug调试

文章目录 应用背景开启方式IDEA设置启动脚本改造 参考资料 应用背景 springboot项目,部署到服务器上,需要开启远程debug跟踪代码。 使用idea开启远程debug。 开启方式 IDEA设置 选择 Edit Configuration 如图,点击加号,选择Re…

【机器学习】利用机器学习优化陆军战术决策与战场态势感知

🔒文章目录: 💥1.引言 🛴2.机器学习在陆军战术决策中的应用 🛣️2.1数据收集与预处理 🌄2.2模型构建与训练: 🌅2.3实时决策支持: 🌅2.4代码实现 &…

排序算法——上

一、冒泡排序: 1、冒泡排序算法的思想 我们从左边开始把相邻的两个数两两做比较,当一个元素大于右侧与它相邻的元素时,交换它们之间位置;反之,它们之间的位置不发生变化。冒泡排序是一种稳定的排序算法。 2、代码实现…

5月20日分割等和子集+最后一块石头的重量Ⅱ

416.分割等和子集 给你一个 只包含正整数 的 非空 数组 nums 。请你判断是否可以将这个数组分割成两个子集,使得两个子集的元素和相等。 示例 1: 输入:nums [1,5,11,5] 输出:true 解释:数组可以分割成 [1, 5, 5] 和…

【ai】LiveKit Agent 的example及python本地开发模式工程实例

title: ‘LiveKit Agent Playground’ playgroundLiveKit Community playground的环境变量:LiveKit API # LiveKit API Configuration LIVEKIT_API_KEYYOUR_API_KEY LIVEKIT_API_SECRETYOUR_API_SECRET# Public configuration NEXT_PUBLIC_LIVEKIT_URLwss://YOUR_…

JAVA智慧工厂制造生产管理MES系统,全套源码,多端展示(MES与ERP系统的区别和联系)

MES与ERP系统的区别和联系 MES制造执行系统,是一套面向制造公司车间执行层的生产信息化管理系统。MES 可觉得公司提供涉及制造数据管理、计划排产管理、生产调度管理、库存管理、质量管理、人力资源管理、工作中心、设备管理、工具工装管理、采购管理、成本管理、项…

为什么推荐前端用WebStorm软件编程?

一、介绍 WebStorm是由JetBrains公司开发的一款JavaScript开发工具,被广大中国JS开发者誉为“Web前端开发神器”、“最强大的HTML5编辑器”、“最智能的JavaScript IDE”等。它支持JavaScript、ECMAScript 6、TypeScript、CoffeeScript、Dart和Flow等多种语言的代码…

大学搜题软件音乐类?分享三个支持答案和解析的工具 #微信#媒体

高效的学习工具可以帮助我们提高记忆力和理解能力,使知识更加深入人心。 1.彩虹搜题 这是个微信公众号 一款专门供全国大学生使用的查题神器!致力于帮助大学生解决学习上的难题,涵盖了大学生学习所需的学习资料。 下方附上一些测试的试题及答案 1、甲、乙合伙开…

goimghdr,一个有趣的 Python 库!

更多Python学习内容:ipengtao.com 大家好,今天为大家分享一个有趣的 Python 库 - goimghdr。 Github地址:https://github.com/corona10/goimghdr 在图像处理和分析过程中,识别图像文件的类型是一个常见的需求。Python自带的imghdr…

开源与闭源:AI模型发展的两条路径

目录 前言1 数据隐私保护与用户数据安全1.1 开源大模型的透明性与挑战1.2 闭源大模型的控制与责任 2 商业应用的优劣比较2.1 开源大模型的灵活性与创新2.2 闭源大模型的可靠性与服务质量 3 社区参与与合作的差异3.1 开源大模型的社区驱动与协作3.2 闭源大模型的企业主导与保密性…

【openlayers系统学习】3.3假彩色图像合成(三个波段合成假彩色图像)

三、假彩色图像合成 在上一步中,我们使用 ol/source/GeoTIFF​ 源从单个多波段源(具有红色、绿色、蓝色和Alpha波段)渲染真彩色图像。在下面这个例子中,我们将从可见光谱之外提取数据,并使用它来呈现假彩色合成。 我…

【基于 PyTorch 的 Python 深度学习】9 目标检测与语义分割(2)

前言 文章性质:学习笔记 📖 学习资料:吴茂贵《 Python 深度学习基于 PyTorch ( 第 2 版 ) 》【ISBN】978-7-111-71880-2 主要内容:根据学习资料撰写的学习笔记,该篇主要介绍了优化候选框的几种方法。 一、优化候选框的…

抖店怎么选品?抖店爆款选品思路技巧,新手直接用!

大家好,我是电商花花。 抖店选品永远是我们做抖店,做电商的核心,店铺想要出单,想要赚钱,我们就一定要学会怎么选品,怎么筛选商品。 而我们绝大多数新手并没有办法保证持续选爆款的能力,如果店…

【ARM+Codesys案例】T3/RK3568/树莓派+Codesys锂电池测试设备控制解决方案

锂电池诞生于上世纪60年代,90年代开始由日本索尼公司实现商业化。锂离子电池凭借快速充放电、长循环寿命、无记忆效应等众多优点,成为当今数码产品及电动汽车大规模应用的第一选择。与镍氢电池、铅酸电池相比,锂电池可以存储更多电能。现在&a…

Visual Studio 智能代码插件:CodeGeeX

前言 在软件开发领域,高效的编程助手一直是提升开发者效率和质量的关键。 随着人工智能技术的不断发展,智能编程助手逐渐成为开发者们不可或缺的工具。其中,CodeGeeX作为一款专为Visual Studio设计的免费智能编程助手,凭借其强大…

让大模型更聪明——复杂而艰巨的任务

一、引言 在人工智能领域,大模型因其强大的数据处理能力和复杂的结构,成为了推动技术进步的重要力量。然而,要让大模型真正展现出“聪明”的特质,即具备高度的人类智能水平,仍是一项极具挑战性的任务。本文将从数据质…

Java-Stream流-概述、创建、使用:遍历/匹配、筛选、聚合、映射、归约、排序、提取/组合

Java8-Stream: 一、Stream流概述1.Stream流的特点:2.使用步骤:3.常用方法示例: 二、Stream流创建1.常见的创建Stream的方法2. stream()或parallelStream()方法的使用和选择 三、Stream流使用Optional案例中使用的实体类1.遍历/匹配…

MYSQL之安装

一,下载仓库包 wget -i -c https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm二,安装仓库 yum -y install mysql80-community-release-el7-3.noarch.rpmsed -i s/gpgcheck1/gpgcheck0/g mysql-community.repo三,安装MY…

JS——对象

1.什么是对象 对象是什么? 对象是一种数据类型 无序的数据的集合( 数组是有序的数据集合 ) 对象有什么特点? 无序的数据的集合 可以详细地描述某个事物 静态特征 (姓名, 年龄, 身高, 性别, 爱好) > 可以使用数字, 字符串…

618有哪些值得买的好物?这几款好物通宵整理吐血推荐!

随着618购物节越来越近,很多买家终于等到了用好价钱买好东西的好机会。不管是你一直想要的家居电器,还是最新的数码产品,平时挺贵的东西在618期间会便宜不少。不过,这么多东西可选,促销活动也多得让人看花了眼&#xf…