ES迁mysql_使用kafka连接器迁移mysql数据到ElasticSearch

概述

把 mysql 的数据迁移到 es 有很多方式,比如直接用 es 官方推荐的 logstash 工具,或者监听 mysql 的 binlog 进行同步,可以结合一些开源的工具比如阿里的 canal。

这里打算详细介绍另一个也是不错的同步方案,这个方案基于 kafka 的连接器。流程可以概括为:

mysql连接器监听数据变更,把变更数据发送到 kafka topic。

ES 监听器监听kafka topic 消费,写入 ES。

Kafka Connect有两个核心概念:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector,也就是连接器。在本例中,mysql的连接器是source,es的连接器是sink。

这些连接器本身已经开源,我们之间拿来用即可。不需要再造轮子。

过程详解

准备连接器工具

我下面所有的操作都是在自己的mac上进行的。

首先我们准备两个连接器,分别是 kafka-connect-elasticsearch 和 kafka-connect-elasticsearch, 你可以通过源码编译他们生成jar包,源码地址:

我个人不是很推荐这种源码的编译方式,因为真的好麻烦。除非想研究源码。

我是直接下载 confluent 平台的工具包,里面有编译号的jar包可以直接拿来用,下载地址:

我下载的是 confluent-5.3.1 版本, 相关的jar包在 confluent-5.3.1/share/java 目录下

我们把编译好的或者下载的jar包拷贝到kafka的libs目录下。拷贝的时候要注意,除了 kafka-connect-elasticsearch-5.3.1.jar 和 kafka-connect-jdbc-5.3.1.jar,相关的依赖包也要一起拷贝过来,比如es这个jar包目录下的http相关的,jersey相关的等,否则会报各种 java.lang.NoClassDefFoundError 的错误。

另外mysql-connector-java-5.1.22.jar也要放进去。

数据库和ES环境准备

数据库和es我都是在本地启动的,这个过程具体就不说了,网上有很多参考的。

我创建了一个名为test的数据库,里面有一个名为login的表。

配置连接器

这部分是最关键的,我实际操作的时候这里也是最耗时的。

首先配置jdbc的连接器。

我们从confluent工具包里拷贝一个配置文件的模板(confluent-5.3.1/share目录下),自带的只有sqllite的配置文件,拷贝一份到kafka的config目录下,改名为sink-quickstart-mysql.properties,文件内容如下:

# tasks to create:

name=mysql-login-connector

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

tasks.max=1

connection.url=jdbc:mysql://localhost:3306/test?user=root&password=11111111

mode=timestamp+incrementing

timestamp.column.name=login_time

incrementing.column.name=id

topic.prefix=mysql.

table.whitelist=login

connection.url指定要连接的数据库,这个根据自己的情况修改。mode指示我们想要如何查询数据。在本例中我选择incrementing递增模式和timestamp 时间戳模式混合的模式, 并设置incrementing.column.name递增列的列名和时间戳所在的列名。

混合模式还是比较推荐的,它能尽量的保证数据同步不丢失数据。具体的原因大家可以查阅相关资料,这里就不详述了。

topic.prefix是众多表名之前的topic的前缀,table.whitelist是白名单,表示要监听的表,可以使组合多个表。两个组合在一起就是该表的变更topic,比如在这个示例中,最终的topic就是mysql.login。

connector.class是具体的连接器处理类,这个不用改。

其它的配置基本不用改。

接下来就是ES的配置了。同样也是拷贝 quickstart-elasticsearch.properties 文件到kafka的config目录下,然后修改,我自己的环境内容如下:

name=elasticsearch-sink

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

tasks.max=1

topics=mysql.login

key.ignore=true

connection.url=http://localhost:9200

type.name=mysqldata

topics的名字和上面mysql设定的要保持一致,同时这个也是ES数据导入的索引。从里也可以看出,ES的连接器一个实例只能监听一张表。

type.name需要关注下,我使用的ES版本是7.1,我们知道在7.x的版本中已经只有一个固定的type(_doc)了,使用低版本的连接器在同步的时候会报错误,我这里使用的5.3.1版本已经兼容了。继续看下面的章节就知道了。

关于es连接器和es的兼容性问题,有兴趣的可以看看下面这个issue:

启动测试

当然首先启动zk和kafka。

然后我们启动mysql的连接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-mysql.properties &

接着手动往login表插入几条记录,正常情况下这些变更已经发到kafka对应的topic上去了。为了验证,我们在控制台启动一个消费者从mysql.login主题读取数据:

./bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic mysql.login --from-beginning

