SpringBoot快速整合canal1.1.5(TCP模式)
安装并配置MySQL主从⭐
- 1:Docker安装MySQL8.0.28
docker pull mysql:8.0.28
- 2:创建目录:
mkdir -p /usr/local/mysql8/data
mkdir -p /usr/local/mysql8/log
mkdir -p /usr/local/mysql8/my.conf.d
chmod -R 777 /usr/local/mysql8/
- 3:编写my.cnf文件:
vi /usr/local/mysql8/my.conf.d/my.cnf
内容如下:(注意:把binlog-do-db的值修改成你需要canal监听的数据库名称,如果需要监听多个数据库,一定要在下面写多个binlog-do-db,而不是用“,”分隔)
[client]
# 默认字符集
# default_character_set=utf8
[mysqld]
server-id=138
# 开启二进制日志功能
log-bin=mysql-slave-bin
# binlog 记录内容的方式,记录被操作的每一行
binlog_format = ROW
# ------- >>>>指定监听的数据库(防止监听所有数据库)<<<<<----------
binlog-do-db=security-jwt-db
# 忽略大小写
lower_case_table_names=1
pid-file= /var/run/mysqld/mysqld.pid
socket= /var/run/mysqld/mysqld.sock
# 数据库数据存放目录
datadir= /var/lib/mysql
secure-file-priv= NULL
skip-symbolic-links=0
# 最大链接数
max_connections=200
# 最大失败次数
max_connect_errors=10
# 默认时区
default-time_zone='+8:00'
character-set-client-handshake=FALSE
character_set_server=utf8mb4
# default-character-set=utf8
collation-server=utf8mb4_unicode_ci
init_connect='SET NAMES utf8mb4 COLLATE utf8mb4_unicode_ci'
# 默认使用‘mysql_native_password’插件认证
default_authentication_plugin=mysql_native_password
- 4:启动MySQL容器:
docker run \--name mysql8.0.28 \--privileged=true \--restart=always \-it -p 3308:3306 \-v /usr/local/mysql8/data:/var/lib/mysql \-v /usr/local/mysql8/log:/var/log/mysql \-v /usr/local/mysql8/my.conf.d/my.cnf:/etc/mysql/my.cnf \-e MYSQL_ROOT_PASSWORD=123456 \-d mysql:8.0.28
- 5:查看mysql容器是否启动成功:
[root@centos7-sql my.conf.d]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
ca41de7447ab mysql:8.0.28 "docker-entrypoint.s…" 12 seconds ago Up 11 seconds 33060/tcp, 0.0.0.0:3308->3306/tcp, :::3308->3306/tcp mysql8.0.28
- 6:修改密码连接模式(mysql8.0版本都要进行修改):
docker exec -it mysql /bin/bash
mysql -uroot -p'123456'
ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
- 7:在MySQL中创建一个canal用户,专门用作数据同步:
create user 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';GRANT all privileges ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;
- 8:退出并重启MySQL容器:
docker restart mysql
docker安装canal1.1.5(版本要对应)⭐
- 1:拉取canal1.1.5镜像:
docker pull canal/canal-server:v1.1.5
- 2:运行canal容器:
docker run -p 11111:11111 --name canal \
-e canal.destinations=example \
-e canal.instance.master.address=192.168.184.123:3308 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=security-jwt-db\\..* \
-d canal/canal-server:v1.1.5
核心说明:(只有下面的6个参数才需要我们手动根据自己情况配置,其他可以不用变)
-p 11111:11111
:这是canal的默认监听端口-e canal.instance.master.address=192.168.184.123:3308
:数据库地址和端口(一定要设置成你的MySQL对外暴露的IP和端口号才行)-e canal.instance.dbUsername=canal
:数据库中canal用户的用户名(也就是我们之前单独创建的用户)-e canal.instance.dbPassword=canal
:数据库中canal用户的密码(也就是我们之前单独创建的用户)-e canal.instance.filter.regex=security-jwt-db\\..*
:要监听的表名称(我们这个配置的意思是:监听security-jwt-db数据库下的所有表)--network
:输入我们刚刚创建好的自定义网络,让canal也加入到和MySQL同一个网络中去。
表名称监听支持的语法:
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2
整合SpringBoot项目⭐
- 1:导入依赖:(这个依赖并不是alibaba官方的,而是由其他人帮我们整合了SpringBoot项目,我们只需要去导入这个依赖,再做一些简单的配置即可,非常方便!)
- 注意:canal-spring-boot-starter的1.2.1-RELEASE版本匹配的是canal的1.1.5版本(canal的新版本没有去试过,不知道有没有问题)
<!-- springboot整合canal1.1.5(因为canal-spring-boot-starter的1.2.1-RELEASE版本匹配的是canal的1.1.5版本) --><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency>
- 2:配置canal:(application.yml)
- destination:也就是我们安装canal时执行的docker run … canal中配置的
-e canal.destinations
的参数内容。 - server:配置canal所在服务器ip+端口号(默认是11111),记得修改成自己的ip地址。
- destination:也就是我们安装canal时执行的docker run … canal中配置的
#配置alibaba-canal
canal:destination: example # canal数据同步的目的地。也就是我们安装canal时配置的exampleserver: 192.168.184.123:11111 # canal的地址(canal所在服务器ip+端口号(默认是11111))
# 解决canal-spring-boot-starter一直输出日志
logging:level:top.javatool.canal.client: warn
- 3:编写实体类(这个类也就是我们要监听操作的实体类)
@TableName(value="sys_oper_log")
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@Builder
public class OperationLog implements Serializable {private static final long serialVersionUID = 1L;@TableId("id")@JsonSerialize(using = ToStringSerializer.class) //解决雪花算法生成的id过长导致前端js精度丢失问题(也就是js拿到的数据和后端不一致问题)@ApiModelProperty(name = "id",value = "主键")@ExcelProperty("id")private Long id;@TableField("username")@ApiModelProperty("执行操作的用户名")@ExcelProperty("执行操作的用户名")private String username;@TableField("type")@ApiModelProperty("操作类型")@ExcelProperty("操作类型")private String type;@TableField("uri")@ApiModelProperty("访问的接口uri")@ExcelProperty("访问的接口uri")private String uri;@TableField("time")@ApiModelProperty("访问接口耗时")@ExcelProperty("访问接口耗时")private String time;@TableField("ip")@ApiModelProperty("执行操作的用户的ip")@ExcelProperty("执行操作的用户的ip")private String ip;@TableField("address")@ApiModelProperty("执行操作的用户的ip对应的地址")@ExcelProperty("执行操作的用户的ip对应的地址")private String address;@TableField("browser")@ApiModelProperty("执行操作的用户所使用的浏览器")@ExcelProperty("执行操作的用户所使用的浏览器")private String browser;@TableField("os")@ApiModelProperty("执行操作的用户所使用的操作系统")@ExcelProperty("执行操作的用户所使用的操作系统")private String os;@TableField("oper_time")@ApiModelProperty("操作时间")@ExcelProperty(value = "操作时间",converter = LocalDateTimeConverter.class)private LocalDateTime operTime;@TableLogic//逻辑删除@TableField("del_flag")@ApiModelProperty("删除标志(0代表未删除,1代表已删除)")@ExcelProperty(value = "删除标志",converter = DelFlagConverter.class)private Integer delFlag;}
- 4:创建一个中转类(也就是说我们报错的原因是OperationLog类中的LocalDateTime属性,这个时候我们可以创建一个新的类,其他字段不变,把这个LocalDateTime日期类变成String就可以解决这个bug)
- 原因:(这是由于canal+SpringBoot中的StringConvertUtil类的源码的问题):
- 可以看到下面的源码没有对LocalDateTime转LocalDatetime进行处理,而是把LocalDateTime转为String类型进行返回(问题出在
type.equals(java.sql.Date.class) ? parseDate(columnValue) : columnValue
),而columnValue是String类型,所以我们可以用String类型去接收LocalDateTime类型。
- 可以看到下面的源码没有对LocalDateTime转LocalDatetime进行处理,而是把LocalDateTime转为String类型进行返回(问题出在
- 原因:(这是由于canal+SpringBoot中的StringConvertUtil类的源码的问题):
package com.boot.entity;import com.alibaba.excel.annotation.ExcelProperty;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableLogic;
import com.baomidou.mybatisplus.annotation.TableName;
import com.boot.converter.DelFlagConverter;
import com.boot.converter.LocalDateTimeConverter;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import javax.persistence.Column;
import java.io.Serializable;
import java.time.LocalDateTime;/*** 操作日志canal中转类。解决OperationLog类中的LocalDatetime类型的字段无法被canal接收导致报错** @author youzhengjie* @date 2022/10/30 21:16:54*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@Builder
public class OperationLogCanal implements Serializable {private static final long serialVersionUID = 1L;private Long id;private String username;private String type;private String uri;private String time;private String ip;private String address;private String browser;private String os;//canal+springboot当属性名和数据库字段不一致时,要用@Column去指定数据库字段名,否则会接收不到canal数据@Column(name = "oper_time")private String operTime;//canal+springboot当属性名和数据库字段不一致时,要用@Column去指定数据库字段名,否则会接收不到canal数据@Column(name = "del_flag")private Integer delFlag;}
- 5:OperationLogService
package com.boot.service;import com.baomidou.mybatisplus.extension.service.IService;
import com.boot.entity.OperationLog;import java.util.List;/*** 操作日志服务** @author youzhengjie* @date 2022/10/21 23:32:14*/
public interface OperationLogService extends IService<OperationLog> {long selectAllOperationLogCount();/*** 添加操作日志到elasticsearch** @param operationLog 操作日志* @return boolean*/boolean addOperationLogToEs(OperationLog operationLog);/*** 根据id删除elasticsearch中的操作日志** @param id id* @return boolean*/boolean deleteOperationLogToEs(Long id);/*** 更新elasticsearch中的操作日志** @param operationLog 操作日志* @return boolean*/boolean updateOperationLogToEs(OperationLog operationLog);}
- 6:OperationLogServiceImpl
package com.boot.service.impl;import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.boot.entity.OperationLog;
import com.boot.mapper.OperationLogMapper;
import com.boot.service.OperationLogService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** 操作日志服务impl** @author youzhengjie* @date 2022/10/21 23:42:49*/
@Service
@Slf4j
public class OperationLogServiceImpl extends ServiceImpl<OperationLogMapper, OperationLog> implements OperationLogService {@Autowiredprivate OperationLogMapper operationLogMapper;@Autowiredprivate RestHighLevelClient restHighLevelClient;/*** 操作日志的es索引*/private static final String OPER_LOG_INDEX="operation-log-index";@Overridepublic boolean addOperationLogToEs(OperationLog operationLog) {try {IndexRequest indexRequest = new IndexRequest(OPER_LOG_INDEX);indexRequest.id(operationLog.getId().toString());Map<String, Object> sources = new ConcurrentHashMap<>();sources.put("username", operationLog.getUsername());sources.put("type", operationLog.getType());sources.put("uri", operationLog.getUri());sources.put("time", operationLog.getTime());sources.put("ip", operationLog.getIp());sources.put("address",operationLog.getAddress());sources.put("browser", operationLog.getBrowser());sources.put("os", operationLog.getOs());sources.put("operTime", operationLog.getOperTime());sources.put("delFlag", operationLog.getDelFlag());indexRequest.source(sources);restHighLevelClient.index(indexRequest,RequestOptions.DEFAULT);return true;}catch (Exception e){e.printStackTrace();return false;}}@Overridepublic boolean deleteOperationLogToEs(Long id) {try {DeleteRequest deleteRequest = new DeleteRequest(OPER_LOG_INDEX);deleteRequest.id(id.toString());restHighLevelClient.delete(deleteRequest,RequestOptions.DEFAULT);return true;}catch (Exception e) {e.printStackTrace();return false;}}@Overridepublic boolean updateOperationLogToEs(OperationLog operationLog) {try {//将operationLog封装成MapMap<String,Object> operationLogMap=new ConcurrentHashMap<>();//将operationLog拷贝到Map中BeanUtil.copyProperties(operationLog,operationLogMap);//把map中的id去掉operationLogMap.remove("id");String idStr = operationLog.getId().toString();UpdateRequest updateRequest = new UpdateRequest(OPER_LOG_INDEX,idStr);updateRequest.doc(operationLogMap);restHighLevelClient.update(updateRequest,RequestOptions.DEFAULT);return true;}catch (Exception e){e.printStackTrace();return false;}}}
- 7:创建一个canal监听类(非常核心),指定监听一个表(这里我监听的是sys_oper_log表):
package com.boot.canal;import cn.hutool.core.bean.BeanUtil;
import com.boot.entity.OperationLog;
import com.boot.entity.OperationLogCanal;
import com.boot.service.OperationLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;/*** 操作日志canal处理器** @author youzhengjie* @date 2022/10/30 15:19:09*/
@CanalTable("sys_oper_log") //@CanalTable("sys_oper_log"):指定canal监听的表名为sys_oper_log
@Component
@Slf4j
public class OperationLogCanalHandle implements EntryHandler<OperationLogCanal> {@Autowiredprivate OperationLogService operationLogService;@Overridepublic void insert(OperationLogCanal operationLogCanal) {//编写mysql和缓存同步的逻辑(例如JVM本地缓存、Redis分布式缓存、es等)OperationLog operationLog = new OperationLog();//bean拷贝BeanUtil.copyProperties(operationLogCanal,operationLog);//同步到es中operationLogService.addOperationLogToEs(operationLog);log.warn("OperationLogCanalHandle->insert->开始同步->"+operationLog);}/*** 更新** @param before 之前* @param after 之后*/@Overridepublic void update(OperationLogCanal before, OperationLogCanal after) {//编写mysql和缓存同步的逻辑(例如JVM本地缓存、Redis分布式缓存、es等)OperationLog operationLog = new OperationLog();//注意:要拷贝after对象,这个对象是修改之后的对象BeanUtil.copyProperties(after,operationLog);//同步esoperationLogService.updateOperationLogToEs(operationLog);log.warn("OperationLogCanalHandle->update->开始同步->"+operationLog);}@Overridepublic void delete(OperationLogCanal operationLogCanal) {//编写mysql和缓存同步的逻辑(例如JVM本地缓存、Redis分布式缓存、es等)Long id = operationLogCanal.getId();//同步esoperationLogService.deleteOperationLogToEs(id);log.warn("OperationLogCanalHandle->delete->开始同步->"+id);}
}