maxwell 基于zookeeper的高可用方案

Maxwell版本1.39.2

一: 添加zk的pox文件

<!-- customize HA -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.4.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.4.0</version>
</dependency>

二: 创建zk工具类

在 com.zendesk.maxwell.util 包下创建 CuratorUtil 类,后面会使用此类实现高可用

package com.zendesk.maxwell.util;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class CuratorUtil {private final String zookeeperServers;private final int sessionTimeoutMs;private final int connectionTimeoutMs;private final int baseSleepTimeMs;private final int maxRetries;private CuratorFramework client;public CuratorUtil(String zookeeperServers, int sessionTimeoutMs, int connectionTimeoutMs, int baseSleepTimeMs,int maxRetries) {this.zookeeperServers = zookeeperServers;this.sessionTimeoutMs = sessionTimeoutMs;this.connectionTimeoutMs = connectionTimeoutMs;this.baseSleepTimeMs = baseSleepTimeMs;this.maxRetries = maxRetries;}/** 构造 zookeeper 客户端,并连接 zookeeper 集群*/public void start() {ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);client = CuratorFrameworkFactory.newClient(this.zookeeperServers,this.sessionTimeoutMs,this.connectionTimeoutMs,retryPolicy);client.start();}/** 实现分布式锁*/public void highAvailable() {// 1.连接 Zookeeper 客户端this.start();// 2.向 zookeeper 注册自己String lockPath = "/maxwell/ha/lock";InterProcessMutex lock = new InterProcessMutex(client, lockPath);try {// 3.获取锁lock.acquire();// 4.将自己信息注册到 leader 路径String leaderPath = "/maxwell/ha/leader";client.create().withMode(CreateMode.EPHEMERAL).forPath(leaderPath);} catch (Exception e) {e.printStackTrace();}}
}

三: 修改 com.zendesk.maxwell 包下的MaxwellConfig类

3.1 添加属性

// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;

3.2 buildOptionParser 方法添加代码

parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" ).withRequiredArg();
parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" ).withRequiredArg();
parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" ).withRequiredArg();
parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" ).withRequiredArg();
parser.accepts( "max_retries", "max retry times" ).withRequiredArg();

3.3 setup 方法添加代码

this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode && zookeeperServers == null){LOGGER.error("you must specify --zookeeper because you want to use maxwell in ha mode");
}

四:修改 com.zendesk.maxwell.Maxwell 的main函数

将代码段

if ( config.haMode ) {new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
} else {maxwell.start();
}

全部注释掉,修改为

if ( config.haMode ) {CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);curatorUtil.highAvailable();
}
maxwell.start();

然后重新打包就能得到基于zk的高可用版本了,打包时可以将test包删除,防止出现错误

源码下载地址

五: 启动脚本

5.1 创建配置文件 config.properties

log_level=info

# mysql login info
host=localhost
port=3306
user=root
password=root123
schema_database=maxwell
# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello

producer=kafka

#       *** kafka ***
producer=kafka
#kafka.bootstrap.servers=hosta:9092,hostb:9092
kafka.bootstrap.servers=localhost:9092
kafka.max.request.size = 104857600

kafka_topic=mysql.%{database}.%{table}
kafka_version=2.7.0

# alternative kafka topic to write DDL (alter/create/drop) to.  Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl

# hash function to use.  "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]

# how maxwell writes its kafka key.
#
# 'hash' looks like:
# {"database":"test","table":"tickets","pk.id":10001}
#
# 'array' looks like:
# ["test","tickets",[{"id":10001}]]
#
# default: "hash"
#kafka_key_format=hash # [hash, array]

5.2 启动脚本编写 startup.sh

#!/bin/bash

single(){
  bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --daemon
  echo -e "\033[32m单机版启动成功\n\033[0m"
}

ha(){
  ## zookeeper 多个用,分割
  bin/maxwell --filter 'exclude: *.*, include: cp.*' --kafka_version=2.7.0 --config=config.properties --ha --zookeeper=127.0.0.1:2181 --daemon
  echo -e "\033[32m高可用版启动成功\n\033[0m"
}

