elasticsearch date_MySQL数据实时增量同步到Elasticsearch

efd45e39aa6eac7f6fed18f9c34a5b90.png

Mysql到Elasticsearch的数据同步,一般用ETL来实现,但性能并不理想,目前大部分的ETL是定时查询Mysql数据库有没有新增数据或者修改数据,如果数据量小影响不大,但如果几百万上千万的数据量性能就明显的下降很多,本文是使用Go实现的go-mysql-transfer中间件来实时监控Mysql的Binlog日志,然后同步到Elasticsearch,从实时性、性能效果都不错。

一、go-mysql-transfer

go-mysql-transfer是使用Go语言实现的MySQL数据库实时增量同步工具。能够实时监听MySQL二进制日志(binlog)的变动,将变更内容形成指定格式的消息,发送到接收端。在数据库和接收端之间形成一个高性能、低延迟的增量数据(Binlog)同步管道, 具有如下特点:

1、不依赖其它组件,一键部署

2、集成多种接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再编写客户端,开箱即用

3、内置丰富的数据解析、消息生成规则;支持Lua脚本,以处理更复杂的数据逻辑

4、支持监控告警,集成Prometheus客户端

5、高可用集群部署

6、数据同步失败重试

7、全量数据初始化

详情及安装说明 请参见: MySQL Binlog 增量同步工具go-mysql-transfer实现详解

项目开源地址:go-mysql-transfer

二、配置

# app.ymltarget: elasticsearch #目标类型#elasticsearch连接配置es_addrs: 127.0.0.1:9200 #连接地址,多个用逗号分隔es_version: 7 # Elasticsearch版本,支持6和7、默认为7#es_password:  # 用户名#es_version:  # 密码

三、数据转换规则

相关配置如下:

rule:  -    schema: eseap #数据库名称    table: t_user #表名称    #order_by_column: id #排序字段,存量数据同步时不能为空    #column_lower_case: true #列名称转为小写,默认为false    #column_upper_case:false#列名称转为大写,默认为false    column_underscore_to_camel: true #列名称下划线转驼峰,默认为false    # 包含的列,多值逗号分隔,如:id,name,age,area_id  为空时表示包含全部列    #include_columns: ID,USER_NAME,PASSWORD    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id  默认为空    #default_column_values: area_name=合肥  #默认的列-值,多个用逗号分隔,如:source=binlog,area_name=合肥    #date_formatter: yyyy-MM-dd #date类型格式化, 不填写默认yyyy-MM-dd    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss    #Elasticsearch相关    es_index: user_index #Index名称,可以为空,默认使用表(Table)名称    #es_mappings: #索引映射,可以为空,为空时根据数据类型自行推导ES推导    #  -      #   column: REMARK #数据库列名称    #    field: remark #映射后的ES字段名称    #    type: text #ES字段类型    #    analyzer: ik_smart #ES分词器,type为text此项有意义    #    #format: #日期格式,type为date此项有意义    #   -      #    column: USER_NAME #数据库列名称    #    field: account #映射后的ES字段名称    #    type: keyword #ES字段类型

示例一

t_user表,数据如下:

0da77858803ffa446af255dcfb7dddeb.png

自动创建的Mapping,如下:

9d384525dd143372a816e33302c8633f.png

同步到Elasticsearch的数据如下:

f80bfb01662bfcc76aa6491e0d670baa.png

示例二

t_user表,同实例一

使用如下配置:

rule:  -    schema: eseap #数据库名称    table: t_user #表名称    order_by_column: id #排序字段,存量数据同步时不能为空    column_lower_case: true #列名称转为小写,默认为false    #column_upper_case:false#列名称转为大写,默认为false    #column_underscore_to_camel: true #列名称下划线转驼峰,默认为false    # 包含的列,多值逗号分隔,如:id,name,age,area_id  为空时表示包含全部列    #include_columns: ID,USER_NAME,PASSWORD    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗号分隔,如:id,name,age,area_id  默认为空    default_column_values: area_name=合肥  #默认的列-值,多个用逗号分隔,如:source=binlog,area_name=合肥    #date_formatter: yyyy-MM-dd #date类型格式化, 不填写默认yyyy-MM-dd    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp类型格式化,不填写默认yyyy-MM-dd HH:mm:ss    #Elasticsearch相关    es_index: user_index #Index名称,可以为空,默认使用表(Table)名称    es_mappings: #索引映射,可以为空,为空时根据数据类型自行推导ES推导      -          column: REMARK #数据库列名称        field: remark #映射后的ES字段名称        type: text #ES字段类型        analyzer: ik_smart #ES分词器,type为text此项有意义        #format: #日期格式,type为date此项有意义      -          column: USER_NAME #数据库列名称        field: account #映射后的ES字段名称        type: keyword #ES字段类型

