简单实现MySQL数据实时增量同步到Kafka————Maxwell

任务需求:将MySQL里的数据实时增量同步到Kafka

1、准备工作

1.1、MySQL方面:开启BinLog

1.1.1、修改my.cnf文件

vi /etc/my.cnf
[mysqld]
server-id  = 1
binlog_format = ROW

1.1.2、重启MySQL,然后登陆到MySQL之后,查看是否已经修改过来:

mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+

1.1.3、创建Maxwell用户,并赋予 maxwell 库的一些权限

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 

1.2、Kafka准备工作
1.2.1、启动Zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties 

1.2.2、启动kafka

kafka-server-start /usr/local/etc/kafka/server.properties 

2、Maxwell

2.1、下载安装包
https://github.com/zendesk/maxwell/releases/download/v1.20.0/maxwell-1.20.0.tar.gz
2.2、解压到指定位置

tar -zxvf maxwell-1.20.0.tar.gz 

2.3、在MYSQL中创建测试用表(前提你要进入一个库)

CREATE TABLE `test` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`age` int(11) DEFAULT NULL,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.4、打开Maxwell(要在maxwell安装目录下)

bin/maxwell --user=maxwell --password=123456 --host='127.0.0.1' --producer=stdout

2.5、对数据进行操作

insert into test values(1,22,"小明");
update test set name='whirly' where id=1;
delete from test where id=1;

可以看到Maxwell控制台的输出,测试成功!

{"database":"test","table":"test","type":"insert","ts":1552153502,"xid":832,"commit":true,"data":{"id":1,"age":22,"name":"小明锋"}}
{"database":"test","table":"test","type":"update","ts":1552153502,"xid":833,"commit":true,"data":{"id":1,"age":22,"name":"whirly"},"old":{"name":"小明"}}
{"database":"test","table":"test","type":"delete","ts":1552153502,"xid":834,"commit":true,"data":{"id":1,"age":22,"name":"whirly"}}

3、实现MySQL数据实时增量同步到Kafka

3.1、开启指定到Kafka的MaxWell

bin/maxwell --user='maxwell' --password='123456' --host='127.0.0.1' \--producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --kafka_version=0.10.2.1

3.2、对数据库进行操作

insert into test values(1,22,"小明");
update test set name='whirly' where id=1;
delete from test where id=1;

3.3、启动一个消费者来消费 maxwell topic的消息,观察其输出;

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

3.4、再次执行数据库结果观察,仍然可以得到相同的输出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/535856.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【转】mip-semi-fixed 走走又停停

写在前面 MIP 中悬浮元素的特殊情况 其实组件上线已经有一段时间了,最开始看到这个需求是站长提交了一个这中功能的组件过来,不过看过代码立刻就想到了 MIP 页面的特殊性:从结果页打开的 MIP 页面,是嵌套在一个 iframe 之中的。…

Mac使用Homebrew安装Kafka

1、使用brew install命令安装Kafka $ brew install kafka安装过程将依赖安装 zookeeper软件位置 /usr/local/Cellar/zookeeper /usr/local/Cellar/kafka配置文件位置 /usr/local/etc/kafka/zookeeper.properties /usr/local/etc/kafka/server.properties 备注:后…

广州站长沙龙 MIP 问题及答案

1. mip提交几个月时间了,生效量比较少,是什么原因? 答:提交 MIP 页面后,经过收录、校验、和生效三个步骤,才能在结果页看到闪电标。 1)提交 URL 后,spider 会去抓取收录&#xff1…

日常问题——初始化Hive仓库报错com.google.common.base.Preconditions.checkArgument

问题描述: 初始化Hive仓库报错Exception in thread “main” java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V 解决方案(过程): com.google.commo…

【转】百度站长平台MIP引入工具使用心得

MIP引入主动推送流程 对于 MIP 站点改造好了,我们如何提交数据,并且 MIP 提交后,我们能得到哪些数据的反馈,在这里简单的写一篇文章,说一下。 改造 MIP,我们一般是添加了一个二级域名站点进行改造&#x…

Hadoop之HDFS应用

1、通过http://127.0.0.1:8088/即可查看集群所有节点状态: 2、访问http://localhost:9870/即可查看文件管理页面(在3.0.0中在之前的版本中文件管理的端口是50070,替换为了9870端口): ————进入文件系统 ————…

MIP ACCESS细节剖析

什么是 MIP ACCESS MIP ACCESS 由百度 MIP 团队开发的一种页面访问权限控制机制,能够允许网页发布者在页面元素中定义内容标记,并结合用户访问情况进行综合评价,从而展现或隐藏页面中内容,直至用户登录、订阅或付费后才能够查看隐…

