elasticsearch date_MySQL数据实时增量同步到Elasticsearch

efd45e39aa6eac7f6fed18f9c34a5b90.png

Mysql到Elasticsearch的数据同步,一般用ETL来实现,但性能并不理想,目前大部分的ETL是定时查询Mysql数据库有没有新增数据或者修改数据,如果数据量小影响不大,但如果几百万上千万的数据量性能就明显的下降很多,本文是使用Go实现的go-mysql-transfer中间件来实时监控Mysql的Binlog日志,然后同步到Elasticsearch,从实时性、性能效果都不错。

一、go-mysql-transfer

go-mysql-transfer是使用Go语言实现的MySQL数据库实时增量同步工具。能够实时监听MySQL二进制日志(binlog)的变动,将变更内容形成指定格式的消息,发送到接收端。在数据库和接收端之间形成一个高性能、低延迟的增量数据(Binlog)同步管道, 具有如下特点:

1、不依赖其它组件,一键部署

2、集成多种接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再编写客户端,开箱即用

3、内置丰富的数据解析、消息生成规则;支持Lua脚本,以处理更复杂的数据逻辑

4、支持监控告警,集成Prometheus客户端

5、高可用集群部署

6、数据同步失败重试

7、全量数据初始化

详情及安装说明 请参见: MySQL Binlog 增量同步工具go-mysql-transfer实现详解

项目开源地址:go-mysql-transfer

二、配置

# app.ymltarget: elasticsearch #目标类型#elasticsearch连接配置es_addrs: 127.0.0.1:9200 #连接地址,多个用逗号分隔es_version: 7 # Elasticsearch版本,支持6和7、默认为7#es_password:  # 用户名#es_version:  # 密码

三、数据转换规则

相关配置如下:

rule:  -    schema: eseap #数据库名称    table: t_user #表名称    #order_by_column: id #排序字段,存量数据同步时不能为空    #column_lower_case: true #列名称转为小写,默认为false    #column_upper_case:false#列名称转为大写,默认为false    column_underscore_to_camel: true #列名称下划线转驼峰,默认为false    # 包含的列,多值逗号分隔,如:id,name,age,area_id  为空时表示包含全部列    #include_columns: ID,USER_NAME,PASSWORD    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id  默认为空    #default_column_values: area_name=合肥  #默认的列-值,多个用逗号分隔,如:source=binlog,area_name=合肥    #date_formatter: yyyy-MM-dd #date类型格式化, 不填写默认yyyy-MM-dd    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss    #Elasticsearch相关    es_index: user_index #Index名称,可以为空,默认使用表(Table)名称    #es_mappings: #索引映射,可以为空,为空时根据数据类型自行推导ES推导    #  -      #   column: REMARK #数据库列名称    #    field: remark #映射后的ES字段名称    #    type: text #ES字段类型    #    analyzer: ik_smart #ES分词器,type为text此项有意义    #    #format: #日期格式,type为date此项有意义    #   -      #    column: USER_NAME #数据库列名称    #    field: account #映射后的ES字段名称    #    type: keyword #ES字段类型

示例一

t_user表,数据如下:

0da77858803ffa446af255dcfb7dddeb.png

自动创建的Mapping,如下:

9d384525dd143372a816e33302c8633f.png

同步到Elasticsearch的数据如下:

f80bfb01662bfcc76aa6491e0d670baa.png

示例二

t_user表,同实例一

使用如下配置:

rule:  -    schema: eseap #数据库名称    table: t_user #表名称    order_by_column: id #排序字段,存量数据同步时不能为空    column_lower_case: true #列名称转为小写,默认为false    #column_upper_case:false#列名称转为大写,默认为false    #column_underscore_to_camel: true #列名称下划线转驼峰,默认为false    # 包含的列,多值逗号分隔,如:id,name,age,area_id  为空时表示包含全部列    #include_columns: ID,USER_NAME,PASSWORD    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id  默认为空    default_column_values: area_name=合肥  #默认的列-值,多个用逗号分隔,如:source=binlog,area_name=合肥    #date_formatter: yyyy-MM-dd #date类型格式化, 不填写默认yyyy-MM-dd    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss    #Elasticsearch相关    es_index: user_index #Index名称,可以为空,默认使用表(Table)名称    es_mappings: #索引映射,可以为空,为空时根据数据类型自行推导ES推导      -          column: REMARK #数据库列名称        field: remark #映射后的ES字段名称        type: text #ES字段类型        analyzer: ik_smart #ES分词器,type为text此项有意义        #format: #日期格式,type为date此项有意义      -          column: USER_NAME #数据库列名称        field: account #映射后的ES字段名称        type: keyword #ES字段类型

