Kafka 集群数据备份 MirrorMaker 详解

什么是 MirrorMaker?

MirrorMaker是Kafka附带的一个用于在Kafka集群之间制作镜像数据的工具。该工具主要动作就是从源集群中消费并生产到目标群集。

一个集群可以启动多个MirrorMaker配置到多个集群

 

运行 MirrorMaker方法

kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --num.streams 8 --whitelist ".*"

实际操作一遍

consumer.properties文件

root@DESKTOP-I0EG1MJ:~/kafka-client/mirror-maker# cat consumer.properties
bootstrap.servers=192.168.3.194:9092
group.id=mirrormaker
auto.offset.reset=earliest

producer.properties文件

root@DESKTOP-I0EG1MJ:~/kafka-client/mirror-maker# cat producer.properties
bootstrap.servers=192.168.2.123:9092

 运行mirror-maker

root@DESKTOP-I0EG1MJ:~/kafka-client/mirror-maker# cat run-mirrormaker.sh
kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --offset.commit.interval.ms 5000 --num.streams 2 --whitelist "test0428"

参数说明:

--consumer.config 消费者的配置文件(要消费的集群)

--producer.config 指定生产配置文件(要发送到的目标集群)

--whitelist 要同步的topic白名单,可以匹配正则,也可以指定具体topic

--offset.commit.interval.ms 消费端提交offset时间间隔

--num.streams  MirrorMaker 要创建多少个 KafkaConsumer 实例

更多可以使用 --help查看

root@DESKTOP-I0EG1MJ:~/kafka-client/mirror-maker# kafka-mirror-maker.sh --help
This tool helps to continuously copy data between two Kafka clusters.
Option                                   Description
------                                   -----------
--abort.on.send.failure <String: Stop    Configure the mirror maker to exit onthe entire mirror maker when a send      a failed send. (default: true)failure occurs>
--consumer.config <String: config file>  Embedded consumer config for consumingfrom the source cluster.
--consumer.rebalance.listener <String:   The consumer rebalance listener to useA custom rebalance listener of type      for mirror maker consumer.ConsumerRebalanceListener>
--help                                   Print usage information.
--message.handler <String: A custom      Message handler which will processmessage handler of type                  every record in-between consumer andMirrorMakerMessageHandler>               producer.
--message.handler.args <String:          Arguments used by custom messageArguments passed to message handler      handler for mirror maker.constructor.>
--new.consumer                           DEPRECATED Use new consumer in mirrormaker (this is the default so thisoption will be removed in a futureversion).
--num.streams <Integer: Number of        Number of consumption streams.threads>                                 (default: 1)
--offset.commit.interval.ms <Integer:    Offset commit interval in ms.offset commit interval in                (default: 60000)millisecond>
--producer.config <String: config file>  Embedded producer config.
--rebalance.listener.args <String:       Arguments used by custom rebalanceArguments passed to custom rebalance     listener for mirror maker consumer.listener constructor as a string.>
--version                                Display Kafka version.
--whitelist <String: Java regex          Whitelist of topics to mirror.(String)>

 

运行后会提示这么一句话

WARNING: The default partition assignment strategy of the mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'

 主要是说是MirrorMaker使用的消费策略是 Range ,以后可能改成 “轮训策略” ,我们可以手动指定“轮询策略”

在Consuemr.properties中设置即可

partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

 

 

此时我们已经运行成功,直接去查看是否可以消费成功即可。

注意:同步前最后将目标集群的Topic创建好,否则会使用Broker默认配置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509745.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面试的态度

关于面试&#xff0c;如果到了面试现场&#xff0c;可能环境并不是自己所希望的那样&#xff0c;但是也不能消极对待&#xff0c;既然自己付出了时间来到这里&#xff0c; 公司对方也安排了时间给予你面试&#xff0c;不管是本着对自己的负责&#xff0c;还是对公司面试人员的…

域名服务的工作流程

域名服务的工作流程

Kafka 消费者组 Rebalance 详解

Rebalance作用 Rebalance 本质上是一种协议&#xff0c;主要作用是为了保证消费者组&#xff08;Consumer Group&#xff09;下的所有消费者&#xff08;Consumer&#xff09;消费的主体分区达成均衡。 比如&#xff1a;我们有10个分区&#xff0c;当我们有一个消费者时&…

C/C++程序员必须熟悉的开源库

作为一个经验丰富的Linux C/C程序员&#xff0c; 肯定亲手写过各种功能的代码&#xff0c; 比如封装过数据库访问的类&#xff0c; 封装过网络通信的类&#xff0c;封装过日志操作的类&#xff0c; 封装过文件访问的类&#xff0c; 封装过UI界面库等&#xff0c; 也在实际的项目…

CDH kafka JMX 启动

服务正常启动 telnet 127.0.0.1 9393 就可以,直接 telnet ip 9393 就不通 我们查看CDH broker_java_opts 配置项 原内容为 -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 …

java基础之多线程笔记

多线程的优势&#xff1a; 多线程作为一种多任务&#xff0c;并发的工作方式&#xff0c;当然有其存在的优势。 1.线程之间不能共享内存&#xff0c;而线程之间共享内存&#xff08;堆内存&#xff09;&#xff0c;则很简单。 2.系统创建进程&#xff08;需要为该进程重新分…

