使用maxwell实时同步mysql数据到kafka

一、软件环境:

操作系统:CentOS release 6.5 (Final)
java版本: jdk1.8
zookeeper版本: zookeeper-3.4.11
kafka 版本: kafka_2.11-1.1.0.tgz
maxwell版本:maxwell-1.16.0.tar.gz
注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口

二、环境部署

1、安装jdk

export JAVA_HOME=/usr/java/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$CLASSPATH

2、安装maven

参考:https://www.cnblogs.com/wcwen1990/p/7227278.html

3、安装zookeeper

1)下载软件:

wget http://101.96.8.157/archive.apache.org/dist/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz 
tar zxvf zookeeper-3.4.11.tar.gz  
mv zookeeper-3.4.11 /usr/local/zookeeper

2)修改环境变量

编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置:

## ZooKeeper Envexport ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

运行以下命令使环境变量生效: source /etc/profile

3)重命名配置文件

初次使用 ZooKeeper 时,需要将$ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg

mv  $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg

4)单机模式–修改配置文件
创建目录/usr/local/zookeeper/data 和/usr/local/zookeeper/logs 修改配置文件

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181

5)启动zookeeper

bin/zkServer.sh startZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

6)验证zukeeper服务

telnet chavin.king 2181Trying 192.168.72.130...
Connected to chavin.king.
Escape character is '^]'.
stat
Zookeeper version: 3.4.11-37e277162d567b55a07d1755f0b31c32e93c01a0, built on 11/01/2017 18:06 GMT
Clients:/192.168.72.130:44054[0](queued=0,recved=1,sent=0)Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x1a4
Mode: standalone
Node count: 147
Connection closed by foreign host.

4、安装zkui

git clone https://github.com/DeemOpen/zkui.git
cd zkui 
mvn clean install

修改配置文件默认值

#vim config.cfgserverPort=9090     #指定端口zkServer=192.168.1.110:2181sessionTimeout=300000

启动程序至后台
2.0-SNAPSHOT 会随软件的更新版本不同而不同,执行时请查看target 目录中真正生成的版本

nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar & 

用浏览器访问:

http://chavin.king:9090/

5、安装kafka

wget http://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -zxvf kafka_2.11-1.1.0.tgz -C /usr/local/kafkamkdir -p /usr/local/kafka/data-logs

修改配置文件

vim server.propertieslog.dirs=/usr/local/kafka/data-logs
zookeeper.connect=chavin.king:2181

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties &

创建topic

bin/kafka-topics.sh --create --zookeeper chavin.king:2181 --replication-factor 1 --partitions 1 --topic maxwell 

查看所有topic

bin/kafka-topics.sh --list --zookeeper chavin.king:2181

启动producer

bin/kafka-console-producer.sh --broker-list chavin.king:9092 --topic maxwell

启动consumer

bin/kafka-console-consumer.sh --zookeeper chavin.king:2181 --topic maxwell --from-beginning

或者

bin/kafka-console-consumer.sh --bootstrap-server chavin.king:9092  --from-beginning --topic maxwell

6、开启mysql binlog

 more /etc/my.cnf
[client]
default_character_set = utf8[mysqld]
basedir = /usr/local/mysql-5.6.24
datadir = /usr/local/mysql-5.6.24/data
port = 3306
#skip-grant-tables
character_set_server = utf8
log_error = /usr/local/mysql-5.6.24/data/mysql.errbinlog_format = row
log-bin = /usr/local/mysql-5.6.24/logs/mysql-bin
sync_binlog = 2
max_binlog_size = 16M
expire_logs_days = 10server_id = 1
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

7、安装maxwell

wget https://github.com/zendesk/maxwell/releases/download/v1.16.0/maxwell-1.16.0.tar.gz
tar -zxvf maxwell-1.16.0.tar.gz -C /usr/local/maxwell 

启动maxwell

nohup bin/maxwell --user='canal' --password='canal' --host='chavin.king' --producer=kafka --kafka.bootstrap.servers=chavin.king:9092 > maxwell.log &

8、开发kafka消费程序

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;public class KafkaTest {public static void main(String[] args){String topicName = "maxwell";String groupID = "example-group";Properties props = new Properties();props.put("bootstrap.servers","192.168.72.130:9092");props.put("group.id",groupID);props.put("auto.offset.reset","earliest");props.put("serializer.encoding","utf-8");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String > consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topicName));try{while(true){ConsumerRecords<String,String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());}}finally{consumer.close();}}}

ideal启动以上消费程序

9、测试