case "$1" in
  'ha')
     ha
     ;;
  *)
     single
     ;;
esac

5.2.1 高可用版本启动命令

./startup.sh ha

5.2.2 单机版启动命令

./startup.sh


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/22793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

查看gz文件 linux zcat file.gz mtx.gz

可以使用以下命令来查看 gz 压缩文件的内容&#xff1a; zcat file.gz 1 该命令会将 file.gz 文件解压并输出到标准输出&#xff0c;可以通过管道符将其与 grep 命令结合使用来查找需要的关键词&#xff0c;例如&#xff1a; zcat file.gz | grep keyword 1 该命令会将 file.gz…

Electron 开发,报handshake failed; returned -1, SSL error code 1,错误

代码说明 在preload.js代码中&#xff0c;暴露参数给渲染线程renderer.js访问&#xff0c; renderer.js 报&#xff1a;ERROR:ssl_client_socket_impl.cc(978)] failed; returned -1, SSL error code 1,错误 问题原因 如题所说&#xff0c;跨进程传递消息&#xff0c;这意味…

Rust 开发环境搭建【一】

Rust 开发环境 推荐 搭建&#xff1a; 安装 rust 语言 以及 工具链 推荐安装方法&#xff1a;rustup curl --proto ‘https’ --tlsv1.2 -sSf https://sh.rustup.rs | sh 在国内如果访问速度慢&#xff0c;可以使用清华大学提供的镜像服务&#xff1a; https://mirrors.tu…

Python中实现多个列表、字典、元组、集合的连接

目录 目录 前言 一、列表 1、运算符 2、extend&#xff08;&#xff09;方法 3、解包操作 * 二、字典 1、update&#xff08;&#xff09;方法 2、解包操作 ** 三、元组 1、 运算符 2、解包操作 * 四、集合 1、union方法 2、| 运算符 3、解包操作 * 五、不同类…

Python 多线程

Python 多线程 多线程类似于同时执行多个不同程序&#xff0c;多线程运行有如下优点&#xff1a; 使用线程可以把占据长时间的程序中的任务放到后台去处理。用户界面可以更加吸引人&#xff0c;这样比如用户点击了一个按钮去触发某些事件的处理&#xff0c;可以弹出一个进度条…

学习单片机的秘诀:实践与坚持

在学习单片机时&#xff0c;将实践与学习结合起来是一个很好的方法。不要一上来就死磕指令和名词&#xff0c;而是边学边做实验&#xff0c;循序渐进地理解和应用指令。通过实验&#xff0c;你能亲身感受到指令的控制效果&#xff0c;增强对单片机的理解和兴趣。 学习单片机不…

Android Ble蓝牙App(二)连接与发现服务

Ble蓝牙App&#xff08;二&#xff09;连接与发现服务 前言正文一、GATT回调二、连接和断连三、连接状态回调四、发现服务五、服务适配器六、显示服务七、源码 前言 在上一篇中我们进行扫描设备的处理&#xff0c;本文中进行连接和发现服务的数据处理&#xff0c;运行效果图如下…

Electron 工具进程utilityProcess 使用中遇到的坑点汇集

简介 这是基于 node.js 中的子进程的概念推出来的&#xff0c;可参考链接&#xff1a;utilityProcess | Electron 官网有一句话非常重要&#xff0c;它提供一个相当于 Node.js 的 child_process.fork API&#xff0c;但使用 Chromium 的 Services API 代替来执行子进程。这句话…

AI量化模型预测——baseline学习笔记

一、赛题理解 1. 赛题名称 AI量化模型预测 2. 赛题理解 本赛事是一个量化金融挑战&#xff0c;旨在通过大数据与机器学习的方法&#xff0c;使用给定的训练集和测试集数据&#xff0c;预测未来中间价的移动方向。参赛者需要理解市场行为的原理&#xff0c;创建量化策略&#…

element表格+表单+表单验证结合u

一、结果展示 1、图片 2、描述 table中放form表单&#xff0c;放输入框或下拉框或多选框等&#xff1b; 点击添加按钮&#xff0c;首先验证表单&#xff0c;如果存在没填的就验证提醒&#xff0c;都填了就向下添加一行表单表格&#xff1b; 点击当前行删除按钮&#xff0c;…