es_mappings 定义索引的mappings(映射关系),不定义es_mappings则使用列类型自动创建索引的mappings(映射关系)。

自动创建的Mapping,如下:

530be6436cfc893d3e76e6dfc280d032.png

同步到Elasticsearch的数据如下:

ec86f96137021460c4f2fc30c1860f37.png

四、Lua脚本

使用Lua脚本可以实现更复杂的数据处理逻辑,go-mysql-transfer支持Lua5.1语法。

示例一

t_user表,数据如下:

d6c93215b386f8b33ae8ecb4283115c0.png

引入Lua脚本:

#规则配置  rule:  -    schema: eseap #数据库名称    table: t_user #表名称    order_by_column: id #排序字段,存量数据同步时不能为空    lua_file_path: lua/t_user_es.lua   #lua脚本文件    es_index: user_index #Elasticsearch Index名称,可以为空,默认使用表(Table)名称    es_mappings: #索引映射,可以为空,为空时根据数据类型自行推导ES推导      -          field: id #映射后的ES字段名称        type: keyword #ES字段类型      -          field: userName #映射后的ES字段名称        type: keyword #ES字段类型      -          field: password #映射后的ES字段名称        type: keyword #ES字段类型      -          field: createTime #映射后的ES字段名称        type: date #ES字段类型        format: yyyy-MM-dd HH:mm:ss #日期格式,type为date此项有意义      -          field: remark #映射后的ES字段名称        type: text #ES字段类型        analyzer: ik_smart #ES分词器,type为text此项有意义      -          field: source #映射后的ES字段名称        type: keyword #ES字段类型

es_mappings 定义索引的mappings(映射关系),不定义es_mappings则根据字段的值自动创建mappings(映射关系)。根据es_mappings 生成的mappings如下:

51e441839b4f34db7666dff8e47a72f6.png

user_index索引mappings

Lua脚本:

local ops = require("esOps") --加载elasticsearch操作模块local row = ops.rawRow()  --当前数据库的一行数据,table类型,key为列名称local action = ops.rawAction()  --当前数据库事件,包括:insert、update、deletelocal id = row["ID"] --获取ID列的值local userName = row["USER_NAME"] --获取USER_NAME列的值local password = row["PASSWORD"] --获取USER_NAME列的值local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值local remark = row["REMARK"] --获取REMARK列的值local result = {}  -- 定义一个table,作为结果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 数据来源if action == "insert" then -- 只监听新增事件    ops.INSERT("t_user",id,result) -- 新增,参数1为index名称,string类型;参数2为要插入的数据主键;参数3为要插入的数据,tablele类型或者json字符串end 

同步到Elasticsearch的数据如下:

70692e2a7a28e81414005838287e8caf.png

示例二

t_user表,同实例一

引入Lua脚本:

    schema: eseap #数据库名称    table: t_user #表名称    lua_file_path: lua/t_user_es2.lua   #lua脚本文件

未明确定义index名称、mappings,es会根据值自动创建一个名为t_user的index。

使用如下脚本:

local ops = require("esOps") --加载elasticsearch操作模块local row = ops.rawRow()  --当前数据库的一行数据,table类型,key为列名称local action = ops.rawAction()  --当前数据库事件,包括:insert、update、deletelocal id = row["ID"] --获取ID列的值local userName = row["USER_NAME"] --获取USER_NAME列的值local password = row["PASSWORD"] --获取USER_NAME列的值local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值local result = {}  -- 定义一个table,作为结果集result["id"] = idresult["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["remark"] = remarkresult["source"] = "binlog" -- 数据来源if action == "insert" then -- 只监听新增事件    ops.INSERT("t_user",id,result) -- 新增,参数1为index名称,string类型;参数2为要插入的数据主键;参数3为要插入的数据,tablele类型或者json字符串end 

同步到Elasticsearch的数据如下:

2a5cc054c9dd7b1ae21cdd3ac9897f79.png

esOps模块提供的方法如下:

  1. INSERT: 插入操作,如:ops.INSERT(index,id,result)。参数index为索引名称,字符串类型;参数index为要插入数据的主键;参数result为要插入的数据,可以为table类型或者json字符串
  2. UPDATE: 修改操作,如:ops.UPDATE(index,id,result)。参数index为索引名称,字符串类型;参数index为要修改数据的主键;参数result为要修改的数据,可以为table类型或者json字符串
  3. DELETE: 删除操作,如:ops.DELETE(index,id)。参数index为索引名称,字符串类型;参数id为要删除的数据主键,类型不限;


文章来源:https://www.jianshu.com/p/5a9b6c4f318c

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/567786.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

联想计算机不能进入系统桌面,联想笔记本进不去桌面的解决方法

联想笔记本进不去桌面的解决方法笔记本电脑开机后,电源指示灯亮,显示器屏如果有显示,但进不了系统,这种情况多数是系统故障导致的,可以尝试开机按F8键,进入安全模式,然后进入最后一次安全配置进…

springboot jpa sql打印_SpringBoot集成Spring Data JPA以及读写分离

相关代码:github OSCchinaJPA是什么JPA(Java Persistence API)是Sun官方提出的Java持久化规范,它为Java开发人员提供了一种对象/关联映射工具 来管理Java应用中的关系数据.它包括以下几方面的内容:1.ORM映射 支持xml和注解方式建立实体与表之间的映射.2.Java持久化API 定义了一…

win10 make命令的安装

1、下载MinGWMinGW官网下载:http://www.mingw.org ,点击右上角Downloads 或者网盘下载:链接:https://pan.baidu.com/s/1vQVKycK1TKVsnLV_OMgiCg 提取码:bbhl 点击下载 mingw-get-setup.exe 安装 mingw-get-setup.exe…

html中svg的css,HTML5 内联 SVG

什么是SVG?SVG 指可伸缩矢量图形 (Scalable Vector Graphics)SVG 用于定义用于网络的基于矢量的图形SVG 使用 XML 格式定义图形SVG 图像在放大或改变尺寸的情况下其图形质量不会有损失SVG 是万维网联盟的标准SVG 的优势与其他图像格式相比(比如 JPEG 和 GIF)&#x…

fast-rcnn win10 tensorflow部署

1、下载代码https://github.com/chde222/Faster-RCNN-TensorFlow-Python3 2、安装所依赖包 pip install -r requirements.txt 或者单独利用pip install cython pip install easydict 3、在 ./data/coco/pythonAPI 下打开cmd运行: python setup.py build_ext --in…

如何找到python的安装路径_如何查看python的安装路径

展开全部 官方文档上有写的,sys.executable是当前Python解释器(或者其他Python实现)的路径。 1、安装mysql 首先到mysql官网e68a843231313335323631343130323136353331333365643662下载文件:mysql-installer-community.msi 安装过…

两个html之间传递对象,解决微信警告:该链接含有无法解析的地址链接-两个html之间的传值(JSON数据)...

最近微信公众号开发进入二期了,增添关于汽车租赁的商城模块。遇到界面传值数据问题。1.首先我用的方式是:location.href"sales-detail.html?id"escape(JSON.stringify(htmlObj));另外,在第二界面用:var obj JSON.parse…

vue 获取url地址的参数_Vue之vuerouter的使用

1. 什么是vue-router?所谓的vue-router, 通俗的来讲 就是路由 但是这个和后端路由是不同的, 这是前端路由,是url和单页面组件的对应关系, 也就是SPA(单页应用)的路径管理器。再通俗的说,vue-router就是WebApp的链接路径管理系统。vue-router是Vue.js官方的路由插件…

win10下openpose1.5安装

历经一个星期的安装挫折,终于安装成功了。赶紧记录一下。 1、准备所需资料 (1)下载cuda和cudnn。版本最好都是cuda10和cudnn10.我下载的是下图所示版本。 如果不是这个版本可能会出错,而且出错几率很高。本人就因为安装的cuda10…

div展示html文本,html – 使文本适合div

我一直在努力重新创建我在90年代创建的父亲网站(呃),我一直无法让文本适合div内部并水平对齐.我需要将文本放在一起,以便它们适合div.这是jsfiddle中页面的代码示例HTMLHomeInside StaffOur Mission示例CSSdiv img#header{width: 50%;height: 15%;margin-left: 125px;margin-ri…

ImportError: cannot import name 'pyopenpose' from 'openpose'错误解决方法

前提条件:openpose1.5配置过程前面都成功,c api成功运行,但是python api配置中,cmake也添加了build_python_path.运行中仍出现 ImportError: cannot import name pyopenpose from openpose 这个错误。 解决方法: 将你…

python语句join_详解Python中的join()函数的用法

原博文 2017-08-07 20:51 − 函数:string.join() Python中有join()和os.path.join()两个函数,具体作用如下: join(): 连接字符串数组。将字符串、元组、列表中的元素以指定的字符(分隔符)连接生成一个新的字符串 &n...0584 相…

计算机网络期末考长沙学院,校内用-第二套计算机网络试卷A..doc

《计算机网络》课程考试 A卷题号一二三四五六总分合分人复核人满分得分一、单选题(在本题的每一小题的备选答案中,只有一个答案是正确的,请把你认为正确答案的题号,填入题干的括号内。多选不给分。每小题1分,共20分)得分评卷人复核…

python glob.glob使用

函数功能:匹配所有的符合条件的文件,并将其以list的形式返回 示例: 当前文件夹下有如下文件 import globlist glob.glob(‘*g’)print(list) 结果: [dog.1012.jpg, dog.1013.jpg, dog.1014.jpg, dog.1015.jpg, dog.1016.jpg]

nohup启动jar_nohup命令详解

nohup命令详解在我们想要把SpringBoot微服务工程部署到远程服务器时,会通过java -jar springboot.jar的方式启动SpringBoot微服务。但是当我们把运行这个命令的SSH客户端退出登录就会导致SpringBoot进程也一起停止了,然后当然就没法访问我们启动的项目了…

教师进修学校计算机教学反思,优秀教学反思

十一月优秀教学反思教学反思在上周我们学校迎来了鞍山市教师进修学校第七届“薪火计划”八年组英语的展示课。鞍山市教师进修学校之所以搞这样一个活动是为了给80后老师创造更多的学习和交流的机会,为了帮助80后更快更好地成长。我们学校之所以提供了这样的场所最为…

用Python批量更改图片大小

#提取目录下所有图片,更改尺寸后保存到另一目录 from PIL import Image import os.path import glob def convertjpg(jpgfile,outdir,width128,height128):imgImage.open(jpgfile)try:new_imgimg.resize((width,height),Image.BILINEAR) new_img.save(os.path.join(outdir,os…

python用pip安装pygame_安装pygame和pip的问题以及过程

1. 先安装pip(一个重要的工具cnqqtd) 2. 安装与python版和系统本相匹配的pygame 详细安装过程 Pip请到这里安装 https://pypi.python.org/pypi/pip#download 下载完成后,会获得一个叫git-pip.py的文件 • 打开git-pip.py文件存在的目录,按下shift rightClick • 打开windows Po…

计算机水平考试改革,浅析全国计算机等级考试改革及应对策略

[摘 要]针对吉林省全国计算机等级考试部分科目即将实行无纸化改革进行分析,结合近几年笔者指导学生参加全国计算机等级考试的教学实践,对新形式下的考试内容和方式进行分析,及时调整教与学的侧重点,以期很好地应对新形式下的考核…

plotloss记录

1、保存所有的loss all_loss[] all_loss.append(loss) fileopen(data.txt,w) file.write(all_loss) file.close() 2、画图 list [] with open(data.txt, r) as f:for line in f.readlines():arr line.split(,) print(arr.__len__()) arr[0]arr[0][1:] arr[-1]arr[-1][0:-2…