Kafka集群数据迁移方案

概述

MirrorMaker2(后文简称 MM2)在 2019 年 12 月随 Kafka 2.4.0 一起推出。顾名思义,是为了解决 Kafka 集群之间数据复制和数据同步的问题而诞生的 Kafka 官方的数据复制工具。在实际生产中,经常被用来实现 Kafka 数据的备份,迁移和灾备等目的。

使用场景

Kafka MM2适用于下列场景:

  • 远程数据同步:通过MM2,Kafka数据可以在不同地域的集群进行传输复制。
  • 灾备场景:通过MM2,可以构建不同数据中心的主备两个集群容灾架构,MM2实时同步两个集群的数据。当其中一个集群不可用时,可以将上面的应用程序切换到另一个集群,从而实现异地容灾功能。
  • 数据迁移场景:在业务上云、混合云、集群升级等场景,存在数据从旧集群迁移到新集群的需求。此时,您可以使用MM2实现新旧数据的迁移,保证业务的连续性。
  • 聚合数据中心场景:通过MM2,可以将多个Kafka子集群的数据同步到一个中心Kafka集群,实现数据的汇聚。

功能

Kafka MM2作为数据复制工具,具有以下功能:

  • 复制topics数据以及配置信息。
  • 复制consumer groups及其消费topic的offset信息。
  • 复制ACLs。
  • 自动检测新的topic以及partition。
  • 提供MM2的metrics。
  • 高可用以及可水平扩展的框架。

任务执行方式

MM2任务有以下执行方式:

  • Distributed Connect集群的connector方式(推荐):在已有Connect集群执行MM2
    connector任务的方式。您可以参照本文使用Connect集群服务的功能来管理MM2任务。
  • Dedicated MirrorMaker集群方式:不需要使用Connect集群执行MM2
    connector任务,而是直接通过Driver程序管理MM2的所有任务。
  • Standalone Connect的worker方式:执行单个MirrorSourceConnector任务,适合在测试场景下使用。

说明

推荐在Distributed Connect集群上启动MM2 connector任务,可以借助Connect集群的Rest服务管理MM2任务。

使用限制

  1. 为保证生产集群的数据完整和安全,必须先在测试集群进行测试
  2. 源集群与目标集群的Kafka软件版本为2.12_2.4.1及以上。
  3. MM2 迁移任务会增加CPU和内存的占用,尽量停止客户端生产与消费,或根据数据量大小,选择在窗口时间进行迁移。

迁移方案

集群情况

集群配置

主题配置:确保目标集群中的主题配置(如分区数、副本数、保留策略等)与源集群一致,或根据业务需求进行调整。
Broker 配置:检查每个 broker 的配置参数,确保两者之间的兼容性。

数据分布与负载

分区分布:了解源集群中各主题的分区分布情况,以便在目标集群中合理安排分区。
负载评估:分析源集群的负载,确保目标集群有足够的能力来处理迁移后的数据流。

安全性和权限

认证与授权:检查源集群和目标集群的安全设置,确保数据迁移过程中涉及的用户和角色具有适当的权限。

兼容性和版本

Kafka 版本:确保两个集群的 Kafka 版本兼容,特别是在使用特性时,避免因版本差异引发的问题。
消息格式:验证消息格式和序列化机制在两个集群中的一致性。

创建测试topic

# 根据目标集群与业务需求进行调整目标topic副本与分区数,测试不做要求

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topictest --replication-factor 3

在这里插入图片描述

2.3 生产消息

bash-5.0$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topictest
>1
>2
>3
>4
>5
>6
>7
>8
>

在这里插入图片描述

2.4 记录偏移量

记录 Kafka 主题的偏移量信息

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic s.topictest --time  -1

在这里插入图片描述

MM2配置文件

# 指定两个集群,以及对应的host
clusters = s,d
s.bootstrap.servers = xxxx:9092
d.bootstrap.servers = yyyy:9092
# 指定同步备份的topic & consumer group,支持正则
s->d.topics = topictest
s->d.groups = .*
# 指定复制链条,可以是双向的
s->d.enabled = true
# us-east->us-west.enabled = true  # 双向,符合条件的两个集群的topic会相互备份

全量配置