es_mappings 定义索引的mappings(映射关系),不定义es_mappings则使用列类型自动创建索引的mappings(映射关系)。

自动创建的Mapping,如下:

530be6436cfc893d3e76e6dfc280d032.png

同步到Elasticsearch的数据如下:

ec86f96137021460c4f2fc30c1860f37.png

四、Lua脚本

使用Lua脚本可以实现更复杂的数据处理逻辑,go-mysql-transfer支持Lua5.1语法。

示例一

t_user表,数据如下:

d6c93215b386f8b33ae8ecb4283115c0.png

引入Lua脚本:

#规则配置  rule:  -    schema: eseap #数据库名称    table: t_user #表名称    order_by_column: id #排序字段,存量数据同步时不能为空    lua_file_path: lua/t_user_es.lua   #lua脚本文件    es_index: user_index #Elasticsearch Index名称,可以为空,默认使用表(Table)名称    es_mappings: #索引映射,可以为空,为空时根据数据类型自行推导ES推导      -          field: id #映射后的ES字段名称        type: keyword #ES字段类型      -          field: userName #映射后的ES字段名称        type: keyword #ES字段类型      -          field: password #映射后的ES字段名称        type: keyword #ES字段类型      -          field: createTime #映射后的ES字段名称        type: date #ES字段类型        format: yyyy-MM-dd HH:mm:ss #日期格式,type为date此项有意义      -          field: remark #映射后的ES字段名称        type: text #ES字段类型        analyzer: ik_smart #ES分词器,type为text此项有意义      -          field: source #映射后的ES字段名称        type: keyword #ES字段类型

es_mappings 定义索引的mappings(映射关系),不定义es_mappings则根据字段的值自动创建mappings(映射关系)。根据es_mappings 生成的mappings如下:

51e441839b4f34db7666dff8e47a72f6.png

user_index索引mappings

Lua脚本:

local ops = require("esOps") --加载elasticsearch操作模块local row = ops.rawRow()  --当前数据库的一行数据,table类型,key为列名称local action = ops.rawAction()  --当前数据库事件,包括:insert、update、deletelocal id = row["ID"] --获取ID列的值local userName = row["USER_NAME"] --获取USER_NAME列的值local password = row["PASSWORD"] --获取USER_NAME列的值local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值local remark = row["REMARK"] --获取REMARK列的值local result = {}  -- 定义一个table,作为结果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 数据来源if action == "insert" then -- 只监听新增事件    ops.INSERT("t_user",id,result) -- 新增,参数1为index名称,string类型;参数2为要插入的数据主键;参数3为要插入的数据,tablele类型或者json字符串end 

同步到Elasticsearch的数据如下:

70692e2a7a28e81414005838287e8caf.png

示例二

t_user表,同实例一

引入Lua脚本:

    schema: eseap #数据库名称    table: t_user #表名称    lua_file_path: lua/t_user_es2.lua   #lua脚本文件

未明确定义index名称、mappings,es会根据值自动创建一个名为t_user的index。

使用如下脚本:

local ops = require("esOps") --加载elasticsearch操作模块local row = ops.rawRow()  --当前数据库的一行数据,table类型,key为列名称local action = ops.rawAction()  --当前数据库事件,包括:insert、update、deletelocal id = row["ID"] --获取ID列的值local userName = row["USER_NAME"] --获取USER_NAME列的值local password = row["PASSWORD"] --获取USER_NAME列的值local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值local result = {}  -- 定义一个table,作为结果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 数据来源if action == "insert" then -- 只监听新增事件    ops.INSERT("t_user",id,result) -- 新增,参数1为index名称,string类型;参数2为要插入的数据主键;参数3为要插入的数据,tablele类型或者json字符串end 

