本地部署Canal笔记-实现MySQL与ElasticSearch7数据同步

背景

本地搭建canal实现mysql数据到es的简单的数据同步,仅供学习参考

建议首先熟悉一下canal同步方式:https://github.com/alibaba/canal/wiki

前提条件

  • 本地搭建MySQL数据库
  • 本地搭建ElasticSearch
  • 本地搭建canal-server
  • 本地搭建canal-adapter

操作步骤

本地环境为window11,大部分组件采用docker进行部署,MySQL采用8.0.27,

在这里插入图片描述

搭建MySQL数据库

推荐使用docker-desktop,可视化操作,首先找到需要使用的版本直接pull到本地,通过界面进行容器启动,指定环境变量并定义挂载卷

在这里插入图片描述

也可本地使用docker命令部署MySQL,需要指定环境变量,root账户密码和挂载本地配置文件

#命令根据自己本地路径和配置进行更改
docker run  
--env=MYSQL_ROOT_PASSWORD=123456 
--volume=D:\docker\mysql\data:/var/lib/mysql/ 
--volume=D:\docker\mysql\conf:/etc/mysql/conf.d 
--volume=/var/lib/mysql 
-p 3306:3306 
-p 33060:33060 --runtime=runc -d 
--privileged=true 
mysql:latest 

命令说明:

  • -p 3306:3306
    将宿主机的 3306 端口映射到 docker 容器的 3306 端口,格式为:主机(宿主)端口:容器端口
  • --name mysql
    运行服务的名字
  • -v
    挂载数据卷,格式为:宿主机目录或文件:容器内目录或文件
  • -e MYSQL_ROOT_PASSWORD=123456
    初始化 root 用户的密码为 123456
  • -d
    后台程序运行容器
  • --privileged=true 开启特殊权限
    Docker 挂载主机目录时(添加容器数据卷),如果 Docker 访问出现 cannot open directory:Permission denied,在挂载目录的命令后多加一个 --privileged=true 参数即可。
    因为出于安全原因,容器不允许访问任何设备,privileged 让 docker 应用容器获取宿主机 root 权限(特殊权限),允许我们的 Docker 容器访问连接到主机的所有设备。容器获得所有能力,可以访问主机的所有设备,例如,CD-ROM、闪存驱动器、连接到主机的硬盘驱动器等。

下一步在MySQL配置文件中my.cnf设置开启binlog,更改配置文件之后重启MySQL

# Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA#
# The MySQL  Server configuration file.
#
# For explanations see
# http://dev.mysql.com/doc/mysql/en/server-system-variables.html[mysqld]
pid-file        = /var/run/mysqld/mysqld.pid
socket          = /var/run/mysqld/mysqld.sock
datadir         = /var/lib/mysql
secure-file-priv= NULL--skip-host-cache
--skip-name-resolve[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复# Custom config should go here
!includedir /etc/mysql/conf.d/

更改完配置文件之后验证binlog是否打开,并创建canal用户,设置权限

-- 查询binlog配置
show variables like 'binlog_format';
show variables like 'log_bin';-- 创建canal账户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;show grants for 'canal';

准备工作

  • 新建数据库
CREATE SCHEMA `canal_manager` DEFAULT CHARACTER SET utf8mb4 ;
  • 新增测试表
create table test_canal
(id   int auto_incrementprimary key,name text null
);

搭建ELK

本地搭建elk推荐使用docker-compose进行搭建,单独搭建的话需要进行各种配置比较麻烦,我本地搭建的版本为7.16.1,如果需要更改版本或者更改其他配置可以直接更改下面docker-compose.yaml文件

docker-compose.yaml

services:elasticsearch:image: elasticsearch:7.16.1container_name: esenvironment:discovery.type: single-nodeES_JAVA_OPTS: "-Xms512m -Xmx512m"ports:- "9200:9200"- "9300:9300"healthcheck:test: ["CMD-SHELL", "curl --silent --fail localhost:9200/_cluster/health || exit 1"]interval: 10stimeout: 10sretries: 3networks:- elasticlogstash:image: logstash:7.16.1container_name: logenvironment:discovery.seed_hosts: logstashLS_JAVA_OPTS: "-Xms512m -Xmx512m"volumes:- ./logstash/pipeline/logstash-nginx.config:/usr/share/logstash/pipeline/logstash-nginx.config- ./logstash/nginx.log:/home/nginx.logports:- "5000:5000/tcp"- "5000:5000/udp"- "5044:5044"- "9600:9600"depends_on:- elasticsearchnetworks:- elasticcommand: logstash -f /usr/share/logstash/pipeline/logstash-nginx.configkibana:image: kibana:7.16.1container_name: kibports:- "5601:5601"depends_on:- elasticsearchnetworks:- elastic
networks:elastic:driver: bridge

Deploy with docker compose

此处不再详细讲解docker-compse如何部署了

$ docker compose up -d
Creating network "elasticsearch-logstash-kibana_elastic" with driver "bridge"
Creating es ... done
Creating log ... done
Creating kib ... done

Expected result

Listing containers must show three containers running and the port mapping as below:

$ docker ps
CONTAINER ID        IMAGE                 COMMAND                  CREATED             STATUS                    PORTS                                                                                            NAMES
173f0634ed33        logstash:7.8.0        "/usr/local/bin/dock…"   43 seconds ago      Up 41 seconds             0.0.0.0:5000->5000/tcp, 0.0.0.0:5044->5044/tcp, 0.0.0.0:9600->9600/tcp, 0.0.0.0:5000->5000/udp   log
b448fd3e9b30        kibana:7.8.0          "/usr/local/bin/dumb…"   43 seconds ago      Up 42 seconds             0.0.0.0:5601->5601/tcp                                                                           kib
366d358fb03d        elasticsearch:7.8.0   "/tini -- /usr/local…"   43 seconds ago      Up 42 seconds (healthy)   0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp     

After the application starts, navigate to below links in your web browser:

  • Elasticsearch: http://localhost:9200
  • Logstash: http://localhost:9600
  • Kibana: http://localhost:5601/api/status

准备工作

  • 进入kibana,新增测试索引
PUT /test_canal
{"mappings":{"properties":{"id": {"type": "long"},"name": {"type": "text"}}
}
}

搭建canal-server

推荐使用官网下载release版本进行解压部署,官网地址:https://github.com/alibaba/canal/wiki/QuickStart,也可以使用docker进行部署,重点是部署完成后需要修改配置文件。

  • 更改配置
vi conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

具体配置文件介绍可以查看:https://github.com/alibaba/canal/wiki/AdminGuide

注意: 使用docker部署时,如果没有设置为同一个网络,数据库地址无法使用127.0.0.1

  • 启动
sh bin/startup.sh

搭建canal-adapter

基本功能

canal 1.1.1版本之后, 增加客户端数据落地的适配及启动功能, 目前支持功能:

  • 客户端启动器
  • 同步管理REST接口
  • 日志适配器, 作为DEMO
  • 关系型数据库的数据同步(表对表同步), ETL功能
  • HBase的数据同步(表对表同步), ETL功能
  • (后续支持) ElasticSearch多表数据同步,ETL功能

适配器整体结构

client-adapter分为适配器和启动器两部分, 适配器为多个fat jar, 每个适配器会将自己所需的依赖打成一个包, 以SPI的方式让启动器动态加载, 目前所有支持的适配器都放置在plugin目录下

启动器为 SpringBoot 项目, 支持canal-client启动的同时提供相关REST管理接口, 运行目录结构为:

- binrestart.shstartup.batstartup.shstop.sh
- lib...
- plugin client-adapter.logger-1.1.1-jar-with-dependencies.jarclient-adapter.hbase-1.1.1-jar-with-dependencies.jar...
- confapplication.yml- hbasemytest_person2.yml
- logs

以上目录结构最终会打包成 canal-adapter-*.tar.gz 压缩包

适配器配置介绍

总配置文件 application.yml

adapter定义配置部分

canal.conf:canalServerHost: 127.0.0.1:11111          # 对应单机模式下的canal server的ip:portzookeeperHosts: slave1:2181               # 对应集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准mqServers: slave1:6667 #or rocketmq       # kafka或rocketMQ地址, 与canalServerHost不能并存flatMessage: true                         # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效batchSize: 50                             # 每次获取数据的批大小, 单位为KsyncBatchSize: 1000                       # 每次同步的批数量retries: 0                                # 重试次数, -1为无限重试timeout:                                  # 同步超时时间, 单位毫秒mode: tcp # kafka rocketMQ                # canal client的模式: tcp kafka rocketMQsrcDataSources:                           # 源数据库defaultDS:                              # 自定义名称url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true   # jdbc url username: root                                            # jdbc 账号password: 121212                                          # jdbc 密码canalAdapters:                            # 适配器列表- instance: example                       # canal 实例名或者 MQ topic 名groups:                                 # 分组列表- groupId: g1                           # 分组id, 如果是MQ模式将用到该值outerAdapters:                        # 分组内适配器列表- name: logger                        # 日志打印适配器
......           

说明:

  1. 一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和hbase
  2. 目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息

操作流程

从官网下载relaes版本之后进行解压,解压之后修改cnf目录下得application.yml文件,根据自己得需求选择下面不同的adapters进行配置,我本地安装的es是es7.16,选择es7的配置,具体配置的含义请参考上方适配器配置介绍

  • 配置application.yaml
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: -1timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumerkafka.bootstrap.servers: 127.0.0.1:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumerrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumerrabbitmq.host:rabbitmq.virtual.host:rabbitmq.username:rabbitmq.password:rabbitmq.resource.ownerId:srcDataSources:defaultDS:url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true # 配置数据库地址username: rootpassword: 123456canalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#          druid.stat.enable: false
#          druid.stat.slowSqlMillis: 1000
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase- name: es7hosts:  172.22.80.1:9300 # 127.0.0.1:9200 for rest modeproperties:mode: transport # or rest# security.auth: test:123456 #  only used for rest modecluster.name: docker-cluster
#      - name: kudu
#        key: kudu
#        properties:
#          kudu.master.address: 127.0.0.1 # ',' split multi address
#      - name: phoenix
#        key: phoenix
#        properties:
#          jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
#          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
#          jdbc.username:
#          jdbc.password:
  • 新增测试yml文件,再es7目录下新建testcanal.yml配置,更加复杂的映射处理可以参考canal官网示例:

    https://github.com/alibaba/canal/wiki/Sync-ES

在这里插入图片描述

  • 启动canal-adapter启动器
bin/startup.sh

windows启动.bat文件

  • 验证

新增mysql mytest.test_canal表的数据, 将会自动同步到es的test_canal索引下面, 并会打出DML的log

番外:adapter管理REST接口

注意:部分操作需要指定cnf下的yml配置

查询所有订阅同步的canal instance或MQ topic
curl http://127.0.0.1:8081/destinations
数据同步开关
curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT

针对 example 这个canal instance/MQ topic 进行开关操作. off代表关闭, instance/topic下的同步将阻塞或者断开连接不再接收数据, on代表开启

注: 如果在配置文件中配置了 zookeeperHosts 项, 则会使用分布式锁来控制HA中的数据同步开关, 如果是单机模式则使用本地锁来控制开关

数据同步开关状态
curl http://127.0.0.1:8081/syncSwitch/example

查看指定 canal instance/MQ topic 的数据同步开关状态

手动ETL
  • 不带参数
curl http://127.0.0.1:8081/etl/es7/testcanal.yml -X POST
  • 带参数
curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST -d "params=2018-10-21 00:00:00"

导入数据到指定类型的库, 如果params参数为空则全表导入, 参数对应的查询条件在配置中的etlCondition指定

查看相关库总数据
curl http://127.0.0.1:8081/count/hbase/mytest_person2.yml

常见问题

not part of the cluster Cluster [elasticsearch], ignoring…

查看canal-adapter配置中的application.yml配置是否和es中的cluster_name配置相同
在这里插入图片描述
在这里插入图片描述

连接问题

如果本地使用docker进行部署,未指定网桥,切记不能使用127.0.0.1进行es连接或者mysql数据库连接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/615938.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

24-1-9 bilibilic++音视频

下午两点面试,面试官迟到了一会,面试官人很好,整体面试经历很不错,但是我人太紧张了,基础知识掌握的深度不够,没有深挖, 是做音视频的底层相关的, 实习要求只要每天打卡够九个小时就…

禁用code server docker容器中的工作区信任提示

VSCode 添加受限模式,主要是防止自动运行代码的,比如在vscode配置的task和launch参数是可以运行自定义代码的。如果用VScode打开未知的工程文件就有可能直接运行恶意代码。 但是当我们的实验基础模板文件可控的情况下,要想禁用code server do…

vue前端开发自学,组件的嵌套关系demo

vue前端开发自学,组件的嵌套关系demo!今天开始分享的,前端开发经常用到的,组件的嵌套关系案例代码。下面先给大家看看,代码执行效果。 如图,这个是代码执行后,的效果布局! 下面给大家贴出来源码。方便大家…

认识字面量

字面量:数据在程序中的书写格式 整数小数写法无变化 字符:在程序中必须使用单引号,有且只能有一个字符 字符串:程序中必须使用双引号,内容可有可无 布尔值:true、false 空值:null 示例和结…

功效产品如何做好营销?媒介盒子解答

功能性产品目前的营销痛点就在于宣传夸张导致用户信任度降低,尤其是健康类产品,作为消费者,对此类产品大多持观望态度,但媒介盒子作为提供品牌宣传服务的团队,想和大家聊聊:功能性产品除了在功能上进行宣传…

【Java】解决Servlet编程中出现的中文乱码问题

1、引言 前面两篇文章我们讲述了编写Servlet程序的基本步骤和修改一个Servlet程序 【Java】编写一个简单的Servlet程序​​​​​​ 【Java】SmartTomcat的配置及使用 上面两篇文章的示例代码都是使用的全英文,当我们编写中文,发现似乎出了一点点问题…

【100个 Unity实用技能】☀️ | UGUI中 判断屏幕中某个坐标点的位置是否在指定UI区域内

🎬 博客主页:https://xiaoy.blog.csdn.net 🎥 本文由 呆呆敲代码的小Y 原创,首发于 CSDN🙉 🎄 学习专栏推荐:Unity系统学习专栏 🌲 游戏制作专栏推荐:游戏制作 &…

【一文搞懂JVM的内存屏障】

要命的问题: 什么是线程的安全性?怎么保证?jvm什么是的内存屏障?他有什么作用? **线程的安全性是指:**指在多线程环境下,多个线程同时访问同一资源时不会产生意外结果或导致数据出错的状态。其…

在线ai扩图是什么?有什么工具?分享3个好用的工具。

在线ai扩图是什么?有什么工具?分享3个好用的工具。 在当今数字化的时代,图像处理成为了我们日常生活和工作中不可或缺的一部分。有时候,我们需要将图像放大以获取更多的细节,但传统的方法往往会导致图像质量的损失。幸…

Invalid bound statement(只有调用IService接口这一层会报错的)

问题描述:controller直接调用实现类可以,但是一旦调用IService这个接口这一层就报错. 找遍了大家都说是xml没对应好,但是我确实都可以一路往下跳,真的对应好了.结果发现是 MapperScan写错了,如下才是对的. MapperScan的作用是不需要在mapper上一直写注解了,只要启动类上写好就放…

无货源电商哪个平台比较适合新手?

我是电商珠珠 近年来电商平台层出不穷,无论是传统平台像是拼多多、淘宝、京东,还是短视频电商平台:快手、抖音小店、视频号小店。 都成为了兼职乃至全职人群心中的香饽饽,有人选择去做拼多多、有人选择去做抖音小店,…

高级分布式系统-第6讲 分布式系统的容错性--故障/错误/失效/异常

分布式系统容错性的概念 分布式系统的容错性: 当发生故障时, 分布式系统应当在进行恢复的同时继续以可接受的方式进行操作, 并且可以从部分失效中自动恢复, 且不会严重影响整体性能。 具体包括以下4个方面的内容: 可…

如何将后端带过来的字符串通过‘,’号作为判断依据,分割字符串然后生成数组

在实际开发工程中我们会遇到我们调用后端接口获取图片、文件、视频甚至选择的对象时,如果是这样的: 这种数据类型如果想渲染在html中的话就会很麻烦,我们可以通过","号为切割点将它放入数组中,通过列表进行渲染 由于实…

STM32入门教程-2023版【3-4】总结GPIO使用方法

三、总结GPIO使用方法 总体上来说是比较简单的 首先初始化时钟,然后定义结构体,赋值结构体 GPIO_Mode可以选择那8种输入输出模式,GPIO_Pin选择引脚,可以用按位或的方式同时选中多个引脚,GPIO_Speed选择输出速度,最后使…

全网最全持续集成接口自动化-jmeter+ant+jenkins

ant 批量执行Jmeter 一、环境准备 1、JDK环境:Java Downloads | Oracle 2、ANT环境:Apache Ant - Binary Distributions 3、Jmeter:Apache JMeter - Download Apache JMeter 4、将 jmeter的extras目录中ant-jmeter-1.1.1.jar包拷贝至ant…

【开发篇】一、内存泄漏的分析工具

文章目录 1、内存泄漏2、解决内存泄漏3、工具一:Top4、工具二:VisualVM5、工具三:阿尔萨斯Arthas6、工具四:Promethus Grafana7、图像分析 1、内存泄漏 一个对象不再使用后,(因其从GC Root仍有引用链可达…

2023下半年软考证书什么时候发放?怎么领取?

已经确定领取时间的地区: 广东: 电子版:2024年1月8日上线 纸质版:预计24年2月开始 重庆: 邮寄申领:2024年1月15日0:00-3月1日23:00 现场领取:2024年1月15日-2月7日 贵州: 邮…

vue Element Plus Cascader级联选择器点击标签选中复选框

element-plus原功能 element-plus的Cascader级联选择器点击标签时是不会选中复选框的,我们想要实现点击标签时也能选中复选框这个效果,那么就要用到一些原生的方法 实现效果 mounted() {// Cascader 级联选择器: 点击文本就让它自动点击前面的input就可…

PPT自动化处理

python-pptx模块 可以创建、修改PPT(.pptx)文件非Python标准模块,需要单独安装 在线安装方式 pip install python-pptx 读取slide幻灯片 .slides 获取shape形状 slide.shapes 判断一个shape中是否存在文字 shape.has_text_frame 获取文字框 shape.text_f…