HDFS常用Shell命令

1、-ls: 显示目录信息 hadoop fs -ls /2、-mkdir:在HDFS上创建目录 hadoop fs -mkdir -p /demo/test3、-moveFromLocal:从本地剪切粘贴到HDFS hadoop fs -moveFromLocal a.txt /demo/test/a.txt4、-appendToFile:追加一个文件到已经存在…

Linux环境下Flume的安装

1、在官网http://flume.apache.org/download.html下载flume的压缩包 2、解压到指定位置并重命名 tar -zxvf apache-flume-1.9.0-bin.tar.gz3、配置环境并生效 #vi ~/.bashrc export FLUME_HOME/usr/local/APP/flume export PATH$PATH:$FLUME_HOME/bin #使变量设置生效 #sour…

MIPCache 域名升级

一、MIPCache URL 是什么 举个例子,MIP 官网的 URL 为: https://www.mipengine.org 对应的 MIPCache 的 URL 为: https://mipcache.bdstatic.com/c/s/www.mipengine.org 所谓 MIPCache URL 是经过 MIP-Cache CDN 缓存后的 MIP 页面地址&…

Flume监听端口,输出端口数据案例

1、在flume目录下新建/myconf目录,并在目录下新建socket-console.conf 文件! mkdir myconf cd myconf touch socket-console.conf2、编辑文件vim socket-console.conf,添加以下内容: # 定义这个agent中各组件的名字 a1.sources r1 a1.sink…

MIP 移动网页加速器视频教程全新发布

MIP (Mobile Instant Pages - 移动网页加速器) 是百度推出的开源项目,用于移动端页面加速。MIP 技术通过优化浏览器资源加载,前端代码执行及 CDN 缓存加速来加速页面,打造秒开的页面浏览体验。目前,有 5000 多家站点的 MIP 页已经…

日常问题——flume连接hive时报错Caused by: java.lang.NoSuchMethodError

问题描述: 今天新安装的flume,使用flume来做kafka与hive对接时出现了以下两个的错误: Caused by: org.apache.hive.hcatalog.streaming.ConnectionError: HiveEndPoint{metaStoreUrithrift://localhost:9083, databasedb, tablestudent, pa…

MIP 技术进展月报:储存功能全新上线,MIP-Cache域名升级,校验更严谨

集 * 瞬时触达用户、高转化率、炫酷闪电标、优质展现形式 * 等诸多特性为一体的 MIP 页面吸引了众多站点进行改造。为了更好地服务于广大站长,更快地倾听站长们的声音,MIP 技术团队特推出《MIP 技术进展月报》,欢迎大家对 MIP 技术提建议&…

通过Flume简单实现Kafka与Hive对接(Json格式)

将以下存储在kafka的topic中的JSON格式字符串,对接存储到Hive的表中 {"id":1,"name":"小李"} {"id":2,"name":"小张"} {"id":3,"name":"小刘"} {"id":4,&qu…

改造MIP获得搜索青睐,轻松完成SEO

搜索引擎目标及页面排序方法 搜索引擎作为互联网流量的入口,承担着流量分发的职责。但排序成千上万的网页,决定哪些网页在第一页,是由网页本身的用户体验决定的。权重算法会从内容优质性,广告多少,加载速度等多个角度…

日常问题———Attempting to operate on hdfs namenode as root

写在最前注意: 1、master,slave都需要修改start-dfs.sh,stop-dfs.sh,start-yarn.sh,stop-yarn.sh四个文件 2、如果你的Hadoop是另外启用其它用户来启动,记得将root改为对应用户 HDFS格式化后启动dfs出现以…

WebP 在减少图片体积和流量上的效果如何?MIP技术实践分享

作者 | Jackson 编辑 | 尾尾 不论是 PC 还是移动端,图片一直占据着页面流量的大头,在图片的大小和质量之间如何权衡,成为了长期困扰开发者们的问题。而 WebP 技术的出现,为解决该问题提供了好的方案。本文将为大家详细介绍 WebP …

日常问题——pdsh localhost Connection refused

问题描述: 本地安装hadoop单机模式的时候需要启动namenode时报错 pdshxxx: localhost: connect: Connection refused解决方案(过程): 原因是pdsh默认采用的是rsh登录,修改成ssh登录即可,在环境变量/etc/…

MIP技术进展月报第2期: 数据绑定,异步脚本加速

一、 功能更新 1. mip-bind 上线,实现复杂交互 MIP bind 双向绑定机制和组件上线,提供双向绑定的特性;能够允许页面实现数据驱动功能,开发者可以在任意场景修改数据,并驱动页面元素变动。 MIP 小姐姐画外音&#xf…