Canal Mysql binlog 同步至 ElasticSearch 详细介绍

文章目录

  • 数据同步ElasticSearch
    • 单表基本配置
    • 适配器映射文件详细介绍(单表、多表映射介绍)
      • 单表映射索引示例sql
      • 单表映射索引示例sql带函数或运算操作
      • 多表映射(一对一, 多对一)索引示例sql
      • 多表映射(一对多)索引示例sql
      • 其它类型的sql示例
      • 注意事项

本文详细介绍Canal 配置保存 ElasticSearch

Canal从零配置使用参考:https://blog.csdn.net/zhangshenghang/article/details/120361721

数据同步ElasticSearch

我们接着在之前配置Hbase基础上直接修改配置,实现同时同步ElasticSearch

单表基本配置

  • 1.修改启动器配置 {canal-apapter}/conf/application.yml
server:port: 8081
logging:level:com.alibaba.otter.canal.client.adapter: DEBUGcom.alibaba.otter.canal.client.adapter.hbase: DEBUG
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null
canal.conf:# tcp kafka rocketMQ rabbitMQ canal-server运行的模式,TCP模式就是直连客户端,不经过中间件。kafka和mq是消息队列的模式mode: tcp 
#  flatMessage: truezookeeperHosts: syncBatchSize: 1retries: 0timeout: 1000accessKey:secretKey:consumerProperties:# canal tcp consumer 指定canal-server的地址和端口canal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts: 127.0.0.1:2181canal.tcp.batch.size: 1canal.tcp.username:canal.tcp.password:srcDataSources: # 数据源配置,从哪里获取数据defaultDS: # 指定一个名字,在ES的配置中会用到,唯一url: jdbc:mysql://127.0.0.1:3306/test2?useUnicode=trueusername: rootpassword: *****canalAdapters:- instance: example # canal instance Name or mq topic name 指定在canal配置的实例名称groups:- groupId: g1 outerAdapters:- name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000- name: hbaseproperties:hbase.zookeeper.quorum: sangfor.abdi.node3,sangfor.abdi.node2,sangfor.abdi.node1hbase.zookeeper.property.clientPort: 2181zookeeper.znode.parent: /hbase-unsecure- name: es7 # config目录下的子目录名称hosts: 192.168.168.2:9300 # 127.0.0.1:9200 for rest modeproperties:mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest modecluster.name: my_application
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address
  • 2.ElasticSearch 表映射文件
# 指定数据源,这个值和adapter的application.yml文件中配置的srcDataSources值对应。
dataSourceKey: defaultDS
# 指定canal-server中配置的某个实例的名字,不同实例对应不同业务
destination: example
# 组ID ,tcp方式这里填写空,不要填写值,不然可能会接收不到数据
groupId: 
# ES的mapping(映射)
esMapping:# ES索引名称_index: testsync2# ES标示文档的唯一标示,通常对应数据表中的主键ID字段_id: _id
#  upsert: true
#  pk: id
# 数据表每个字段映射到表中的具体名称,不能重复sql: "select a.id as _id, a.name,a.age,a.age_2,a.message,a.insert_time from testsync as a"
#  objFields:
#    _labels: array:;
#  etlCondition: "where a.c_time>={}"commitBatch: 10
  • 3 重启服务
bin/restart.sh

写入数据

INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());

查看adapter日志

