基于binlog实现数据加工处理

场景举例:

为了查询方便性,目前订单中存在好多冗余字段,例如用户昵称,但是当昵称对应表变化时候,好多同学可能就直接在修改昵称的地方手 动调用订单接口更新昵称,但这样不仅代码结构混乱而且耦合严重

使用说明:

下面举例过程只是基于单机简单示例,没有加任务线程池、并发考虑。例外个人建议如果是同步重要的数据不要用此种同步方式,用不好可 能存在丢数据风险,向我上面说的一些仅展示无业务用户的字段可以用此同步

具体使用

应用maven核心依赖,其他spring等依赖可以自己添加

 <!--binlog同步核心依赖-->
<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.21.0</version>
</dependency><!--mysql链接-->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>6.0.6</version>
</dependency>
<!--mysql解析-->
<dependency><groupId>com.github.jsqlparser</groupId><artifactId>jsqlparser</artifactId><version>4.5</version>
</dependency>

下面是我举例说明具体实现代码,用户直接实现BizTableListener接口自定义自己想同步表逻辑即可,举例中我只放了些核心代码,具体代码已附件上传

代码

1.配置数据库信息

package com.data.binlog.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;@Data
@Component
@ConfigurationProperties(prefix = "binlog.datasource"
)
public class DataSourceConfig {private String url;private int port;private String username;private String passwd;private String db;}
logging:config: classpath:log4j2/log4j2-dev.yml
binlog:datasource:url: mysql的地址db: 数据库port: 端口username: 用户名passwd: 密码

2.加载binlog配置信息

