基于Canal同步MySQL数据到Elasticsearch

基于Canal同步MySQL数据到Elasticsearch

基于 canal 同步 mysql 的数据到 elasticsearch 中。

1、canal-server

相关软件的安装请参考:《Canal实现数据同步》

1.1 pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>canal-to-elasticsearch</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>canal-to-elasticsearch</name><description>canal to elasticsearch</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version><relativePath/></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.2 SimpleCanalClientExample编写

package com.example.canatest.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;/*** 说明:用于测试canal是否已经连接上了mysql*/
public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.94.186",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {// 获取指定数量的数据Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}

在这里插入图片描述

在这里插入图片描述

注意当后面canal-adapter也连接上canal-server后,程序就监听不到数据变化了。

这个类只是测试,下面不使用。

2、canal-adapter

由于目前canal-adapter没有官方docker镜像,所以拉去一个非官方的。

canal-adapter安装:

搜索镜像

$ docker search canal-adapter

在这里插入图片描述

拉取镜像

$ docker pull slpcat/canal-adapter:v1.1.5

在这里插入图片描述

启动

$ docker run -p 8081:8081 --name canal-adapter -d slpcat/canal-adapter:v1.1.5

在这里插入图片描述

修改配置

$ docker exec -it 89ef714d3a0e /bin/bash
$ cd conf/
$ vi application.yml
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null
canal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:# canal tcp consumer# canal.tcp.server.host需要修改canal.tcp.server.host: 192.168.94.186:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources:defaultDS:# url,username,password需要修改url: jdbc:mysql://192.168.94.186:3306/canal_test?useUnicode=trueusername: canalpassword: canalcanalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger# name需要修改- name: es7# hosts需要修改hosts: 192.168.94.186:9200 # 127.0.0.1:9200 for rest modeproperties:mode: rest# security.auth: test:123456 #  only used for rest mode# cluster.name需要修改cluster.name: my-es
$ cd conf/es7
$ cp -v mytest_user.yml canal_test_collect.yml
# 删除其他多余的
$ rm -rf biz_order.yml customer.yml mytest_user.yml
$ vi dailyhub_collect.yml
dataSourceKey: defaultDS
# 需要修改
destination: example
# 需要修改
groupId: g1
esMapping:# 需要修改_index: canal_test_id: _id_type: _docupsert: true
#  pk: id# 需要修改sql: "
SELECTc.id AS _id,c.user_id AS userId,c.title AS title,c.url AS url,c.note AS note,c.collected AS collected,c.created AS created,c.personal AS personal,u.username AS username,u.avatar AS userAvatar
FROMm_collect c
LEFT JOIN m_user u ON c.user_id = u.id"
#  objFields:
#    _labels: array:;
#   etlCondition: "where c.c_time>={}"commitBatch: 3000

也可以在外面编辑好,通过docker命令传输到docker容器中:

$ docker cp canal_test_collect.yml canal-adapter:/opt/canal-adapter/conf/es7/canal_test_collect.yml
$ docker cp application.yml canal-adapter:/opt/canal-adapter/conf/application.yml

重启容器

$ docker restart 89ef714d3a0e

验证是否启动成功

$ docker logs -f 89ef714d3a0e

在这里插入图片描述

注意对于时间类型,在后端一定要使用LocalDateTime或者LocalDate类型,如果是Date类型,需要自己手动

设置格式。

3、测试

准备测试条件:

1、首先在数据库中生成表和字段

CREATE TABLE `m_user` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`avatar` varchar(255) DEFAULT NULL,`created` date DEFAULT NULL,`lasted` date DEFAULT NULL,`open_id` varchar(255) DEFAULT NULL,`statu` int(11) DEFAULT NULL,`username` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;CREATE TABLE `m_collect` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`collected` date DEFAULT NULL,`created` date DEFAULT NULL,`note` varchar(255) DEFAULT NULL,`personal` int(11) DEFAULT NULL,`title` varchar(255) DEFAULT NULL,`url` varchar(255) DEFAULT NULL,`user_id` bigint(20) DEFAULT NULL,PRIMARY KEY (`id`),KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;

在这里插入图片描述

2、然后在elasticsearch中生成索引