如有其他需要,可添加其他使用参数# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = A,B
#replication.policy.separator=""
#source.cluster.alias=""
#target.cluster.alias=""
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
A.bootstrap.servers = xxxx:9092
B.bootstrap.servers = yyyy:9092
A.security.protocol=SASL_PLAINTEXT
A.sasl.mechanism=SCRAM-SHA-512
A.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \username="xxx" password="xxx";
B.security.protocol=SASL_PLAINTEXT
B.sasl.mechanism=SCRAM-SHA-512
B.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \username="xxx" password="xxx";
# enable and configure individual replication flows
# 设置同步的流向
A->B.enabled = true
#A.producer.enable.idempotence = true
#B.producer.enable.idempotence = true
# regex which defines which topics gets replicated. For eg "foo-.*"
#A->B.topics = hadoopLogCollection,t_biz_act_mmetric
#设置同步的topic;支持正则
A->B.topics = xxxx,xxxx
#设置排除的topic:支持正则
A->B.topics.exclude= xxxx#B->A.enabled = true
#B->A.topics = .*# Setting replication factor of newly created remote topics
replication.factor=3############################# Internal Topic Settings  #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
sync.topic.configs.enabled=true
#同步配置的时间频率
sync.topic.configs.enabled.interval.seconds=60
checkpoints.topic.replication.factor=2
heartbeats.topic.replication.factor=2
offset-syncs.topic.replication.factor=2
#offset-syncs.topic.location = target#启动同步的Task数量----启用几个线程进行同步
tasks.max = 5# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offset.storage.replication.factor=2
status.storage.replication.factor=2
config.storage.replication.factor=2# customize as needed
# replication.policy.separator = _
sync.topic.acls.enabled = true
emit.heartbeats.interval.seconds = 5#开启topic动态和消费者组 动态同步与同步的周期
refresh.topics.enabled = true
refresh.topics.interval.seconds = 60
refresh.groups.enabled = true
refresh.groups.interval.seconds = 60# 开始消费者组offset同步;设置同步的周期---注意:仅仅同步idle中的消费者的offset
sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 5#设置同步的topic Name命名规则;3.0版本提供了两种topic同步命名规则,默认会带上前缀,也可以手动不带前缀的----此时不能做双向同步
replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy

执行迁移进程

bash-5.0$ ./bin/connect-mirror-maker.sh mm2.properties

在这里插入图片描述

验证数据同步

topic迁移后会变为s.topic
在这里插入图片描述

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic s.topictest --time  -1

在这里插入图片描述
与源集群偏移量相同,消费消息正常
在这里插入图片描述
确认消息偏移量,保证数据一致性。
源集群停止mm2迁移进程,并将业务连接到新集群中即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/58776.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Prometheus监控平台部署与应用

Prometheus特点 多维数据模型 PromSQL:一种灵活的查询语言,可以利用多维数据完成复杂的查询 不依赖分布式存储,单个服务器节点可直接工作 基于HTTP的pull方式采集时间序列数据 推送时间序列数据通过PushGateway组件支持 通过服务发现或静态配…

vue3 栅栏式拖拽布局组件

先看效果&#xff1a; 使用方法&#xff1a; 1、npm install fencelayout 2、引入使用 <template><Fencelayout><!-- 需要写的模块直接嵌套在这个下面就可以 --><div class"aaaa"><a-button>模块1</a-button></div><…

探索设计模式:命令模式

探索设计模式&#xff1a;命令模式 &#x1f9d0;1. 概念&#x1f3af;2. 作用&#x1f4e6;3. 实现3.1 定义命令接口3.2 实现具体命令3.3 实现接收者3.4 实现调用者3.5 使用 &#x1f4bb;4. 应用场景 命令模式&#xff08;Command Pattern&#xff09;就是一种行为型设计模式…

茅台最新任务脚本

茅台最新任务脚本 –小白教程— 这个脚本的作用是实现i茅台应用的自动预约功能&#xff0c;主要功能包括生成请求头、预约商品、计算距离和库存情况、发送微信推送消息等。 代码如下#!/usr/bin/python3cron: 0 0 9/21 * * * new Env(i茅台) import logging import sysimpor…

​CSS之三

CSS三大特性 CSS 有三个非常重要的三个特性:层圣性、继承性、优先级 层叠性 相同选择器给设置相同的样式&#xff0c;此时一个样式就会覆盖(层曼)另一个冲突的样式。层曼性主要解决样式冲突的问题 层叠性原则: - 样式冲突&#xff0c;遵循的原则是就近原则&#xff0c;哪个…

C++设计模式创建型模式———简单工厂模式、工厂方法模式、抽象工厂模式

文章目录 一、引言二、简单工厂模式三、工厂方法模式三、抽象工厂模式四、总结 一、引言 创建一个类对象的传统方式是使用关键字new &#xff0c; 因为用 new 创建的类对象是一个堆对象&#xff0c;可以实现多态。工厂模式通过把创建对象的代码包装起来&#xff0c;实现创建对…

python爬虫抓取豆瓣数据教程

环境准备 在开始之前&#xff0c;你需要确保你的Python环境已经安装了以下库&#xff1a; requests&#xff1a;用于发送HTTP请求。BeautifulSoup&#xff1a;用于解析HTML文档。 如果你还没有安装这些库&#xff0c;可以通过以下命令安装&#xff1a; pip install requests…

代码-画图函数示例