package com.data.binlog.config;import com.data.binlog.listener.BinLogEventListener;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;@Configuration
public class BinaryLogClientConfig {@Resourceprivate BinLogEventListener binLogEventListener;@Resourceprivate DataSourceConfig dataSourceConfig;@Beanpublic void BinaryLog() throws Exception{BinaryLogClient client = new BinaryLogClient(dataSourceConfig.getUrl(), dataSourceConfig.getPort(), dataSourceConfig.getUsername(), dataSourceConfig.getPasswd());EventDeserializer eventDeserializer = new EventDeserializer();client.setEventDeserializer(eventDeserializer);client.registerEventListener(binLogEventListener);client.connect();}}
package com.data.binlog.listener;import cn.hutool.core.util.ObjectUtil;
import com.data.binlog.config.DataSourceConfig;
import com.data.binlog.context.TableContext;
import com.data.binlog.listener.biz.BizTableRouteProcess;
import com.data.binlog.param.BinLogItem;
import com.data.binlog.param.ColumnInfo;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.alter.Alter;
import net.sf.jsqlparser.util.TablesNamesFinder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.Serializable;
import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** binlog监听主类*/
@Component
@Slf4j
public class BinLogEventListener implements BinaryLogClient.EventListener {//调用用户自定义表处理实现类@Resourceprivate BizTableRouteProcess bizTableRouteProcess;@Resourceprivate DataSourceConfig dataSourceConfig;@Overridepublic void onEvent(Event event) {EventHeader header = event.getHeader();EventType eventType = header.getEventType();System.out.println("监听的事件类型:" + eventType);Map<String, Map<String, ColumnInfo>> tableIdColumnMap = TableContext.getTableIdColumnMap();Map<String, String> tableNameMap = TableContext.getTableNameMap();Map<String, String> tableIdMap = TableContext.getTableIdMap();if (eventType == EventType.TABLE_MAP) {TableMapEventData tableData = event.getData();String db = tableData.getDatabase();String table = tableData.getTable();String tableId = String.valueOf(tableData.getTableId());try {tableNameMap.put(table, tableId);tableIdMap.put(tableId, table);//如果缓存中有数据结构则直接掉过if (tableIdColumnMap.get(tableId) == null) {Map<String, ColumnInfo> columnInfoMap = getColMap(db, table);tableIdColumnMap.put(tableId, columnInfoMap);}} catch (Exception e) {throw new RuntimeException(e);}}if (eventType == EventType.QUERY) {//更新表结构会进此逻辑QueryEventData queryEventData = event.getData();String execSql = queryEventData.getSql();Statement statement = null;try {statement = CCJSqlParserUtil.parse(execSql);} catch (JSQLParserException e) {log.error("{} sql语句格式错误", execSql);return;}//如果字段发生更新需要删除重新获取if (statement instanceof Alter) {Alter alterStatement = (Alter) statement;String tableName = alterStatement.getTable().getName();//数据结构有变化清除tableIdColumnMap.remove(tableNameMap.get(tableName));log.error("数据结构变化", tableName);}}if (EventType.isWrite(eventType)) {//获取事件体WriteRowsEventData data = event.getData();} else if (EventType.isUpdate(eventType)) {UpdateRowsEventData data = (UpdateRowsEventData) event.getData();for (Map.Entry<Serializable[], Serializable[]> mapEntry : data.getRows()) {Map<String, ColumnInfo> columnInfoMap = tableIdColumnMap.get(String.valueOf(data.getTableId()));Map<String, Serializable> before = Maps.newHashMap();Map<String, Serializable> after = Maps.newHashMap();Map<String, Object[]> change = Maps.newHashMap();BinLogItem binLogItem = new BinLogItem();binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));columnInfoMap.entrySet().forEach(entry -> {String column = entry.getKey();ColumnInfo columnInfo = entry.getValue();Serializable beforeValue = mapEntry.getKey()[columnInfo.getIdx()];Serializable afterValue = mapEntry.getValue()[columnInfo.getIdx()];before.put(column, beforeValue);after.put(column, afterValue);if (!ObjectUtil.equals(beforeValue, afterValue)) {change.put(column, Lists.newArrayList(beforeValue, afterValue).toArray());}});binLogItem.setEventType(eventType);binLogItem.setTimestamp(event.getHeader().getTimestamp());binLogItem.setBefore(before);binLogItem.setAfter(after);binLogItem.setColumnChangeMap(change);binLogItem.setColumnInfoMap(columnInfoMap);bizTableRouteProcess.route(binLogItem);}System.out.println(data);} else if (EventType.isDelete(eventType)) {DeleteRowsEventData data = event.getData();BinLogItem binLogItem = new BinLogItem();binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));binLogItem.setEventType(eventType);binLogItem.setTimestamp(event.getHeader().getTimestamp());bizTableRouteProcess.route(binLogItem);System.out.println(data);}}/*** 获取表中所有的字段信息** @param db* @param table* @return* @throws Exception*/public Map<String, ColumnInfo> getColMap(String db, String table) throws Exception {Map<String, ColumnInfo> map = new HashMap<>();try {Class.forName("com.mysql.jdbc.Driver");// 保存当前注册的表的column信息Connection connection = DriverManager.getConnection("jdbc:mysql://" + dataSourceConfig.getUrl() + ":"+ dataSourceConfig.getPort(), dataSourceConfig.getUsername(), dataSourceConfig.getPasswd());// 执行sqlString preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, " +"DATA_TYPE, ORDINAL_POSITION -1 as ORDINAL_POSITION , case when COLUMN_KEY = 'PRI' THEN 'Y' ELSE 'N' END IS_PKC" +" FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ? order by ordinal_position";String sql = "SELECT * FROM TENANT_DOMAIN_REL";PreparedStatement ps = connection.prepareStatement(preSql);ps.setString(1, db);ps.setString(2, table);ResultSet rs = ps.executeQuery();while (rs.next()) {String schema = rs.getString("TABLE_SCHEMA");String tableName = rs.getString("TABLE_NAME");String column = rs.getString("COLUMN_NAME");int idx = rs.getInt("ORDINAL_POSITION");String dataType = rs.getString("DATA_TYPE");String isPKC = rs.getString("IS_PKC");ColumnInfo columnInfo = new ColumnInfo(idx, schema, tableName, column, dataType, "Y".equals(isPKC));map.put(column, columnInfo);}ps.close();rs.close();} catch (SQLException e) {}return map;}
}
package com.data.binlog.listener.biz;import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.data.binlog.annotation.BinLogTable;
import com.data.binlog.param.BinLogItem;
import com.data.binlog.param.ColumnInfo;
import com.jdl.edu.core.common.utils.SpringContextHolder;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;/*** 用户自定义实现路由*/
@Component
public class BizTableRouteProcess {public static Map<String, List<BizTableListener>> tableIdColumnMap = new HashMap<>();@Resourceprivate SpringContextHolder springContextHolder;@PostConstructpublic void initRoute() {// 使用ServiceLoader加载UserService接口的所有实现List<BizTableListener> serviceImpl = SpringContextHolder.getBeanList(BizTableListener.class);// 遍历加载的实现并获取它们的Class对象if(serviceImpl != null){for (BizTableListener implementation : serviceImpl) {BinLogTable table =  implementation.getClass().getAnnotation(BinLogTable.class);if(table !=null && ObjectUtil.isNotEmpty(table.tableName())){String tableName = table.tableName();BizTableListener bean =  SpringContextHolder.getBean(StrUtil.lowerFirst(implementation.getClass().getSimpleName()),BizTableListener.class);List<BizTableListener> tableImplList = tableIdColumnMap.getOrDefault(tableName,new ArrayList<>());tableImplList.add(bean);tableIdColumnMap.put(tableName,tableImplList);}}}}public void route(BinLogItem binLogItem) {List<BizTableListener> tableImplList =  tableIdColumnMap.get(binLogItem.getTableName());if(ObjectUtil.isEmpty(tableImplList)){return;}tableImplList.forEach(impl->{impl.listener(binLogItem);});}}

下面是对user表变化的处理加工类,可以热插拔,直接打注解即可

package com.data.binlog.listener.biz;import com.data.binlog.param.BinLogItem;/*** 自定义处理类统一实现接口*/
public interface BizTableListener {public void listener(BinLogItem binLogItem);
}
package com.data.binlog.listener.biz.user;import com.data.binlog.annotation.BinLogTable;
import com.data.binlog.listener.biz.BizTableListener;
import com.data.binlog.param.BinLogItem;
import com.github.shyiko.mysql.binlog.event.EventType;
import org.springframework.stereotype.Component;/*** 用户想监听哪些表直接实现BizTableListener接口打BinLogTable注解上表明对应的表名*/
@BinLogTable(tableName = "user")
@Component
public class UserTableListener implements BizTableListener {private static Long currentTs = null;@Overridepublic void listener(BinLogItem binLogItem) {//全量更新才需判断if (currentTs != null && binLogItem.getTimestamp() != null && currentTs >= binLogItem.getTimestamp()) {System.out.println("当前消息不是最新数据");return;}if (EventType.isUpdate(binLogItem.getEventType())) {System.out.println("自己的逻辑");} else {///}currentTs = binLogItem.getTimestamp();}
}

总结

代码附件: https://download.csdn.net/download/zhaoyonghenghcl/89221807
上面是关于binlog同步基本代码实现,代码已分享链接,有问题随时沟通

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/3282.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何设置微信自动回复?教你快速上手!

自动回复对于需要在微信上洽谈业务的人来说&#xff0c;无疑是非常实用的一个功能。 下面就一起来看看微信管理系统的机器人自动回复都有哪些设置吧&#xff01; 1、自动通过好友 只要有新的好友请求发送到你的微信账号&#xff0c;系统会自动通过该请求&#xff0c;无需手动…

vue flvjs 播放视频

写在前面&#xff1a; 之前使用过vodiejs插件播放过mp4视频格式的视频&#xff1b; 此次需要使用flvjs插件播放rtsp视频格式的视频&#xff1b; 因为视频的数据格式不同&#xff0c;所以对应的插件不同。 思维导图&#xff1a; 参考链接&#xff1a;rtmp、rtsp、flv、m3u8、 …

【活动汇总】纽约大学AI Meets Science;第十六届生物信息学和生物医学技术国际会议;Bio-IT World2024

HyperAI超神经将网罗海内外 AI for Science 垂类会议&#xff0c;并整理会议信息与链接&#xff0c;一键直达官方主页&#xff0c;不错过任何一场重量级活动&#xff01; 未来活动预告&#xff1a; 4 月 26 日&#xff0c;纽约大学「AI Meets Science」会议 5 月 16 日&#…

Linux制作docker镜像

一、制作镜像 1.在/home/data/images目录下编写Dockerfile文件 Dockerfile&#xff1a;是制作镜像的文件 vi Dockerfile FROM java:8 ENV JAVA_HOME/usr/lib/jvm/jdk1.8.0_181 ENV PATH$PATH:$JAVA_HOME/bin ENV LC_ALLen_US.utf8 ENV LANGen_US.utf8 ENV LANGUAGEen_US.utf…

锁 synchronized和lock

Synchronized 原理&#xff1a; 方法级的同步是隐式&#xff0c; 即无需通过字节码指令来控制的&#xff0c; 它实现在方法调用和返回操 作之中。JVM 可以从方法常量池中的方法表结构(method_info Structure) 中的 ACC_SYNCHRONIZED 访问标志区分一个方法是否同步方法 。当方法…

uiautomation 监控 Discord客户端的聊天记录 附python代码

一个Python脚本,用于监控和抓取Discord客户端的聊天记录。它使用了`uiautomation`库来模拟用户界面操作, 定义了一个名为`discord`的类,它初始化了几个变量,包括一个用于控制UI自动化的`UiaAPI`对象,以及一个用于存储会话项目的列表`SessionItemList`。 通过UI自动化获取名…

深度学习与目标检测:从卷积神经网络到YOLOv8概念介绍

深度学习与目标检测&#xff1a;从卷积神经网络到YOLOv8的深入探索 随着人工智能技术的迅猛发展&#xff0c;深度学习和计算机视觉领域取得了举世瞩目的成果。在目标检测这一关键任务中&#xff0c;卷积神经网络&#xff08;CNN&#xff09;和YOLO系列模型发挥着至关重要的作用…

Redis中的Lua脚本(六)

Lua脚本 清空repl_scriptcache_dict字典 每当主服务器添加一个新的从服务器时&#xff0c;主服务器都会清空自己的repl_scriptcache_dict字典&#xff0c;这是因为随着新从服务器的出现&#xff0c;repl_scriptcache_字典里面记录的脚本已经不再被所有从服务器载入过&#xf…

使用 pytorch训练自己的图片分类模型

如何自己训练一个图片分类模型&#xff0c;如果一切从头开始&#xff0c;对于一般公司或个人基本是难以实现的。其实&#xff0c;我们可以利用一个现有的图片分类模型&#xff0c;加上新的分类&#xff0c;这种方式叫做迁移学习&#xff0c;就是把现有的模式知识&#xff0c;转…

leetcode77--组合

1. 题意 给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 2. 题解 1. 回溯减枝 class Solution { public:vector<int> temp;vector<vector<int>> ans;void dfs(int cur, int n, int k) {// 剪…

流量分析利器arkime的学习之路(二)---API接口

前文回忆 《流量分析利器arkime的学习之路(一)---安装部署》 概述 注意点 Arkime对所有API调用都使用摘要身份验证,因此请确保在库或curl命令中启用摘要身份验证。学习如何进行API调用的最简单方法是打开浏览器的javascript控制台,观察Arkime UI正在进行的调用,它使用…

向表内INSERT数据出现ORA-00600 ktspgfb-inc2错误的分析处理

业务高峰期&#xff0c;业务维护人员反馈某业务卡主&#xff0c;发来报错一看&#xff0c;是ORA-00600...的&#xff0c;心理一下就紧张起来&#xff1b;当前版本的ORA-00600错误&#xff0c;基本分为了2类&#xff0c;要么没啥影响&#xff1b;如果对业务有影响了&#xff0c;…

强化学习的重要概念:环境、模型、策略和它们的关系

在强化学习中&#xff0c;环境&#xff08;Environment&#xff09;、模型&#xff08;Model&#xff09;和策略&#xff08;Policy&#xff09;是三个核心概念&#xff0c;它们之间的关系可以描述如下&#xff1a; 环境&#xff08;Environment&#xff09;&#xff1a; 环境是…

<component> <slot> <template>三者之间的区别与使用

学习目标&#xff1a; 目标 1、了解组件的含义 2、了解 的含义及用法 3、了解 的含义及用法 4、了解 的含义及用法 学习内容&#xff1a; 内容&#xff1a; 什么是组件&#xff1f; 组件的出现&#xff0c;就是为了拆分Vue实例的代码量&#xff0c;能够让我们以不同的组件&am…

deque的插入和删除

函数原型 两端插入操作 push_back(elem) //向容器尾部添加一个数据push_front(elem) //向容器头部插入一个数据pop_back() //删除容器最后一个数据 pop_front() //删除第一个容器第一个数据 …

点云数据处理的库

PCL、Open3D和OpenGL都是用于点云数据处理的常用库&#xff0c;它们各有优劣&#xff0c;具体如下&#xff1a; PCL&#xff08;Point Cloud Library&#xff09; PCL是一个非常流行的开源点云数据处理库&#xff0c;它支持从各种传感器&#xff08;如激光雷达、Kinect&#xf…

Python实战 | 只需“4步”入门网络爬虫(小白也会)

文章目录 Python实战 | 只需“4步”入门网络爬虫&#xff08;小白也会&#xff09;1&#xff1a;确定目标网站和数据2&#xff1a;安装必要的库3&#xff1a;编写爬虫代码4.目标网站的URL5.发送HTTP请求并获取响应内容6.使用BeautifulSoup解析HTML内容7.查找包含新闻标题和链接…

【golang学习之旅】Go 的基本数据类型

系列文章 【golang学习之旅】报错&#xff1a;a declared but not used 目录 系列文章总览布尔型&#xff08;bool&#xff09;字符串型&#xff08;string&#xff09;整数型&#xff08;int、uint、byte、rune&#xff09;浮点型&#xff08;float32、float64&#xff09;复…

【C++】——类与对象引入和认识

创作不易&#xff0c;多多支持&#xff01; 前言 有了上一篇博客的基础以后&#xff0c;就正式进入C类和对象的领域了&#xff0c;如果看完本篇文章对你有用&#xff0c;还请多多支持&#xff01;&#xff01;&#x1f618;&#x1f618; 一 面向过程和面向对象 1.面向过程 …

js的includes函数

在JavaScript中&#xff0c;includes() 是一个数组&#xff08;Array&#xff09;和字符串&#xff08;String&#xff09;对象的方法&#xff0c;用于确定一个数组是否包含一个特定的值&#xff0c;或者一个字符串是否包含一个特定的子串。如果找到该值或子串&#xff0c;则返…