Springboot整合Canal -- Canal 多客户端

文章目录

  • 前言
  • 一、批量注册bean定义:
    • 1.1 定义Canal注解:
    • 1.2 canal bean定义注册:
    • 1.3 canal bean 生成:
  • 二、canal客户端获取mysql数据变动
    • 2.1 canal客户端
    • 2.2 消息处理
  • 总结
  • 参考


前言

在项目中如果想要多个Canal 客户端通过tcp直连接入Canal 服务端,显然需要定义多个连接不同实例的客户端,而每个客户端除了连接到的实例不同其它配置几乎都相同,如果定义多个客户端显然会造成很多重复代码,那么spring 中有什么办法可以批量定义canal客户端?


一、批量注册bean定义:

我们知道spring中bean 的生成是依靠bean 定义,所以如果我们可以批量定义canal客户端 BeanDefinition ,然后将其注册到spring ,这样spring 就可以来生成我们需要的bean。而在spring 中我们可以 使用ImportBeanDefinitionRegistrar来自定义bean;

1.1 定义Canal注解:

CanalConfig.java

import org.springframework.context.annotation.Import;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Import(CanalConnectorRegistry.class)
public @interface CanalConfig {// 定义需要连接的canal 实例数组String[] destinations() default "";
}

然后在 spring 启动类 就可以增加改注解:

import com.example.spring_canal.config.CanalConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
@CanalConfig(destinations = {"test","aabbcc"})
public class SpringCanalApplication {public static void main(String[] args) {SpringApplication.run(SpringCanalApplication.class, args);}}

1.2 canal bean定义注册:

CanalConnectorRegistry.java

