Canal整合SpringBoot详解(一)

文章目录

    • Canal整合SpringBoot详解(一)
      • 什么是canal
      • 搭建Kafka3.2.1集群⭐
        • Kafka集群机器规划
        • 创建3台虚拟机(centos7系统)
        • 必要的环境准备(3台虚拟机都要执行如下操作)⭐
          • 分别修改每个服务器的hosts文件(将上面的ip和主机名配置上去)
          • 分别关闭每个服务器的防火墙
          • 分别为每个服务器安装jdk8
          • 分别为每个服务器安装Docker
            • 为每个节点的Docker接入阿里云镜像加速器
            • 为每个节点的docker设置开机自动启动
          • 分别为每个服务器安装zookeeper3.7.1(搭建zookeeper集群)⭐
          • 分别为每个服务器安装Kafka3.2.1(搭建Kafka集群)⭐
          • 安装MySQL5.7,配置canal+mysql(本次采用Docker的方式)⭐
      • 案例1:Canal+Kafka实现mysql和redis的数据同步⭐
        • 必要的环境
        • 配置canal.deployer
        • 启动canal.deployer⭐
        • 配置hosts(由于我是Windows运行,没有配置hosts导致无法识别kafka01主机名)⭐
        • 创建一个SpringBoot项目⭐
          • 项目结构
          • 准备需要同步的数据库表⭐
          • pom.xml⭐
          • application.yml⭐
          • RedisTemplateConfig(配置类)
          • Config.class⭐
          • canal要求必须要的实体类(用于接收canal发送到kafka的同步消息)⭐
            • ConfigCanalBean.class⭐
            • MysqlType.class⭐
            • SqlType.class⭐
          • ConfigCanalRedisConsumer(kafka消费者类,监听指定topic,把canal发送的消息同步到Redis中)⭐
        • 创建kafka的topic(我们指定的topic名称为canal-test-topic)
        • 开始测试canal同步效果⭐
          • 测试1:给t_config表插入数据
          • 测试2:修改t_config表数据
          • 测试3:删除t_config表数据

Canal整合SpringBoot详解(一)

什么是canal

  • canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
  • canal工作原理:
    • canal的工作原理就是把自己伪装成MySQL slave从节点,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如RocketMQ、Kafka、ElasticSearch等等。
  • canal能做什么:
    • 数据库镜像
    • 数据库实时备份
    • 索引构建和实时维护
    • 业务cache(缓存)刷新
    • 带业务逻辑的增量数据处理

搭建Kafka3.2.1集群⭐

Kafka集群机器规划
IP地址主机名需要安装的资源操作系统
192.168.184.201kafka01jdk、Docker、zookeeper、Kafkacentos7.9
192.168.184.202kafka02jdk、Docker、zookeeper、Kafkacentos7.9
192.168.184.203kafka03jdk、Docker、zookeeper、Kafkacentos7.9
创建3台虚拟机(centos7系统)

在这里插入图片描述在这里插入图片描述
在这里插入图片描述

必要的环境准备(3台虚拟机都要执行如下操作)⭐
分别修改每个服务器的hosts文件(将上面的ip和主机名配置上去)
  • 1:进入hosts文件:
vi /etc/hosts

在最后面追加内容如下:(这个需要根据你自己服务器的ip来配置)

192.168.184.201 kafka01
192.168.184.202 kafka02
192.168.184.203 kafka03
分别关闭每个服务器的防火墙
systemctl stop firewalld
systemctl disable firewalld
分别为每个服务器安装jdk8
  • 1:进入oracle官网下载jdk8的tar.gz包:

  • 2:将下载好的包上传到每个服务器上:

  • 3:查看是否上传成功:

[root@kafka01 ~]# ls
anaconda-ks.cfg  jdk-8u333-linux-x64.tar.gz
  • 4:创建文件夹:
mkdir -p /usr/java/
  • 5:解压刚刚下载好的包并输出到/usr/java目录下:
tar -zxvf jdk-8u333-linux-x64.tar.gz -C /usr/java/
[root@kafka02 ~]# ls /usr/java/
jdk1.8.0_333
  • 6:配置java环境变量:
vi /etc/profile

在文件中末尾添加如下配置:(需要更改的是JAVA_HOME,根据自己的java目录名来更改)

JAVA_HOME=/usr/java/jdk1.8.0_333
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
  • 7:让配置立即生效:
