【缓存与数据库结合最终方案】伪从技术

实现伪从技术:基于Binlog的Following表变更监听与缓存更新

技术方案概述

要实现一个专门消费者服务作为Following表的伪从,订阅binlog并在数据变更时更新缓存,可以采用以下技术方案:

主要组件

  1. MySQL Binlog监听:使用开源工具监听MySQL的binlog
  2. 消息队列:将变更事件发布到消息队列(可选)
  3. 消费者服务:处理变更事件并更新缓存
  4. 缓存系统:Redis或其他缓存解决方案

具体实现步骤

1. 配置MySQL Binlog

首先确保MySQL已开启binlog并配置为ROW模式:

-- 检查当前binlog配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';-- 修改my.cnf/my.ini文件
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

2. 使用Java实现Binlog监听

可以使用开源的mysql-binlog-connector-java库:

<!-- pom.xml 依赖 -->
<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.25.4</version>
</dependency>

3. 消费者服务实现

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;public class FollowingTableBinlogConsumer {private final BinaryLogClient client;private final CacheService cacheService;public FollowingTableBinlogConsumer(String hostname, int port, String username, String password, CacheService cacheService) {this.cacheService = cacheService;this.client = new BinaryLogClient(hostname, port, username, password);client.registerEventListener(event -> {EventData data = event.getData();if (data instanceof TableMapEventData) {// 表映射事件TableMapEventData tableMapEvent = (TableMapEventData) data;if ("your_database".equals(tableMapEvent.getDatabase()) && "Following".equals(tableMapEvent.getTable())) {// 处理Following表的事件}} else if (data instanceof WriteRowsEventData) {// 插入操作processWriteEvent((WriteRowsEventData) data);} else if (data instanceof UpdateRowsEventData) {// 更新操作processUpdateEvent((UpdateRowsEventData) data);} else if (data instanceof DeleteRowsEventData) {// 删除操作processDeleteEvent((DeleteRowsEventData) data);}});}private void processWriteEvent(WriteRowsEventData data) {// 处理新增关注事件for (Serializable[] row : data.getRows()) {Long followerId = (Long) row[0]; // 假设第一列是follower_idLong followeeId = (Long) row[1]; // 假设第二列是followee_idcacheService.addFollowing(followerId, followeeId);}}private void processUpdateEvent(UpdateRowsEventData data) {// 处理更新事件(如果Following表有更新操作)for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {Serializable[] before = row.getKey();Serializable[] after = row.getValue();// 根据业务逻辑处理更新}}private void processDeleteEvent(DeleteRowsEventData data) {// 处理取消关注事件for (Serializable[] row : data.getRows()) {Long followerId = (Long) row[0];Long followeeId = (Long) row[1];cacheService.removeFollowing(followerId, followeeId);}}public void start() {try {client.connect();} catch (IOException e) {throw new RuntimeException("Failed to connect to MySQL binlog", e);}}public void stop() {try {client.disconnect();} catch (IOException e) {// 处理异常}}
}

4. 缓存服务实现

public interface CacheService {void addFollowing(Long followerId, Long followeeId);void removeFollowing(Long followerId, Long followeeId);Set<Long> getFollowings(Long followerId);Set<Long> getFollowers(Long followeeId);
}public class RedisCacheService implements CacheService {private final JedisPool jedisPool;public RedisCacheService(JedisPool jedisPool) {this.jedisPool = jedisPool;}@Overridepublic void addFollowing(Long followerId, Long followeeId) {try (Jedis jedis = jedisPool.getResource()) {// 用户关注列表jedis.sadd("user:" + followerId + ":followings", followeeId.toString());// 用户粉丝列表jedis.sadd("user:" + followeeId + ":followers", followerId.toString());}}@Overridepublic void removeFollowing(Long followerId, Long followeeId) {try (Jedis jedis = jedisPool.getResource()) {// 用户关注列表jedis.srem("user:" + followerId + ":followings", followeeId.toString());// 用户粉丝列表jedis.srem("user:" + followeeId + ":followers", followerId.toString());}}@Overridepublic Set<Long> getFollowings(Long followerId) {try (Jedis jedis = jedisPool.getResource()) {Set<String> followings = jedis.smembers("user:" + followerId + ":followings");return followings.stream().map(Long::valueOf).collect(Collectors.toSet());}}@Overridepublic Set<Long> getFollowers(Long followeeId) {try (Jedis jedis = jedisPool.getResource()) {Set<String> followers = jedis.smembers("user:" + followeeId + ":followers");return followers.stream().map(Long::valueOf).collect(Collectors.toSet());}}
}

5. 服务启动

public class Application {public static void main(String[] args) {// 配置Redis连接池JedisPool jedisPool = new JedisPool("localhost", 6379);CacheService cacheService = new RedisCacheService(jedisPool);// 启动binlog消费者FollowingTableBinlogConsumer consumer = new FollowingTableBinlogConsumer("localhost", 3306, "username", "password", cacheService);consumer.start();// 添加关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(() -> {consumer.stop();jedisPool.close();}));}
}

高级优化方案

1. 引入消息队列(如Kafka)

// 在Binlog消费者中,将事件发布到Kafka
public class KafkaEventPublisher {private final Producer<String, String> producer;public KafkaEventPublisher(String bootstrapServers) {Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");this.producer = new KafkaProducer<>(props);}public void publishFollowingEvent(String eventType, Long followerId, Long followeeId) {String key = followerId + ":" + followeeId;String value = String.format("{\"eventType\":\"%s\",\"followerId\":%d,\"followeeId\":%d}", eventType, followerId, followeeId);producer.send(new ProducerRecord<>("following-events", key, value));}public void close() {producer.close();}
}// 然后有独立的消费者服务从Kafka消费并更新缓存

2. 处理初始数据同步

// 在服务启动时,先全量同步Following表数据到缓存
public void initialSync() {// 从数据库读取所有Following关系List<Following> allFollowings = followingRepository.findAll();// 批量写入缓存try (Jedis jedis = jedisPool.getResource()) {Pipeline pipeline = jedis.pipelined();for (Following following : allFollowings) {pipeline.sadd("user:" + following.getFollowerId() + ":followings", following.getFolloweeId().toString());pipeline.sadd("user:" + following.getFolloweeId() + ":followers", following.getFollowerId().toString());}pipeline.sync();}
}

3. 监控与容错

  • 记录binlog位置,以便重启后从正确位置继续
  • 实现重试机制处理缓存更新失败
  • 添加监控指标跟踪事件处理延迟和错误率

总结

这个方案实现了Following表的伪从技术,通过监听MySQL binlog实时捕获数据变更,并更新Redis缓存。这种架构具有以下优点:

  1. 低延迟:几乎实时同步数据库变更
  2. 解耦:消费者服务独立于主业务服务
  3. 可扩展:可以轻松添加更多消费者处理不同业务逻辑
  4. 高性能:Redis提供了高效的关系数据存储和查询

根据业务规模,可以选择简单的直接更新缓存方案,或者引入消息队列的更复杂架构。

经过对数据库设计、缓存设计的详细论证,总结并提炼出缓存与数据库结合的最终方案。

伪从方案应用场景如:用户关系服务,关注与取消关注的接口。

  • 即接口直接更新数据库Following表即响应用户,后续流程对用户来说是完全异步的。
  • Follower表、计数服务、Redis缓存会依赖Following表产生的binlog日志分别更新数据。

关于Binlog监听在服务重启/暂停时的数据丢失问题

Binlog监听在服务重启或暂停时是否会导致数据丢失,取决于具体的实现方式和配置。下面我将详细分析这个问题及解决方案。

关键影响因素

1. Binlog位置记录

  • 不记录位置:如果服务没有记录已处理的binlog位置,重启后会从当前最新的binlog位置开始,导致中间变更丢失
  • 记录位置:正确记录binlog位置可以确保重启后从断点继续

2. MySQL binlog保留策略

  • expire_logs_days参数决定binlog保留天数
  • 如果binlog被过早清除,而服务长时间停机,可能导致无法恢复

3. 事务完整性

  • 部分处理的事务在重启后可能导致不一致

解决方案

1. 持久化binlog位置

修改之前的消费者服务,增加位置记录功能:

public class FollowingTableBinlogConsumer {// 增加binlog位置存储接口private final BinlogPositionStore positionStore;public FollowingTableBinlogConsumer(..., BinlogPositionStore positionStore) {this.positionStore = positionStore;// 设置binlog文件名和位置BinlogPosition position = positionStore.getPosition();if (position != null) {client.setBinlogFilename(position.getFilename());client.setBinlogPosition(position.getPosition());}client.registerEventListener(event -> {// 处理事件...// 记录位置if (event.getHeader().getEventType() == EventType.ROTATE) {RotateEventData rotateEvent = (RotateEventData) event.getData();positionStore.savePosition(new BinlogPosition(rotateEvent.getBinlogFilename(), rotateEvent.getBinlogPosition()));} else if (event.getHeader().getEventType() != EventType.FORMAT_DESCRIPTION) {positionStore.savePosition(new BinlogPosition(client.getBinlogFilename(), event.getHeader().getNextPosition()));}});}
}// Binlog位置存储接口
public interface BinlogPositionStore {void savePosition(BinlogPosition position);BinlogPosition getPosition();
}// 简单的文件存储实现
public class FileBinlogPositionStore implements BinlogPositionStore {private final File positionFile;public FileBinlogPositionStore(String filePath) {this.positionFile = new File(filePath);}@Overridepublic void savePosition(BinlogPosition position) {try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(positionFile))) {out.writeObject(position);} catch (IOException e) {throw new RuntimeException("Failed to save binlog position", e);}}@Overridepublic BinlogPosition getPosition() {if (!positionFile.exists()) return null;try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(positionFile))) {return (BinlogPosition) in.readObject();} catch (Exception e) {throw new RuntimeException("Failed to read binlog position", e);}}
}// Binlog位置对象
public class BinlogPosition implements Serializable {private final String filename;private final long position;// constructor, getters...
}

2. MySQL配置优化

确保MySQL配置合理:

-- 设置足够的binlog保留时间(根据业务需求调整)
SET GLOBAL expire_logs_days = 7;-- 或使用新的变量(MySQL 8.0+)
SET GLOBAL binlog_expire_logs_seconds = 604800;  -- 7天

3. 启动时数据校验和修复

服务启动时增加校验逻辑:

public void start() {// 检查binlog位置是否有效BinlogPosition position = positionStore.getPosition();if (position != null) {if (!isBinlogFileExists(position.getFilename())) {// 执行全量同步initialSync();positionStore.clearPosition();}}client.connect();
}private boolean isBinlogFileExists(String filename) {// 实现检查binlog文件是否存在的逻辑// 可以通过SHOW BINARY LOGS命令获取当前存在的binlog文件列表
}

4. 优雅停机处理

确保服务停止时正确处理:

public void stop() {try {// 等待当前事件处理完成client.disconnect();// 确保最后的位置已保存positionStore.flush();} catch (IOException e) {// 处理异常}
}

高级保障方案

1. 引入事务表记录处理状态

创建一张事务记录表:

CREATE TABLE binlog_consumer_state (consumer_id VARCHAR(100) PRIMARY KEY,binlog_filename VARCHAR(100) NOT NULL,binlog_position BIGINT NOT NULL,last_heartbeat TIMESTAMP NOT NULL,processed_checksum VARCHAR(100)
);

2. 定期检查点(checkpoint)

// 每处理N个事件或每隔M秒记录一次完整状态
private void checkpoint(Event event) {// 计算当前已处理数据的校验和String checksum = computeChecksum();// 更新数据库状态jdbcTemplate.update("INSERT INTO binlog_consumer_state VALUES (?, ?, ?, NOW(), ?) " +"ON DUPLICATE KEY UPDATE binlog_filename=?, binlog_position=?, last_heartbeat=NOW(), processed_checksum=?",consumerId, client.getBinlogFilename(), event.getHeader().getNextPosition(), checksum,client.getBinlogFilename(), event.getHeader().getNextPosition(), checksum);
}

3. 数据修复机制

当检测到不一致时:

public void repairIfNeeded() {// 从数据库获取最后处理的状态StateRecord state = getLastStateFromDB();// 从缓存获取最后处理的状态StateRecord cacheState = getLastStateFromCache();if (!state.equals(cacheState)) {// 执行修复逻辑executeRepair(state);}
}

总结

正确实现的Binlog监听服务在重启/暂停时不会丢失数据,但需要:

  1. 持久化记录binlog位置(文件名+偏移量)
  2. 配置足够的binlog保留时间
  3. 实现优雅的停机和恢复机制
  4. 考虑增加校验和修复逻辑(针对关键业务)

建议的完整方案:

  • 使用混合位置存储(本地文件+数据库)
  • 定期检查点
  • 启动时数据校验
  • 足够的binlog保留期
  • 监控binlog消费延迟

这样即使在服务重启、暂停甚至长时间停机后,也能保证数据不会丢失,并能从正确的位置恢复处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/77385.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《100天精通Python——基础篇 2025 第3天:变量与数据类型全面解析,掌握Python核心语法》

目录 一、Python变量的定义和使用二、Python整数类型&#xff08;int&#xff09;详解三、Python小数/浮点数&#xff08;float&#xff09;类型详解四、Python复数类型(complex)详解---了解五、Python字符串详解(包含长字符串和原始字符串)5.1 处理字符串中的引号5.2 字符串的…

【前后端分离项目】Vue+Springboot+MySQL

文章目录 1.安装 Node.js2.配置 Node.js 环境3.安装 Node.js 国内镜像4.创建 Vue 项目5.运行 Vue 项目6.访问 Vue 项目7.创建 Spring Boot 项目8.运行 Spring Boot 项目9.访问 Spring Boot 项目10.实现 Vue 与 Spring Boot 联动11.安装 axios12.编写请求13.调用函数请求接口14.…

线性代数(一些别的应该关注的点)

一、矩阵 矩阵运算&#xff1a;线性变换 缩放、平移、旋转 无所不能的矩阵 - 三维图形变换_哔哩哔哩_bilibili

01Redis快速入门(nosql、安装redis、客户端、命令及类型、java客户端、序列化)

Redis的常见命令和客户端使用 1.初识Redis Redis是一种键值型的NoSql数据库&#xff0c;这里有两个关键字&#xff1a; 键值型 NoSql 其中键值型&#xff0c;是指Redis中存储的数据都是以key、value对的形式存储&#xff0c;而value的形式多种多样&#xff0c;可以是字符串…

AI编程:[体验]从 0 到 1 开发一个项目的初体验

一、开发信息 开发时间&#xff1a;1.5-2天工具使用&#xff1a; 不熟练&#xff0c;开发本项目前1天&#xff0c;才简单使用了Cursor的功能 功能复杂度&#xff1a; 开发的功能相对简单。页面&#xff1a;2个&#xff0c;登录页面&#xff0c;个人中心页面功能&#xff1a;5个…

LeetCode-392 判断子序列

给定字符串 s 和 t &#xff0c;判断 s 是否为 t 的子序列。 字符串的一个子序列是原始字符串删除一些&#xff08;也可以不删除&#xff09;字符而不改变剩余字符相对位置形成的新字符串。&#xff08;例如&#xff0c;"ace"是"abcde"的一个子序列&#…

Linux 系统监控大师:Glances 工具详解助力自动化

看图猜诗&#xff0c;你有任何想法都可以在评论区留言哦~ 摘要 Glances 是一款基于 Python 开发的跨平台系统监控工具&#xff0c;集成了 CPU、内存、磁盘、网络、进程等核心指标的实时监控能力&#xff0c;并支持命令行、Web界面、客户端-服务器模式等多种使用场景。其轻量级…

Spring Boot 3.4.5 运行环境需求

&#x1f4dd; Spring Boot 3.4.5 运行环境要求 &#x1f33f; 1️⃣ 基本需求 ☑️ JDK版本&#xff1a;最低 Java 17 &#x1f517; https://www.java.com/ 最高兼容至 Java 24 ☑️ 依赖框架&#xff1a;需搭配 Spring Framework 6.2.6 &#x1f517; https://docs.sprin…

在KEIL里C51和MDK兼容以及添加ARM compiler5 version编译器

前言 我们想在一个keil里面可以打开32和51的文件&#xff0c;这样就不需要两个keil了 还有就是现在的keil&#xff0c;比如我用的是5.41的&#xff0c;就没有5版本的处理器&#xff0c;所以要安装 本篇文章我们来详细讲解如何实现上面说的两个内容 准备的东西 1.ARM5编译器 …

Flutter 弹窗队列管理:支持优先级的线程安全通用弹窗队列系统

在复杂的 Flutter 应用开发中&#xff0c;弹窗管理是一个常见难题。手动管理弹窗的显示顺序和条件判断不仅繁琐&#xff0c;还容易出错。为此&#xff0c;我们实现了一个支持优先级的线程安全通用弹窗队列管理系统。它能够自动管理弹窗的显示顺序&#xff0c;支持条件判断&…

鸿蒙NEXT开发剪贴板工具类(ArkTs)

import { pasteboard } from kit.BasicServicesKit; import { StrUtil } from ./StrUtil;/*** 剪贴板工具类* 需要权限&#xff1a;* ohos.permission.READ_PASTEBOARD // 允许应用读取剪贴板。* author CSDN-鸿蒙布道师* since 2025/04/25*/ export class PasteboardUtil {…

FastAPI 零基础入门指南:10 分钟搭建高性能 API

一、为什么选择 FastAPI&#xff1f; 想象一下&#xff0c;用 Python 写 API 可以像搭积木一样简单&#xff0c;同时还能拥有媲美 Go 语言的性能&#xff0c;这个框架凭借三大核心优势迅速风靡全球&#xff1a; 开发效率提升 3 倍&#xff1a;类型注解 自动文档&#xff0c;…

【算法】BFS-解决FloodFill问题

目录 FloodFill问题 图像渲染 岛屿数量 岛屿的最大面积 被围绕的区域 FloodFill问题 FloodFill就是洪水灌溉的意思&#xff0c;假设有下面的一块田地&#xff0c;负数代表是凹地&#xff0c;正数代表是凸地&#xff0c;数字的大小表示凹或者凸的程度。现在下一场大雨&…

代码随想录算法训练营第三十七天|动态规划part4

1049. 最后一块石头的重量 II 题目链接&#xff1a; 1049. 最后一块石头的重量 II - 力扣&#xff08;LeetCode&#xff09; 文章讲解&#xff1a; 代码随想录 思路&#xff1a; 理解为把石头分成两堆 使得两堆的差值尽可能小 求这个最小值1 理解为往背包里装物品 每个物品的…

(八)深入了解AVFoundation-采集:拍照功能的实现

引言 在上一篇文章中&#xff0c;我们初步完成了使用 AVFoundation 采集视频数据的流程&#xff0c;掌握了 AVCaptureSession 的搭建与视频流的预览显示。 本篇将继续深入 AVFoundation&#xff0c;聚焦于静态图片采集的实现。通过 AVCapturePhotoOutput&#xff0c;我们可以…

git tag使用场景和实践

背景 每次上线一个迭代&#xff0c;为了区分本次代码的分支是哪个迭代的commit&#xff0c;可以给分支打上tag&#xff0c;这样利于追踪分支所属迭代&#xff0c;如果devops没有自动给分支打tag&#xff0c;需要自己来打 操作 1.查看当前tag git tag2.给分支打tag git tag…

从零开始掌握Linux数据流:管道与重定向完全指南

全文目录 1 知识背景与核心概念1.1 操作系统的输入输出模型1.2 Shell 的中间人角色 2 重定向技术深度解析2.1 输出重定向2.1.1 覆盖写2.1.2 追加写2.1.3 错误重定向2.1.4 同时重定向 stdout 和 stderr 2.2 输入重定向2.2.1 文件作为输入源2.2.2 Here Document&#xff08;多行输…

aws(学习笔记第三十九课) iot-core

文章目录 aws(学习笔记第三十九课) iotcore(Internet Of Thing)学习内容:1. 整体架构1.1 代码链接1.2 整体架构(概要)1.3 整体架构(详细 )2. 代码解析2.1 创建`IOT thing`2.2 创建`AWS IOT certificate`证书2.2.1 创建`lambda`需要的`role`2.2.2 创建`lambda`2.2.3 `lambd…

国家新政鼓励游戏出海,全球化安全威胁如何解

本文作者&#xff1a;腾讯宙斯盾DDoS防护团队 01 政策红利释放&#xff1a;游戏出海升级为“国家战略工程” 01 4月21日&#xff0c;国务院新闻办公室发布《加快推进服务业扩大开放综合试点工作方案》&#xff0c;释放了一个信号&#xff1a;首次将“游戏出海”列为战略级工程&…

MobX 在 React 中的使用:状态管理的新选择

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》、《前端求职突破计划》 &#x1f35a; 蓝桥云课签约作者、…