import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanNameGenerator;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;import java.util.Map;public class CanalConnectorRegistry implements ImportBeanDefinitionRegistrar {@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {// 获取CanalConfig  的注解内容Map<String, Object> annotationAttributes = importingClassMetadata.getAnnotationAttributes(CanalConfig.class.getName());// 获取destinations 要连接的实例String[] destinations = (String[]) annotationAttributes.get("destinations");for (int i = 0; i < destinations.length; i++) {GenericBeanDefinition beanDefinition = new GenericBeanDefinition();// 为CanalConnectorFactory 设置 destinationRegistry属性参数MutablePropertyValues properties = new MutablePropertyValues();properties.add("destinationRegistry", destinations[i]);beanDefinition.setPropertyValues(properties);// 定义 使用CanalConnectorFactory 来生成bean 对象beanDefinition.setBeanClass(CanalConnectorFactory.class);// 因为要生成的canalConnector bean对象都是CanalConnector 类型所以bean 的名称不能重复// 本文生成bean 的名称为canalConnector0,canalConnector1,,,registry.registerBeanDefinition("canalConnector" + i, beanDefinition);}}@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {ImportBeanDefinitionRegistrar.super.registerBeanDefinitions(importingClassMetadata, registry);}
}

1.3 canal bean 生成:

CanalConnectorFactory.java

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;@Component
public class CanalConnectorFactory implements FactoryBean {private String  destinationRegistry;// 定义canal 服务段的地址和端口@Value("${canal.server.host}")private String canalServerHost;@Value("${canal.server.port}")private int canalServerPort;public void setDestinationRegistry(String destinationRegistry) {this.destinationRegistry = destinationRegistry;}public CanalConnector createConnector(String destination, String username, String password) {return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerHost, canalServerPort),destination, username, password);}public CanalConnector createConnector(String destination) {return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerHost, canalServerPort),destination, "", "");}@Overridepublic Object getObject() throws Exception {// 生成 canal 客户端的beanreturn createConnector(destinationRegistry);}@Overridepublic Class<?> getObjectType() {return CanalConnector.class;}
}

canal 服务端ip 端口定义:

canal.server.host=localhost
canal.server.port=11111

这样当在项目中 去获取 canalConnector0,canalConnector1,,,这样的bean 时就会通过 CanalConnectorFactory 的getObject() 去生成bean;

二、canal客户端获取mysql数据变动

2.1 canal客户端

CanalService2.java


import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;@Slf4j
@Component
// 根据canal.enable 属性值取加载 这个bean 如果 canal.enable 为false 则不加载bean
@ConditionalOnProperty(name = "canal.enable", havingValue = "true")
public class CanalService2 implements DisposableBean {@Autowiredprivate ApplicationContext context;@Autowiredprivate RedisTemplate redisTemplate;// 定义要连接的实例数组,// 注意顺序和 @CanalConfig(destinations = {"test","aabbcc"}) 保持一致@Value("#{'${canal.destination.values}'.split(',')}")private List<String> destinations;// 定义每个实例中要监听的表,注意顺序和canal.destination.values 保持一致@Value("#{'${canal.client.subscribe.filters}'.split(',')}")private List<String> canalFilters;// 定义每个实例中每次要获取表动条数,注意顺序和canal.destination.values 保持一致@Value("#{'${canal.client.batch.sizes}'.split(',')}")private List<Integer> batchSizes;@Autowiredprivate CanalListener canalListener;private List<CanalConnector> connectors = new ArrayList<>(1 << 3);@PostConstructpublic void run() {// 开启线程进行数据消费for (int i = 0; i < destinations.size(); i++) {int finalI = i;new Thread(() -> toConsumeMessage(finalI, destinations.get(finalI))).start();}}private void toConsumeMessage(int i, String destination) {// 获取spring 容器中的 CanalConnector beanCanalConnector canalConnector = (CanalConnector) context.getBean("canalConnector" + i);// 收集项目中使用到的CanalConnector  bean 便于后续项目终止进行端口连接使用connectors.add(canalConnector);// 定义最后消费的位点long lastOffset = fetchFromPosition(canalConnector, i, destination);while (true) {// 获取消息,并且不进行ack 确认Message message = canalConnector.getWithoutAck(batchSizes.get(i));long batchId = message.getId();List<CanalEntry.Entry> entryList = message.getEntries();int size = message.getEntries().size();// 如果没有获取到消息则2s 后在次进行获取if (batchId == -1 || entryList.isEmpty()) {try {// 线程休眠2秒Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}continue;}// 比对消费位点,如果项目中已经消费过该条数据则继续进行下一次数据拉取long nowOffset = entryList.get(0).getHeader().getLogfileOffset();if (nowOffset <= lastOffset) {continue;}try {// 消费数据canalListener.onMessage(message);// 向服务端提交ack 确认canalConnector.ack(batchId);// 保存最后消费的位点,防止项目重启后 重复消费消息lastOffset = message.getEntries().get(size - 1).getHeader().getLogfileOffset();savePositionState(lastOffset, destination);} catch (Exception ex) {log.error("consume error:{}", ex.getMessage());// 回滚到未进行 ack 的地方,指定回滚具体的batchIdcanalConnector.rollback(batchId);}}}// 获取并设置消费的起始位点private long fetchFromPosition(CanalConnector canalConnector, int i, String key) {// Canal 连接器连接canalConnector.connect();// 订阅数据变更canalConnector.subscribe(canalFilters.get(i));// 回滚到未进行 ack 的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿canalConnector.rollback();// 从存储中获取上次消费的位点long position = getPositionState(key);return position;}// 获取位点状态private long getPositionState(String key) {// TODO: 从存储中获取上次消费的位点Object slot = redisTemplate.opsForValue().get("canal:" + key);if (null != slot) {if (slot instanceof Long) {return (long) slot;} else {return ((Integer) slot).longValue();}}return -1;}// 保存位点状态private void savePositionState(long position, String key) {// TODO: 将 position 保存到存储中redisTemplate.opsForValue().set("canal:" + key, position);}@Overridepublic void destroy() throws Exception {// 项目关闭断开连接if (null != connectors && !CollectionUtils.isEmpty(connectors)) {connectors.stream().forEach(oneConnect -> {if (null != oneConnect) {oneConnect.disconnect();}});}}
}

参数配置:

canal.enable=true
canal.destination.values=test,aabbcc
canal.client.subscribe.filters=test.test_user|test.user,biglog.about_us
canal.client.batch.sizes=10,10

2.2 消息处理

CanalListener.java

public interface CanalListener {void onMessage(Message msg);
}

MyCanalListener.java

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
@Component
public class MyCanalListener implements CanalListener {@Overridepublic void onMessage(Message msg) {List<CanalEntry.Entry> entries = msg.getEntries();for (CanalEntry.Entry entry : entries) {if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {CanalEntry.RowChange rowChange = null;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {throw new RuntimeException("parse error", e);}String tableName = entry.getHeader().getTableName();CanalEntry.EventType eventType = rowChange.getEventType();List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();String schemaName = entry.getHeader().getSchemaName();// 处理数据变更事件for (CanalEntry.RowData rowData : rowDataList) {switch (eventType) {case INSERT:// 处理插入事件dealInsert(schemaName, tableName, rowData.getAfterColumnsList());break;case UPDATE:// 处理更新事件dealUpdate(schemaName, tableName, rowData.getAfterColumnsList());break;case DELETE:// 处理删除事件dealDelate(schemaName, tableName, rowData.getBeforeColumnsList());break;default:break;}}}}}private void dealDelate(String schemaName, String tableName, List<CanalEntry.Column> afterColumnsList) {Map<String, Object> dataMap = new HashMap<>();for (CanalEntry.Column column : afterColumnsList) {dataMap.put(column.getName(), column.getValue());}
//        log.debug("delate data:{}", afterColumnsList);log.debug("delate map data:{}", dataMap);}private void dealUpdate(String schemaName, String tableName, List<CanalEntry.Column> columns) {Map<String, Object> dataMap = new HashMap<>();for (CanalEntry.Column column : columns) {dataMap.put(column.getName(), column.getValue());}
//        log.debug("update data:{}", columns);log.debug("update map data:{}", dataMap);}private void dealInsert(String schemaName, String tableName, List<CanalEntry.Column> columns) {Map<String, Object> dataMap = new HashMap<>();for (CanalEntry.Column column : columns) {dataMap.put(column.getName(), column.getValue());}
//        log.debug("insert data:{}", columns);log.debug("insert map data:{}", dataMap);}
}

总结

本文通过ImportBeanDefinitionRegistrar 进行canal客户端bean 定义的注册,通过FactoryBean ,注意canal 客户端的默认的id 为1001,目前canal server上的一个instance只能有一个client消费。

参考

Canal ClientAPI 参考;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/630421.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis 消息队列和发布订阅

文章目录 基本模式生产者消费者原理&模型redis实现java实现 发布者订阅者原理&模型redis实现java实现 stream模式原理&模型工作原理redis实现Java实现 选型外传 基本模式 采用redis 三种方案&#xff1a; ● 生产者消费者&#xff1a;一个消息只能有一个消费者 ●…

Smart Tomcat

Smart Tomcat插件可以让idea图形化界面让代码部署到tomcat上达成一键打包部署的过程 下面是idea安装使用Smart Tomcat的过程 我们直接在plugins(插件)里搜索Tomcat 然后下载第一个 然后点击Apply(应用) 在一个项目中 第一次使用时要进行配置Smart Tomcat Name 可以不配置…

vscode安装和基本设置

目录 vscode安装和基本设置1.HTML标签2.标签属性3.HTML基本结构4.安装vscode5.安装Live Server插件6.HTML注释7.文档说明8.HTML字符编码9.HTML设置语言10.HTML标准结构 vscode安装和基本设置 1.HTML标签 标签 又称 元素&#xff0c;是HTML的基本组成单位。标签分为&#xff1…

JSP简单学习

jsp是在html中嵌入java代码 jsp也是在j2ee服务端中的java组件 第一次运行 在第一次运行jsp代码时会经历以下步骤&#xff0c;将jsp转为java代码&#xff0c;将java代码转为class文件。 所以通常会比较慢&#xff0c;编译后就好多了。 四大作用域 requestsessionpageapplica…

使用pdfbox 为 PDF 增加水印

使用pdfbox 为 PDF增加水印https://www.jylt.cc/#/detail?activityIndex2&idbd410851b0a72dad3105f9d50787f914 引入依赖 <dependency><groupId>org.apache.pdfbox</groupId><artifactId>pdfbox</artifactId><version>3.0.1</ve…

cesiumlab切片通过arcgisjs加载

cesiumlab切片通过arcgisjs加载 需要注意2个地方&#xff0c;一个是tileInfo&#xff0c;一个是getTileUrl&#xff0c; 在tileInfo中定义好cesiumlab切片的相关信息。 getTileUrl 格式化url的格式。 注意设置编辑&#xff0c;避免超出范围报404。 <html lang"en"…

【栈】【字符串和int类型转化】Leetcode 150 逆波兰表达式求值

【栈】【字符串和int类型转化】Leetcode 150 逆波兰表达式求值 解法1 栈 ---------------&#x1f388;&#x1f388;题目链接 Leetcode 150 逆波兰表达式求值 &#x1f388;&#x1f388;------------------- 解法1 栈 字符串转化为int类型数据: Integer.parseInt(s) Long.p…

java多线程传参数

package com.myThread;public class AdminThread extends Thread{private String name;public void AdminThread(String name){this.name name;}Overridepublic void run() {//线程开始之后执行的代码for (int i 0; i < 100; i) {System.out.print(getName()"线程…

Web前端-移动web开发——flex布局

移动web开发——flex布局 1.0传统布局和flex布局对比 1.1传统布局 兼容性好布局繁琐局限性&#xff0c;不能再移动端很好的布局 1.2 flex布局 操作方便&#xff0c;布局极其简单&#xff0c;移动端使用比较广泛pc端浏览器支持情况比较差IE11或更低版本不支持flex或仅支持部…

ChatGPT 与生成式 AI 的崛起:第二十六章到第三十三章

原文&#xff1a;Rise of Generative AI and ChatGPT 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 第二十五章&#xff1a;ChatGPT 用于国家网络安全和技术政策 介绍 恐怖分子、罪犯、警察、国防、执法机构、工程师、作家和学生等都在使用 ChatGPT&#xff0c;这是…

如何进行产品的人机交互设计?

产品的人机交互设计是指通过用户界面和用户体验设计来优化产品与用户之间的交互过程&#xff0c;从而提高产品的易用性、可用性和用户满意度。人机交互设计需要考虑用户的需求、行为模式、心理感受以及技术实现&#xff0c;下面我将介绍如何进行产品的人机交互设计。 首先&…

蓝桥备战 每日一题 (1)

二分法 代码如下&#xff1a; #include<iostream> #define ll long long using namespace std;ll query(ll n) {ll ret 0;while (n > 0) {ret n / 5; // 要计算 从 1 到 n 中 是 5 的倍数的个数&#xff0c;但是要注意 如果是 25 &#xff0c;要计算两次n / 5;}r…

kubeadm常用命令

一 kubeadm token 命令 "kubectl和kubeadm命令执行的时候" 默认情况下,kubectl 在 $HOME/.kube 目录下查找名为config的文件可以通过设置KUBECONFIG环境变量或设置--kubeconfig参数来指定其他Kubeconfig文件 ​ 解决1&#xff1a; k8s token 过期 的解决方法 ① 总…

【学习笔记】[AGC063E] Child to Parent

提供一个多项式做法。 分别设 f u , i , g u , i f_{u,i},g_{u,i} fu,i​,gu,i​表示以 u u u为根时&#xff0c; a u i a_ui au​i和 a u ≥ i a_u\ge i au​≥i的方案数&#xff0c;合并子树 v v v时&#xff0c;转移如下&#xff1a; f u , i ∑ f u , i − k r g v . k…

达梦报错:无效IP,设置达梦访问IP白名单

管理工具连接远程库&#xff0c;报错&#xff1a;无效IP 1.:关闭防火墙 systemctl status firewalld service iptables status 2.查看用户和用户状态 select username,ACCOUNT_STATUS from dba_users;3.查看用户 访问ip限制和资源限制 select id,failed_num,allow_addr fro…

倒排索引介绍

1.简介 倒排索引&#xff08;Inverted Index&#xff09;是信息检索系统中最常用的数据结构之一&#xff0c;它用来存储在全文搜索下某个单词存在于哪些文档或数据记录中。与传统的正排索引&#xff08;文档到关键词的映射&#xff09;不同&#xff0c;倒排索引建立了从关键词…

目标检测开源数据集——道路坑洼

一、危害 对车辆的影响&#xff1a;道路坑洼会导致车辆行驶不稳&#xff0c;增加车辆的颠簸&#xff0c;不仅影响乘坐舒适度&#xff0c;还可能对车辆的悬挂系统、轮胎等造成损害。长期在坑洼路面上行驶&#xff0c;车辆的减震系统、悬挂系统等关键部件容易受损&#xff0c;进…

Redis和MySQL如何保持数据一致性

前言 在高并发的场景下&#xff0c;大量的请求直接访问Mysql很容易造成性能问题。所以&#xff0c;我们都会用Redis来做数据的缓存&#xff0c;削减对数据库的请求。但是&#xff0c;Mysql和Redis是两种不同的数据库&#xff0c;如何保证不同数据库之间数据的一致性就非常关键…

【封装一个日志库(linux)】【转载】

原文链接&#xff1a; 工具库1&#xff1a;封装一个日志库&#xff08;linux)-CSDN博客 一、&#xff0c;需求 &#xff08;1&#xff09;封装一个c/c日志库&#xff0c;提供格式打印接口&#xff0c;编写程序代码时方便使用&#xff1b; &#xff08;2&#xff09; 格式打印…

Mybatis----面向接口

让mybatis自动生成dao层接口的实现类 这是dao层接口的实现类&#xff0c;在mybatis中我们可以省略这种实现接口的方式&#xff0c;直接面向接口操作数据库&#xff0c;mybatis可以帮我们自动生成接口的实现类&#xff0c;也就是下面这个实现类mybatis帮我们生成了。 1、修改se…