mysql到pg怎么高效_干货 | Debezium实现Mysql到Elasticsearch高效实时同步(示例代码)

题记

来自Elasticsearch中文社区的问题——

MySQL中表无唯一递增字段,也无唯一递增时间字段,该怎么使用logstash实现MySQL实时增量导数据到es中?

logstash和kafka_connector都仅支持基于自增id或者时间戳更新的方式增量同步数据。

回到问题本身:如果库表里没有相关字段,该如何处理呢?

本文给出相关探讨和解决方案。

1、 binlog认知

1.1 啥是 binlog?

binlog是Mysql sever层维护的一种二进制日志,与innodb引擎中的redo/undo log是完全不同的日志;其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中。

作用主要有:

1)复制:达到master-slave数据一致的目的。

2)数据恢复:通过mysqlbinlog工具恢复数据。

3)增量备份。

1.2 阿里的Canal实现了增量Mysql同步

lazy.gif

一图胜千言,canal是用java开发的基于数据库增量日志解析、提供增量数据订阅&消费的中间件。

目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。目的:增量数据订阅&消费。

综上,使用binlog可以突破logstash或者kafka-connector没有自增id或者没有时间戳字段的限制,实现增量同步。

2、基于binlog的同步方式

1)基于kafka Connect的Debezium 开源工程,地址:. https://debezium.io/

2)不依赖第三方的独立应用: Maxwell开源项目,地址:http://maxwells-daemon.io/

由于已经部署过conluent(kafka的企业版本,自带zookeeper、kafka、ksql、kafka-connector等),本文仅针对Debezium展开。

3、Debezium介绍

Debezium是捕获数据实时动态变化的开源的分布式同步平台。能实时捕获到数据源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,实时同步到Kafka,稳定性强且速度非常快。

特点:

1)简单。无需修改应用程序。可对外提供服务。

2)稳定。持续跟踪每一行的每一处变动。

3)快速。构建于kafka之上,可扩展,经官方验证可处理大容量的数据。

4、同步架构

lazy.gif

如图,Mysql到ES的同步策略,采取“曲线救国”机制。

步骤1: 基Debezium的binlog机制,将Mysql数据同步到Kafka。

步骤2: 基于Kafka_connector机制,将kafka数据同步到Elasticsearch。

5、Debezium实现Mysql到ES增删改实时同步

软件版本:

confluent:5.1.2;

Debezium:0.9.2_Final;

Mysql:5.7.x.

Elasticsearch:6.6.1

5.1 Debezium安装

Debezium的安装只需要把debezium-connector-mysql的压缩包解压放到Confluent的解压后的插件目录(share/java)中。

MySQL Connector plugin 压缩包的下载地址:

注意重启一下confluent,以使得Debezium生效。

5.2 Mysql binlog等相关配置。

Debezium使用MySQL的binlog机制实现数据动态变化监测,所以需要Mysql提前配置binlog。

核心配置如下,在Mysql机器的/etc/my.cnf的mysqld下添加如下配置。

1[mysqld]

2

3server-id = 223344

4log_bin = mysql-bin

5binlog_format = row

6binlog_row_image = full

7expire_logs_days = 10

然后,重启一下Mysql以使得binlog生效。

1systemctl start mysqld.service

5.3 配置connector连接器。

配置confluent路径目录 : /etc

创建文件夹命令 :

1mkdir kafka-connect-debezium

在mysql2kafka_debezium.json存放connector的配置信息 :