2021-09-20 13:53:07.279 [pool-1-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"}
2021-09-20 13:53:07.286 [pool-1-thread-1] DEBUG c.a.o.c.client.adapter.hbase.service.HbaseSyncService - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"}
2021-09-20 13:53:07.287 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"} 
Affected indexes: testsync2 

查看ElasticSearch数据
在这里插入图片描述
至此写入ElasticSearch、Hbase成功

适配器映射文件详细介绍(单表、多表映射介绍)

${adapter}/conf/es7/xxx.yml

dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
outerAdapterKey: exampleKey     # 对应application.yml中es配置的key 
destination: example            # cannal的instance或者MQ的topic
groupId:                        # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:_index: mytest_user           # es 的索引名称_type: _doc                   # es 的type名称, es7下无需配置此项_id: _id                      # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
#  pk: id                       # 如果不需要_id, 则需要指定一个属性为主键属性# sql映射sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,a.c_time as _c_time, c.labels as _labels from user aleft join role b on b.id=a.role_idleft join (select user_id, group_concat(label order by id desc separator ';') as labels from labelgroup by user_id) c on c.user_id=a.id"
#  objFields:
#    _labels: array:;           # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
#    _obj: object               # json对象etlCondition: "where a.c_time>='{0}'"     # etl 的条件参数commitBatch: 3000                         # 提交批大小

sql映射说明:

sql支持多表关联自由组合, 但是有一定的限制:

  1. 主表不能为子查询语句
  2. 只能使用left outer join即最左表一定要是主表
  3. 关联从表如果是子查询不能有多张表
  4. 主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐, 可能会造成数据同步的不一致, 比如修改了where条件中的字段内容)
  5. 关联条件只允许主外键的’='操作不能出现其他常量判断比如: on a.role_id=b.id and b.statues=1
  6. 关联条件必须要有一个字段出现在主查询语句中比如: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须出现在主select语句中

Elastic Search的mapping 属性与sql的查询值将一一对应(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id可以填写到配置文件的 _id: _id映射.

单表映射索引示例sql

select a.id as _id, a.name, a.role_id, a.c_time from user a

该sql对应的es mapping示例:

{"mytest_user": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"c_time": {"type": "date"}}}}}
}

单表映射索引示例sql带函数或运算操作

select a.id as _id, concat(a.name,'_test') as name, a.role_id+10000 as role_id, a.c_time from user a

函数字段后必须跟上别名, 该sql对应的es mapping示例:

{"mytest_user": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"c_time": {"type": "date"}}}}}
}

多表映射(一对一, 多对一)索引示例sql

select a.id as _id, a.name, a.role_id, b.role_name, a.c_time from user a 
left join role b on b.id = a.role_id

注:这里join操作只能是left outer join, 第一张表必须为主表!!
该sql对应的es mapping示例:

{"mytest_user": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"role_name": {"type": "text"},"c_time": {"type": "date"}}}}}
}

多表映射(一对多)索引示例sql

select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a 
left join (select user_id, group_concat(label order by id desc separator ';') as labels from labelgroup by user_id) c on c.user_id=a.id

注:left join 后的子查询只允许一张表, 即子查询中不能再包含子查询或者关联!!

该sql对应的es mapping示例:

{"mytest_user": {"mappings": {"_doc": {"properties": {"name": {"type": "text"},"role_id": {"type": "long"},"c_time": {"type": "date"},"labels": {"type": "text"}}}}}
}

其它类型的sql示例

  • geo type
select ... concat(IFNULL(a.latitude, 0), ',', IFNULL(a.longitude, 0)) AS location, ...
  • 复合主键
select concat(a.id,'_',b.type) as _id, ... from user a left join role b on b.id=a.role_id
  • 数组字段
select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a 
left join (select user_id, group_concat(label order by id desc separator ';') as labels from labelgroup by user_id) c on c.user_id=a.id

配置中使用:

objFields:labels: array:;
  • 对象字段
select a.id as _id, a.name, a.role_id, c.labels, a.c_time, a.description from user a

配置中使用:

objFields:description: object

其中a.description字段内容为json字符串

  • 父子文档索引
    es/customer.yml
......
esMapping:_index: customer_type: _doc_id: idrelations:customer_order:name: customersql: "select t.id, t.name, t.email from customer t"

es/order.yml

esMapping:_index: customer_type: _doc_id: _idrelations:customer_order:name: orderparent: customer_idsql: "select concat('oid_', t.id) as _id,t.customer_id,t.id as order_id,t.serial_code as order_serial,t.c_time as order_timefrom biz_order t"skips:- customer_id