# 创建索引并添加映射字段
PUT /canal_test
{"mappings": {"properties": {"collected": {"type": "date","format": "date_optional_time||epoch_millis"},"created": {"type": "date","format": "date_optional_time||epoch_millis"},"note": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"personal": {"type": "integer"},"title": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"url": {"type": "text"},"userAvatar": {"type": "text"},"userId": {"type": "long"},"username": {"type": "keyword"}}}
}

在这里插入图片描述

3、插入数据

INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload../../images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05', '2022-01-06', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', 'MarkerHub');

在这里插入图片描述

4、查看数据

GET /canal_test/_search

5、遇到的问题

如果看到canal-adapter一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysqlescanal

adapar

2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/120971.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1. 两数之和、Leetcode的Python实现

博客主页&#xff1a;&#x1f3c6;看看是李XX还是李歘歘 &#x1f3c6; &#x1f33a;每天分享一些包括但不限于计算机基础、算法等相关的知识点&#x1f33a; &#x1f497;点关注不迷路&#xff0c;总有一些&#x1f4d6;知识点&#x1f4d6;是你想要的&#x1f497; ⛽️今…

虚拟机构建部署单体项目及前后端分离项目

目录 一.部署单体项目 1.远程数据库 1.1远程连接数据库 1.2 新建数据库运行sql文件 2.部署项目到服务器中 3.启动服务器运行 二.部署前后端分离项目 1.远程数据库和部署到服务器 2.利用node环境启动前端项目 3.解决主机无法解析服务器localhost问题 方法一 ​编辑 方法二 一.部…

不需要报班学课程,也能制作手办创业的新方法!

近些年&#xff0c;我们已经习惯只要走进商场就一定会路过泡泡玛特一类的潮流玩具店&#xff0c;甚至还会在美食城、自助饮料机旁边看到盲盒手办的售卖机器里摆放着诱人的近期热卖盲盒。一线城市如此&#xff0c;县城也是一样&#xff0c;不同的只可能是盲盒里的内容。 盲盒到底…

公司如何禁止拷贝文件

公司如何禁止拷贝文件 安企神U盘管理系统下载使用 禁止拷贝文件是一种数据安全措施&#xff0c;通常在企业中用于保护重要信息和知识产权。禁止拷贝文件的方法需要根据公司的实际情况来选择和实施&#xff0c;以下是一些常见的方法&#xff0c;可用于防止文件拷贝&#xff1a…

【pwn入门】使用python打二进制

声明 本文是B站你想有多PWN学习的笔记&#xff0c;包含一些视频外的扩展知识。 程序网络交互初体验 将程序部署成可以远程访问的 socat tcp-l:8877,fork exec:./question_1_plus_x64,reuseaddr通过网络访问程序 nc 127.0.0.1 8877攻击脚本 import socket import telnetli…

【C语言】字符函数与字符串函数

简单不先于复杂&#xff0c;而是在复杂之后。 目录 0. 前言 1. 函数介绍 1.1 strlen 1.1.1 介绍 1.1.2 strlen 函数模拟实现 1.1.2.1 计数器方法 1.1.2.2 递归方法 1.1.2.3 指针 - 指针方法 1.2 strcpy 1.2.1 介绍 1.2.2 strcpy 函数模拟实现 1.3 strcat 1…

RuoYi-Vue-SqlServer配置

项目链接 https://gitee.com/linkxs/RuoYi-Vue-SqlServerhttps://gitee.com/linkxs/RuoYi-Vue-SqlServer 服务端Eclipse编译 需要在 /ruoyi-common/pom.xml 中注释掉这些exclusion才能在Eclipse编译。实际maven编译&#xff0c;可以把这一块打开。 客户端ruoyi-ui编译 使用…

【Unity实战】手戳一个自定义角色换装系统——2d3d通用

文章目录 每篇一句前言素材开始切换头型添加更改颜色随机控制头型和颜色新增眼睛同样的方法配置人物的其他部位设置相同颜色部位全部部位随机绘制UI并添加点击事件通过代码控制点击事件添加颜色修改的事件其他部位效果UI切换添加随机按钮保存角色变更数据跳转场景显示角色数据 …

Python-自动化绘制股票价格通道线

常规方案 通过将高点/低点与其 2 个或 3 个相邻点进行比较来检测枢轴点,并检查它是否是其中的最高/最低点。对所有枢轴点进行线性回归以获得上方和下方趋势线。价格离开通道后建仓。通过这样做,我们得到如下所示的价格通道。我认为我们可以利用给定的数据取得更好的结果。