剑指Offer05.替换空格

剑指Offer05.替换空格 目录 剑指Offer05.替换空格题目描述解法一&#xff1a;遍历添加解法二&#xff1a;原地修改 题目描述 请实现一个函数&#xff0c;把字符串s中的每个空格都替换成“%20”。 解法一&#xff1a;遍历添加 由于每次替换都要把一个空格字符变成三个字符&a…

Godot 4 源码分析 - 碰撞

碰撞功能应该是一个核心功能&#xff0c;它能自动产生相应的数据&#xff0c;比如目标对象进入、离开本对象的检测区域。 基于属性设置&#xff0c;能碰撞的都具备这样的属性&#xff1a;Layer、Mask. 在Godot 4中&#xff0c;Collision属性中的Layer和Mask属性是用于定义碰撞…

Unity 编辑器选择器工具类Selection 常用函数和用法

Unity 编辑器选择器工具类Selection 常用函数和用法 点击封面跳转下载页面 简介 在Unity中&#xff0c;Selection类是一个非常有用的工具类&#xff0c;它提供了许多函数和属性&#xff0c;用于操作和管理编辑器中的选择对象。本文将介绍Selection类的常用函数和用法&#xff…

Redis-Java客户端-Jedis

目录 01.导入依赖 02.进行测试连接 03.使用JedisPool 04.修改测试的代码 01.导入依赖 新建一个mevan工程&#xff0c;在pom文件下导入相应的依赖&#xff0c;相依的依赖可以去官网查找 spring官网&#xff1a;Spring Data Redis <dependencies><!-- jedis -->…

伊语IM即时通讯源码/im商城系统/纯源码IM通讯系统安卓+IOS前端纯原生源码

伊语IM即时通讯源码/im商城系统/纯源码IM通讯系统安卓IOS前端纯原生源码&#xff0c; 后端是java源码。

2.4 网络安全新技术

数据参考&#xff1a;CISP官方 目录 云计算安全大数据安全移动互联网安全物联网安全工业互联网安全 一、云计算安全 1、云计算定义 云计算是指通过网络访问可扩展的、灵活的物理或虚拟共享资源池&#xff0c;并按需自助获取和管理资源的模式。在云计算中&#xff0c;计算资…

深度学习之双线性插值

1、单线性插值 单线性插值是一种用于估计两个已知数据点之间未知点的方法。它基于线性关系&#xff0c;通过计算目标位置的值&#xff0c;使用已知点之间的线性函数进行插值。这在图像处理中常用于放缩、旋转等操作&#xff0c;计算简单&#xff0c;产生平滑结果&#xff0c;但…

小白也能懂!业务中台与数据中台究竟是什么?

大家好&#xff0c;今天我们要讨论的是业务中台与数据中台&#xff0c;或许你对这些名词还不太熟悉&#xff0c;但别担心&#xff0c;接下来我将为你详细解释这两个概念&#xff0c;并且用通俗易懂的语言来解释它们。 业务中台是什么&#xff1f; 首先&#xff0c;让我们来了解…

ubuntu搭建wifi热点,共享网络(x86、arm相同)

目录 1 首先检查网络管理器服务是否开启 &#xff08;ubuntu需要界面&#xff09; 2 创建并配置需要共享的wifi 首先&#xff0c;明确下这篇文章说的是啥&#xff0c;是为了在ubuntu系统的电脑上&#xff0c;搭建一个wifi热点&#xff0c;供其他移动设备连接上网。就像你…

java实现钉钉群机器人@机器人获取信息后,机器人回复(机器人接收消息)

1.需求 鉴于需要使用钉钉群机器人回复&#xff0c;人们提出的问题&#xff0c;需要识别提出的问题中的关键词&#xff0c;后端进行处理实现对应的业务逻辑 2.实现方式 用户群机器人&#xff0c;附带提出的问题&#xff0c;后端接收消息后识别消息内容&#xff0c;读取到关键…