文章目录
- Canal整合SpringBoot详解(一)
- 什么是canal
- 搭建Kafka3.2.1集群⭐
- Kafka集群机器规划
- 创建3台虚拟机(centos7系统)
- 必要的环境准备(3台虚拟机都要执行如下操作)⭐
- 分别修改每个服务器的hosts文件(将上面的ip和主机名配置上去)
- 分别关闭每个服务器的防火墙
- 分别为每个服务器安装jdk8
- 分别为每个服务器安装Docker
- 为每个节点的Docker接入阿里云镜像加速器
- 为每个节点的docker设置开机自动启动
- 分别为每个服务器安装zookeeper3.7.1(搭建zookeeper集群)⭐
- 分别为每个服务器安装Kafka3.2.1(搭建Kafka集群)⭐
- 安装MySQL5.7,配置canal+mysql(本次采用Docker的方式)⭐
- 案例1:Canal+Kafka实现mysql和redis的数据同步⭐
- 必要的环境
- 配置canal.deployer
- 启动canal.deployer⭐
- 配置hosts(由于我是Windows运行,没有配置hosts导致无法识别kafka01主机名)⭐
- 创建一个SpringBoot项目⭐
- 项目结构
- 准备需要同步的数据库表⭐
- pom.xml⭐
- application.yml⭐
- RedisTemplateConfig(配置类)
- Config.class⭐
- canal要求必须要的实体类(用于接收canal发送到kafka的同步消息)⭐
- ConfigCanalBean.class⭐
- MysqlType.class⭐
- SqlType.class⭐
- ConfigCanalRedisConsumer(kafka消费者类,监听指定topic,把canal发送的消息同步到Redis中)⭐
- 创建kafka的topic(我们指定的topic名称为canal-test-topic)
- 开始测试canal同步效果⭐
- 测试1:给t_config表插入数据
- 测试2:修改t_config表数据
- 测试3:删除t_config表数据
Canal整合SpringBoot详解(一)
什么是canal
- canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
- canal工作原理:
- canal的工作原理就是把自己伪装成MySQL slave从节点,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如RocketMQ、Kafka、ElasticSearch等等。
- canal能做什么:
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护
- 业务cache(缓存)刷新
- 带业务逻辑的增量数据处理
搭建Kafka3.2.1集群⭐
Kafka集群机器规划
IP地址 | 主机名 | 需要安装的资源 | 操作系统 |
---|---|---|---|
192.168.184.201 | kafka01 | jdk、Docker、zookeeper、Kafka | centos7.9 |
192.168.184.202 | kafka02 | jdk、Docker、zookeeper、Kafka | centos7.9 |
192.168.184.203 | kafka03 | jdk、Docker、zookeeper、Kafka | centos7.9 |
创建3台虚拟机(centos7系统)
必要的环境准备(3台虚拟机都要执行如下操作)⭐
分别修改每个服务器的hosts文件(将上面的ip和主机名配置上去)
- 1:进入hosts文件:
vi /etc/hosts
在最后面追加内容如下:(这个需要根据你自己服务器的ip来配置)
192.168.184.201 kafka01
192.168.184.202 kafka02
192.168.184.203 kafka03
分别关闭每个服务器的防火墙
systemctl stop firewalld
systemctl disable firewalld
分别为每个服务器安装jdk8
- 1:进入oracle官网下载jdk8的tar.gz包:
-
2:将下载好的包上传到每个服务器上:
-
3:查看是否上传成功:
[root@kafka01 ~]# ls
anaconda-ks.cfg jdk-8u333-linux-x64.tar.gz
- 4:创建文件夹:
mkdir -p /usr/java/
- 5:解压刚刚下载好的包并输出到/usr/java目录下:
tar -zxvf jdk-8u333-linux-x64.tar.gz -C /usr/java/
[root@kafka02 ~]# ls /usr/java/
jdk1.8.0_333
- 6:配置java环境变量:
vi /etc/profile
在文件中末尾添加如下配置:(需要更改的是JAVA_HOME,根据自己的java目录名来更改)
JAVA_HOME=/usr/java/jdk1.8.0_333
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
- 7:让配置立即生效:
source /etc/profile
- 8:查看JDK是否安装成功:
[root@kafka01 ~]# java -version
java version "1.8.0_333"
Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
分别为每个服务器安装Docker
- 1:切换镜像源
wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo
- 2:查看当前镜像源中支持的docker版本
yum list docker-ce --showduplicates | sort -r
- 3:安装特定版本的docker-ce
yum -y install docker-ce-3:20.10.8-3.el7.x86_64 docker-ce-cli-3:20.10.8-3.el7.x86_64 containerd.io
为每个节点的Docker接入阿里云镜像加速器
配置镜像加速器方法。
- 准备工作:
- 1:首先进入阿里云容器镜像服务 https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors
- 2:点击镜像工具下面的镜像加速器
- 3:拿到你的加速器地址和下面第二步的registry-mirrors的值替换即可。
针对Docker客户端版本大于 1.10.0 的用户,可以通过修改daemon配置文件/etc/docker/daemon.json来使用加速器
- 第一步:
mkdir -p /etc/docker
- 第二步:
cat <<EOF> /etc/docker/daemon.json
{"exec-opts": ["native.cgroupdriver=systemd"], "registry-mirrors": ["https://u01jo9qv.mirror.aliyuncs.com","https://hub-mirror.c.163.com","https://mirror.baidubce.com"],"live-restore": true,"log-driver":"json-file","log-opts": {"max-size":"500m", "max-file":"3"},"max-concurrent-downloads": 10,"max-concurrent-uploads": 5,"storage-driver": "overlay2"
}
EOF
- 第三步:
sudo systemctl daemon-reload
- 第四步:
sudo systemctl restart docker
最后就接入阿里云容器镜像加速器成功啦。
为每个节点的docker设置开机自动启动
sudo systemctl enable docker
分别为每个服务器安装zookeeper3.7.1(搭建zookeeper集群)⭐
- 1:在zookeeper官网上面下载zookeeper稳定版(当前为3.7.1)的tar.gz包,并上传到每个服务器上:
zookeeper官网
- 2:查看刚刚上传的zookeeper包:
[root@kafka01 ~]# pwd
/root
[root@kafka01 ~]# ls | grep zookeeper
apache-zookeeper-3.7.1-bin.tar.gz
- 3:解压我们的zookeeper包:
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /usr/local
mv /usr/local/apache-zookeeper-3.7.1-bin/ /usr/local/zookeeper
cd /usr/local/zookeeper
- 4:配置关于zookeeper的环境变量:
vi /etc/profile
在文件中末尾添加如下配置:(ZOOKEEPER_HOME需要根据你自己的zookeeper目录来配置)
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH
- 5:让配置立即生效:
source /etc/profile
- 6:创建目录:
cd /usr/local/zookeeper
sudo mkdir data
- 7;添加配置:
cd conf
sudo vi zoo.cfg
内容如下:(dataDir修改成自己的目录,kafka01/02/03是我们在hosts配置的主机名映射,相当于ip)
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
clientPort=2181
server.1=kafka01:2888:3888
server.2=kafka02:2888:3888
server.3=kafka03:2888:3888
initLimit:ZooKeeper集群模式下包含多个zk进程,其中一个进程为leader,余下的进程为follower。
当follower最初与leader建立连接时,它们之间会传输相当多的数据,尤其是follower的数据落后leader很多。initLimit配置follower与leader之间建立连接后进行同步的最长时间。
syncLimit:配置follower和leader之间发送消息,请求和应答的最大时间长度。
tickTime:tickTime则是上述两个超时配置的基本单位,例如对于initLimit,其配置值为5,说明其超时时间为 2000ms * 5 = 10秒。
server.id=host:port1:port2 :其中id为一个数字,表示zk进程的id,这个id也是dataDir目录下myid文件的内容。host是该zk进程所在的IP地址,port1表示follower和leader交换消息所使用的端口,port2表示选举leader所使用的端口。
dataDir:其配置的含义跟单机模式下的含义类似,不同的是集群模式下还有一个myid文件。myid文件的内容只有一行,且内容只能为1 - 255之间的数字,这个数字亦即上面介绍server.id中的id,表示zk进程的id。
- 8:进入data目录:
cd /usr/local/zookeeper/data/
- 9:对每个服务器(kafka01、kafka02、kafka03)配置myid文件:
- 9(1):如果是kafka01服务器,则执行下面这个:(下面的1、2、3就是我们上面指定的server.id,每个zookeeper服务器都要有一个id,并且全局唯一)
echo "1" > myid
- 9(2):如果是kafka02服务器,则执行下面这个:
echo "2" > myid
- 9(3):如果是kafka03服务器,则执行下面这个
echo "3" > myid
- 10:启动zookeeper服务命令:(必须要把全部zookeeper服务器启动之后在执行下一步status命令)
cd /usr/local/zookeeper/bin/
[root@kafka01 bin]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
- 11:对全部的zookeeper服务器执行查看zookeeper集群节点状态命令:(看看哪个是leader节点、哪个是follower节点)。Mode就是某一台zookeeper的角色⭐
[root@kafka01 bin]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
[root@kafka02 data]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
[root@kafka03 data]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
分别为每个服务器安装Kafka3.2.1(搭建Kafka集群)⭐
- 1:进入kafka官网:
Kafka官网
- 2(下载方式1):下载当前kafka的Binary稳定版(截止到2022-08-29,稳定版本为3.2.1),下载会十分缓慢,大约要1个小时的时间(假如你的网速很慢,那么这种方式就不推荐了。):
- 2(下载方式2):使用我上传kafka_2.13-3.2.1.zip包(注意这个不是tgz包,而是zip包)(推荐这种方式),下载速度很快:
kafka3.2.1快速下载地址
- 3:解压kafka_2.13-3.2.1.zip包,拿到kafka的tgz包:
- 4:将解压好的kafka的tgz包上传到每个服务器上。
- 5:查看每个服务器上是否都已经成功上传了kafka_2.13-3.2.1.tgz包:
[root@kafka01 ~]# pwd
/root
[root@kafka01 ~]# ls | grep kafka
kafka_2.13-3.2.1.tgz
- 6:解压kafka.tgz包到/usr/local下:
tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/
- 7:修改kafka目录:
cd /usr/local/
mv kafka_2.13-3.2.1/ kafka
-
**8:修改每个服务器的kafka配置文件:(注意:对应的机器要执行对应的命令,不是都在一台服务器执行)**⭐
- 8(1):在kafka01服务器上修改的配置文件,将下面的内容粘贴上去:⭐
[root@kafka01 local]# rm -f /usr/local/kafka/config/server.properties [root@kafka01 local]# vi /usr/local/kafka/config/server.properties
内容如下:
注意下面3个地方:
①每一个kafka的broker.id都不可以一样,并且要为数字(比如0、1、2都是可以的)!
②log.dirs为你当前机器的kafka的日志数据存储目录
③zookeeper.connect:配置连接Zookeeper集群地址,下面的kafka01:2181(kafka01的意思是zk所在的服务器的ip地址,因为我们配置了hosts,所以就直接用主机名更方便;2181就是zk配置文件中的clientPort)
#broker 的全局唯一编号,不能重复,只能是数字。 broker.id=1 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/usr/local/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理) zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
- 8(2):在kafka02服务器上修改的配置文件,将下面的内容粘贴上去:⭐
[root@kafka02 local]# rm -f /usr/local/kafka/config/server.properties [root@kafka02 local]# vi /usr/local/kafka/config/server.properties
内容如下:
#broker 的全局唯一编号,不能重复,只能是数字。 broker.id=2 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/usr/local/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理) zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
- 8(3):在kafka03服务器上修改的配置文件,将下面的内容粘贴上去:⭐
[root@kafka03 local]# rm -f /usr/local/kafka/config/server.properties [root@kafka03 local]# vi /usr/local/kafka/config/server.properties
内容如下:
#broker 的全局唯一编号,不能重复,只能是数字。 broker.id=3 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 log.dirs=/usr/local/kafka/datas #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个 topic 创建时的副本数,默认时 1 个副本 offsets.topic.replication.factor=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #每个 segment 文件的大小,默认最大 1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理) zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
-
9:给每个服务器都配置kafka的环境变量:
sudo vim /etc/profile
在最后面追加的内容如下:
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
- 10:让配置立即生效:
source /etc/profile
- 11:启动zk集群。依次在 kafka01、kafka02、kafka03节点上启动zookeeper。(zk要先启动,然后再启动kafka)⭐
/usr/local/zookeeper/bin/zkServer.sh start
- 12:后台模式启动kafka集群。依次在 kafka01、kafka02、kafka03节点上启动kafka。
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
- 13:查看kafka是否启动成功:
[root@kafka01 local]# jps
3603 Kafka
3166 QuorumPeerMain
4367 Jps
- 14:关闭kafka集群:(可以暂时不关闭,方便后面继续演示)
- 注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper
集群。
- 注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper
kafka-server-stop.sh
- 15:等kafka集群全部关闭之后再关闭zookeeper:(可以暂时不关闭,方便后面继续演示)
zkServer.sh stop
安装MySQL5.7,配置canal+mysql(本次采用Docker的方式)⭐
- 1:创建my.cnf文件(也就是mysql的配置文件)
vim /my-sql/mysql-master/conf/my.cnf
将内容粘贴进my.cnf文件
[client]
# 指定编码格式为utf8,默认的MySQL会有中文乱码问题
default_character_set=utf8
[mysqld]
collation_server=utf8_general_ci
character_set_server=utf8# 全局唯一id(不允许有相同的)
server_id=201
binlog-ignore-db=mysql
# 指定MySQL二进制日志
log-bin=mysql-bin
# 二进制日志格式,因为要整合canal,所以这里必须要是row
binlog_format=row
#指定具体要同步的数据库,如果不配置则表示所有数据库均开启 Binlog(可以配置多个)
binlog-do-db=canal-test-db1
binlog-do-db=canal-test-db2
- 2:运行一个mysql容器实例。作为Master节点
docker run -p 3307:3306 \
-v /my-sql/mysql-master/log:/var/log/mysql \
-v /my-sql/mysql-master/data:/var/lib/mysql \
-v /my-sql/mysql-master/conf:/etc/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
--name mysql-master \
-d mysql:5.7
- 3:进入容器内部,并登陆mysql
docker exec -it mysql-master /bin/bash
mysql -uroot -p
- 4:创建canal的mysql帐号,使该canal帐号具有MySQL的Slave (从节点)的权限(也就是能够主从复制), 如果已有账户可直接 grant(这几步都是在mysql容器内部进行,也就是登录了mysql帐号后执行的命令)
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal';
flush privileges;
- 5:退出mysql容器,并重启容器:
docker restart mysql-master
- 6:再次进入容器内部,并登陆mysql
docker exec -it mysql-master /bin/bash
mysql -uroot -p
- 7:查看是否成功开启binlog日志
show variables like '%log_bin%';
案例1:Canal+Kafka实现mysql和redis的数据同步⭐
案例目的:
1:实现canal只监控canal-test-db1数据库下的t_config(主要同步这个表)和.t_user表。
2:当我们修改canal-test-db1数据库下的t_config表的内容会自动同步到Redis中;
3:当我们修改canal-test-db1数据库下的t_user表则不会同步。(虽然t_user表也被canal监控,但是这个案例就要做到在被监控的情况下,而不被同步),说白了就是只同步t_config表。
必要的环境
- 1:jdk8
- 2:zookeeper
- 3:kafka
- 4:canal.deployer
- 5:Redis
- 6:Lombok
配置canal.deployer
- 1:进入Canal的github仓库:
Canal的github仓库地址
-
2:选择canal.deployer的版本(我们选择的是最新版v1.1.6):
- 2(1)方式1:直接从GitHub上面下载。(下载速度十分慢,不推荐)
- 2(2)方式2:从我的csdn上面下载。(速度很快,推荐!⭐)
canal.deployer快速下载地址
-
3:上传到我们的服务器上(这里我们就拿kafka01服务器作为canal服务器),生产环境可以另外创建一个新的canal服务器。
-
4:查看canal是否上传到我们的服务器上:(只上传到kafka01服务器上)
[root@kafka01 ~]# ls | grep canal
canal.deployer-1.1.6.tar.gz
- 5:解压canal.deployer:
mkdir -p /usr/local/canal-deployer
tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-deployer
cd /usr/local/canal-deployer/conf
- 6:修改canal.properties文件:
vim /usr/local/canal-deployer/conf/canal.properties
- 修改地方1(配置zookeeper集群地址):
- 修改地方2(修改成kafka模式):
- 修改地方3(修改canal数据库用户的账号密码):
- 修改地方4(在conf目录下要有example同名的目录,可以默认不改,意思就是instance.properties在/usr/local/canal-deployer/conf/example目录下。):
- 例如要将example改成abc1,则也要在conf目录下创建一个abc1的目录,并在里面创建instance.properties配置文件。
- 修改地方5(配置kafka集群地址):
- 7:配置instance.properties配置文件:(默认是在example目录下)
cd /usr/local/canal-deployer/conf/example
vim instance.properties
- 修改地方1。canal数据库的id,必须要全局唯一(和mysql的id不能设置一样):
- 修改地方2。我们MySQL的master数据库的ip+端口(我们上面设置mysql的是3307端口):
- 修改地方3。在MySQL的master数据库中canal的账号密码:
- 修改地方4。新增一个配置,设置默认同步的数据库名:⭐
- canal.instance.defaultDatabaseName =监控的数据库名
- 例如canal.instance.defaultDatabaseName=canal-test-db1
- canal.instance.defaultDatabaseName =监控的数据库名
- 修改地方5:匹配表名的正则表达式:(指定canal要监控的数据库.表名)很重要⭐
- canal.instance.filter.regex=canal-test-db1.t_config,canal-test-db1.t_user
- 修改地方6:指定用于canal传输消息的kafka的topic名称:(我们指定的topic名称为canal-test-topic)
启动canal.deployer⭐
- 1:跳转目录:
cd /usr/local/canal-deployer/bin/
- 2:执行sh:
./startup.sh
配置hosts(由于我是Windows运行,没有配置hosts导致无法识别kafka01主机名)⭐
- :修改C:\Windows\System32\drivers\etc路径下的hosts文件:
创建一个SpringBoot项目⭐
项目结构
准备需要同步的数据库表⭐
CREATE DATABASE `canal-test-db1`;USE `canal-test-db1`;CREATE TABLE `t_config` (`config_id` bigint(20) NOT NULL,`config_info` text,`datetime` datetime DEFAULT NULL,`desc` varchar(255) DEFAULT NULL,PRIMARY KEY (`config_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
pom.xml⭐
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>canal-demo</artifactId><version>1.0-SNAPSHOT</version><properties><!-- springboot版本--><spring-boot.version>2.5.9</spring-boot.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><!-- spring整合kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Redis服务启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.70</version></dependency><!-- springboot-web依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies></project>
application.yml⭐
server:port: 8081
# spring整合kafka配置
spring:kafka:# kafka集群地址(可以多个)bootstrap-servers:- 192.168.184.201:9092- 192.168.184.202:9092- 192.168.184.203:9092#kafka消费者配置consumer:# 指定一个消费者组idgroup-id: canal-group1# key/value的反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#手动提交第1步:开启手动提交offset(true的话就是消费完一条消息自动会提交)enable-auto-commit: false# kafka生产者配置producer:# key/value的序列化key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerlistener:#手动提交第2步:ack设置为手动(enable-auto-commit要设置为false)# manual_immediate:每处理完业务手动调用Acknowledgment.acknowledge()后立即提交ack-mode: manual_immediate#redis配置redis:host: 127.0.0.1# password:port: 6379database: 2
RedisTemplateConfig(配置类)
package com.boot.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisTemplateConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();Jackson2JsonRedisSerializer jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setValueSerializer(jsonRedisSerializer);redisTemplate.setHashValueSerializer(jsonRedisSerializer);// 解决查询缓存转换异常的问题ObjectMapper om = new ObjectMapper();// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和publicom.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jsonRedisSerializer.setObjectMapper(om); //如果不设置,存储到redis的对象取出来将无法进行转换redisTemplate.setDefaultSerializer(jsonRedisSerializer);return redisTemplate;}
}
Config.class⭐
package com.boot.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.io.Serializable;/*** 该实体类对应着数据库表t_config字段* @author youzhengjie 2022-09-01 16:55:40*///lombok注解简化开发
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true) //开启链式编程
public class Config implements Serializable {private Long configId;private String configInfo;private String datetime;private String desc;
}
canal要求必须要的实体类(用于接收canal发送到kafka的同步消息)⭐
ConfigCanalBean.class⭐
package com.boot.entity.config_canal;import com.boot.entity.Config;
import lombok.Data;import java.util.List;/*** 这个类是接收canal发送过来的消息所必须要的* @author youzhengjie 2022-09-01 16:55:18*/
@Data
public class ConfigCanalBean {//config实体类的数据private List<Config> data;//数据库名称private String database;private long es;//递增private int id;//是否是DDL语句private boolean isDdl;//表结构的字段类型private MysqlType mysqlType;//UPDATE语句,旧数据private String old;//主键名称private List<String> pkNames;//sql语句private String sql;//暂时没发现什么用,不过也要写上这个属性private SqlType sqlType;//表名private String table;private long ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等private String type;//getter、setter方法
}
MysqlType.class⭐
package com.boot.entity.config_canal;import lombok.Data;/*** 和SqlType类差不多。(就是把我们要同步的t_config数据库表的字段全部复制到这里,然后全部改成String类型即可)* 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应,而无需更改。* 注意:这个类的属性全部都要是String类型* @author youzhengjie 2022-09-01 16:55:25*/
@Data
public class MysqlType {private String configId;private String configInfo;private String datetime;private String desc;
}
SqlType.class⭐
package com.boot.entity.config_canal;import lombok.Data;/*** 和MysqlType类差不多。(就是把我们要同步的t_config数据库表的字段全部复制到这里,然后全部改成int类型即可)* 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应,而无需更改。* 注意:这个类的属性全部都要是int类型* @author youzhengjie 2022-09-01 16:55:32*/
@Data
public class SqlType {private int configId;private int configInfo;private int datetime;private int desc;}
ConfigCanalRedisConsumer(kafka消费者类,监听指定topic,把canal发送的消息同步到Redis中)⭐
package com.boot.comsumer;import com.alibaba.fastjson.JSONObject;
import com.boot.entity.Config;
import com.boot.entity.config_canal.ConfigCanalBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.TimeUnit;/*** kafka消费者(监听名为canal-test-topic的topic),同步Redis* @author youzhengjie 2022-09-01 16:54:28*/
@Component
@Slf4j
public class ConfigCanalRedisConsumer {@Autowiredprivate RedisTemplate redisTemplate;//redis的key格式:(数据库.表名_字段的id)private static final String KEY_PREFIX = "canal-test-db1.t_config_";//过期时间(单位:小时)private static final int TIME_OUT = 24;/*** @param consumer 接收消费记录(消息)* @param ack 手动提交消息*/@KafkaListener(topics = "canal-test-topic")public void receive(ConsumerRecord<String, String> consumer, Acknowledgment ack) {try {//获取canal的消息String value = (String) consumer.value();log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), value);//转换为javaBeanConfigCanalBean canalBean = JSONObject.parseObject(value, ConfigCanalBean.class);/*由于我们canal.instance配置了监控canal-test-db1.t_config表和canal-test-db1.t_user表(生产环境下可以启动多个canal,每一个canal监听一张需要同步的表)所以我们要对这两张表分开处理。(可以通过他们的表名(canalBean.getTable())来区分)如果canalBean.getTable()获取的表名是t_config,则同步到redis,如果不是则不管。*///System.out.println(canalBean);if("t_config".equals(canalBean.getTable())){//获取是否是DDL语句boolean isDdl = canalBean.isDdl();//获取当前sql语句的类型(比如INSERT、DELETE等等)String type = canalBean.getType();List<Config> configList = canalBean.getData();//如果不是DDL语句if (!isDdl) {//INSERT和UPDATE都是一样的操作if ("INSERT".equals(type) || "UPDATE".equals(type)) {//新增语句for (Config config : configList) {Long id = config.getConfigId();//新增到redis中,过期时间是10分钟redisTemplate.opsForValue().set(KEY_PREFIX + id, JSONObject.toJSONString(config), TIME_OUT, TimeUnit.HOURS);}}else if("DELETE".equals(type)){//删除语句for (Config config : configList) {Long id = config.getConfigId();//从redis中删除redisTemplate.delete(KEY_PREFIX+id);}}}}//最后,如果上面的代码没有报错的情况下,可以确认消息了。(很重要)ack.acknowledge();}catch (Exception e){throw new RuntimeException();}}}
创建kafka的topic(我们指定的topic名称为canal-test-topic)
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server kafka01:9092 --topic canal-test-topic --create
开始测试canal同步效果⭐
测试1:给t_config表插入数据