【算法|动态规划No30】leetcode5. 最长回文子串

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】【LeetCode】 &#x1f354;本专栏旨在提高自己算法能力的同时&#xff0c;记录一下自己的学习过程&#xff0c;希望…

C语言之预处理

目录 前言 宏定义define的用法 文件包含include的用法 条件编译的用法 其他预处理命令 练习题 练习一 练习二 练习三 前言 预处理命令可以改变程序设计环境&#xff0c;提高编程效率&#xff0c;它们并不是C语言本身的组成部分&#xff0c;不能直接对它们进行编译&am…

redis高可用

文章目录 redis高可用概述哨兵模式原理配置流程使用缺点 cluster集群原理特征流程缺点故障转移故障检测故障转移 集群配置和管理主要命令搭建集群创建集群查看集群配置信息测试集群主从切换扩容缩容 redis高可用概述 1、高可用是分布式的概念。 Redis的高可用性是指在Redis集群…

springsecurity学习笔记-未完

目录 前言 一、概念 1.什么是springsecurity 2.对比shiro 二、开始项目 1.建立一个空项目&#xff0c;建立module&#xff0c;引入相关依赖 2.启动项目&#xff0c;访问项目 3.自定义密码 总结 前言 记录一下学习springsecurity的过程 开发环境&#xff1a;IDEA 一、概念 1.…

解决提交到App Store时的ITMS-90478和ITMS-90062错误

解决提交到App Store时的ITMS-90478和ITMS-90062错误 目录 引言 正文 1. 什么是ITMS-90478和ITMS-90062错误&#xff1f; 2. 解决方法 2.1 确定当前的版本号和构建号 2.2 递增版本号和构建号 2.3 再次尝试提交应用 总结 参考资料 错误记录 摘要&#xff1a;本文为iOS…

鼎鑫鸿鄴引入“能源互联网+”理念 打造共赢

近年来&#xff0c;随着全球能源消耗的不断增长和环境问题的日益突出&#xff0c;清洁能源转型成为全球共同关注的话题。中国作为全球最大的能源消费国&#xff0c;也在积极推动能源结构的优化和清洁能源的发展。鼎鑫鸿鄴新能源科技有限公司在推动清洁能源转型方面制定了一系列…

北太天元安装教程 及使用方法

北太天元是面向科学计算与工程计算的国产通用型科学计算软件。提供科学计算、可视化、交互式程序设计&#xff0c;具备丰富的底层数学函数库&#xff0c;支持数值计算、数据分析、数据可视化、数据优化、算法开发等工作&#xff0c;并通过SDK与API接口&#xff0c;扩展支持各类…

Vite介绍及实现原理

Vite介绍及实现原理 一、Vite简介1.1、什么是Vite1.2 、Vite的主要特性1.3、 为什么要使用Vite 二、Vite的实现原理2.1、依赖处理2.2、静态资源加载2.3、vue文件缓存2.4、 js/ts处理 三、热更新原理四、vite基本使用4.1、安装4.2、搭建项目 一、Vite简介 1.1、什么是Vite Vite…

计算机网络——理论知识总结(上)

开新番&#xff0c;因为博主备考的学校计网只考察1/6的分值&#xff0c;而且定位偏向于送分题&#xff0c;因此在备考时并没有很高强度的复习。本帖基于王道考研的教辅总结归纳&#xff0c;虽然是408的教材&#xff0c;但忽略其中有难度的部分&#xff0c;如计算题、画图题等&a…

如何通过员工工时管理降低企业成本?

作为当今快节奏商业环境的领导者或管理者&#xff0c;掌握员工的工作时间对于控制企业成本和确保每个人都各尽其责至关重要。 员工工时表软件就是这样一款工时跟踪管理解决方案&#xff1a;数字化的工时表有助于保护企业的财务不会被无节制的开支冲垮。然而&#xff0c;引入此…

Spark SQL概述与基本操作

目录 一、Spark SQL概述 &#xff08;1&#xff09;概念 &#xff08;2&#xff09;特点 &#xff08;3&#xff09;Spark SQL与Hive异同 &#xff08;4&#xff09;Spark的数据抽象 二、Spark Session对象执行环境构建 (1)Spark Session对象 &#xff08;2&#xff09;代码演…