offset = 3428, key = {"database":"chavin","table":"dept","_uuid":"0b195622-e7c7-4cf6-8203-5576752f9024"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2276,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3429, key = {"database":"chavin","table":"dept","_uuid":"333b98e3-a597-47fc-95ad-6e59ee0dadf6"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2277,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3430, key = {"database":"chavin","table":"dept","_uuid":"cf9fa656-ed13-4cb0-b909-d1218e402e96"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2278,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3431, key = {"database":"chavin","table":"dept","_uuid":"7f2f683a-39bc-498b-9a4e-920697b3da18"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2279,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3432, key = {"database":"chavin","table":"dept","_uuid":"ef639cd1-9206-4145-8608-372bbaaaa14a"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2280,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3433, key = {"database":"chavin","table":"dept","_uuid":"ebdf15ad-7149-4ac4-b567-627dd910182c"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2281,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
offset = 3434, key = {"database":"chavin","table":"dept","_uuid":"1bc667f4-15f0-438c-8139-6f1cbe8b4db3"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2282,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
offset = 3435, key = {"database":"chavin","table":"dept","_uuid":"1613b695-284a-49e3-9793-74fb2cf8dc5b"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2283,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
offset = 3436, key = {"database":"chavin","table":"dept","_uuid":"f72a800c-92cc-4494-9438-bc61c58b5cb9"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2284,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
offset = 3437, key = {"database":"chavin","table":"dept","_uuid":"9887d144-d75d-46f8-96ba-ad7c3adf45fd"}, value = {"database":"chavin","table":"dept","type":"insert","ts":1499944326,"xid":121,"xoffset":2285,"data":{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}

至此数据同步已经可以正常进行了,是不是很简单。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/766099.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于springboot+vue的旅游推荐系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

ubuntu : 无法修正错误,因为您要求某些软件包保持现状,就是它们破坏了软件包间的依赖关系。

往后看&#xff0c;90%能解决你的问题 原文链接&#xff1a;学一下 (suxueit.com) 我相信很多人刚使用ubuntu都遇到过这个问题&#xff0c;如果没有遇到&#xff0c;可能是你运气好使用了正确的软件源 libprotobuf-dev : 依赖: zlib1g-dev 但是它将不会被安装 zlib1g-dev : 依…

【React】React中将 Props 传递给组件

当使用 React 时&#xff0c;props 是组件之间传递数据的主要方式。以下是针对您提到的五个问题的详细解答&#xff1a; 1. 如何向组件传递 props 在父组件中&#xff0c;你可以通过组件标签的属性&#xff08;attributes&#xff09;将 props 传递给子组件。这些属性在子组件…

【redis】服务器架构演进

架构演进 单机架构应用数据分离架构应⽤服务集群架构读写分离 / 主从分离架构冷热分离架构垂直分库微服务架构 单机架构 所有的应用服务、业务所需的数据、业务处理等都在一台服务器上。 在初期&#xff0c;用户访问量很少&#xff0c;对服务器的的性能和安全没有很高的要求&am…

验证回文串——算法思路

题目链接&#xff1a;125. 验证回文串 - 力扣&#xff08;LeetCode&#xff09; 主要是将大写字母转换成小写&#xff08;将字母异或32即可转换大小&#xff09;&#xff0c;并将数字也存入数组&#xff0c;除去其他字符。反转是通过list反相输入再比较实现的。 public static …

踩了一天Prophet的fbprophet坑

pip怎么安装Prophet 安装了这个不行,要安装fbprophet 然后安装不起 哦豁 anaconda虚拟环境安装好将其导入pycharm from fbprophet import Prophet 然后不报错了,很稀奇对吧,不报错了 但是运行还是给你显示 没有fbprophet 绝望了,找人吧 通过官方网站安装最新版Prophet,但是…

2024年全球生成人工智能全景图【中文】

2024年全球生成人工智能全景图【中文】 在过去的一年中&#xff0c;产生式人工智能&#xff08;GenAI&#xff09;无疑成为了全球各行各业的热门话题。特别是ChatGPT的发布&#xff0c;激发了公众对GenAI强烈的兴趣和激动&#xff0c;唤醒了我们对其变革潜力的认知。 虽然我们…

keepalived高可用

负载均衡集群 底层协议&#xff1a;VRRP协议 优点&#xff1a; 工作原理&#xff1a; 体系结构图 内核空间&#xff1a;IPVS模块 NETLINK模块 用户空间&#xff1a;I/O模型 内存管理方法&#xff1a;Memory mngt 控制面板&#xff08;配置文件解析器&#xff09; 核心组件…

国自然提交状态,NSFC已审核 ≠ 申请书被受理!!!

本 期 推 荐 【SciencePub学术】2024年度国家自然科学基金集中受理期项目申请受理工作已基本结束。到底什么状态才算申请书被NSFC接收成功呢&#xff1f; 01 申请书状态 申请人登录ISIS系统&#xff0c;至此&#xff0c;绝大部分申请人的系统状态为下面三种&#xff1a; …

基础时间线柱状图绘制

from pyecharts.charts import Bar,Timeline # 导入柱状图底部的时间线 from pyecharts.options import LabelOpts # 移动标记的数据位置 from pyecharts.globals import ThemeType # 导入颜色主题 bar1Bar() # 创建一个柱状图对象 bar1.add_xaxis(["中国","美…

安科瑞ANET智能物联网网关 通信管理机-安科瑞 蒋静

概述 本系列智能通信管理机是一款采用嵌入式硬件计算机平台&#xff0c;具有多个下行通信接口及一个或者多个上行网络接口&#xff0c;用于将一个目标区域内所有的智能监控/ 保护装置的通信数据整理汇总后&#xff0c;实时上传主站系统&#xff0c;完成遥信、遥测等能源数据采集…

2024公认口碑最好的洗地机有哪些?若看重清洁力,这四款最值得买

每当我们要清洁卫生时&#xff0c;是否总是感到腰酸背痛、疲劳不堪&#xff0c;甚至头昏眼花&#xff1f;地板是家中的重要门面&#xff0c;不容忽视的卫生焦点。如今&#xff0c;我们终于多了一位家务打扫的救星——家用洗地地机。一次操作&#xff0c;即可完成扫地除尘、地除…

【Java开发过程中的流程图】

流程图由一系列的图形符号和箭头组成&#xff0c;每个符号代表一个特定的操作或决策。下面是一些常见的流程图符号及其含义&#xff1a; 开始/结束符号&#xff08;圆形&#xff09;&#xff1a;表示程序的开始和结束点。 过程/操作符号&#xff08;矩形&#xff09;&#xff…

【排序算法】实现快速排序值(霍尔法三指针法挖坑法优化随即选key中位数法小区间法非递归版本)

文章目录 &#x1f4dd;快速排序&#x1f320;霍尔法&#x1f309;三指针法&#x1f320;挖坑法✏️优化快速排序 &#x1f320;随机选key&#x1f309;三位数取中 &#x1f320;小区间选择走插入&#xff0c;可以减少90%左右的递归&#x1f309; 快速排序改非递归版本&#x1…

代码随想录三刷day32

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、力扣452. 用最少数量的箭引爆气球二、力扣435. 无重叠区间三、力扣763. 划分字母区间四、力扣56. 合并区间 前言 本周的主题就是用贪心算法来解决区间问题&…

瑞士百达资产管理有限公司拟增三大去中心化数字加密货币支付接口!

简介: 瑞士百达集团成立于1805年,欧洲第三大财富管理公司, 集团拥有约 5,300 名员工,其中包括 900 名投资经理。它在金融服务中心拥有 30 个办事处网络,包括在日内瓦、卢森堡、拿骚、香港和新加坡的注册银行,百达集团管理的资产总额达6380亿瑞士法郎(7670亿美元)。 瑞士百达资…

一键部署开源舆情系统

系统展示 部署 docker run -itd --name stonedt_yuqing -p 8085:8085 registry.cn-beijing.aliyuncs.com/stonedt_yuqing/stonedt_yuqing:1.0.6 登录 ip:8085 默认用户名&#xff1a;13900000000 &#xff0c;密码&#xff1a;stonedt

Warning: fread(): Length parameter must be greater than 0

上面的意思是 警告&#xff1a;fread&#xff08;&#xff09;&#xff1a;长度参数必须大于0 在PHP中&#xff0c;fread() 函数用于从打开的文件中读取数据。该函数的原型如下&#xff1a; string fread ( resource $handle , int $length ) 其中&#xff0c;$handle 参数是…

C# Solidworks二次开发:获取主窗口API和创建新活动窗口API详解

今天要讲的是Solidworks中的两个API。 &#xff08;1&#xff09;Frame Method (ISldWorks)&#xff1a;获取SOLIDWORKS主框架。 下面是API中给出的例子&#xff1a; public void Main(){ModelDoc2 swModelDoc default(ModelDoc2);Frame swFrame default(Frame);ModelWindow…

【Linux】安装yum

本文参考的文章[1]&#xff0c;但是文章[1]中存在一些问题&#xff0c;就是文章[1]参考的163 mirror的文档的那部分[2]&#xff0c;因此在最后一步vim repo那里多此一举搞了一遍163的repo。不仅163的repo连接速度很慢&#xff0c;而且最后repo中的镜像都换成了中科大的ustc镜像…