source /etc/profile
  • 8:查看JDK是否安装成功:
[root@kafka01 ~]# java -version
java version "1.8.0_333"
Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
分别为每个服务器安装Docker
  • 1:切换镜像源
wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo
  • 2:查看当前镜像源中支持的docker版本
yum list docker-ce --showduplicates | sort -r
  • 3:安装特定版本的docker-ce
yum -y install docker-ce-3:20.10.8-3.el7.x86_64 docker-ce-cli-3:20.10.8-3.el7.x86_64 containerd.io
为每个节点的Docker接入阿里云镜像加速器

配置镜像加速器方法。

  • 准备工作:
  • 1:首先进入阿里云容器镜像服务 https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors
  • 2:点击镜像工具下面的镜像加速器
  • 3:拿到你的加速器地址和下面第二步的registry-mirrors的值替换即可。

针对Docker客户端版本大于 1.10.0 的用户,可以通过修改daemon配置文件/etc/docker/daemon.json来使用加速器

  • 第一步:
mkdir -p /etc/docker
  • 第二步:
cat <<EOF> /etc/docker/daemon.json
{"exec-opts": ["native.cgroupdriver=systemd"],	"registry-mirrors": ["https://u01jo9qv.mirror.aliyuncs.com","https://hub-mirror.c.163.com","https://mirror.baidubce.com"],"live-restore": true,"log-driver":"json-file","log-opts": {"max-size":"500m", "max-file":"3"},"max-concurrent-downloads": 10,"max-concurrent-uploads": 5,"storage-driver": "overlay2"
}
EOF
  • 第三步:
sudo systemctl daemon-reload
  • 第四步:
sudo systemctl restart docker

最后就接入阿里云容器镜像加速器成功啦。

为每个节点的docker设置开机自动启动
sudo systemctl enable docker
分别为每个服务器安装zookeeper3.7.1(搭建zookeeper集群)⭐
  • 1:在zookeeper官网上面下载zookeeper稳定版(当前为3.7.1)的tar.gz包,并上传到每个服务器上:

zookeeper官网

在这里插入图片描述在这里插入图片描述

  • 2:查看刚刚上传的zookeeper包:
[root@kafka01 ~]# pwd
/root
[root@kafka01 ~]# ls | grep zookeeper
apache-zookeeper-3.7.1-bin.tar.gz
  • 3:解压我们的zookeeper包:
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /usr/local
mv /usr/local/apache-zookeeper-3.7.1-bin/ /usr/local/zookeeper
cd /usr/local/zookeeper
  • 4:配置关于zookeeper的环境变量:
vi /etc/profile

在文件中末尾添加如下配置:(ZOOKEEPER_HOME需要根据你自己的zookeeper目录来配置)

export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH
  • 5:让配置立即生效:
source /etc/profile
  • 6:创建目录:
cd /usr/local/zookeeper
sudo mkdir data
  • 7;添加配置:
cd conf
sudo vi zoo.cfg

内容如下:(dataDir修改成自己的目录,kafka01/02/03是我们在hosts配置的主机名映射,相当于ip)

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
clientPort=2181
server.1=kafka01:2888:3888
server.2=kafka02:2888:3888
server.3=kafka03:2888:3888

initLimit:ZooKeeper集群模式下包含多个zk进程,其中一个进程为leader,余下的进程为follower。
当follower最初与leader建立连接时,它们之间会传输相当多的数据,尤其是follower的数据落后leader很多。initLimit配置follower与leader之间建立连接后进行同步的最长时间。

syncLimit:配置follower和leader之间发送消息,请求和应答的最大时间长度。

tickTime:tickTime则是上述两个超时配置的基本单位,例如对于initLimit,其配置值为5,说明其超时时间为 2000ms * 5 = 10秒。

server.id=host:port1:port2其中id为一个数字,表示zk进程的id,这个id也是dataDir目录下myid文件的内容。host是该zk进程所在的IP地址,port1表示follower和leader交换消息所使用的端口,port2表示选举leader所使用的端口。

dataDir:其配置的含义跟单机模式下的含义类似,不同的是集群模式下还有一个myid文件。myid文件的内容只有一行,且内容只能为1 - 255之间的数字,这个数字亦即上面介绍server.id中的id,表示zk进程的id。

  • 8:进入data目录:
cd /usr/local/zookeeper/data/
  • 9:对每个服务器(kafka01、kafka02、kafka03)配置myid文件:
  • 9(1):如果是kafka01服务器,则执行下面这个:(下面的1、2、3就是我们上面指定的server.id,每个zookeeper服务器都要有一个id,并且全局唯一)
echo "1" > myid
  • 9(2):如果是kafka02服务器,则执行下面这个:
echo "2" > myid
  • 9(3):如果是kafka03服务器,则执行下面这个
echo "3" > myid
  • 10:启动zookeeper服务命令:(必须要把全部zookeeper服务器启动之后在执行下一步status命令)
cd /usr/local/zookeeper/bin/
[root@kafka01 bin]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
  • 11:对全部的zookeeper服务器执行查看zookeeper集群节点状态命令:(看看哪个是leader节点、哪个是follower节点)。Mode就是某一台zookeeper的角色
[root@kafka01 bin]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
[root@kafka02 data]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
[root@kafka03 data]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
分别为每个服务器安装Kafka3.2.1(搭建Kafka集群)⭐
  • 1:进入kafka官网:

Kafka官网

  • 2(下载方式1):下载当前kafka的Binary稳定版(截止到2022-08-29,稳定版本为3.2.1),下载会十分缓慢,大约要1个小时的时间(假如你的网速很慢,那么这种方式就不推荐了。):

在这里插入图片描述

  • 2(下载方式2):使用我上传kafka_2.13-3.2.1.zip包(注意这个不是tgz包,而是zip包)(推荐这种方式),下载速度很快:

kafka3.2.1快速下载地址

在这里插入图片描述

  • 3:解压kafka_2.13-3.2.1.zip包,拿到kafka的tgz包:

在这里插入图片描述

  • 4:将解压好的kafka的tgz包上传到每个服务器上。
  • 5:查看每个服务器上是否都已经成功上传了kafka_2.13-3.2.1.tgz包:
[root@kafka01 ~]# pwd
/root
[root@kafka01 ~]# ls | grep kafka
kafka_2.13-3.2.1.tgz
  • 6:解压kafka.tgz包到/usr/local下:
tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local/
  • 7:修改kafka目录:
cd /usr/local/
mv kafka_2.13-3.2.1/ kafka
  • **8:修改每个服务器的kafka配置文件:(注意:对应的机器要执行对应的命令,不是都在一台服务器执行)**⭐

    • 8(1):在kafka01服务器上修改的配置文件,将下面的内容粘贴上去:⭐
    [root@kafka01 local]# rm -f /usr/local/kafka/config/server.properties
    [root@kafka01 local]# vi /usr/local/kafka/config/server.properties
    

    内容如下:

    注意下面3个地方:

    ①每一个kafka的broker.id都不可以一样,并且要为数字(比如0、1、2都是可以的)!

    ②log.dirs为你当前机器的kafka的日志数据存储目录

    ③zookeeper.connect:配置连接Zookeeper集群地址,下面的kafka01:2181(kafka01的意思是zk所在的服务器的ip地址,因为我们配置了hosts,所以就直接用主机名更方便;2181就是zk配置文件中的clientPort)

    #broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=1
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/usr/local/kafka/datas
    #topic 在当前 broker 上的分区个数
    num.partitions=1
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    # 每个 topic 创建时的副本数,默认时 1 个副本
    offsets.topic.replication.factor=1
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #每个 segment 文件的大小,默认最大 1G
    log.segment.bytes=1073741824
    # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理)
    zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
    
    • 8(2):在kafka02服务器上修改的配置文件,将下面的内容粘贴上去:⭐
    [root@kafka02 local]# rm -f /usr/local/kafka/config/server.properties
    [root@kafka02 local]# vi /usr/local/kafka/config/server.properties
    

    内容如下:

    #broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=2
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/usr/local/kafka/datas
    #topic 在当前 broker 上的分区个数
    num.partitions=1
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    # 每个 topic 创建时的副本数,默认时 1 个副本
    offsets.topic.replication.factor=1
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #每个 segment 文件的大小,默认最大 1G
    log.segment.bytes=1073741824
    # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理)
    zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
    
    • 8(3):在kafka03服务器上修改的配置文件,将下面的内容粘贴上去:⭐
    [root@kafka03 local]# rm -f /usr/local/kafka/config/server.properties
    [root@kafka03 local]# vi /usr/local/kafka/config/server.properties
    

    内容如下:

    #broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=3
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/usr/local/kafka/datas
    #topic 在当前 broker 上的分区个数
    num.partitions=1
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    # 每个 topic 创建时的副本数,默认时 1 个副本
    offsets.topic.replication.factor=1
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #每个 segment 文件的大小,默认最大 1G
    log.segment.bytes=1073741824
    # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便我们管理)
    zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181/kafka
    
  • 9:给每个服务器都配置kafka的环境变量:

sudo vim /etc/profile

在最后面追加的内容如下:

export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
  • 10:让配置立即生效:
source /etc/profile
  • 11:启动zk集群。依次在 kafka01、kafka02、kafka03节点上启动zookeeper。(zk要先启动,然后再启动kafka)⭐
/usr/local/zookeeper/bin/zkServer.sh start
  • 12:后台模式启动kafka集群。依次在 kafka01、kafka02、kafka03节点上启动kafka。
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
  • 13:查看kafka是否启动成功:
[root@kafka01 local]# jps
3603 Kafka
3166 QuorumPeerMain
4367 Jps
  • 14:关闭kafka集群:(可以暂时不关闭,方便后面继续演示)
    • 注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper
      集群。
kafka-server-stop.sh
  • 15:等kafka集群全部关闭之后再关闭zookeeper:(可以暂时不关闭,方便后面继续演示)
zkServer.sh stop
安装MySQL5.7,配置canal+mysql(本次采用Docker的方式)⭐
  • 1:创建my.cnf文件(也就是mysql的配置文件)
vim /my-sql/mysql-master/conf/my.cnf

将内容粘贴进my.cnf文件

[client]
# 指定编码格式为utf8,默认的MySQL会有中文乱码问题
default_character_set=utf8
[mysqld]
collation_server=utf8_general_ci
character_set_server=utf8# 全局唯一id(不允许有相同的)
server_id=201
binlog-ignore-db=mysql
# 指定MySQL二进制日志
log-bin=mysql-bin
# 二进制日志格式,因为要整合canal,所以这里必须要是row
binlog_format=row
#指定具体要同步的数据库,如果不配置则表示所有数据库均开启 Binlog(可以配置多个)
binlog-do-db=canal-test-db1
binlog-do-db=canal-test-db2
  • 2:运行一个mysql容器实例。作为Master节点
docker run -p 3307:3306 \
-v /my-sql/mysql-master/log:/var/log/mysql \
-v /my-sql/mysql-master/data:/var/lib/mysql \
-v /my-sql/mysql-master/conf:/etc/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
--name mysql-master \
-d mysql:5.7
  • 3:进入容器内部,并登陆mysql
docker exec -it mysql-master /bin/bash
mysql -uroot -p
  • 4:创建canal的mysql帐号,使该canal帐号具有MySQL的Slave (从节点)的权限(也就是能够主从复制), 如果已有账户可直接 grant(这几步都是在mysql容器内部进行,也就是登录了mysql帐号后执行的命令)
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal';
flush privileges;
  • 5:退出mysql容器,并重启容器:
docker restart mysql-master
  • 6:再次进入容器内部,并登陆mysql
docker exec -it mysql-master /bin/bash
mysql -uroot -p
  • 7:查看是否成功开启binlog日志
show variables like '%log_bin%';

案例1:Canal+Kafka实现mysql和redis的数据同步⭐

案例目的:

1:实现canal只监控canal-test-db1数据库下的t_config(主要同步这个表)和.t_user表。

2:当我们修改canal-test-db1数据库下的t_config表的内容会自动同步到Redis中;

3:当我们修改canal-test-db1数据库下的t_user表则不会同步。(虽然t_user表也被canal监控,但是这个案例就要做到在被监控的情况下,而不被同步),说白了就是只同步t_config表。

必要的环境
  • 1:jdk8
  • 2:zookeeper
  • 3:kafka
  • 4:canal.deployer
  • 5:Redis
  • 6:Lombok
配置canal.deployer
  • 1:进入Canal的github仓库:

Canal的github仓库地址

  • 2:选择canal.deployer的版本(我们选择的是最新版v1.1.6):

    • 2(1)方式1:直接从GitHub上面下载。(下载速度十分慢,不推荐)