同步到Elasticsearch的数据如下:

2a5cc054c9dd7b1ae21cdd3ac9897f79.png

esOps模块提供的方法如下:

  1. INSERT: 插入操作,如:ops.INSERT(index,id,result)。参数index为索引名称,字符串类型;参数index为要插入数据的主键;参数result为要插入的数据,可以为table类型或者json字符串
  2. UPDATE: 修改操作,如:ops.UPDATE(index,id,result)。参数index为索引名称,字符串类型;参数index为要修改数据的主键;参数result为要修改的数据,可以为table类型或者json字符串
  3. DELETE: 删除操作,如:ops.DELETE(index,id)。参数index为索引名称,字符串类型;参数id为要删除的数据主键,类型不限;


文章来源:https://www.jianshu.com/p/5a9b6c4f318c

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/567786.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

联想计算机不能进入系统桌面,联想笔记本进不去桌面的解决方法

联想笔记本进不去桌面的解决方法笔记本电脑开机后,电源指示灯亮,显示器屏如果有显示,但进不了系统,这种情况多数是系统故障导致的,可以尝试开机按F8键,进入安全模式,然后进入最后一次安全配置进…

win10 make命令的安装

1、下载MinGWMinGW官网下载:http://www.mingw.org ,点击右上角Downloads 或者网盘下载:链接:https://pan.baidu.com/s/1vQVKycK1TKVsnLV_OMgiCg 提取码:bbhl 点击下载 mingw-get-setup.exe 安装 mingw-get-setup.exe…

html中svg的css,HTML5 内联 SVG

什么是SVG?SVG 指可伸缩矢量图形 (Scalable Vector Graphics)SVG 用于定义用于网络的基于矢量的图形SVG 使用 XML 格式定义图形SVG 图像在放大或改变尺寸的情况下其图形质量不会有损失SVG 是万维网联盟的标准SVG 的优势与其他图像格式相比(比如 JPEG 和 GIF)&#x…

fast-rcnn win10 tensorflow部署

1、下载代码https://github.com/chde222/Faster-RCNN-TensorFlow-Python3 2、安装所依赖包 pip install -r requirements.txt 或者单独利用pip install cython pip install easydict 3、在 ./data/coco/pythonAPI 下打开cmd运行: python setup.py build_ext --in…

vue 获取url地址的参数_Vue之vuerouter的使用

1. 什么是vue-router?所谓的vue-router, 通俗的来讲 就是路由 但是这个和后端路由是不同的, 这是前端路由,是url和单页面组件的对应关系, 也就是SPA(单页应用)的路径管理器。再通俗的说,vue-router就是WebApp的链接路径管理系统。vue-router是Vue.js官方的路由插件…

win10下openpose1.5安装

历经一个星期的安装挫折,终于安装成功了。赶紧记录一下。 1、准备所需资料 (1)下载cuda和cudnn。版本最好都是cuda10和cudnn10.我下载的是下图所示版本。 如果不是这个版本可能会出错,而且出错几率很高。本人就因为安装的cuda10…

div展示html文本,html – 使文本适合div

我一直在努力重新创建我在90年代创建的父亲网站(呃),我一直无法让文本适合div内部并水平对齐.我需要将文本放在一起,以便它们适合div.这是jsfiddle中页面的代码示例HTMLHomeInside StaffOur Mission示例CSSdiv img#header{width: 50%;height: 15%;margin-left: 125px;margin-ri…

ImportError: cannot import name 'pyopenpose' from 'openpose'错误解决方法

前提条件:openpose1.5配置过程前面都成功,c api成功运行,但是python api配置中,cmake也添加了build_python_path.运行中仍出现 ImportError: cannot import name pyopenpose from openpose 这个错误。 解决方法: 将你…

python语句join_详解Python中的join()函数的用法

原博文 2017-08-07 20:51 − 函数:string.join() Python中有join()和os.path.join()两个函数,具体作用如下: join(): 连接字符串数组。将字符串、元组、列表中的元素以指定的字符(分隔符)连接生成一个新的字符串 &n...0584 相…

python glob.glob使用