可以看到刚才插入的数据。

把数据从 MySQL 移动到 Kafka 里就算完成了,接下来把数据从 Kafka 写到 ElasticSearch 里。

首先启动ES和kibana,当然后者不是必须的,只是方便我们在IDE环境里测试ES。你也可以通过控制台给ES发送HTTP的指令。

先把之前启动的mysql连接器进程结束(因为会占用端口),再启动 ES 连接器,

./bin/connect-standalone.sh config/connect-standalone.properties config/quickstart-elasticsearch.properties &

如果正常的话,ES这边应该已经有数据了。打开kibana的开发工具,在console里执行

GET _cat/indices

这是获取节点上所有的索引,你应该能看到,

green open mysql.login 1WqRjkbfTlmXj8eKBPvAtw 1 1 4 0 12kb 7.8kb

说明索引已经正常创建了。然后我们查询下,

GET mysql.login/_search?pretty=true

结果如下,

{

"took" : 1,

"timed_out" : false,

"_shards" : {

"total" : 1,

"successful" : 1,

"skipped" : 0,

"failed" : 0

},

"hits" : {

"total" : {

"value" : 4,

"relation" : "eq"

},

"max_score" : 1.0,

"hits" : [

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+0",

"_score" : 1.0,

"_source" : {

"id" : 1,

"username" : "lucas1",

"login_time" : 1575870785000

}

},

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+1",

"_score" : 1.0,

"_source" : {

"id" : 2,

"username" : "lucas2",

"login_time" : 1575870813000

}

},

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+2",

"_score" : 1.0,

"_source" : {

"id" : 3,

"username" : "lucas3",

"login_time" : 1575874031000

}

},

{

"_index" : "mysql.login",

"_type" : "mysqldata",

"_id" : "mysql.login+0+3",

"_score" : 1.0,

"_source" : {

"id" : 4,

"username" : "lucas4",

"login_time" : 1575874757000

}

}

]

}

}

参考:

1.《kafka权威指南》

关注公众号:犀牛饲养员的技术笔记

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/561567.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql not exists很慢_查询速度优化用not EXISTS 代替 not in

exists : 强调的是是否返回结果集,不要求知道返回什么, 比如:select name from student where sex m and mark exists(select 1 from grade where ...) ,只要exists引导的子句有结果集返回,那么exists这个条件就算成立了,大家注意返回的字段…

python自定义函数func_python自定义函数与面向对象

前言python的最大特点就是dynamically typed就是动态类型,不像java需要定义数据类型引入先从一段代码引入:12345678910class Dog():def __init__(self,name,age):self.namenameself.ageagedef getName(self):return self.namedogDog(name,16)print(dog.g…

java的继承实例_java继承(实例讲解一)

Java继承(Java inheritance)Java继承是使用已存在的类的定义作为基础建立新类的技术,新类的定义可以增加新的数据或新的功能,也可以用父类的功能,但不能选择性地继承父类。这种技术使得复用以前的代码非常容易,能够大大缩短开发周…

java 调用groovy_Java调用Groovy脚本

在idea下,标准的Java maven项目中展示如何调用Groovy脚本和方法。maven项目引进Groovy libFile -> Project Structure -> Global Libaries 添加下载好的Groovy下lib里面的jar包调用Groovy脚本首先,Java调用Groovy脚本需要该Groovy脚本里面有个main…

java struts 框架_java中struts 框架的实现

该文章主要简单粗暴的实现了struts的请求转发功能。 其他的功能后续会慢慢补上。最近在学习javassist的内容,看到一篇文章 大家一起写mvc 主要简单的描述了mvc的工作流程,同时实现了简单的struts2功能。这里仿照的写了个简单的struts2框架,…

memcached 使用 java_使用Java java_memcached client的陷阱

这2天,才发现之前我们的某个开发人员使用java_memcached-release_2.0.1.jar是有问题的在我们的某个模块里,需要2个memcached,分别提供不同的服务于是,开发的人员就从网上粘贴来如下的码,分别生成2个MemcacheUtil类stat…

java final 变量只读_java final的使用总结

final 变量:是只读的;final 方法:是不能继承或者重写的。final 引用:引用不能修改,但是对象本身的属性可以修改;final class:不可继承;final MyObject o new MyObject();o.setValue…

java list 获取索引_java – 获取arrayList中元素的索引

我试图在arrayList minuteList中获得466的索引[288, 318, 346, 376, 406, 436, 466, 1006, 1036, 1066, 1096, 1126, 1156]但我收到这个错误:java.lang.IndexOutOfBoundsException: Index: 466, Size: 13at java.util.ArrayList.rangeCheck(ArrayList.java:635)at j…

java 如何调用static_java 关键字static详细介绍及如何使用

java 关键字static 详解一、 static代表着什么在Java中并不存在全局变量的概念,但是我们可以通过static来实现一个“伪全局”的概念,在Java中static表示“全局”或者“静态”的意思,用来修饰成员变量和成员方法,当然也可以修饰代码…

java xml setdoctype_如何在Java中使用DOM将自定义doctype标记添加到带有xhtml标记的xml中?...

我使用java中的DOM创建了一个XML文档,并将XHTML标记插入到XML文档中。现在我要添加如下doctype:]>我试着把它作为一个字符串追加,但没有成功。DocumentBuilderFactory docFactory DocumentBuilderFactory.newInstance();docFactory.setNamespaceAware(true);DocumentBuilde…

