MySQL向Es数据同步策略

哈喽,大家好!目前有有一个小项目功能落到自己手中,也是一个面试必考点。如何保证MySQL与Es、Redis等之间的数据一致性,带着大家的问题,我给提供一种解决方案(最终一致性)

代码如下:

@Component
@Slf4j
@EnableAsync
@AllArgsConstructor
public class EsSynchronizeTasksJob {private final  ISysUserAllESService sysUserAllESService;private final SysUserAllESDocMapper userAllESDocMapper;private final SysPositRequestDocMapper sysPositRequestDocMapper;private final ISysPositRequestService sysPositRequestService;//    @Scheduled(fixedRate = 10000)
//    @Scheduled(cron = "0 0 3 * * ?")@Async("pushMysqlToEsExecutor")@Transactional(rollbackFor = Exception.class)public void syncDataToEsBySysUserAllES() throws Exception {log.info("<----异步执行数据同步开始---->");// 获取系统时间Long startTime = System.currentTimeMillis();process(sysPositRequestService, sysPositRequestDocMapper);process(sysUserAllESService, userAllESDocMapper);Long endTime = System.currentTimeMillis();log.info("<----异步执行数据同步结束---->");log.info("<----异步执行数据同步耗时---->" + (endTime - startTime) + "ms");}private <D extends BaseEntity, S extends IService<D>, M extends BaseEsMapper<D>> void process(IService service, BaseEsMapper mapper) throws Exception {Class<D> docClass = mapper.getEntityClass();String indexName = docClass.getAnnotation(Document.class).indexName();if (!mapper.existsIndex(indexName)) {boolean success = mapper.createIndex(indexName);if (success) {System.out.println("索引创建成功!indexName===>" + indexName);}} else {List<D> sysUserAllESDocs = mapper.selectList(new LambdaEsQueryWrapper<>());System.out.println("es中有" + sysUserAllESDocs.size());// TODO 现在是全量同步,后面待定时任务开启后就是每天新增的数据Integer delete = mapper.delete(new LambdaEsQueryWrapper<>());System.out.println("删除ES之前的数据" + delete);}// TODO 现在是全量同步,后面待定时任务开启后就是每天新增的数据List<D> list = service.lambdaQuery().getBaseMapper().selectList(new LambdaQueryWrapper<>());List<D> document = new ArrayList<>();for (D o : list) {D docInstance = docClass.getDeclaredConstructor().newInstance();BeanUtils.copyProperties(o, docInstance);Method setIdMethod = docClass.getMethod("setMysqlId", Long.class);Long idValue = (Long) o.getClass().getMethod("getId").invoke(o);setIdMethod.invoke(docInstance, idValue);document.add(docInstance);}int successCount = mapper.insertBatch(document);System.out.println("在MySQL表中有:" + list.size() + "条数据");System.out.println("向ES中成功添加:" + successCount + "条数据");if (list.size() == successCount) {System.out.println("同步成功");} else {int i = list.size() - successCount;System.out.println("同步失败,有" + i + "条数据未写入ES,请联系管理员!");}}
}

自定义线程池类:

package com.ruoyi.web.es.conifg;import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 七大参数:* 1 corePoolSize:线程池核心线程数量,核心线程不会被回收,即使没有任务执行,也会保持空闲状态。如果线程池中的线程少于此数目,则在执行任务时创建。* 2 maximumPoolSize:池允许最大的线程数,当线程数量达到corePoolSize,且workQueue队列塞满任务了之后,继续创建线程。* 3 keepAliveTime:超过corePoolSize之后的“临时线程”的存活时间。* 4 unit:keepAliveTime的单位。* 5 workQueue:当前线程数超过corePoolSize时,新的任务会处在等待状态,并存在workQueue中,BlockingQueue是一个先进先出的阻塞式队列实现,底层实现会涉及Java并发的AQS机制,有关于AQS的相关知识,我会单独写一篇,敬请期待。* 6 threadFactory:创建线程的工厂类,通常我们会自顶一个threadFactory设置线程的名称,这样我们就可以知道线程是由哪个工厂类创建的,可以快速定位。* 7 handler:线程池执行拒绝策略,当线数量达到maximumPoolSize大小,并且workQueue也已经塞满了任务的情况下,线程池会调用handler拒绝策略来处理请求。* 四大拒绝策略:* 1 AbortPolicy:为线程池默认的拒绝策略,该策略直接抛异常处理。* 2 DiscardPolicy:直接抛弃不处理。* 3 DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务* 4 CallerRunsPolicy:使用当前调用的线程来执行此任务** @author gao*/
@Configuration
public class ThreadPoolTaskExecutorConfig {// 核心线程数private static final int corePoolSize = 4;// 最大线程数private static final int maxPoolSize = 4;// 允许线程空闲时间(单位:默认为秒)private static final int keepAliveTime = 10;// 缓冲队列数private static final int queueCapacity = 15;// 线程池名前缀private static final String threadNamePrefix = "Thread-PushMessage-Service-";@Bean("pushMysqlToEsExecutor")public ThreadPoolTaskExecutor pushMessageExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(corePoolSize);executor.setMaxPoolSize(maxPoolSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(keepAliveTime);executor.setThreadNamePrefix(threadNamePrefix);// setWaitForTasksToCompleteOnShutdown(true): 该方法用来设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,这样这些 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。executor.setWaitForTasksToCompleteOnShutdown(true);// setAwaitTerminationSeconds(60): 该方法用来设置线程池中 任务的等待时间,如果超过这个时间还没有销毁就 强制销毁,以确保应用最后能够被关闭,而不是阻塞住。executor.setAwaitTerminationSeconds(60);// 线程池对拒绝任务的处理策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());// 初始化executor.initialize();return executor;}@Bean(name = "pushMysqlToEsScheduler")public ThreadPoolTaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS,new LinkedBlockingQueue<>(queueCapacity),new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").build());scheduler.setPoolSize(customThreadPool.getMaximumPoolSize());scheduler.setThreadNamePrefix(threadNamePrefix);return scheduler;}}

大家知道有几种保证数据一致性的解决方案呢?

①通过读写锁来实现

②通过中间件canal伪装成MySQL的从节点来实现(参考地址:超详细的Canal入门,看这篇就够了!-CSDN博客)

③通过MQ异步消息推送实现,MySQL数据发生变化时向MQ发送消息

④通过定时任务获取MySQL中更新的数据

..........

最后:①②是强一致性的解决方案,③④是最终一致性的解决方案。项目实战是一个漫长的过程,山高路远,道阻且长!加油💪🏻!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/29398.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Financial Statement Analysis with Large Language Models论文精读

Financial Statement Analysis with Large Language Models 论文精读 文章目录 Financial Statement Analysis with Large Language Models 论文精读Abstract 核心速览研究细节baselineGPT与分析师对比人类分析师与 GPT 的互补性错误预测的来源增量信息增益 分析师出现偏差或分…

Python实现连连看10

4.2.2 取消图片的标识 当玩家第一次选中图片时,会通过红色方框标识出该图片。当玩家发现选错了图片时,可以再次点击该图片,取消标识。 (1)记录第一次选中的图片 在“(2)标识选中的图片”中提到的代码中,FIRSTCLICK的值是True,也就是第一次点击图片时,记录该图片的…

【YOLOv10改进[注意力]】在YOLOv10中使用注意力MLCA的实践+ 含全部代码和详细修改方式 + 手撕结构图 + 全网首发

本文将进行在YOLOv10中添加注意力MLCA的实践,助力YOLOv10目标检测效果的实践,文中含全部代码、详细修改方式以及手撕结构图。助您轻松理解改进的方法。 改进前和改进后的参数对比: 目录 一 MLCA 二 在YOLOv10中使用注意力MLCA的实践 1 整体修改

编程男送什么礼物:探秘最佳选择与创意点子

编程男送什么礼物&#xff1a;探秘最佳选择与创意点子 在寻找送给编程男的礼物时&#xff0c;我们不仅要考虑礼物的实用性&#xff0c;还要兼顾其个人兴趣与特点。编程男通常对技术、创新和实用性有着极高的追求&#xff0c;因此选择一份既符合他们口味又能表达心意的礼物&…

【CS.AL】算法核心之分治算法:从入门到进阶

文章目录 1. 概述2. 适用场景3. 设计步骤4. 优缺点5. 典型应用6. 题目和代码示例6.1 简单题目&#xff1a;归并排序6.2 中等题目&#xff1a;最近点对问题6.3 困难题目&#xff1a;分数背包问题 7. 题目和思路表格8. 总结References 1000.01.CS.AL.1.4-核心-DivedeToConquerAlg…

JavaScript 数组操作和数学计算

计算数组平均值 使用 reduce 方法 const numbers [1, 2, 3, 4, 5]; const sum numbers.reduce((accumulator, currentValue) > accumulator currentValue, 0); const average sum / numbers.length; console.log(average); // 输出: 3使用 forEach 方法 const number…

字符串格式化的精度控制

我们可以用辅助符号m.n来控制宽度和精度&#xff0c;m控制宽度&#xff0c;若是宽度小于数字本身&#xff0c;不生效。 n控制小数点精度&#xff0c;要求是数字&#xff0c;会进行小数的四舍五入例如%5d&#xff08;数字宽度为5空格空格111&#xff09;&#xff0c;%2.3f&…

汇编语言程序设计 - 输入两个字数据(16位的数)X,Y,计算Z=X+Y,并把Z的结果显示出来

80x86汇编习题 题目描述&#xff1a;输入两个字数据&#xff08;16位的数&#xff09;X,Y&#xff0c;计算ZXY&#xff0c;并把Z的结果显示出来。提示&#xff1a;X&#xff0c;Y的输入可以是任何进制。 思路&#xff1a; 1&#xff0c;总共输入两个数 2&#xff0c;每个数…

Python兴趣编程百例:手把手带你开发一个图片转字符图的小工具

在数字世界的无尽探索中&#xff0c;我们时常被那些看似平凡的技术所启发&#xff0c;它们如同星辰般点缀着我们的创意天空。今天&#xff0c;我突发奇想&#xff0c;想要用Python开发一个将图片转化为字符画的小工具。这不仅是一次技术的实践&#xff0c;更是一场艺术与科技的…

多客陪玩系统源码支持二次开发陪玩预约系统搭建,打造专业游戏陪玩平台

简述 随着电竞行业的快速发展&#xff0c;电竞陪玩APP正在逐渐成为用户在休闲娱乐时的首选。为了吸引用户和提高用户体验&#xff0c;电竞陪玩APP开发需要定制一些特色功能&#xff0c;并通过合适的盈利模式来获得收益。本文将为您介绍电竞陪玩APP开发需要定制的特色功能以及常…

LiveCharts2:简单灵活交互式且功能强大的.NET图表库

前言 之前的文章中提到过ScottPlot、与oxyplot&#xff0c;这两个是比较常用的.NET图表库&#xff0c;今天介绍一款新的.NET图表库&#xff1a;LiveCharts2。 LiveCharts2介绍 LiveCharts2 是一个现代化的数据可视化库&#xff0c;用于创建动态和交互式图表&#xff0c;支持…

编程输出中间变量:深度解析与实战应用

编程输出中间变量&#xff1a;深度解析与实战应用 在编程过程中&#xff0c;中间变量是一个至关重要的概念。它们不仅有助于我们更好地理解和组织代码&#xff0c;还能提高程序的效率和可读性。那么&#xff0c;编程输出中间变量究竟是什么呢&#xff1f;本文将从四个方面、五…

一小时搞定JavaScript(2)——DOM与BOM的应用

前言,本篇文章是依据bilibili博主(波波酱老师)的学习笔记,波波酱老师讲的很好,很适合速成!!! 本篇文章会与java进行对比学习,因为JS中很多语法和java是相同的,所以大家最好熟悉Java语言后再来进行学习,效果更佳,见效更快. 文章目录 5.DOM和BOM5.1 DOM5.1.1传统元素获取5.1.2 C…

upload.dragger组件 拖拽上传到上传区域外时,浏览器自带下载、预览文件,如何禁止该默认事件【一步解决】

只需在非上传区域加下面两个事件&#xff0c;阻止默认行为 onDragOver{(e: any) > e.preventDefault()}//阻止浏览器默认拖拽下载事件onDrop{(e: any) > e.preventDefault()}>例如&#xff0c;我的拖拽上传区域在Modal内部&#xff0c;在modal外部加一个div 并指定上…

高考志愿填报,是选好专业,还是选好学校?过来人给你说说

分数限制下&#xff0c;选好专业还是选好学校&#xff1f; 到底是先选专业还是先选学校&#xff0c;是让考生及家长一直拿不准、辨不清的问题&#xff0c;是优先考虑学校还是专业&#xff0c;上了好学校&#xff0c;专业不喜欢就业前景不理想&#xff0c;怎么办&#xff1f;为…

【未来已来】AI大模型革命:向量数据库如何重塑智能世界?

在人工智能的浪潮中,向量数据库正成为推动AI大模型发展的幕后英雄。这不是简单的技术升级,而是一场关于智能未来的革命。本文将带您深入了解向量数据库如何成为AI大模型的核心竞争力,以及它如何助力我们在智能化的道路上加速前进。 向量数据库:AI大模型的心脏 想象一下…

vue echarts画多柱状图+多折线图

<!--多柱状图折线图--> <div class"echarts-box" id"multiBarPlusLine"></div>import * as echarts from echarts;mounted() {this.getMultiBarPlusLine() },getMultiBarPlusLine() {const container document.getElementById(multiBar…

Vue 3 的各生命周期使用场景

Vue 3 的生命周期相比于 Vue 2 有所不同&#xff0c;主要是因为 Vue 3 引入了 Composition API&#xff0c;改变了组件的书写方式和生命周期的调用顺序。以下是 Vue 3 中常用的生命周期钩子及其使用场景的通俗解释&#xff1a; ** 更新阶段 ** onBeforeUpdate 什么时候调用&a…

图书管理系统代码(Java)

1、运行演示 QQ2024528-205028-HD 详细讲解在这篇博客&#xff1a;JavaSE&#xff1a;图书管理系统-CSDN博客 2、所建的包 3、Java代码 3.1 book包 3.1.1 Book类代码 package book;/*** Created with IntelliJ IDEA.* Description:* User: dings* Date: 2024-05-13* Time:…

MyBatis 自定义映射 ResultMap:字段与属性的映射详解

在 MyBatis 框架中&#xff0c;ResultMap是一个非常强大的功能&#xff0c;它允许我们自定义SQL查询结果与Java对象之间的映射关系。特别是在数据库字段名和Java对象属性名不一致时&#xff0c;ResultMap能够帮助我们精确地映射数据。 ResultMap 的基本使用 若字段名和实体类…