SaaS 电商设计 (五) 私有化部署-实现 binlog 中间件适配

一、 背景

  具体的中间件私有化背景在上文 SaaS` 电商设计 (二) 私有化部署-缓存中间件适配 已有做相关介绍.这里具体讨论的场景是通过解析mysql binlog 来实现mysql到其他数据源的同步.具体比如:在电商的解决方案业务流中经常有 ES 的使用场景,用以解决一些复杂的查询和搜索商品的支持以及某些数据分析的场景.那就需要做到 mysql 数据库到 ES 的数据同步.在支持 mysqlES 数据同步的过程中,常用的技术方案有这样几种.

二、 设计主体

2.1 N种方案

方案1: 业务代码成功应答后操作目标数据源写入(本文用ES举例)
在这里插入图片描述

如上第一种方案在业务代码操作数据库, 异步执行 ES 数据同步写入.如:完成商品后写入数据,异步线程开启执行写入 ES 索引录入.

方案2:业务代码成功应答后,发送MQ,利用MQ来保证 ES 写入的最终一致
在这里插入图片描述

在第一种的方案中写入 ES 步骤中可能出现ES 写入失败case. 在方案一基础上为了保证可靠性引入 MQ ,保证在ES操作时出现异常抖动能够通过重试来保证数据的最终一致性.在业务代码中实际操作数据库后发送 MQ ,这边消费 MQ 执行 ES 数据同步.如:完成商品写入数据,发送消息 MQ , MQ consumer 消费写入 ES 索引录入.

方案 3.通过binlog 来实现数据库监听,保证数据同步脱离业务代码控制
在这里插入图片描述

  • 在大部分的场景下方案二完全能够满足业务诉求. 这样的一个方案在具体实施过程中存在两个点.

  • 业务开发的同时需要同步关心数据的同步
    在某种意义上来说,数据的同步并不是业务代码需要去关心的.业务代码永远关心的只是自身的逻辑实现,关注的是产品迭代过程中如何保证业务模型的可持续演进和领域资产沉淀.基于这个原则我的理解是需要把数据的同步从业务代码里进行剥离的.

    • 散落在各个业务代码角落的维护成本
      方案二的场景在很长时间的迭代过程中很可能就将出现这样的情况.商品添加进行商品的 ES 数据更新,门店添加进行门店的 ES 数据更新.诸如此类,长期迭代将得到大量的脚本代码,随着开发人员的更替,不断的迭代和开发.最终可能变成一座岌岌可危的高楼,开发人员小心翼翼的在原来的代码上继续裹上自己这版的裹脚布.维护性和成本指数上升.
      基于此我们尝试着借助 binlog 的这样一个工具来完善第二个方案适应更多索引更新,更加复杂的同步场景.首先 binlog 的形式能够通过仅监听数据库的 binlog 的消息来做到不同数据表数据更新的收口,我们可以在消费消息的入口来定义一个处理的接口,通过表名来进行不同表消费逻辑的实现.很简单就可以做到.一石二鸟做到数据处理的收口以及逻辑代码关于数据同步逻辑的抽离.

    方案4:完美终极方案(抽离技术细节的实现,做到binlog解析的接口和数据同步的接口化.)

在这里插入图片描述

对于第三种方式来说的话,接下来引入了第二个讨论的点.

  • 私有化支持
    就是在去做一些 SaaS 场景的私有化时,咱们再去做数据同步的时候不得不依赖 binlog ,那对于 binlog 的解析常见的工具也比较多.常见的开源的 canal ,各大厂里也有相应的工具,东厂的 DRC (前身binlake),福包厂的精卫.基于此在项目中不得不在这些不同的实现之上完成抽象.这样我们就能够在既支持到内部项目的数据监听,也能够完成项目实施私有化的场景部署.
  • 同步目标逻辑的不同支持
    在上文中我们提到的最多也就是关于 ES 数据的同步,那其实在实际的开发场景可能面临的更多,比如在数据库更新后的准实时缓存刷新,数据库写入商品成功后关于商品新建成功的三方消息同步.等等.同样我们在这个基础实现了一个接口,用来方便具体的使用方来进行具体消息处理.完美.

2.2 方案4 coding落地

2.2.1 类图

在这里插入图片描述
核心步骤:

step1:抽象MessageListener 实现 BinlogListener 完成 binlog 中间件解析发送的 MQ msg 得到反序列化的表数据.内含本次选取的反序列化类型.如:是canal 还是 DRC .
step2:抽象 BinlogClientAdapter 完成反序列化和处理msg接口定义.具体可以有 CanalBinlogAdapter,DrcBinlogClientAdapter实现.
step3:抽象BinlogDataHandler 完成具体表具体操作**(insert,delete,update,query)** 接口定义.具体在接入方进行实现MultiCloundBinLogDataHandler,这样在进行注入时得到具体的实现类,进行具体的实现操作.如:CategoryBinlogDataHandler.

2.2.2 核心实现

BinlogHandlerAdapter 完成 binlog client 接口定义.

package com.baixiu.middleware.binlog.adapter;import com.baixiu.middleware.binlog.model.BinlogData;
import com.baixiu.middleware.mq.model.CommonMessage;/*** binlog 适配器接口* 适配中间件list:canal,jingwei,drc等。* function1:完成不同中间件解析能力* function2:完成不同中间件handlerMsg能力* @author baixiu* @date 2023年12月11日*/
public interface BinlogHandlerAdapter {/*** 反序列MQMsg To binlogMsg* @param mqMsg mqMsg* @return*/BinlogData deserializationMQMsg(CommonMessage mqMsg);/*** 反序列MQMsg To binlogMsg* @param mqMsg mqMsg* @return*/void handleBinLogData(BinlogData binLogData) throws Exception;}

CanalBinlogHandlerAdapter 完成 canal 解析

package com.baixiu.middleware.binlog.adapter;import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.baixiu.middleware.binlog.consts.CommonConsts;
import com.baixiu.middleware.binlog.core.AbstractBinlogHandler;
import com.baixiu.middleware.binlog.core.BinlogTableHandlerRouter;
import com.baixiu.middleware.binlog.enums.CommonRowTypeEnum;
import com.baixiu.middleware.binlog.model.BinlogData;
import com.baixiu.middleware.binlog.model.BinlogDataToDiffModel;
import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel;
import com.baixiu.middleware.mq.model.CommonMessage;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** canal binlog handler adapter* 当property配置的clientType=canal时进行注入bean* canal client 用以解析 mq -starter 发送过来的消费消息* @author baixiu* @date 创建时间 2023/12/11 8:39 PM*/
@Slf4j
public class CanalBinlogHandlerAdapter implements BinlogHandlerAdapter{@Autowiredprivate BinlogTableHandlerRouter binlogTableHandlerRouter;@Overridepublic BinlogData deserializationMQMsg(CommonMessage mqMsg) {FlatMessage flatMessage = JSON.parseObject(mqMsg.getText(),FlatMessage.class);BinlogData binLogData=new BinlogData ();if(flatMessage!=null){binLogData.setBinlogDataObject(flatMessage);}return binLogData;}@Overridepublic void handleBinLogData(BinlogData binLogData) throws Exception {if(binLogData==null || binLogData.getBinlogDataObject()==null){return;}FlatMessage flatMessage= (FlatMessage) binLogData.getBinlogDataObject ();List<Map<String, String>> rowDatas = flatMessage.getData();List<Map<String, String>> oldDatas = flatMessage.getOld();String tableName = flatMessage.getTable();AbstractBinlogHandler handler = binlogTableHandlerRouter.ALL_TABLE_HANDLERS.get(tableName);for (int i = 0; i < rowDatas.size(); i++) {Map<String, String> rowData = rowDatas.get(i);Map<String, String> oldData = new HashMap<>(i,0.75f);if (oldDatas != null && oldDatas.size() == rowDatas.size()) {oldData = oldDatas.get(i);}Map<String, String> fieldsMaps = Maps.newHashMapWithExpectedSize(20);BinlogDataToDiffModel binlogDataToDiffModel = transRowDataToAllBinlogData(handler, rowData, oldData, fieldsMaps, flatMessage.getType());switch (binlogDataToDiffModel.getCommonRowTypeEnum()) {case INSERT:log.info("Canal.handleMessage.binlogTransConfigToMap.INSERT.{}", JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));handler.insert(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());break;case UPDATE:log.info("Canal.handleMessage.binlogTransConfigToMap.UPDATE.{}",JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));handler.update(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());break;case DELETE:Map<String, String> delMap = getBeforeColumnsFromBinlogData(handler, oldData);log.info("Canal.handleMessage.binlogTransConfigToMap.DELETE");handler.delete(delMap);break;default:log.info("CanalBinlogClientAdapter.handleMessage.binlogTransConfigToMap.default.{}",JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));break;}}}public static BinlogDataToDiffModel transRowDataToAllBinlogData(AbstractBinlogHandler binlogData, Map<String, String> afterColumns, Map<String, String> beforeColumns, Map<String, String> fieldsMap, String type) {try {String[] updateFields = binlogData.getUpdateFields();String[] keyFields = binlogData.getFields();List<BinlogTableRowDiffModel> changeList = new ArrayList<> ();for (String key : afterColumns.keySet()) {if (keyFields.length == 1 && ArrayUtils.contains(keyFields, CommonConsts.BINLOG_ALL_FIELDS)) {fieldsMap.put(key, afterColumns.get(key));} else if (ArrayUtils.contains(keyFields, key)) {fieldsMap.put(key, afterColumns.get(key));}if (beforeColumns != null && !beforeColumns.isEmpty() && beforeColumns.get(key) != null) {BinlogTableRowDiffModel bean = new BinlogTableRowDiffModel();bean.setField(key);bean.setAfter(afterColumns.get(key));bean.setBefore(beforeColumns.get(key));if (updateFields.length == 1 && ArrayUtils.contains(updateFields,CommonConsts.BINLOG_ALL_FIELDS)) {changeList.add(bean);} else if (ArrayUtils.contains(updateFields, key)) {changeList.add(bean);}}}BinlogDataToDiffModel data = new BinlogDataToDiffModel(changeList, fieldsMap, CommonRowTypeEnum.transType(type));log.info("transRowDataToAllBinlogData.changeList:{}.fieldsMap{}.data{}",JSON.toJSONString(changeList), JSON.toJSONString(fieldsMap), JSON.toJSONString(data));return data;} catch (Exception e) {log.error("handleMessage.transRowDataToAllBinlogData.handleMessage.error.{}", JSON.toJSONString(binlogData), e);}return null;}/*** 删除操作* 不同的表需要从binlogData中获取的信息不同,这里抽取** @return*/private Map<String, String> getBeforeColumnsFromBinlogData(AbstractBinlogHandler binlogData, Map<String, String> beforeColumns) {Map<String, String> keys = new HashMap<>();if (beforeColumns != null && !beforeColumns.isEmpty()) {String[] keyFields = binlogData.getFields();for (String key : beforeColumns.keySet()) {// 找出关心的字段值if (ArrayUtils.contains(keyFields, key)) {keys.put(key, beforeColumns.get(key));}}}return keys;}
}

AbstractBinlogHandler 抽象binloghandler 处理类.

package com.baixiu.middleware.binlog.core;import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel;
import java.util.List;
import java.util.Map;/*** @author baixiu* @date 创建时间 2023/12/12 11:31 AM*/
public interface AbstractBinlogHandler {/*** 需要关心的字段。实现后将仅实现的字段值放置于 fieldValues 中* @return 监控字段*/String[] getFields();/*** 需要关心的变更字段。实现后将仅实现的字段值放置于 changeList 中* @return 更新字段*/String[] getUpdateFields();/*** 新增时触发* @param fieldValues 唯一字段,用于确定一条数据* @param changeList 字段的值发生变化的* @throws Exception 业务exception*/void insert(Map<String, String> fieldValues, List<BinlogTableRowDiffModel> changeList) throws Exception;/*** 数据修改时触发* @param fieldValues 实现了getFields接口里得到的字段里的字段以及字段的值* @param changeList  字段的值发生变化的* @throws Exception 业务exception*/void update(Map<String, String> fieldValues, List<BinlogTableRowDiffModel> changeList) throws Exception;/*** 删除时触发* @param fieldValues 唯一字段,用于确定一条数据* @throws Exception 业务exception*/void delete(Map<String, String> fieldValues) throws Exception;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/224419.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32 寄存器配置笔记——I2C 读写AT24C02 EEPROM

一、简介 本文主要介绍STM32F10xx系列如何使用软件模拟I2C总线读写AT24C02的EEPROM数据。 二、概述 I2C协议是一种用于同步、半双工、串行总线(由单片机时钟线、单数据交换器数据线组成)上的协议。规定了总线空闲状态、起始条件、停止条件、数据有效性、字节格式、响应确认信号…

MES系统物料计划协同:全面解析与应用

一、MES系统物料计划协同概述 MES系统物料计划协同是指通过MES系统对物料计划进行统一管理和协调&#xff0c;确保生产计划的顺利进行。通过将物料需求、采购、库存、生产和配送等环节进行有效集成&#xff0c;实现供应链的优化。这种协同方式有助于提高供应链的透明度和协同性…

焊盘:十字连接VS全覆盖 铺铜

在铺铜规则中&#xff0c;焊盘连接方式有两种&#xff1a; 十字连接 优点&#xff1a;较好焊接&#xff1a;因铺铜面积减少&#xff0c;温度下降速度降低&#xff0c;较好焊接&#xff0c;不易虚焊。 缺点&#xff1a;载流能力较弱&#xff1a;铺铜面积↓ → 载流能力↓全连接…

Leetcode—118.杨辉三角【简单】

2023每日刷题&#xff08;六十&#xff09; Leetcode—118.杨辉三角 实现代码 class Solution { public:vector<vector<int>> generate(int numRows) {vector<vector<int>> ans(numRows);for(int i 0; i < numRows; i) {ans[i].resize(i 1);ans…

算法训练营Day14

#Java #二叉树层次遍历 #反转二叉树 开源学习资料 二叉树的层次遍历&#xff1a;力扣题目链接 二叉树的层次遍历很好理解&#xff1a; 就是从根结点一层一层地往下遍历&#xff08;同一层&#xff0c;从左到右&#xff09;&#xff1a; 迭代的方式很好理解&#xff1a;就是…

用实例域代替序数

在Java中&#xff0c;枚举类型的ordinal()方法返回枚举常量的序数&#xff08;即其在枚举声明中的位置&#xff09;。在某些情况下&#xff0c;使用实例域&#xff08;instance field&#xff09;代替序数可能更加安全和易读。以下是一个示例&#xff0c;演示如何使用实例域代替…

mysql CREATE DATABASE

DROP DATABASE IF EXISTS zengwenfeng;CREATE DATABASE zengwenfeng DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS 0;USE zengwenfeng; 脚本天天少这些&#xff0c;天天找这段&#xff01;

computed 和 watch 的奇妙世界:让数据驱动你的 Vue 应用(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

TestSSLServer4.exe工具使用方法简单介绍(查SSL的加密版本SSL3或是TLS1.2)

一、工具使用方法介绍 工具使用方法参照&#xff1a;http://www.bolet.org/TestSSLServer/ 全篇英文看不懂&#xff0c;翻译了下&#xff0c;能用到的简单介绍如下&#xff1a; 将下载的TestSSLServer4.exe工具放到桌面上&#xff0c;CMD命令行进入到桌面目录&#xff0c;执…

从 0 开始创建 SpringBoot 项目

从 0 开始创建 SpringBoot 项目 从 0 开始创建 SpringBoot 项目环境准备创建项目项目目录结构及说明编写代码参考 从 0 开始创建 SpringBoot 项目 环境准备 操作系统&#xff1a;Windows 10IDE&#xff1a;IntelliJ IDEA 2023.3.1Java 版本&#xff1a;jdk1.8 工具网盘链接&…

俄罗斯军方计划用 Astra Linux 取代 Windows!

网络安全正在改变全球化的面貌&#xff0c;各国政府为了防范外国的间谍和破坏活动&#xff0c;正积极发展自己的技术。在这一趋势下&#xff0c;俄罗斯军方已经开始用 Linux 发行版 Astra Linux 替换 Windows 系统。 如何提高Linux系统安全性&#xff1f;提升Linux安全的关键策…

垃圾收集器及内存分配

目录 垃圾收集器种类 HotSpot虚拟机所包含的收集器 垃圾收集器部分源码 垃圾收集器后台日志参数说明与配对关系 1、串行垃圾收集器 串行垃圾收集器运行示意图 1&#xff09;、编写测试代码 2&#xff09;、设置垃圾回收为串行收集器 3&#xff09;、启动程序&#xff…

Flink 数据集类型

现实世界中&#xff0c;所有的数据都是以流式的形态产生的&#xff0c;不管是哪里产生的数据&#xff0c;在产生的过程中都是一条条地生成&#xff0c;最后经过了存储和转换处理&#xff0c;形成了各种类型的数据集。如下图所示&#xff0c;根据现实的数据产生方式和数据产生是…

基于JavaWeb+SSM+Vue微信小程序的移动学习平台系统的设计和实现

基于JavaWebSSMVue微信小程序的移动学习平台系统的设计和实现 源码获取入口Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码获取入口 Lun文目录 第1章 绪论 1 1.1 课题背景 1 1.2 课题意义 1 1.3 研究内容 2 第2章 开发环…

【基础篇】1.1 认识STM32(二)

3.3 VREF/VREF-引脚 VREF和VREF-是STM32中用于提供参考电压的引脚。如下图&#xff1a; VREF引脚可以连接一个单独的外部参考电压&#xff0c;范围在2.0V&#xff5e;VDDA&#xff0c;但不能超过VDDA&#xff0c;否则就超过了模拟器件的最大供电电压。在100引脚的封装中&#…

文件上传自动化测试方案(超详细)

一、概述 【测试地址】&#xff1a;https://pan.baidu.com 【测试工具】&#xff1a;selenium、requests 【脚本语言】&#xff1a;Python 【运行环境】&#xff1a;Windows 百度网盘作为文件存储及分享的平台&#xff0c;核心功能大部分是对文件的操作&#xff0c;如果要…

如何一键打开系统属性,编辑环境变量

常规方法&#xff1a; ①右键此电脑→打开属性 ②在控制面版中→系统与安全→系统 对于以上方法&#xff0c;我的电脑都不行&#xff0c;右键属性没反应&#xff1b;点击系统也没反应&#xff0c;这时打开运行窗口&#xff08;winR&#xff09;→输入sysdm.cpl →就可以直接到…

Linux--Docker容器(最新)

这里写目录标题 安装Docker安装指令配置加速器 Docker简介名词解释作用run命令解读 操作常见命令命令的别名 数据卷简介数据卷命令使用 本地目录挂载问题发现问题解决二级目录二级目录 安装Docker 安装指令 如下文档 https://b11et3un53m.feishu.cn/wiki/Rfocw7ctXij2RBkShcu…

【教3妹学编程-算法题】反转二叉树的奇数层

插&#xff1a; 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 坚持不懈&#xff0c;越努力越幸运&#xff0c;大家一起学习鸭~~~ 3妹&#xff1a;“你不是真正的快乐&#xff0c; 你的…

开具实习证明:在线实习项目介绍

大数据在线实习项目&#xff0c;是在线上为学生提供实习经验的项目。我们希望能够帮助想要在毕业后从事数据科学类工作的学生更加顺利地适应从教室到职场的转换&#xff1b;也帮助那些在工作中需要处理数据、实现数据价值的其他职能的从业者高效快速地掌握每天都能用起来的数据…