mapping示例:

{"mappings":{"_doc":{"properties":{"id": {"type": "long"},"name": {"type": "text"},"email": {"type": "text"},"order_id": {"type": "long"},"order_serial": {"type": "text"},"order_time": {"type": "date"},"customer_order":{"type":"join","relations":{"customer":"order"}}}}}
}

注意事项

  • 多表映射时,主表数据必须插入,如果只插入子表不插入主表,数据无法同步到ElasticSearch;相反只插入主表,子表不进行插入,数据是可以同步到ElasticSearch的
  • 多表映射时,如果主表关联id写入后,子表再进行修改之前的关联的id为我们主表写入的id,数据是无法同步到ElasticSearch中的。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509627.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于C++11的线程池

背景 在传统的收到任务即创建线程的情况下,我们每收到一个任务,就创建一个线程,执行任务,销毁线程, 我们把这三个过程所用的时间分别记做T1,T2,T3 任务本身所用的时间仅占T2/(T1T2T3),这在任务本身所用时间很短的情况下…

集合的工具类

集合操作的工具类: 1):Arrays类: 2):Collections类. Arrays类: 在Collection接口中有一个方法叫toArray把集合转换为Object数组. 把集合转换为数组: Object[] arr 集合对象.toArray(); 数组也可以转换为集合(List集合): public static List asList(T… a) 等价于public …

Docker入门到精通开发指南(一文搞懂)

文章目录安装官方安装文档具体安装步骤1.卸载之前的版本(如果之前未用过忽略该步骤)2.安装相关依赖3.设置docker镜像4.安装docker安装latest版本指定版本安装5.启动docker6.查看docker版本7.运行一个docker hello world8.卸载docker9.配置阿里云镜像加速地址docker常用命令dock…

如何向Maven中央仓库提交自己的Jar包(发布自己的Jar包到中央仓库)

文章目录注册账号GPG 安装安装生成密钥上传公钥Maven配置上传到Maven仓库修改项目的配置,填写基本信息执行编译命令登录网站配置发布项目中应用遇到的问题解决方法本文将介绍如何将自己的jar包发布至公共的中央仓库,通过maven方式进行引用 注册账号 注册…

List和Set以及Map的选用

选用哪一种容器取决于每一种容器的存储特点以及当前业务的需求: List: 单一元素集合. 允许元素重复/记录元素的添加顺序. Set:单一元素集合. 不允许元素重复/不记录元素的添加顺序. 既要不重复,又要保证先后顺序:LinkedHashSet. Map: 双元素集合. 如果存储数据的时候,还得…

Map集合类

映射的数学解释: 设A、B是两个非空集合,如果存在一个法则f,使得对A中的每个元素a,按法则f,在B中有唯一确定的元素b与之对应,则称f为从A到B的映射,记作f:A→B。 映射关系(两个集合):A集合和B集…

Socket select模型

Windows socket select模型开发。 套接字select模型是一种比较常用的IO模型。利用该模型可以使Windows socket应用程序可以同时管理多个套接字。 使用select模型,可以使当执行操作的套接字满足可读可写条件时,给应用程序发送通知。收到这个通知后&#x…

Set实现类性能对比

Set接口的实现类: 共同的特点: 1):都不允许元素重复. 2):都不是线程安全的类. 解决方案:Set s Collections.synchronizedSet(Set对象); HashSet: 不保证元素的先后添加顺序. 底层才有的是哈希表算法,查询效率极高. 判断两个对象是否相等的规则: 1):equals比较为true. …

HugeGraph Server/Hubble安装使用

文章目录HugeGraph Server1 概述2 依赖2.1 安装JDK-1.83 部署3.1 下载tar包4 安装启动4.1 解压4.2 配置Hbase5 访问Server5.1 服务启动状态校验6 停止Server7 多图配置HugeGraph-Hubble 基于Web的可视化图形界面1.概述2.安装3 使用3.1创建图HugeGraph Server 1 概述 HugeGrap…

