亚马逊云科技Amazon MSK基于S3云服务器实现导出导入、备份还原、迁移方案

亚马逊云科技Amazon MSK是Amazon云平台提供的托管Kafka服务。在系统升级或迁移时,用户常常需要将一个Amazon MSK集群中的数据导出(备份),然后在新集群或另一个集群中再将数据导入(还原)。通常,Kafka集群间的数据复制和同步多采用Kafka MirrorMaker,但是,在某些场景中,受环境限制,两个于Kafka集群之间的网络可能无法连通,或者两个亚马逊云科技账号相互隔离,亦或是需要将Kafka的数据沉淀为文件存储以备他用。此时,基于Kafka Connect S3 Source/Sink Connector的方案会是一种较为合适的选择,本文就将介绍一下这一方案的具体实现。

 数据的导出、导入、备份、还原通常都是一次性操作,为此搭建完备持久的基础设施并无太大必要,省时省力,简单便捷才是优先的考量因素。为此,本文将提供一套开箱即用的解决方案,方案使用Docker搭建Kafka Connect,所有操作均配备自动化Shell脚本,用户只需设置一些环境变量并执行相应脚本即可完成全部工作。这种基于Docker的单体模式可以应对中小型规模的数据同步和迁移,如果要寻求稳定、健壮的解决方案,可以考虑将Docker版本的Kafka Connect迁移到Kubernetes或Amazon MSK Connect,实现集群化部署。

 整体架构

 首先介绍一下方案的整体架构。导出/导入和备份/还原其实是两种高度类似的场景,但为了描述清晰,我们还是分开讨论。先看一下导出/导入的架构示意图:

 在这个架构中,Source端的MSK是数据流的起点,安装了S3 Sink Connector的Kafka Connect会从Source端的MSK中提取指定Topic的数据,然后以Json或Avro文件的形式存储到S3上;同时,另一个安装了S3 Source Connector的Kafka Connect会从S3上读取这些Json或Avro文件,然后写入到Sink端MSK的对应Topic中。如果Source端和Sink端的MSK集群不在同一个Region,可以在各自的Region分别完成导入和导出,然后在两个Region之间使用S3的Cross-Rejion Replication进行数据同步。

 该架构只需进行简单的调整,即可用于MSK集群的备份/还原,如下图所示:先将MSK集群的数据备份到S3上,待完成集群的升级、迁移或重建工作后,再从S3上将数据恢复到新建集群即可。

 预设条件

 本文聚焦于Kafka Connect的数据导出/导入和备份/还原操作,需要提前准备:

 一台基于Amazon Linux2的EC2实例(建议新建纯净实例),本文所有的实操脚本都将在该实例上执行,该实例也是运行Kafka Connect Docker Container的宿主机。

 两个MSK集群,一个作为Source,一个作为Sink;如果只有一个MSK集群也可完成验证,该集群将既作Source又作Sink。

 为聚焦Kafka Connect S3 Source/Sink Connector的核心配置,预设MSK集群没有开启身份认证(即认证类型为Unauthenticated),数据传输方式为PLAINTEXT,以便简化Kafka Connect的连接配置。

 网络连通性上要求EC2实例能访问S3、Source端MSK集群、Sink端MSK集群。如果在实际环境中无法同时连通Source端和Sink端,则可以在两台分属于不同网络的EC2上进行操作,但它们必须都能访问S3。如果是跨Region或账号隔离,则另需配置S3 Cross-Region Replication或手动拷贝数据文件。

 全局配置

 由于实际操作将不可避免地依赖到具体的亚马逊云科技账号以及本地环境里的各项信息(如AKSK,服务地址,各类路径,Topic名称等),为了保证本文给出的操作脚本具有良好的可移植性,将所有与环境相关的信息抽离出来,以全局变量的形式在实操前集中配置。以下就是全局变量的配置脚本,读者需要根据个人环境设定这些变量的取值:

 为了便于演示和解读,本文将使用下面的全局配置,其中前6项配置与账号和环境强相关,仍需用户自行修改,脚本中给出的仅为示意值,而后5项配置与MSK数据的导入导出息息相关,不建议修改,因为后续的解读将基于这里设定的值展开,待完成验证后,您可再根据需要灵活修改后5项配置以完成实际的导入导出工作。

 回到操作流程,登录准备好的EC2实例,修改下面脚本中与账号和环境相关的前6项配置,然后执行修改后的脚本。此外,需要提醒注意的是:在后续操作中,部分脚本执行后将不再返回,而是持续占用当前窗口输出日志或Kafka消息,因此需要新开命令行窗口,每次新开窗口都需要执行一次这里的全局配置脚本。

 关于上述脚本中的后5项配置,有如下详细说明:

 我们就以脚本中设定的值为例,解读一下这5项配置联合起来将要实现的功能,同时也是本文将演示的主要内容:

 在Source端的MSK集群上存在两个名为source-topic-1和source-topic-2的Topic,通过安装有S3 Sink Connector的Kafka Connect(Docker容器)将两个Topic的数据导出到S3的指定存储桶中,然后再通过安装有S3 Source Connector的Kafka Connect(Docker容器,可以和S3 Source Connector共存为一个Docker容器)将S3存储桶中的数据写入到Sink端的MSK集群上,其中原source-topic-1的数据将被写入sink-topic-1,原source-topic-2的数据将被写入sink-topic-2。

 特别地,如果是备份/还原场景,需要保持导出/导入的Topic名称一致,此时,可直接删除S3 Source Connector中以transforms开头的4项配置(将在下文中出现),或者将下面两项改为:

 如果只有一个MSK集群,同样可以完成本文的验证工作,只需将SOURCE_KAFKA_BOOTSTRAP_SEVERS和SINK_KAFKA_BOOTSTRAP_SEVERS同时设置为该集群即可,这样,该集群既是Source端又是Sink端,由于配置中的Source Topics和Sink Topics并不同名,所以不会产生冲突。

 环境准备

 安装工具包

 在EC2上执行以下脚本,安装并配置jq,yq,docker,jdk,kafka-console-client五个必须的软件包,可以根据自身EC2的情况酌情选择安装全部或部分软件。建议使用纯净的EC2实例,完成全部的软件安装:

 创建S3存储桶

 整个方案以S3作为数据转储媒介,为此需要在S3上创建一个存储桶。Source端MSK集群的数据将会导出到该桶中并以Json文件形式保存,向Sink端MSK集群导入数据时,读取的也是存储在该桶中的Json文件。

 在源MSK上创建Source Topics

 为了确保Topics数据能完整备份和还原,S3 Source Connector建议Sink Topics的分区数最好与Source Topics保持一致,如果让MSK自动创建Topic,则很有可能会导致Source Topics和Sink Topics的分区数不对等,所以,选择手动创建Source Topics和Sink Topics,并确保它们的分区数一致。以下脚本将创建source-topic-1和source-topic-2两个Topic,各含9个分区:

 在目标MSK上创建Sink Topics

 原因同上,以下脚本将创建:sink-topic-1和sink-topic-2两个Topic,各含9个分区:

 制作Kafka Connect镜像

 接下来是制作带S3 Sink Connector和S3 Source Connector的Kafka Connect镜像,镜像和容器均以kafka-s3-syncer命名,以下是具体操作:

 配置并启动Kafka Connect

 镜像制作完成后,就可以启动了Kafka Connect了。Kafka Connect有很多配置项,需要提醒注意的是:在下面的配置中,使用的是Kafka Connect内置的消息转换器:JsonConverter,如果你的输入/输出格式是Avro或Parquet,则需要另行安装对应插件并设置正确的Converter Class。

 上述脚本执行后,命令窗口将不再返回,而是会持续输出容器日志,因此下一步操作需要新开一个命令行窗口。

 配置并启动S3 Sink Connector

 在第5节的操作中,已经将S3 Sink Connector安装到了Kafka Connect的Docker镜像中,但是还需要显式地配置并启动它。新开一个命令行窗口,先执行一遍《实操步骤(1):全局配置》,声明全局变量,然后执行以下脚本:

 配置并启动S3 Source Connector

 同上,在第5节的操作中,已经将S3 Source Connector安装到了Kafka Connect的Docker镜像中,同样需要显式地配置并启动它:

 至此,整个环境搭建完毕,一个以S3作为中转媒介的MSK数据导出、导入、备份、还原链路已经处于运行状态。

 测试

 现在,来验证一下整个链路是否能正常工作。首先,使用kafka-console-consumer.sh监控source-topic-1和sink-topic-1两个Topic,然后使用脚本向source-topic-1持续写入数据,如果在sink-topic-1看到了相同的数据输出,就说明数据成功地从source-topic-1导出然后又导入到了sink-topic-1中,相应的,在S3存储桶中也能看到“沉淀”的数据文件。

 打开Source Topic

 新开一个命令行窗口,先执行一遍《实操步骤(1):全局配置》,声明全局变量,然后使用如下命令持续监控source-topic-1中的数据:

 打开Sink Topic

 新开一个命令行窗口,先执行一遍《实操步骤(1):全局配置》,声明全局变量,然后使用如下命令持续监控sink-topic-1中的数据:

 向Source Topic写入数据

 新开一个命令行窗口,先执行一遍《实操步骤(1:全局配置》,声明全局变量,然后使用如下命令向source-topic-1中写入数据:

 现象与结论

 执行上述写入操作后,从监控source-topic-1的命令行窗口中可以很快看到写入的数据,这说明Source端MSK已经开始持续产生数据了,随后(约1分钟),即可在监控sink-topic-1的命令行窗口中看到相同的输出数据,这说明目标端的数据同步也已开始正常工作。此时,打开S3的存储桶会发现大量Json文件,这些Json是由S3 Sink Connector从source-topic-1导出并存放到S3上的,然后S3 Source Connector又读取了这些Json并写入到了sink-topic-1中,至此,整个方案的演示与验证工作全部结束。

 清理

 在验证过程中,可能需要多次调整并重试,每次重试最好恢复到初始状态,以下脚本会帮助清理所有已创建的资源:

 小结

 本方案主要定位于轻便易用,在S3 Sink Connector和S3 Source Connector中还有很多与性能、吞吐量相关的配置,例如:s3.part.size, flush.size, s3.poll.interval.ms, tasks.max等,可以在实际需要自行调整,此外,Kafka Connect也可以方便地迁移到Kubernetes或Amazon MSK Connect中以实现集群化部署。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/584762.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

flink generic log-based incremental checkpoints 设计

背景 flink 在1.15版本后开始提供generic log-based incremental checkpoints的检查点方案,目的在于减少checkpoint的耗时,尽量缩短端到端的数据处理延迟,本文就来看下这种新类型的checkpoint的设计 generic log-based incremental checkpo…

uniapp门店收银,点击右边商品,商品会进入左边的购物车,并且,当扫码枪扫描商品条形码,商品也会累计进入购物车

效果&#xff1a; 代码&#xff1a; <template><view class"container"><view class"top" style"height: 10%; margin-bottom: 20rpx; box-shadow: 0px 2px 4px rgba(0, 0, 0, 0.2);"><view class"box" style&q…

【HarmonyOS开发】案例-记账本开发

OpenHarmony最近一段时间&#xff0c;简直火的一塌糊度&#xff0c;学习OpenHarmony相关的技术栈也有一段时间了&#xff0c;做个记账本小应用&#xff0c;将所学知识点融合记录一下。 1、记账本涉及知识点 基础组件&#xff08;Button、Select、Text、Span、Divider、Image&am…

Maven配置教程

一&#xff1a;下载 Maven – Download Apache Maven 二&#xff1a;解压 三&#xff1a;修改setting 1.在<localRepository>标签内添加自己的本地位置路径 <!-- localRepository| The path to the local repository maven will use to store artifacts.|| Default:…

IDEA使用HDFS的JavaApi

注&#xff1a;以下代码操作是利用junit在java测试文件夹中实现。 1. 准备工作 1.1 创建测试类 创建测试类&#xff0c;并定义基本变量 public class HDFSJAVAAPI {// 定义后续会用到的基本变量public final String HDFS_PATH "hdfs://hadoop00/";Configuration …

Android studio CMakeLists.txt 打印的内容位置

最近在学习 cmake 就是在安卓中 , 麻烦的要死 , 看了很多的教程 , 发现没有 多少说对打印位置在哪里 , 先说一下版本信息 , 可能你们也不一样 gradle 配置 apply plugin: com.android.applicationandroid {compileSdkVersion 29buildToolsVersion "29.0.3"defau…

GPT编程(1)八分类图像数据集转换为二分类

一个核心问题就是要将这八类数据图片全部重命名&#xff0c;尝试了一步到位 有一个图像数据集&#xff0c;有八个类别amusement,anger,awe,contentment,disgust, excitement, fear,sadness的图片&#xff0c;每张图片被命名为“类别数字”。采用遍历的方式&#xff0c;按顺序阅…

Go语言学习第二天

Go语言数组详解 var 数组变量名 [元素数量]Type 数组变量名&#xff1a;数组声明及使用时的变量名。 元素数量&#xff1a;数组的元素数量&#xff0c;可以是一个表达式&#xff0c;但最终通过编译期计算的结果必须是整型数值&#xff0c;元素数量不能含有到运行时才能确认大小…

阿里云2核2G3M服务器放几个网站?

阿里云2核2g3m服务器可以放几个网站&#xff1f;12个网站&#xff0c;阿里云服务器网的2核2G服务器上安装了12个网站&#xff0c;甚至还可以更多&#xff0c;具体放几个网站取决于网站的访客数量&#xff0c;像阿里云服务器网aliyunfuwuqi.com小编的网站日访问量都很少&#xf…

java 企业工程管理系统软件源码+Spring Cloud + Spring Boot +二次开发+ 可定制化

工程项目管理软件是现代项目管理中不可或缺的工具&#xff0c;它能够帮助项目团队更高效地组织和协调工作。本文将介绍一款功能强大的工程项目管理软件&#xff0c;该软件采用先进的Vue、Uniapp、Layui等技术框架&#xff0c;涵盖了项目策划决策、规划设计、施工建设到竣工交付…

springboot整合hadoop遇错

错误一&#xff1a; Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. 解决&#xff1a; 下载&#xff1a;https://github.com/steveloughran/winutils 选择一个版本 例如&#xff1a;3.0.0 &#xff0c;将里面的hadoop.dll文件复制…

在IntelliJ IDEA中精通Git配置与使用:全面指南

目录 1 前言2 idea中使用git的准备2.1 在 IntelliJ IDEA 中配置 Git2.2 配置 Git 忽略文件 3 在IntelliJ IDEA中使用Git的基本步骤3.1 项目导入到 Git3.2 查看与切换版本信息 4 在 IntelliJ IDEA 中使用分支4.1 创建分支4.2 无冲突合并4.3 冲突合并 5 结语 1 前言 版本控制是现…

Linux(ubuntu)下git / github/gitee使用

先附上git命令 linuxchenxiao:~$ cd Templates/ 先进入一个目录&#xff0c;也可mkdir新建一个目录&#xff1a;用于接下来初始化为git可以管理的仓库 这个目录就是所说的工作目录&#xff0c;指当前正在进行开发的项目的本地目录。 linuxchenxiao:~/Templates$ git init 已…

[每周一更]-(第79期):Apache代理的配置

反向代理逻辑类似Nginx&#xff0c;以下具体展示属于apache的配置和参数说明 局部代理配置方式&#xff1a; # 配置包含https的需要打开 SSLProxyEngine on ProxyPass /api/small https://api.web.com/version1/small/ ProxyPassReverse /api/small https://api.web.com/versio…

直方图与均衡化

直方图 统计图像中相同像素点的数量。 使用cv2.calcHist(images, channels, mask, histSize, ranges)函数 images&#xff1a;原图像图像格式为uint8或float32&#xff0c;当传入函数时应用[]括起来&#xff0c;例如[img]。 channels&#xff1a;同样用中括号括起来&#xff…

如何确保云中高可用?聊聊F5分布式云DNS负载均衡

在当今以应用为中心的动态化市场中&#xff0c;企业面临着越来越大的压力&#xff0c;不仅需要提供客户所期望的信息、服务和体验&#xff0c;而且要做到快速、可靠和安全。DNS是网络基础设施的重要组成部分&#xff0c;拥有一个可用的、智能的、安全和可扩展的DNS基础设施是至…

工程(十六)——自己数据集跑Fast_livo

一、基础环境 Ubuntu20.04 ROS noetic PCL 1.8 Eigen 3.3.4 Sophus git clone https://github.com/strasdat/Sophus.git cd Sophus git checkout a621ff mkdir build && cd build && cmake .. make sudo make install 下面两个直接把包下载下来一起编译…

2023-12-29 服务器开发-Centos部署LNMP环境

摘要: 2023-12-29 服务器开发-Centos部署LNMP环境 centos7.2搭建LNMP具体步骤 1.配置防火墙 CentOS 7.0以上的系统默认使用的是firewall作为防火墙&#xff0c; 关闭firewall&#xff1a; systemctl stop firewalld.service #停止firewall systemctl disable fire…

Windows上ModbusTCP模拟Master与Slave工具的使用

场景 Modbus Slave 与 Modbus Poll主从设备模拟软件与Configure Virtual Serial串口模拟软件使用&#xff1a; Modebus Slave 与 Modbus Poll主从设备模拟软件与Configure Virtual Serial串口模拟软件使用_modbus poll激活-CSDN博客 数据对接协议为Modbus TCP,本地开发需要使…

C语言编程入门 – 编写第一个Hello, world程序

C语言编程入门 – 编写第一个Hello, world程序 C Programming Entry - Write the first application called “Hello, world!” By JacksonML C语言编程很容易&#xff01; 本文开始&#xff0c;将带领你走过C语言编程之旅&#xff0c;通过实例使你对她颇感兴趣&#xff0c;一…