函数功能:匹配所有的符合条件的文件,并将其以list的形式返回 示例: 当前文件夹下有如下文件 import globlist glob.glob(‘*g’)print(list) 结果: [dog.1012.jpg, dog.1013.jpg, dog.1014.jpg, dog.1015.jpg, dog.1016.jpg]

nohup启动jar_nohup命令详解

nohup命令详解在我们想要把SpringBoot微服务工程部署到远程服务器时,会通过java -jar springboot.jar的方式启动SpringBoot微服务。但是当我们把运行这个命令的SSH客户端退出登录就会导致SpringBoot进程也一起停止了,然后当然就没法访问我们启动的项目了…

python用pip安装pygame_安装pygame和pip的问题以及过程

1. 先安装pip(一个重要的工具cnqqtd) 2. 安装与python版和系统本相匹配的pygame 详细安装过程 Pip请到这里安装 https://pypi.python.org/pypi/pip#download 下载完成后,会获得一个叫git-pip.py的文件 • 打开git-pip.py文件存在的目录,按下shift rightClick • 打开windows Po…

.net fileupload批量上传可删除_【JavaWeb基础】文件上传和下载(修订版)

前言只有光头才能变强。文本已收录至我的GitHub仓库,欢迎Star:https://github.com/ZhongFuCheng3y/3y什么是文件上传?文件上传就是把用户的信息保存起来。为什么需要文件上传?在用户注册的时候,可能需要用户提交照片。…

object detection之Win10配置

1、下载models。 https://github.com/tensorflow/models 并文件解压。 2、下载protos文件 https://github.com/protocolbuffers/protobuf/releases?afterv3.9.1 我这里下载的3.7.0版本。注意一定要下载protoc-xxx-win64.zip版本。必须是带有win64的压缩包,否则可…

idea卸载不干净怎么办_fxfactory卸载不干净?Fxfactory及插件卸载教程

fxfactory卸载不干净怎么办?fxfactory是一款非常受欢迎的视频特效插件合集,能应用到FCPX、AE、PR、motion等软件中。过多特效插件下载会导致这些软件运行打开速度慢,那么如何卸载fxfactory这款软件或者删除那些特效插件呢?跟随小编…

矩阵标准型的系数是特征值吗_「线性代数」根据特征值,将二次型化为标准形、规范形...

今天我们来聊一聊线性代数中的二次型化为规范形、标准形的内容,这块知识相当重要,我看了看,几乎每一年的考研数学中都会涉及到一道关于这个知识点的题目,这次的整理,不仅帮助大家整理清楚思路,也是为自己整…

计算机丢失UxTheme无法修复,Win7系统启动程序失败提示“计算机中丢失UxTheme.dll”怎么办...

win7系统启动程序失败出错提示”无法启动此程序,因为计算机中丢失UxTheme.dll。尝试重新安装该程序以解决此问题“怎么办呢?UxTheme.dll是什么?其实UxTheme.dll是支持win7主题的核心文件,丢失UxTheme.dll就无法使用第三方主题了&a…

docker 设置 jvm 内存_是否值得付费?Oracle,Open JDK等四大JVM性能全面对比

市面上可供选择的JVM发行版还是有不少的。选择合适的JVM需要考虑不同的因素。性能是其中一个重要的因素。靠谱的性能研究是很困难的。在本文中,我创建了一个测试,在不同的JVM上执行对比测试。测试程序包括Spring Boot REST应用,使用Prometheu…

计算机考研初试复试比例,考研初试400多分,16人都被刷,计算机专业报考人太多,报应来了...

在目前大家都一味地挤着报考计算机专业,其他工科专业都被抛弃,以至于很多考研分数400的依然是被刷掉,这就是近期天津大学计算机专业考研复试的情况,在以前,考400以上都被称之为神人,但现在报考计算机专业40…

怎样考计算机教师资格证书,非师专生怎么考取计算机教师资格证书?

满意答案ff8410012013.04.08采纳率:41% 等级:12已帮助:22396人先要到户口所在地的教育局报名,报名的时间各地都不一样的,不统一,可以咨询当地教育局。。。如果你没有考教育学心理学和普通话就会组织你考…