java 实验报告模板_java实验报告模板

java实验报告模板1 / 26java 实验报告模板河南工业大学实验报告课程 Java 程序设计 _ 实验名称 一、Java 程序流程控制 院 系____ ____ 专业班级__ _________ 姓 名_______________ 学 号____________ _ 指导老师: 日 期一.实验目的熟悉 Java 语言中的数据类型、变量…

Java写入磁盘阵列_磁盘阵列RAID介绍及计算公式

一、RAID介绍磁盘阵列(Redundant Arrays of Independent Disks,RAID),有“独立磁盘构成的具有冗余能力的阵列”之意。磁盘阵列是由很多块独立的磁盘,组合成一个容量巨大的磁盘组,利用个别磁盘提供数据所产生加成效果提升整个磁盘系…

dbm和mysql使用场景_mysql基本用法总结

1 下载安装官网下载:http://www.mysql.com/注意需要一个Oracle账号才能下载。2 启动mysql将mysql安装目录:设置为环境变量,并将:\bin目录加入环境变量中。启动命令行,输入:mysqld以启动mysql的守护进程。3 …

java disposable_rx-java – RxJava中的CompositeDisposable是什么

复合一次性使处理(认为提前取消更容易).假设您有一个活动同时发生多个api调用:var disposable api.call1(arg1,arg2).subscribe(...)var disposable2 api.call2(arg1).subscribe(...)var disposable3 api.call3().subscribe()如果您需要提前处置(例如,用户导航远…

Java中implies_boolean implies(Permission p)

boolean implies(Permission p)描述 (Description)java.util.PropertyPermission.implies(Permission p)方法检查此PropertyPermission是否隐含指定的Permission 。 这是通过检查p是PropertyPermission对象来完成的, p动作是该对象的动作的子集,并且该对…

java.rmi.server.port_java.rmi.server.ExportException: internal error: ObjID already in use报错处理...

由于在server.xml文件中使用配置了在catalina.sh中也指定了对应CATALINA_OPTS"$CATALINA_OPTS -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port12340 -Dcom.sun.management.jmxremote.authenticatefalse -Dcom.sun.management.jmxremote.sslfalse -D…

java 易变变量_关于java:易变变量和其他变量

以下是经典Concurency in Practice的内容:When thread A writes to a volatile variable and subsequently thread Breads the same variable, the values of all variables that werevisible to A prior to writing to the volatile variable, become visibleto B …

java违反唯一约束异常_Caused by: java.sql.BatchUpdateException: ORA-00001: 违反唯一约束条件 (DSPACE.SYS_C007868)...

Caused by: java.sql.BatchUpdateException: ORA-00001: 违反唯一约束条件 (DSPACE.SYS_C007868).............................遇到这种问题解决方法1. 使用 约束条件查找包含的表明以及 表的字段select a.constraint_name,a.constraint_type,b.column_name,b.table_namefrom…

js中的if与Java中的if_JS直接if参数的用法JS中!和!!区别

经常在JS中见一些代码直接if(参数),然后参数调用的时候是将元素自己传下去。例如下面代码:functiontest1(obj){if(obj){alert($(obj).val());}else{alert("has not obj");}}我们分别点击上面的两个输入框显示如下:解释:实际上相当于java中的重载&#xff…

vs2019能写Java吗_Visual studio2019打包程序过程

要想打包visual studio中的程序我们需要用到setup用于自定义安装部署的项目方案。但是在VS2019中不见了,微软是有意废除安装项目的,合作了一个第三方的安装项目单独使用。我们可以从官网上把Visual Studio Installer 项目扩展下载下来。地址:…