提高C++程序运行效率的10个简单方法

本文以C/C程序为例讲述了程序运行效率的10个简单方法&#xff0c;分享给大家供大家参考之用。具体分析如下&#xff1a; 对于每一个程序员来说&#xff0c;程序的运行效率都是一个值得重视&#xff0c;并为之付出努力的问题。但是程序性能的优化也是一门复杂的学问&#xff0c;…

Kafka JMX 监控 之 jmxtrans + influxdb + grafana

目录 效果图 环境准备 安装 influxdb 安装我们刚刚下载 influxdb rpm文件 查看默认配置 修改参数 启动 influxdb 查看启动状态 设置基本配置 influxdb 其他命令扩展 安装 jmxtrans 可能遇到的异常 验证jmxtrans是否成功运行 安装 Grafana 安装 influxDB 与 Grafa…

如何解决多线程并发访问一个资源的安全性问题?

原子操作&#xff1a;所谓原子操作是指不会被线程调度机制打断的操作&#xff1b;这种操作一旦开始&#xff0c;就一直运行到结束&#xff0c;中间不会有任何 context switch &#xff08;切[1] 换到另一个线程&#xff09;。 关于我对原子操作的理解&#xff1a;原子操作就类…

实例浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO

一.基本概念 我们通俗一点讲&#xff1a; Level_triggered(水平触发)&#xff1a;当被监控的文件描述符上有可读写事件发生时&#xff0c;epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如…

Kafka开发指南之 如何Kafka 事务型生产者,保证生产者exactly once

目录 至少一次&#xff08;at least once&#xff09; 最多一次&#xff08;at most once&#xff09; 精确一次&#xff08;exactly once&#xff09; 幂等性 幂等性作用范围 实现方法 代码 事务 事务作用范围 实现方法 代码 我们知道Kafka的消息交付可靠性保障分为…

UML序列图

UML学习&#xff08;三&#xff09;-----序列图 UML的模型中可分为两种&#xff0c;动态模型和静态模型。用例图、类图和对象图都是UML中的静态结构模型。而在UML系统动态模型的其中一种就是交互视图&#xff0c;它描述了执行系统功能的各个角色之间相互传递消息的顺序关系。序…

OpenTSDB 开发指南之 查询数据

前面博主写了一篇文章去介绍opentsdb的http接口的使用方法,但是某一些接口的使用还是比较复杂&#xff0c;这篇文章会通过example来详细讲述opentsdb的一些特性。 本文的举的例子有这些&#xff1a; 基本的写入和查询数据的注释和说明子查询查询中的filters使用查询数据的rat…

OpenTSDB 开发指南之 Api操作数据

/api/put 请求方式&#xff1a;post请求参数&#xff1a; 参数说明examplesummary返回主要摘要/api/put?summarydetails返回详细信息/api/put?detailssync是否同步&#xff0c;即是否等待数据都写入成功后才返回结果/api/put?syncsync_timeout返回结果之前的等待时间/api/p…

xshell下利用SFTP传输文件

SFTP是基于SSH的文件传输协议&#xff0c;与ZMODEM相比具有更加安全且更为快速的文件传输功能。 如何利用SFTP接收文件: 在本地提示以sftp命令登陆拟要接收文件的主机。 Xshell:> sftp hostname在sftp提示下以get命令接收需要的文件。 sftp:/home/user21>get filenam…

libcurl使用方法

原文地址&#xff1a;http://curl.haxx.se/libcurl/c/libcurl-tutorial.html 译者&#xff1a;JGood(http://blog.csdn.net/JGood ) 译者注&#xff1a;这是一篇介绍如何使用libcurl的入门教程。文档不是逐字逐句按原文翻译&#xff0c;而是根据笔者对libcurl的理解&#xff0c…

OpenTSDB 开发指南之 Grafana 展示OpenTSDB监控数据

目录 准备数据 在Grafana创建OpenTSDB连接 创建一个仪表盘 统计 准备数据 将数据插入OpenTSDB {"metric":"jast.data","value":1023,"timestamp":1588742563,"tags":{"type":"jast-graph-data"}}…

linux重启后地址不是之前设置的静态地址的解决方案

按照以下步骤进行安装 1. vi /etc/hosts 修给域名匹配 2. vi /etc/sysconfig/network 修改主机名称 3. vi /etc/sysconfig/network-scripts/ifcfg-eth0 修改主机IP 4. vi /etc/udev/rules.d/70-persistent-net.rules 把里面的eth0删掉&#xff0c;把eth1名称改为eth0

CDH 版本 Kafka 外网设置

登陆CDH页面,进入Kafka配置页面 搜索 advertised 修改advertised.host.name,这里我们有三台Broker,我们把每台的外网ip填写到对应的机器上 advertised.port不填写 我们kafka的端口设置的是9099 将外网端口9099开放,允许外网访问 (这里不做介绍

SQLite、MySQL和PostgreSQL 三种关系数据库比较

关系型数据库的使用已经有相当长的时间了。它们变得流行起来托了管理系统的福&#xff0c;关系模型被实现得相当的好&#xff0c;并且被证明是操作数据的好方法&#xff08;特别是事务性强的应用&#xff09;。 在这篇DigitalOcean文章中&#xff0c;我们将尝试理解一些最常用、…