1[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json

2{

3 "name" : "debezium-mysql-source-0223",

4 "config":

5 {

6 "connector.class" : "io.debezium.connector.mysql.MySqlConnector",

7 "database.hostname" : "192.168.1.22",

8 "database.port" : "3306",

9 "database.user" : "root",

10 "database.password" : "XXXXXX",

11 "database.whitelist" : "kafka_base_db",

12 "table.whitlelist" : "accounts",

13 "database.server.id" : "223344",

14 "database.server.name" : "full",

15 "database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",

16 "database.history.kafka.topic" : "account_topic",

17 "include.schema.changes" : "true" ,

18 "incrementing.column.name" : "id",

19 "database.history.skip.unparseable.ddl" : "true",

20 "transforms": "unwrap,changetopic",

21 "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",

22 "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",

23 "transforms.changetopic.regex":"(.*)",

24 "transforms.changetopic.replacement":"$1-smt"

25 }

26}

注意如下配置:

"database.server.id",对应Mysql中的server-id的配置。

"database.whitelist" : 待同步的Mysql数据库名。

"table.whitlelist" :待同步的Mysq表名。

重要:“database.history.kafka.topic”:存储数据库的Shcema的记录信息,而非写入数据的topic、

"database.server.name":逻辑名称,每个connector确保唯一,作为写入数据的kafka topic的前缀名称。

坑一:transforms相关5行配置作用是写入数据格式转换。

如果没有,输入数据会包含:before、after记录修改前对比信息以及元数据信息(source,op,ts_ms等)。

这些信息在后续数据写入Elasticsearch是不需要的。(注意结合自己业务场景)。

5.4 启动connector

1curl -X POST -H "Content-Type:application/json"

2--data @mysql2kafka_debezium.json.json

3http://192.168.1.22:18083/connectors | jq

5.5 验证写入是否成功。

5.5.1 查看kafka-topic

1 kafka-topics --list --zookeeper localhost:2181

此处会看到写入数据topic的信息。

注意新写入数据topic的格式:database.schema.table-smt 三部分组成。

本示例topic名称:

full.kafka_base_db.account-smt

5.5.2 消费数据验证写入是否正常

1./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning

至此,Debezium实现mysql同步kafka完成。

6、kafka-connector实现kafka同步Elasticsearch

6.1、Kafka-connector介绍

Kafka Connect是一个用于连接Kafka与外部系统(如数据库,键值存储,检索系统索引和文件系统)的框架。

连接器实现公共数据源数据(如Mysql、Mongo、Pgsql等)写入Kafka,或者Kafka数据写入目标数据库,也可以自己开发连接器。

6.2、kafka到ES connector同步配置

配置路径:

1/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

配置内容:

1"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

2"tasks.max": "1",

3"topics": "full.kafka_base_db.account-smt",

4"key.ignore": "true",

5"connection.url": "http://192.168.1.22:9200",

6"type.name": "_doc",

7"name": "elasticsearch-sink-test"

6.3 kafka到ES启动connector

启动命令

1confluent load elasticsearch-sink-test

2-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

6.4 Kafka-connctor RESTFul API查看

Mysql2kafka,kafka2ES的connector详情信息可以借助postman或者浏览器或者命令行查看。

1curl -X GET http://localhost:8083/connectors

7、坑复盘。

坑2: 同步的过程中可能出现错误,比如:kafka topic没法消费到数据。

排解思路如下:

1)确认消费的topic是否是写入数据的topic;

2)确认同步的过程中没有出错。可以借助connector如下命令查看。

1curl -X GET http://localhost:8083/connectors-xxx/status

坑3: Mysql2ES出现日期格式不能识别。

是Mysql jar包的问题,解决方案:在my.cnf中配置时区信息即可。

坑4: kafka2ES,ES没有写入数据。

排解思路:

1)建议:先创建同topic名称一致的索引,注意:Mapping静态自定义,不要动态识别生成。

2)通过connetor/status排查出错原因,一步步分析。

8、小结

binlog的实现突破了字段的限制,实际上业界的go-mysql-elasticsearch已经实现。

对比:logstash、kafka-connector,虽然Debezium“曲线救国”两步实现了实时同步,但稳定性+实时性能相对不错。

推荐大家使用。大家有好的同步方式也欢迎留言讨论交流。

推荐阅读:

重磅 | 死磕Elasticsearch方法论认知清单(2019春节更新版)

lazy.gif

Elasticsearch基础、进阶、实战第一公众号

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/533062.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java thread safe_Java 线程安全 Thread-Safety

在 Java 的线程安全是老生常谈的问题。经常是各种写法说法一大堆,感觉很多的来源都是在面试的时候,很多考官都喜欢问线程安全的问题。起源这个问题的起源就是 Java 是支持多线程的。如果对进程和线程是什么不太清楚的话,可以恶补下大学课程《…

java socket调用接口_Java中socket接口调用

最近一个项目中接口通讯这一块主要是调用银联系统的socket接口,我方是客户端,即发送请求接收返回报文的一方。在贴代码之前,还是要了解一下关于socket的基础知识。Socket的基本概念1.建立连接当需要建立网络连接时,必须…

protobuf java 编译_Maven项目中,编译proto文件成Java类

新建Maven项目新建一个 Maven 项目:pom定义了最小的maven2元素,即:groupId,artifactId,version。 groupId:项目或者组织的唯一标志,并且配置时生成的路径也是由此生成,如org.codehaus.mojo生成的相对路径为&#xff1a…

python灰色关联度分析代码_灰色关联分析法步骤 - osc_uwnmtz9n的个人空间 - OSCHINA - 中文开源技术交流社区...

https://wenku.baidu.com/view/dc356290af1ffc4fff47ac0d.html?rec_flagdefault&sxts1538121950212利用灰色关联分析的步骤是:1.根据分析目的确定分析指标体系,收集分析数据。设n个数据序列形成如下矩阵:其中m为指标的个数&a…

aio 系统原理 Java_Java新一代网络编程模型AIO原理及Linux系统AIO介绍

从JDK 7版本开始,Java新加入的文件和网络io特性称为nio2(new io 2, 因为jdk1.4中已经有过一个nio了),包含了众多性能和功能上的改进,其中最重要的部分,就是对异步io的支持,称为Java AIO(asynchronous IO)。因为AIO的实…

java请假审批怎么实现_java实现请假时间判断

