文章目录
- 前言
- 一、批量注册bean定义:
- 1.1 定义Canal注解:
- 1.2 canal bean定义注册:
- 1.3 canal bean 生成:
- 二、canal客户端获取mysql数据变动
- 2.1 canal客户端
- 2.2 消息处理
- 总结
- 参考
前言
在项目中如果想要多个Canal 客户端通过tcp直连接入Canal 服务端,显然需要定义多个连接不同实例的客户端,而每个客户端除了连接到的实例不同其它配置几乎都相同,如果定义多个客户端显然会造成很多重复代码,那么spring 中有什么办法可以批量定义canal客户端?
一、批量注册bean定义:
我们知道spring中bean 的生成是依靠bean 定义,所以如果我们可以批量定义canal客户端 BeanDefinition ,然后将其注册到spring ,这样spring 就可以来生成我们需要的bean
。而在spring 中我们可以 使用ImportBeanDefinitionRegistrar来自定义bean;
1.1 定义Canal注解:
CanalConfig.java
import org.springframework.context.annotation.Import;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Import(CanalConnectorRegistry.class)
public @interface CanalConfig {// 定义需要连接的canal 实例数组String[] destinations() default "";
}
然后在 spring 启动类 就可以增加改注解:
import com.example.spring_canal.config.CanalConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
@CanalConfig(destinations = {"test","aabbcc"})
public class SpringCanalApplication {public static void main(String[] args) {SpringApplication.run(SpringCanalApplication.class, args);}}
1.2 canal bean定义注册:
CanalConnectorRegistry.java
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanNameGenerator;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;import java.util.Map;public class CanalConnectorRegistry implements ImportBeanDefinitionRegistrar {@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {// 获取CanalConfig 的注解内容Map<String, Object> annotationAttributes = importingClassMetadata.getAnnotationAttributes(CanalConfig.class.getName());// 获取destinations 要连接的实例String[] destinations = (String[]) annotationAttributes.get("destinations");for (int i = 0; i < destinations.length; i++) {GenericBeanDefinition beanDefinition = new GenericBeanDefinition();// 为CanalConnectorFactory 设置 destinationRegistry属性参数MutablePropertyValues properties = new MutablePropertyValues();properties.add("destinationRegistry", destinations[i]);beanDefinition.setPropertyValues(properties);// 定义 使用CanalConnectorFactory 来生成bean 对象beanDefinition.setBeanClass(CanalConnectorFactory.class);// 因为要生成的canalConnector bean对象都是CanalConnector 类型所以bean 的名称不能重复// 本文生成bean 的名称为canalConnector0,canalConnector1,,,registry.registerBeanDefinition("canalConnector" + i, beanDefinition);}}@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {ImportBeanDefinitionRegistrar.super.registerBeanDefinitions(importingClassMetadata, registry);}
}
1.3 canal bean 生成:
CanalConnectorFactory.java
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;@Component
public class CanalConnectorFactory implements FactoryBean {private String destinationRegistry;// 定义canal 服务段的地址和端口@Value("${canal.server.host}")private String canalServerHost;@Value("${canal.server.port}")private int canalServerPort;public void setDestinationRegistry(String destinationRegistry) {this.destinationRegistry = destinationRegistry;}public CanalConnector createConnector(String destination, String username, String password) {return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerHost, canalServerPort),destination, username, password);}public CanalConnector createConnector(String destination) {return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerHost, canalServerPort),destination, "", "");}@Overridepublic Object getObject() throws Exception {// 生成 canal 客户端的beanreturn createConnector(destinationRegistry);}@Overridepublic Class<?> getObjectType() {return CanalConnector.class;}
}
canal 服务端ip 端口定义:
canal.server.host=localhost
canal.server.port=11111
这样当在项目中 去获取 canalConnector0,canalConnector1,,,这样的bean 时就会通过 CanalConnectorFactory 的getObject() 去生成bean;
二、canal客户端获取mysql数据变动
2.1 canal客户端
CanalService2.java
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;@Slf4j
@Component
// 根据canal.enable 属性值取加载 这个bean 如果 canal.enable 为false 则不加载bean
@ConditionalOnProperty(name = "canal.enable", havingValue = "true")
public class CanalService2 implements DisposableBean {@Autowiredprivate ApplicationContext context;@Autowiredprivate RedisTemplate redisTemplate;// 定义要连接的实例数组,// 注意顺序和 @CanalConfig(destinations = {"test","aabbcc"}) 保持一致@Value("#{'${canal.destination.values}'.split(',')}")private List<String> destinations;// 定义每个实例中要监听的表,注意顺序和canal.destination.values 保持一致@Value("#{'${canal.client.subscribe.filters}'.split(',')}")private List<String> canalFilters;// 定义每个实例中每次要获取表动条数,注意顺序和canal.destination.values 保持一致@Value("#{'${canal.client.batch.sizes}'.split(',')}")private List<Integer> batchSizes;@Autowiredprivate CanalListener canalListener;private List<CanalConnector> connectors = new ArrayList<>(1 << 3);@PostConstructpublic void run() {// 开启线程进行数据消费for (int i = 0; i < destinations.size(); i++) {int finalI = i;new Thread(() -> toConsumeMessage(finalI, destinations.get(finalI))).start();}}private void toConsumeMessage(int i, String destination) {// 获取spring 容器中的 CanalConnector beanCanalConnector canalConnector = (CanalConnector) context.getBean("canalConnector" + i);// 收集项目中使用到的CanalConnector bean 便于后续项目终止进行端口连接使用connectors.add(canalConnector);// 定义最后消费的位点long lastOffset = fetchFromPosition(canalConnector, i, destination);while (true) {// 获取消息,并且不进行ack 确认Message message = canalConnector.getWithoutAck(batchSizes.get(i));long batchId = message.getId();List<CanalEntry.Entry> entryList = message.getEntries();int size = message.getEntries().size();// 如果没有获取到消息则2s 后在次进行获取if (batchId == -1 || entryList.isEmpty()) {try {// 线程休眠2秒Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}continue;}// 比对消费位点,如果项目中已经消费过该条数据则继续进行下一次数据拉取long nowOffset = entryList.get(0).getHeader().getLogfileOffset();if (nowOffset <= lastOffset) {continue;}try {// 消费数据canalListener.onMessage(message);// 向服务端提交ack 确认canalConnector.ack(batchId);// 保存最后消费的位点,防止项目重启后 重复消费消息lastOffset = message.getEntries().get(size - 1).getHeader().getLogfileOffset();savePositionState(lastOffset, destination);} catch (Exception ex) {log.error("consume error:{}", ex.getMessage());// 回滚到未进行 ack 的地方,指定回滚具体的batchIdcanalConnector.rollback(batchId);}}}// 获取并设置消费的起始位点private long fetchFromPosition(CanalConnector canalConnector, int i, String key) {// Canal 连接器连接canalConnector.connect();// 订阅数据变更canalConnector.subscribe(canalFilters.get(i));// 回滚到未进行 ack 的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿canalConnector.rollback();// 从存储中获取上次消费的位点long position = getPositionState(key);return position;}// 获取位点状态private long getPositionState(String key) {// TODO: 从存储中获取上次消费的位点Object slot = redisTemplate.opsForValue().get("canal:" + key);if (null != slot) {if (slot instanceof Long) {return (long) slot;} else {return ((Integer) slot).longValue();}}return -1;}// 保存位点状态private void savePositionState(long position, String key) {// TODO: 将 position 保存到存储中redisTemplate.opsForValue().set("canal:" + key, position);}@Overridepublic void destroy() throws Exception {// 项目关闭断开连接if (null != connectors && !CollectionUtils.isEmpty(connectors)) {connectors.stream().forEach(oneConnect -> {if (null != oneConnect) {oneConnect.disconnect();}});}}
}
参数配置:
canal.enable=true
canal.destination.values=test,aabbcc
canal.client.subscribe.filters=test.test_user|test.user,biglog.about_us
canal.client.batch.sizes=10,10
2.2 消息处理
CanalListener.java
public interface CanalListener {void onMessage(Message msg);
}
MyCanalListener.java
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
@Component
public class MyCanalListener implements CanalListener {@Overridepublic void onMessage(Message msg) {List<CanalEntry.Entry> entries = msg.getEntries();for (CanalEntry.Entry entry : entries) {if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {CanalEntry.RowChange rowChange = null;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {throw new RuntimeException("parse error", e);}String tableName = entry.getHeader().getTableName();CanalEntry.EventType eventType = rowChange.getEventType();List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();String schemaName = entry.getHeader().getSchemaName();// 处理数据变更事件for (CanalEntry.RowData rowData : rowDataList) {switch (eventType) {case INSERT:// 处理插入事件dealInsert(schemaName, tableName, rowData.getAfterColumnsList());break;case UPDATE:// 处理更新事件dealUpdate(schemaName, tableName, rowData.getAfterColumnsList());break;case DELETE:// 处理删除事件dealDelate(schemaName, tableName, rowData.getBeforeColumnsList());break;default:break;}}}}}private void dealDelate(String schemaName, String tableName, List<CanalEntry.Column> afterColumnsList) {Map<String, Object> dataMap = new HashMap<>();for (CanalEntry.Column column : afterColumnsList) {dataMap.put(column.getName(), column.getValue());}
// log.debug("delate data:{}", afterColumnsList);log.debug("delate map data:{}", dataMap);}private void dealUpdate(String schemaName, String tableName, List<CanalEntry.Column> columns) {Map<String, Object> dataMap = new HashMap<>();for (CanalEntry.Column column : columns) {dataMap.put(column.getName(), column.getValue());}
// log.debug("update data:{}", columns);log.debug("update map data:{}", dataMap);}private void dealInsert(String schemaName, String tableName, List<CanalEntry.Column> columns) {Map<String, Object> dataMap = new HashMap<>();for (CanalEntry.Column column : columns) {dataMap.put(column.getName(), column.getValue());}
// log.debug("insert data:{}", columns);log.debug("insert map data:{}", dataMap);}
}
总结
本文通过ImportBeanDefinitionRegistrar 进行canal客户端bean 定义的注册,通过FactoryBean ,注意canal 客户端的默认的id 为1001,目前canal server上的一个instance只能有一个client消费。
参考
Canal ClientAPI 参考;