Mysql 集群双主双从安装使用详细讲解

文章目录下载Mysql安装单机Mysql配置Mysql集群双Master配置master1配置master2配置配置说明双Slave配置Slave1配置Slave2配置双 Master 机上创建账号,并授权远程复制查询Master1的状态查询Master2的状态双Slave机上执行 change master 同步Master数据Slave1 复制 Ma…

ElasticSearch 新增节点,横向扩容

文章目录查看当前ES状态新增节点配置遇到的问题查看当前ES状态 这里默认都是在Kibana进行操作 GET _cluster/health{"cluster_name" : "bjga-gz","status" : "yellow","timed_out" : false,"number_of_nodes" :…

输入和输出(IO)概述

什么是IO:(Input/Output):输入和输出. IO设备: 和电脑通信的设备. 输入设备:麦克风,扫描器,键盘,鼠标等. 输出设备:显示器,打印机,投影仪,耳机,音响等. 为什么程序需要IO呢? 案例1:打游戏操作,得分比较高,存储游戏的信息(XXX-888分). 此时需要把游戏中的数据存储起来,只能…

java中有关文件流的操作

文件流: 顾名思义,程序和文件打交道. 此时我们谈及的文件,值得是纯文本文件(txt的,不要使用Word,Excel), 在字节流中,暂时不要使用中文. FileInputStream: 文件的字节输入流 FileOutputStream: 文件的字节输出流 FileReader:文件的字符输入流 FileWriter:文件的字符输出流…

数据结构实验之二叉树一:树的同构

题目描述 给定两棵树T1和T2。如果T1可以通过若干次左右孩子互换就变成T2,则我们称两棵树是“同构”的。例如图1给出的两棵树就是同构的,因为我们把其中一棵树的结点A、B、G的左右孩子互换后,就得到另外一棵树。而图2就不是同构的。 图1 …

java中字符编码详解

字符编码的发展历程: 阶段1: 计算机只认识数字,我们在计算机里一切数据都是以数字来表示,因为英文符号有限, 所以规定使用的字节的最高位是0.每一个字节都是以0~127之间的数字来表示,比如A对应65,a对应97. 这就是美国标准信息交换码-ASCII. 阶段2: 随着计算机在全球的普及…

java中的包装流和缓冲流概述

处理流/包装流(相对于节点流更高级)装饰设计模式/包装模式: 1:隐藏了底层的节点流的差异,并对外提供了更方便的输入/输出功能,让我们只关心高级流的操作. 2:使用处理流包装了节点流,程序直接操作处理流,让节点流与底层的设备做IO操作. 3:只需要关闭处理流即可. 包装流如何区…

转换流和内存流

转换流:把字节流转成字符流: InputStreamReader:把字节输入流转成字符输入流. OutputStreamWriter:把字节输出流转成字符输出流. 为什么有字节转字符流,没有字符转字节流. 字节流可以操作一切文件(纯文本文件/二进制文件).字符流是用来操作中文纯文本使用的,本身是对字节流的…

windows配置gvim高效率编程(cc++)带自动补全代码

对vim的配置足以处理一般的比赛获其他编程项目要求,如自动缩进,自动补全等等。先上几张截图,看看效果: 可以看见vim简洁高效的界面和不错的缩进功能。 debug功能 一、安装gvim 下载资源并安装 百度云下载网址http://pan.baid…

Flink 1.12 CDH 6.3 集成

之前记录的:Flink 1.9 CDH 6.3 集成 有些下载链接可能被官方关闭了,这里介绍1.12版本集成,并把安装包下载地址换为百度网盘链接 下载安装包 链接: https://pan.baidu.com/s/112fiaaMAMMXMsyiTDh3qjg 提取码: ar5f 安装包内容 FLINK-1.12…