在这里插入图片描述在这里插入图片描述
在这里插入图片描述

  • 2(2)方式2:从我的csdn上面下载。(速度很快,推荐!⭐)

canal.deployer快速下载地址

在这里插入图片描述

  • 3:上传到我们的服务器上(这里我们就拿kafka01服务器作为canal服务器),生产环境可以另外创建一个新的canal服务器。

  • 4:查看canal是否上传到我们的服务器上:(只上传到kafka01服务器上)

[root@kafka01 ~]# ls | grep canal
canal.deployer-1.1.6.tar.gz
  • 5:解压canal.deployer:
mkdir -p /usr/local/canal-deployer
tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-deployer
cd /usr/local/canal-deployer/conf
  • 6:修改canal.properties文件:
vim /usr/local/canal-deployer/conf/canal.properties
  • 修改地方1(配置zookeeper集群地址):

在这里插入图片描述

  • 修改地方2(修改成kafka模式):

在这里插入图片描述

  • 修改地方3(修改canal数据库用户的账号密码):

在这里插入图片描述

  • 修改地方4(在conf目录下要有example同名的目录,可以默认不改,意思就是instance.properties在/usr/local/canal-deployer/conf/example目录下。):
    • 例如要将example改成abc1,则也要在conf目录下创建一个abc1的目录,并在里面创建instance.properties配置文件。

在这里插入图片描述

  • 修改地方5(配置kafka集群地址):

在这里插入图片描述

  • 7:配置instance.properties配置文件:(默认是在example目录下)
cd /usr/local/canal-deployer/conf/example
vim instance.properties
  • 修改地方1。canal数据库的id,必须要全局唯一(和mysql的id不能设置一样):

在这里插入图片描述

  • 修改地方2。我们MySQL的master数据库的ip+端口(我们上面设置mysql的是3307端口):

在这里插入图片描述

  • 修改地方3。在MySQL的master数据库中canal的账号密码:

在这里插入图片描述

  • 修改地方4。新增一个配置,设置默认同步的数据库名:⭐
    • canal.instance.defaultDatabaseName =监控的数据库名
      • 例如canal.instance.defaultDatabaseName=canal-test-db1

在这里插入图片描述

  • 修改地方5:匹配表名的正则表达式:(指定canal要监控的数据库.表名)很重要⭐
    • canal.instance.filter.regex=canal-test-db1.t_config,canal-test-db1.t_user

在这里插入图片描述

  • 修改地方6:指定用于canal传输消息的kafka的topic名称:(我们指定的topic名称为canal-test-topic)

在这里插入图片描述

启动canal.deployer⭐
  • 1:跳转目录:
cd /usr/local/canal-deployer/bin/
  • 2:执行sh:
./startup.sh
配置hosts(由于我是Windows运行,没有配置hosts导致无法识别kafka01主机名)⭐
  • :修改C:\Windows\System32\drivers\etc路径下的hosts文件:

在这里插入图片描述

创建一个SpringBoot项目⭐
项目结构

在这里插入图片描述

