场景举例:
为了查询方便性,目前订单中存在好多冗余字段,例如用户昵称,但是当昵称对应表变化时候,好多同学可能就直接在修改昵称的地方手 动调用订单接口更新昵称,但这样不仅代码结构混乱而且耦合严重
使用说明:
下面举例过程只是基于单机简单示例,没有加任务线程池、并发考虑。例外个人建议如果是同步重要的数据不要用此种同步方式,用不好可 能存在丢数据风险,向我上面说的一些仅展示无业务用户的字段可以用此同步
具体使用
应用maven核心依赖,其他spring等依赖可以自己添加
<!--binlog同步核心依赖-->
<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.21.0</version>
</dependency><!--mysql链接-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>6.0.6</version>
</dependency>
<!--mysql解析-->
<dependency><groupId>com.github.jsqlparser</groupId><artifactId>jsqlparser</artifactId><version>4.5</version>
</dependency>
下面是我举例说明具体实现代码,用户直接实现BizTableListener接口自定义自己想同步表逻辑即可,举例中我只放了些核心代码,具体代码已附件上传
代码
1.配置数据库信息
package com.data.binlog.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;@Data
@Component
@ConfigurationProperties(prefix = "binlog.datasource"
)
public class DataSourceConfig {private String url;private int port;private String username;private String passwd;private String db;}
logging:config: classpath:log4j2/log4j2-dev.yml
binlog:datasource:url: mysql的地址db: 数据库port: 端口username: 用户名passwd: 密码
2.加载binlog配置信息
package com.data.binlog.config;import com.data.binlog.listener.BinLogEventListener;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;@Configuration
public class BinaryLogClientConfig {@Resourceprivate BinLogEventListener binLogEventListener;@Resourceprivate DataSourceConfig dataSourceConfig;@Beanpublic void BinaryLog() throws Exception{BinaryLogClient client = new BinaryLogClient(dataSourceConfig.getUrl(), dataSourceConfig.getPort(), dataSourceConfig.getUsername(), dataSourceConfig.getPasswd());EventDeserializer eventDeserializer = new EventDeserializer();client.setEventDeserializer(eventDeserializer);client.registerEventListener(binLogEventListener);client.connect();}}
package com.data.binlog.listener;import cn.hutool.core.util.ObjectUtil;
import com.data.binlog.config.DataSourceConfig;
import com.data.binlog.context.TableContext;
import com.data.binlog.listener.biz.BizTableRouteProcess;
import com.data.binlog.param.BinLogItem;
import com.data.binlog.param.ColumnInfo;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.alter.Alter;
import net.sf.jsqlparser.util.TablesNamesFinder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.Serializable;
import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** binlog监听主类*/
@Component
@Slf4j
public class BinLogEventListener implements BinaryLogClient.EventListener {//调用用户自定义表处理实现类@Resourceprivate BizTableRouteProcess bizTableRouteProcess;@Resourceprivate DataSourceConfig dataSourceConfig;@Overridepublic void onEvent(Event event) {EventHeader header = event.getHeader();EventType eventType = header.getEventType();System.out.println("监听的事件类型:" + eventType);Map<String, Map<String, ColumnInfo>> tableIdColumnMap = TableContext.getTableIdColumnMap();Map<String, String> tableNameMap = TableContext.getTableNameMap();Map<String, String> tableIdMap = TableContext.getTableIdMap();if (eventType == EventType.TABLE_MAP) {TableMapEventData tableData = event.getData();String db = tableData.getDatabase();String table = tableData.getTable();String tableId = String.valueOf(tableData.getTableId());try {tableNameMap.put(table, tableId);tableIdMap.put(tableId, table);//如果缓存中有数据结构则直接掉过if (tableIdColumnMap.get(tableId) == null) {Map<String, ColumnInfo> columnInfoMap = getColMap(db, table);tableIdColumnMap.put(tableId, columnInfoMap);}} catch (Exception e) {throw new RuntimeException(e);}}if (eventType == EventType.QUERY) {//更新表结构会进此逻辑QueryEventData queryEventData = event.getData();String execSql = queryEventData.getSql();Statement statement = null;try {statement = CCJSqlParserUtil.parse(execSql);} catch (JSQLParserException e) {log.error("{} sql语句格式错误", execSql);return;}//如果字段发生更新需要删除重新获取if (statement instanceof Alter) {Alter alterStatement = (Alter) statement;String tableName = alterStatement.getTable().getName();//数据结构有变化清除tableIdColumnMap.remove(tableNameMap.get(tableName));log.error("数据结构变化", tableName);}}if (EventType.isWrite(eventType)) {//获取事件体WriteRowsEventData data = event.getData();} else if (EventType.isUpdate(eventType)) {UpdateRowsEventData data = (UpdateRowsEventData) event.getData();for (Map.Entry<Serializable[], Serializable[]> mapEntry : data.getRows()) {Map<String, ColumnInfo> columnInfoMap = tableIdColumnMap.get(String.valueOf(data.getTableId()));Map<String, Serializable> before = Maps.newHashMap();Map<String, Serializable> after = Maps.newHashMap();Map<String, Object[]> change = Maps.newHashMap();BinLogItem binLogItem = new BinLogItem();binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));columnInfoMap.entrySet().forEach(entry -> {String column = entry.getKey();ColumnInfo columnInfo = entry.getValue();Serializable beforeValue = mapEntry.getKey()[columnInfo.getIdx()];Serializable afterValue = mapEntry.getValue()[columnInfo.getIdx()];before.put(column, beforeValue);after.put(column, afterValue);if (!ObjectUtil.equals(beforeValue, afterValue)) {change.put(column, Lists.newArrayList(beforeValue, afterValue).toArray());}});binLogItem.setEventType(eventType);binLogItem.setTimestamp(event.getHeader().getTimestamp());binLogItem.setBefore(before);binLogItem.setAfter(after);binLogItem.setColumnChangeMap(change);binLogItem.setColumnInfoMap(columnInfoMap);bizTableRouteProcess.route(binLogItem);}System.out.println(data);} else if (EventType.isDelete(eventType)) {DeleteRowsEventData data = event.getData();BinLogItem binLogItem = new BinLogItem();binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));binLogItem.setEventType(eventType);binLogItem.setTimestamp(event.getHeader().getTimestamp());bizTableRouteProcess.route(binLogItem);System.out.println(data);}}/*** 获取表中所有的字段信息** @param db* @param table* @return* @throws Exception*/public Map<String, ColumnInfo> getColMap(String db, String table) throws Exception {Map<String, ColumnInfo> map = new HashMap<>();try {Class.forName("com.mysql.jdbc.Driver");// 保存当前注册的表的column信息Connection connection = DriverManager.getConnection("jdbc:mysql://" + dataSourceConfig.getUrl() + ":"+ dataSourceConfig.getPort(), dataSourceConfig.getUsername(), dataSourceConfig.getPasswd());// 执行sqlString preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, " +"DATA_TYPE, ORDINAL_POSITION -1 as ORDINAL_POSITION , case when COLUMN_KEY = 'PRI' THEN 'Y' ELSE 'N' END IS_PKC" +" FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ? order by ordinal_position";String sql = "SELECT * FROM TENANT_DOMAIN_REL";PreparedStatement ps = connection.prepareStatement(preSql);ps.setString(1, db);ps.setString(2, table);ResultSet rs = ps.executeQuery();while (rs.next()) {String schema = rs.getString("TABLE_SCHEMA");String tableName = rs.getString("TABLE_NAME");String column = rs.getString("COLUMN_NAME");int idx = rs.getInt("ORDINAL_POSITION");String dataType = rs.getString("DATA_TYPE");String isPKC = rs.getString("IS_PKC");ColumnInfo columnInfo = new ColumnInfo(idx, schema, tableName, column, dataType, "Y".equals(isPKC));map.put(column, columnInfo);}ps.close();rs.close();} catch (SQLException e) {}return map;}
}
package com.data.binlog.listener.biz;import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.data.binlog.annotation.BinLogTable;
import com.data.binlog.param.BinLogItem;
import com.data.binlog.param.ColumnInfo;
import com.jdl.edu.core.common.utils.SpringContextHolder;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;/*** 用户自定义实现路由*/
@Component
public class BizTableRouteProcess {public static Map<String, List<BizTableListener>> tableIdColumnMap = new HashMap<>();@Resourceprivate SpringContextHolder springContextHolder;@PostConstructpublic void initRoute() {// 使用ServiceLoader加载UserService接口的所有实现List<BizTableListener> serviceImpl = SpringContextHolder.getBeanList(BizTableListener.class);// 遍历加载的实现并获取它们的Class对象if(serviceImpl != null){for (BizTableListener implementation : serviceImpl) {BinLogTable table = implementation.getClass().getAnnotation(BinLogTable.class);if(table !=null && ObjectUtil.isNotEmpty(table.tableName())){String tableName = table.tableName();BizTableListener bean = SpringContextHolder.getBean(StrUtil.lowerFirst(implementation.getClass().getSimpleName()),BizTableListener.class);List<BizTableListener> tableImplList = tableIdColumnMap.getOrDefault(tableName,new ArrayList<>());tableImplList.add(bean);tableIdColumnMap.put(tableName,tableImplList);}}}}public void route(BinLogItem binLogItem) {List<BizTableListener> tableImplList = tableIdColumnMap.get(binLogItem.getTableName());if(ObjectUtil.isEmpty(tableImplList)){return;}tableImplList.forEach(impl->{impl.listener(binLogItem);});}}
下面是对user表变化的处理加工类,可以热插拔,直接打注解即可
package com.data.binlog.listener.biz;import com.data.binlog.param.BinLogItem;/*** 自定义处理类统一实现接口*/
public interface BizTableListener {public void listener(BinLogItem binLogItem);
}
package com.data.binlog.listener.biz.user;import com.data.binlog.annotation.BinLogTable;
import com.data.binlog.listener.biz.BizTableListener;
import com.data.binlog.param.BinLogItem;
import com.github.shyiko.mysql.binlog.event.EventType;
import org.springframework.stereotype.Component;/*** 用户想监听哪些表直接实现BizTableListener接口打BinLogTable注解上表明对应的表名*/
@BinLogTable(tableName = "user")
@Component
public class UserTableListener implements BizTableListener {private static Long currentTs = null;@Overridepublic void listener(BinLogItem binLogItem) {//全量更新才需判断if (currentTs != null && binLogItem.getTimestamp() != null && currentTs >= binLogItem.getTimestamp()) {System.out.println("当前消息不是最新数据");return;}if (EventType.isUpdate(binLogItem.getEventType())) {System.out.println("自己的逻辑");} else {///}currentTs = binLogItem.getTimestamp();}
}
总结
代码附件: https://download.csdn.net/download/zhaoyonghenghcl/89221807
上面是关于binlog同步基本代码实现,代码已分享链接,有问题随时沟通