基于Canal同步MySQL数据到Elasticsearch
基于 canal 同步 mysql 的数据到 elasticsearch 中。
1、canal-server
相关软件的安装请参考:《Canal实现数据同步》
1.1 pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>canal-to-elasticsearch</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>canal-to-elasticsearch</name><description>canal to elasticsearch</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version><relativePath/></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
1.2 SimpleCanalClientExample编写
package com.example.canatest.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;/*** 说明:用于测试canal是否已经连接上了mysql*/
public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.94.186",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {// 获取指定数量的数据Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}
}
注意当后面canal-adapter
也连接上canal-server
后,程序就监听不到数据变化了。
这个类只是测试,下面不使用。
2、canal-adapter
由于目前canal-adapter
没有官方docker镜像,所以拉去一个非官方的。
canal-adapter安装:
搜索镜像
$ docker search canal-adapter
拉取镜像
$ docker pull slpcat/canal-adapter:v1.1.5
启动
$ docker run -p 8081:8081 --name canal-adapter -d slpcat/canal-adapter:v1.1.5
修改配置
$ docker exec -it 89ef714d3a0e /bin/bash
$ cd conf/
$ vi application.yml
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null
canal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:# canal tcp consumer# canal.tcp.server.host需要修改canal.tcp.server.host: 192.168.94.186:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources:defaultDS:# url,username,password需要修改url: jdbc:mysql://192.168.94.186:3306/canal_test?useUnicode=trueusername: canalpassword: canalcanalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger# name需要修改- name: es7# hosts需要修改hosts: 192.168.94.186:9200 # 127.0.0.1:9200 for rest modeproperties:mode: rest# security.auth: test:123456 # only used for rest mode# cluster.name需要修改cluster.name: my-es
$ cd conf/es7
$ cp -v mytest_user.yml canal_test_collect.yml
# 删除其他多余的
$ rm -rf biz_order.yml customer.yml mytest_user.yml
$ vi dailyhub_collect.yml
dataSourceKey: defaultDS
# 需要修改
destination: example
# 需要修改
groupId: g1
esMapping:# 需要修改_index: canal_test_id: _id_type: _docupsert: true
# pk: id# 需要修改sql: "
SELECTc.id AS _id,c.user_id AS userId,c.title AS title,c.url AS url,c.note AS note,c.collected AS collected,c.created AS created,c.personal AS personal,u.username AS username,u.avatar AS userAvatar
FROMm_collect c
LEFT JOIN m_user u ON c.user_id = u.id"
# objFields:
# _labels: array:;
# etlCondition: "where c.c_time>={}"commitBatch: 3000
也可以在外面编辑好,通过docker命令传输到docker容器中:
$ docker cp canal_test_collect.yml canal-adapter:/opt/canal-adapter/conf/es7/canal_test_collect.yml
$ docker cp application.yml canal-adapter:/opt/canal-adapter/conf/application.yml
重启容器
$ docker restart 89ef714d3a0e
验证是否启动成功
$ docker logs -f 89ef714d3a0e
注意对于时间类型,在后端一定要使用LocalDateTime
或者LocalDate
类型,如果是Date
类型,需要自己手动
设置格式。
3、测试
准备测试条件:
1、首先在数据库中生成表和字段
CREATE TABLE `m_user` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`avatar` varchar(255) DEFAULT NULL,`created` date DEFAULT NULL,`lasted` date DEFAULT NULL,`open_id` varchar(255) DEFAULT NULL,`statu` int(11) DEFAULT NULL,`username` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;CREATE TABLE `m_collect` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`collected` date DEFAULT NULL,`created` date DEFAULT NULL,`note` varchar(255) DEFAULT NULL,`personal` int(11) DEFAULT NULL,`title` varchar(255) DEFAULT NULL,`url` varchar(255) DEFAULT NULL,`user_id` bigint(20) DEFAULT NULL,PRIMARY KEY (`id`),KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;
2、然后在elasticsearch中生成索引
# 创建索引并添加映射字段
PUT /canal_test
{"mappings": {"properties": {"collected": {"type": "date","format": "date_optional_time||epoch_millis"},"created": {"type": "date","format": "date_optional_time||epoch_millis"},"note": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"personal": {"type": "integer"},"title": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"url": {"type": "text"},"userAvatar": {"type": "text"},"userId": {"type": "long"},"username": {"type": "keyword"}}}
}
3、插入数据
INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload../../images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05', '2022-01-06', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', 'MarkerHub');
4、查看数据
GET /canal_test/_search
5、遇到的问题
如果看到canal-adapter
一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysql
、es
、canal
、
adapar
:
2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!