准备需要同步的数据库表⭐
CREATE DATABASE `canal-test-db1`;USE `canal-test-db1`;CREATE TABLE `t_config` (`config_id` bigint(20) NOT NULL,`config_info` text,`datetime` datetime DEFAULT NULL,`desc` varchar(255) DEFAULT NULL,PRIMARY KEY (`config_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
pom.xml⭐
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>canal-demo</artifactId><version>1.0-SNAPSHOT</version><properties><!--        springboot版本--><spring-boot.version>2.5.9</spring-boot.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><!--        spring整合kafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Redis服务启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.70</version></dependency><!--        springboot-web依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--        lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies></project>
application.yml⭐
server:port: 8081
# spring整合kafka配置
spring:kafka:# kafka集群地址(可以多个)bootstrap-servers:- 192.168.184.201:9092- 192.168.184.202:9092- 192.168.184.203:9092#kafka消费者配置consumer:# 指定一个消费者组idgroup-id: canal-group1# key/value的反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#手动提交第1步:开启手动提交offset(true的话就是消费完一条消息自动会提交)enable-auto-commit: false# kafka生产者配置producer:# key/value的序列化key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerlistener:#手动提交第2步:ack设置为手动(enable-auto-commit要设置为false)# manual_immediate:每处理完业务手动调用Acknowledgment.acknowledge()后立即提交ack-mode: manual_immediate#redis配置redis:host: 127.0.0.1#    password:port: 6379database: 2
RedisTemplateConfig(配置类)
package com.boot.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisTemplateConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();Jackson2JsonRedisSerializer jsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setValueSerializer(jsonRedisSerializer);redisTemplate.setHashValueSerializer(jsonRedisSerializer);// 解决查询缓存转换异常的问题ObjectMapper om = new ObjectMapper();// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和publicom.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jsonRedisSerializer.setObjectMapper(om); //如果不设置,存储到redis的对象取出来将无法进行转换redisTemplate.setDefaultSerializer(jsonRedisSerializer);return redisTemplate;}
}
Config.class⭐
package com.boot.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.io.Serializable;/*** 该实体类对应着数据库表t_config字段* @author youzhengjie 2022-09-01 16:55:40*///lombok注解简化开发
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true) //开启链式编程
public class Config implements Serializable {private Long configId;private String configInfo;private String datetime;private String desc;
}
canal要求必须要的实体类(用于接收canal发送到kafka的同步消息)⭐
ConfigCanalBean.class⭐
package com.boot.entity.config_canal;import com.boot.entity.Config;
import lombok.Data;import java.util.List;/*** 这个类是接收canal发送过来的消息所必须要的* @author youzhengjie 2022-09-01 16:55:18*/
@Data
public class ConfigCanalBean {//config实体类的数据private List<Config> data;//数据库名称private String database;private long es;//递增private int id;//是否是DDL语句private boolean isDdl;//表结构的字段类型private MysqlType mysqlType;//UPDATE语句,旧数据private String old;//主键名称private List<String> pkNames;//sql语句private String sql;//暂时没发现什么用,不过也要写上这个属性private SqlType sqlType;//表名private String table;private long ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等private String type;//getter、setter方法
}
MysqlType.class⭐
package com.boot.entity.config_canal;import lombok.Data;/*** 和SqlType类差不多。(就是把我们要同步的t_config数据库表的字段全部复制到这里,然后全部改成String类型即可)* 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应,而无需更改。* 注意:这个类的属性全部都要是String类型* @author youzhengjie 2022-09-01 16:55:25*/
@Data
public class MysqlType {private String configId;private String configInfo;private String datetime;private String desc;
}
SqlType.class⭐
package com.boot.entity.config_canal;import lombok.Data;/*** 和MysqlType类差不多。(就是把我们要同步的t_config数据库表的字段全部复制到这里,然后全部改成int类型即可)* 下面的属性名支持驼峰法。比如下面的configId属性可以和t_config表的config_id一一对应,而无需更改。* 注意:这个类的属性全部都要是int类型* @author youzhengjie 2022-09-01 16:55:32*/
@Data
public class SqlType {private int configId;private int configInfo;private int datetime;private int desc;}
ConfigCanalRedisConsumer(kafka消费者类,监听指定topic,把canal发送的消息同步到Redis中)⭐
package com.boot.comsumer;import com.alibaba.fastjson.JSONObject;
import com.boot.entity.Config;
import com.boot.entity.config_canal.ConfigCanalBean;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.TimeUnit;/*** kafka消费者(监听名为canal-test-topic的topic),同步Redis* @author youzhengjie 2022-09-01 16:54:28*/
@Component
@Slf4j
public class ConfigCanalRedisConsumer {@Autowiredprivate RedisTemplate redisTemplate;//redis的key格式:(数据库.表名_字段的id)private static final String KEY_PREFIX = "canal-test-db1.t_config_";//过期时间(单位:小时)private static final int TIME_OUT = 24;/*** @param consumer 接收消费记录(消息)* @param ack 手动提交消息*/@KafkaListener(topics = "canal-test-topic")public void receive(ConsumerRecord<String, String> consumer, Acknowledgment ack) {try {//获取canal的消息String value = (String) consumer.value();log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(), consumer.partition(), consumer.offset(), value);//转换为javaBeanConfigCanalBean canalBean = JSONObject.parseObject(value, ConfigCanalBean.class);/*由于我们canal.instance配置了监控canal-test-db1.t_config表和canal-test-db1.t_user表(生产环境下可以启动多个canal,每一个canal监听一张需要同步的表)所以我们要对这两张表分开处理。(可以通过他们的表名(canalBean.getTable())来区分)如果canalBean.getTable()获取的表名是t_config,则同步到redis,如果不是则不管。*///System.out.println(canalBean);if("t_config".equals(canalBean.getTable())){//获取是否是DDL语句boolean isDdl = canalBean.isDdl();//获取当前sql语句的类型(比如INSERT、DELETE等等)String type = canalBean.getType();List<Config> configList = canalBean.getData();//如果不是DDL语句if (!isDdl) {//INSERT和UPDATE都是一样的操作if ("INSERT".equals(type) || "UPDATE".equals(type)) {//新增语句for (Config config : configList) {Long id = config.getConfigId();//新增到redis中,过期时间是10分钟redisTemplate.opsForValue().set(KEY_PREFIX + id, JSONObject.toJSONString(config), TIME_OUT, TimeUnit.HOURS);}}else if("DELETE".equals(type)){//删除语句for (Config config : configList) {Long id = config.getConfigId();//从redis中删除redisTemplate.delete(KEY_PREFIX+id);}}}}//最后,如果上面的代码没有报错的情况下,可以确认消息了。(很重要)ack.acknowledge();}catch (Exception e){throw new RuntimeException();}}}
创建kafka的topic(我们指定的topic名称为canal-test-topic)
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server kafka01:9092 --topic canal-test-topic --create
开始测试canal同步效果⭐
测试1:给t_config表插入数据

在这里插入图片描述
在这里插入图片描述

测试2:修改t_config表数据

在这里插入图片描述在这里插入图片描述

测试3:删除t_config表数据

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/120997.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker - window Docker Desktop升级

文章目录 前言docker - window Docker Desktop升级 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收藏一键三连啊&#xff0c;写作不易啊^ _ ^。   而且听说点赞的人每天的运气都不会太差&#xff0c;实在白嫖的话&#xff0c;那欢迎常来…

【python画画】蘑菇云爱心

来源于网上短视频 数学原理不懂&#xff0c;图个乐 import math from turtle import *def x(i):return 15 * math.sin(i) ** 3 * 20def y(i):return 20 * (12 * math.cos(i) - 5 * math.cos(2 * i) - 2 * math.cos(4 * i))speed(0) color(red) pensize(10) for i in range(51…

C++设计模式_14_Facade门面模式

本篇介绍的Facade门面模式属于“接口隔离”模式的一种&#xff0c;以下进行详细介绍。 文章目录 1. “接口隔离”模式1. 1 典型模式 2. 系统间耦合的复杂度3. 动机(Motivation)4. 模式定义5. Facade门面模式的代码实现6. 结构7. 要点总结8. 其他参考 1. “接口隔离”模式 在组…

笔记44:Batch_Normlization 过程详解

笔记本地地址&#xff1a;D:\work_file\DeepLearning_Learning\03_个人笔记\2.图像处理任务\BN a a a a a a a a a a a a a a a a a

抖音上怎么挂小程序?制作小程序挂载抖音视频

公司企业商家现在已经把抖音作为营销的渠道之一&#xff0c;目前抖音支持短视频挂载小程序&#xff0c;可方便做营销。以下给大家分享这一操作流程。 一、申请自主挂载能力 首先需要在抖音开放平台官网注册一个抖音小程序账号&#xff0c;然后申请短视频自主挂载能力。 二、搭…

Liunx两台服务器实现相互SSH免密登录

一、首先准备两台Linux虚拟机当作此次实验的两台服务器 服务器1&#xff1a;server IPV4&#xff1a;192.168.110.136 服务器2&#xff1a;client IPV4&#xff1a; 192.168.110.134 二、准备阶段 [rootserver ~]# systemctl disable firewalld #关…

Umijs创建一个antd+Ts项目环境

上文搭建Umijs环境并创建一个项目 介绍基本操作中 我们构建了一个Umijs环境的环境 但也只创建了一个页面 真正开发来讲 也不可能只创建几个界面这么简单 这里面的创建 还是非常完整的 这里 我创建一个文件夹 主要是做我们的项目目录 然后 我们在终端输入命令 然后 打开目录终…

C#版字节跳动SDK - SKIT.FlurlHttpClient.ByteDance

前言 在我们日常开发工作中对接第三方开放平台&#xff0c;找一款封装完善且全面的SDK能够大大的简化我们的开发难度和提高工作效率。今天给大家推荐一款C#开源、功能完善的字节跳动SDK&#xff1a;SKIT.FlurlHttpClient.ByteDance。 项目官方介绍 可能是全网唯一的 C# 版字节…

【C++进阶】set和map的基本使用(灰常详细)

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前学习C和算法 ✈️专栏&#xff1a;C航路 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你有帮助的话 欢迎 评论&#x1f4ac; 点赞&#x1…

基于springboot实现网上图书商城管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现网上图书商城管理系统演示 摘要 在Internet高速发展的今天&#xff0c;我们生活的各个领域都涉及到计算机的应用&#xff0c;其中包括网上图书商城的网络应用&#xff0c;在外国网上图书商城已经是很普遍的方式&#xff0c;不过国内的管理网站可能还处于起步…

基于nodejs+vue全国公考岗位及报考人数分析

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

【数据结构】数组和字符串(七):特殊矩阵的压缩存储:三元组表的转置、加法、乘法操作

文章目录 4.2.1 矩阵的数组表示4.2.2 特殊矩阵的压缩存储a. 对角矩阵的压缩存储b~c. 三角、对称矩阵的压缩存储d. 稀疏矩阵的压缩存储——三元组表4.2.3三元组表的转置、加法、乘法、操作转置加法乘法算法测试实验结果代码整合 4.2.1 矩阵的数组表示 【数据结构】数组和字符串…

竞赛 深度学习实现行人重识别 - python opencv yolo Reid

文章目录 0 前言1 课题背景2 效果展示3 行人检测4 行人重识别5 其他工具6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习的行人重识别算法研究与实现 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c…

论文阅读——BART

Arxiv: https://arxiv.org/abs/1910.13461 一个去噪自编码器的预训练序列到序列的模型。是一个结合了双向和自回归transformers的模型。 预训练分为两个阶段&#xff1a;任意噪声函数破坏文本和序列模型重建原始文本 一、模型 input&#xff1a;被破坏的文本-->bidirecti…

基于Canal同步MySQL数据到Elasticsearch

基于Canal同步MySQL数据到Elasticsearch 基于 canal 同步 mysql 的数据到 elasticsearch 中。 1、canal-server 相关软件的安装请参考&#xff1a;《Canal实现数据同步》 1.1 pom依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmln…

1. 两数之和、Leetcode的Python实现

博客主页&#xff1a;&#x1f3c6;看看是李XX还是李歘歘 &#x1f3c6; &#x1f33a;每天分享一些包括但不限于计算机基础、算法等相关的知识点&#x1f33a; &#x1f497;点关注不迷路&#xff0c;总有一些&#x1f4d6;知识点&#x1f4d6;是你想要的&#x1f497; ⛽️今…

虚拟机构建部署单体项目及前后端分离项目

目录 一.部署单体项目 1.远程数据库 1.1远程连接数据库 1.2 新建数据库运行sql文件 2.部署项目到服务器中 3.启动服务器运行 二.部署前后端分离项目 1.远程数据库和部署到服务器 2.利用node环境启动前端项目 3.解决主机无法解析服务器localhost问题 方法一 ​编辑 方法二 一.部…

不需要报班学课程,也能制作手办创业的新方法!

近些年&#xff0c;我们已经习惯只要走进商场就一定会路过泡泡玛特一类的潮流玩具店&#xff0c;甚至还会在美食城、自助饮料机旁边看到盲盒手办的售卖机器里摆放着诱人的近期热卖盲盒。一线城市如此&#xff0c;县城也是一样&#xff0c;不同的只可能是盲盒里的内容。 盲盒到底…

公司如何禁止拷贝文件

公司如何禁止拷贝文件 安企神U盘管理系统下载使用 禁止拷贝文件是一种数据安全措施&#xff0c;通常在企业中用于保护重要信息和知识产权。禁止拷贝文件的方法需要根据公司的实际情况来选择和实施&#xff0c;以下是一些常见的方法&#xff0c;可用于防止文件拷贝&#xff1a…

【pwn入门】使用python打二进制

声明 本文是B站你想有多PWN学习的笔记&#xff0c;包含一些视频外的扩展知识。 程序网络交互初体验 将程序部署成可以远程访问的 socat tcp-l:8877,fork exec:./question_1_plus_x64,reuseaddr通过网络访问程序 nc 127.0.0.1 8877攻击脚本 import socket import telnetli…