热力图 import matplotlib.pyplot as plt import seaborn as sns import numpy as npdef create_heatmap(people, categories, dataNone, title热力图, xlabel类别, ylabel人员,value_range(0.6, 0.95), figsize(10, 6),cmapYlOrRd, decimal_places3):"""创建热…

2024最新Twitter养号全面指南,品牌起号必看!

X (Twitter)作为活跃用户数以亿计的社交媒体平台&#xff0c;用户数依然在不断增长&#xff0c;其中巨大的流量吸引着个人用户与品牌和卖家。 Twitter养号是有必要的&#xff0c;有大量案例表明养好号&#xff0c;可以大幅度降低账号被冻结的几率&#xff0c;并提升账号的稳定…

百度如何打造AI原生研发新范式?

&#x1f449;点击即可下载《百度AI原生研发新范式实践》资料 2024年10月23-25日&#xff0c;2024 NJSD技术盛典暨第十届NJSD软件开发者大会、第八届IAS互联网架构大会在南京召开。本届大会邀请了工业界和学术界的专家&#xff0c;优秀的工程师和产品经理&#xff0c;以及其它行…

基于大语言模型(LLM)自主Agent 智能体综述

近年来,LLM(Large Language Model)取得了显著成功,并显示出了达到人类智能的巨大潜力。基于这种能力,使用LLM作为中央控制器来构建自助Agent,以获得类人决策能力。 Autonomous agents 又被称为智能体、Agent。指能够通过感知周围环境、进行规划以及执行动作来完成既定任务。…

电脑怎么设置开机密码:保障个人信息安全的第一步

在数字化时代&#xff0c;个人信息的安全至关重要。电脑作为我们日常工作和生活中不可或缺的设备&#xff0c;存储了大量的私人数据和敏感信息。为了防止未经授权的访问&#xff0c;设置开机密码是保护个人隐私和信息安全的基本措施之一。本文将详细介绍如何在不同操作系统下为…

分析 std::optional 的使用与常见错误

文章目录 引言常见错误及解决方案1. 错误使用 std::optional 变量进行算术运算2. 错误检查 std::optional 是否有值3. 忽视 std::optional 的默认值 结论 引言 std::optional 是 C17 引入的一个模板类&#xff0c;用于表示可能有也可能没有值的情况。它特别适用于函数返回值&a…

DB-GPT系列(二):DB-GPT部署(镜像一键部署、源码部署)

一、简介 DB-GPT 是一个开源项目&#xff0c;其将大语言模型 LLM 与数据库紧密结合。该项目主要致力于探索如何让预训练的大规模语言模型&#xff08;例如 GPT&#xff09;能够直接与数据库进行交互&#xff0c;从而生成更为准确且信息丰富的回答。 DB-GPT部署后能否直接使用…

Web组件之 Listener (监听器)

文章目录 1.1 Listener概述1.2 Listener快速入门① xml版本② 注解版本 1.3 案例&#xff1a;模拟spring框架 1.1 Listener概述 ​ JavaWeb 中的监听器是监听 ServletContext HttpSession HttpServletRequest 三个数据域对象创建和销毁以及监听数据域对象中数据的变化&#xf…

【论文翻译】IJCAI 2019 | Graph WaveNet:用于深度时空图建模的Graph WaveNet

论文题目Graph WaveNet for Deep Spatial-Temporal Graph Modeling作者团队Zonghan Wu, Shirui Pan, Guodong Long, Jing Jiang, Chengqi Zhang机构澳大利亚悉尼科技大学人工智能中心 (UTS) 和 澳大利亚莫纳什大学发表会议IJCAI 2019论文链接https://www.ijcai.org/proceedings…

Java数组的定义与使用

今天来学习Java数组的定义与使用 目录 1 数组的基本概念1.1 数组的意义1.2 数组的定义1.3 数组的创建及初始化1.3.1 数组的创建1.3.2 数组的初始化 1.4 数组的使用1.4.1 数组中的元素访问1.4.2 遍历数组运行结果运行结果 2 数组是引用类型2.1 初始 JVM 的内存分布2.2 基本类型变…

https://tieba.baidu.com/p/9247698007

微深节能的库区智能化无人天车管理系统结合了格雷母线技术&#xff0c;提供了一种高精度的定位解决方案。格雷母线系统能够实现连续或断续的位置检测&#xff0c;精度高达≤5mm&#xff0c;适用于需要高精度作业的场景&#xff0c;如货物搬运和堆放。这种系统通过实时交互&…

创作里程碑:纪念日回顾与展望

目录 机缘&#xff1a;创作者初心 1. 实战项目 2. 日常学习 3. 技术交流 4. 总结 收获&#xff1a;创作者动力 创作与工作、学习的关系 憧憬&#xff1a;职业规划与创作规划 职业规划&#xff1a; 创作规划&#xff1a; 机缘&#xff1a;创作者初心 回望自己踏上…

软考(中级-软件设计师)数据库篇(1101)

第6章 数据库系统基础知识 一、基本概念 1、数据库 数据库&#xff08;Database &#xff0c;DB&#xff09;是指长期存储在计算机内的、有组织的、可共享的数据集合。数据库中的数据按一定的数据模型组织、描述和存储&#xff0c;具有较小的冗余度、较高的数据独立性和扩展…