笔记:需求分析:每周上班6天夏季早上8:30-12:00下午14:00-17:30冬季早上8:30-12:00下午14:30-18:00请假最低为半天按照上午8:00-12:00,下午14:00-18:00计算,包括了夏季和冬季时间,规律分布如下public String getDouble(HttpServletRequest request) throws ParseException {//参…

java原子整数_多线程(四、原子类-AtomicInteger)

案例10个线程并发累加一个整数,每个线程累加1000,保证线程安全Unsafe类,来源于sun.misc包。该类封装了许多类似指针操作,可以直接进行内存管理、操纵对象、阻塞/唤醒线程等操作。package com.jane;import java.util.ArrayList;imp…

java 极客_Java极客思维

​开篇介绍大家好,公众号【Java极客思维】近期会整理一些Java高频面试题分享给小伙伴,也希望看到的小伙伴在找工作过程中能够用得到!本章节主要针对Java一些消息中间件高频面试题进行分享。通知:公众号【Java极客思维】正在送书福…

java拼三级魔方_魔方秘籍(详细解法)《三阶》

魔方根据视频理解:上 下 左 右先将白面变好:(1).变一个白十字(如图所示)(2).转好以后检查十字的四个角的颜色(蓝绿红橙)与旁边面上的中心块的颜色是否相同。(有两个相同的时,如果它们相邻,就一个放在后面,一个放在左面…

pHp30充电宝能用快充吗,65W快充 30分钟充满电 是时候淘汰充电宝了吗?

在过去的一年里,手机快充技术有了新的突破,OPPO推出了65W快充。无独有偶,联想拯救者电竞手机的预热宣传中,号称搭载90W快充。有评测称,使用65W快充,30分钟可以充满一块4000mAh容量的电池,使用90…

matlab画圆柱,Matlab 画三维圆柱体

主要学习了画空间圆柱体和空间长方形的绘制方法。有两个surface property:FaceColor和EdgeColor’;先讲FaceColor’,它指定了surface画出曲面的颜色,可以是[r,g,b]的一个向量,分别表示了红绿蓝的颜色配比;也可以是inte…

matlab类间散度矩阵,协方差矩阵和散布矩阵(散度矩阵)的意义

在机器学习模式识别相关算法中,经常需要求样本的协方差矩阵C和散布矩阵S。如在PCA主成分分析中,就需要计算样本的散度矩阵,而有的教材资料是计算协方差矩阵。实质上协方差矩阵和散度矩阵的意义就是一样的,散布矩阵(散度矩阵)前乘以…

把树分成森林 matlab,20170106RF_Matlab 随机森林指的是利用多棵树对样本进行训练并预测的一种分类器,包括两个方面:数据的随 269万源代码下载- www.pudn.com...

文件名称: 20170106RF_Matlab下载 收藏√ [5 4 3 2 1 ]开发工具: matlab文件大小: 441 KB上传时间: 2017-01-06下载次数: 0提 供 者: yanxiu详细说明:随机森林指的是利用多棵树对样本进行训练并预测的一种分类器,包括两个方面:数据的随…

php绘制频谱图,一步一步教你实现iOS音频频谱动画(二)

本文是系列文章中的第二篇,上篇讲述了音频播放和频谱数据计算,本篇讲述数据处理和动画的绘制。前言在上篇文章中我们已经拿到了频谱数据,也知道了数组每个元素表示的是振幅,那这些数组元素之间有什么关系呢?根据FFT的原…

php删除尾部字符,php如何删除字符串末尾字符

我们知道字符串删除字符的方式有好几种,今天就来介绍三种php删除字符串最后一个字符的函数,有需要的小伙伴可以参考一下。方法一:substr()函数substr()函数返回字符串的一部分。语法如下:substr(string string, int start, int [l…

PHP 蒙太奇马赛克拼图,AndreaMosaic制作一幅马赛克拼图

大家在网上应该都见过用很多幅图片拼成的马赛克图片,今天小编就为大家介绍AndreaMosaic制作一幅马赛克拼图方法,不会的朋友快快来学习吧!软件名称:AndreaMosaic(蒙太奇图片制作软件) V6.1.0.4 中文安装免费版软件大小:…

oracle字段类型设计,Oracle字段类型设计与实际业务不符引发的问题

在Oracle表的设计过程中,开发人员总是对字段的类型不以为然,下面来演示一个例子,按照应该设计为number的,结果设计成了varcha在Oracle表的设计过程中,开发人员总是对字段的类型不以为然,下面来演示一个例子…

linux下进程监控6,Linux进程监控技术—精通软件性能测试与LoadRunner最佳实战(6)...

8.2.5 Linux操作系统进程监控技术Linux在进程监控方面同样出色,不仅可以通过图形用户界面的管理工具,还可以用命令方式显示进程相关信息。像“Windows的任务管理器”一样,在RedHat 9中可以通过单击“系统工具”→“系统监视器”,…

linux 命令行 迅雷替代,Mac/Linux下迅雷替代方案

还记得我两年前写的《DIY了家用NAS》吗?现在又带来新的升级啦。当初的NAS最多能使用Transmission来进行BT下载,那时就在想,如果能下载普通的http资源就好了。再进一步,有什么方案可以通吃所有下载方式呢? 记得那个时候…

linux好用的编译器,推荐几款Linux下比Notepad++好的编辑器软件

Notepad这一段又出风头了,好好的做你软件多好,非得参杂入政治。前两天开源文本编辑器 Notepad 发布了 7.8.1 版本,然后在该版本中作者居然摸黑中国,具体的内容请大家自行百度。而且这已经不是 Notepad 第一次这么干了!…