数据同步后数据总条数对不上的问题解决

文章目录

    • @[toc]
  • 1.问题
  • 2.解决办法
    • 2.1)设置合理的线程池参数
    • 2.2)设置url连接参数
    • 2.3) 优化msql的系统参数
    • 2.4)使用CountDownLatch减法计数器和数据插入的公共方法新开一个事务
    • 2.5)sql批量注入器执行成功后,当前线程sleep(1000)睡1s
  • 3.业务代码套路如下
  • 4.总结

1.问题

  使用上一篇文章的思路来实现数据库表全量数据同步,遇到了一个奇葩的问题,在本地跑代码数据条数对上了,但是生产上线的时候跑数据居然条数对不上,于是乎,我进行了思考,最大的问题有以下几个原因:
  1.1)数据量太大,线程池的参数设置不合理,开的线程太多会导致数据库最大连接数不够而报一个最大连接数不够的异常,从而多出来的处理不了的连接超时就会被数据库丢弃了。
  1.2)数据库的参数性能不一样导致,发送过去插入的数据处理不过来,发送给数据库插入的批次数据越多,由于数据库性能不高处理不过来,导致数据库端阻塞,参数设置(最大连接数据,缓冲区大小等等参数)不合适,在大批量插入场景需要优化,优化数据库系统参数需要重启,所以该方案不适合生产环境。

  1.3)线程提交到线程池执行是异步都在一个主线程中,都是默认springBoot的事务隔离级别,就会导致,主线程执行完毕,但是子线程没有执行完毕,会导致所有的子线的程事务在同一个事务主线程事务里面,这个主线程的事务提交的数据量就会很大,主线程执行完后子线程还在执行,不断的往mysql数据库发送数据,导致mysql端阻塞处理不过来,而丢了一些批次的数据。

2.解决办法

2.1)设置合理的线程池参数

  核心线程数设置为4个,最大线程数设置为8个足够了,设置太多的话已经超过了mysql数据库机器性能了,一般机器就8核cpu,所以设置4-8个线程处理数据足够了,设置了太多的线程,一个是消耗资源不说,二一个是增大mysql的压力(使用过多的线程给mysql端发送大量的数据)。

package xxxxx;import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class EventListenerExecutor {public static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();static {executor.setCorePoolSize(4);// 配置最大线程数executor.setMaxPoolSize(8);// 配置缓存队列大小executor.setQueueCapacity(10000);// 空闲线程存活时间executor.setKeepAliveSeconds(15);executor.setThreadNamePrefix("event-listener");// 线程池拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//自定义数据策略executor.setRejectedExecutionHandler((r, executor) -> {try {executor.getQueue().put(r);} catch (InterruptedException e) {log.error("线程处理拒绝策略失败:{}",e.getMessage());e.printStackTrace();}});// 等待所有任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);// 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞executor.setAwaitTerminationSeconds(60);executor.initialize();}/*** 直接执行 不给返回值* @param task*/public static void execute(Runnable task) {executor.execute(task);}/*** 执行一哈 给返回值* @param task 定时处理* @return* @param <T>*/public static <T> Future<T> submit(Callable<T> task){return executor.submit(task);}}

2.2)设置url连接参数

​    mysql的驱动连接的url需要加:rewriteBatchedStatements=true参数

​    关于rewriteBatchedStatements这个参数介绍:

​    MySQL的JDBC连接的url中要加rewriteBatchedStatements参数,并保证5.1.13以上版本的驱动,才能实现高性能的批量插入。
MySQL JDBC驱动在默认情况下会无视executeBatch()语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,批量插入实际上是单条插入,直接造成较低的性能。
只有把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL
另外这个选项对INSERT/UPDATE/DELETE都有效

​    多数据源配置参看上一篇文章:

spring:datasource:p6spy: truedynamic:datasource:master:username: xxxpassword: xxxxxdriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://xxxx:3306/xxxxxx?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=trueold_db:url: jdbc:mysql://xxxxxx:3306/xxxxx?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghaiusername: xxxxxxxpassword: xxxxdriver-class-name: com.mysql.cj.jdbc.Driver

2.3) 优化msql的系统参数

set global max_allowed_packet=1024 *1024 * 512; # 单个packet可以允许的最大值
set global max_connections = 60000; # 并发连接请求量比较大,建议调高此值,以增加并行连接数量
set global innodb_lock_wait_timeout=16 * 1024; # 事务锁超时时间,默认50s,超过就报错
set global bulk_insert_buffer_size=512 * 1024 * 1024; # 加快insert插入效率
set global wsrep_max_ws_size=1024*1024*1024*4; # 避免事务大小超过限制,最大4G

  以上参数如果是在Navicat里面本次会话设置的,使用Navicat执行批量插入sql脚本只对本次会话有效,mysql重启后或会话关闭后失效,永久配置需要在my.cnf配置文件中修改,然后重启数据库即可,永久配置可以参考网上教程。

2.4)使用CountDownLatch减法计数器和数据插入的公共方法新开一个事务

  使用CountDownLatch减法计数器是为了让主线程等待每个子线程都执行完在往下执行,数据插入的公共方法上新开一个事务,一批数据开启一个新的事务,就相当于一个线程来执行数据提交,隔离性好,不会相互影响,特别是id不唯一的情况下会在一个事务中会相互影响的,小批次提高了效率。

 @Transactional(propagation = Propagation.REQUIRES_NEW)public void dealData(CopyOnWriteArrayList<CreditsRecordVO> creditsRecordVOS, CountDownLatch countDownLatch) {if (CollectionUtil.isNotEmpty(creditsRecordVOS)) {EventListenerExecutor.execute(() -> {doWork(creditsRecordVOS);countDownLatch.countDown();});}}

2.5)sql批量注入器执行成功后,当前线程sleep(1000)睡1s

  执行批量插入成功后,睡1s是为了给数据库一个缓冲的时间,让已经提交到mysql的数据执行完后在发送下一批数据过去,不至于发送大量数据导致mysq那边处理不过来而交通拥挤被挤出了赛道。

Integer cr = creditRecordMapper.insertBatchSomeColumn(result);
if (cr > 0) {log.info("插入积分记录ok");Thread.sleep(1000);
}

3.业务代码套路如下

McMembersServiceImpl类如下:

package xxxxx;import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.cursor.Cursor;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j
@DS("old_db")
@Service
public class McMembersServiceImpl extends ServiceImpl<McMembersMapper, McMembers> implements McMembersService {@Autowiredprivate SqlSessionFactory sqlSessionFactory;@Autowiredprivate CreditRecordService creditRecordService;@Overridepublic void dealData() {int pageSize = 20000;AtomicInteger index = new AtomicInteger(1);AtomicInteger totalIndex = new AtomicInteger(0);Long total2 = this.baseMapper.getTotal2();log.info("total2:{}",total2);Long pageCount2 = (total2 + pageSize - 1) / pageSize; //推荐写法log.info("pageCount2:{}", pageCount2);CountDownLatch countDownLatch = new CountDownLatch(Math.toIntExact(pageCount2));try (SqlSession sqlSession = sqlSessionFactory.openSession();Cursor<CreditsRecordVO> pageCursor = sqlSession.getMapper(McMembersMapper.class).getCursorData()) {List<CreditsRecordVO> creditsRecordVOS = new ArrayList<>();for (CreditsRecordVO creditsRecordVO : pageCursor) {creditsRecordVOS.add(creditsRecordVO);totalIndex.getAndIncrement();log.info("total:{}", totalIndex.get());if (index.getAndIncrement() == pageSize) {CopyOnWriteArrayList<CreditsRecordVO> creditsRecordVO2 = new CopyOnWriteArrayList<>(creditsRecordVOS);log.info("creditsRecordVO2.size:{}", creditsRecordVO2.size());creditRecordService.dealData(creditsRecordVO2, countDownLatch);creditsRecordVOS.clear();log.info("清空list:{}", creditsRecordVOS.size());index.set(0);}}if (CollectionUtil.isNotEmpty(creditsRecordVOS)) {CopyOnWriteArrayList<CreditsRecordVO> creditsRecordVO3 = new CopyOnWriteArrayList<>(creditsRecordVOS);log.info("creditsRecordVO3.size:{}", creditsRecordVO3.size());creditRecordService.dealData(creditsRecordVO3, countDownLatch);creditsRecordVOS.clear();log.info("清空list2:{}", creditsRecordVOS.size());index.set(0);}countDownLatch.await();} catch (Exception e) {log.error("游标查询异常:{}", e.getMessage());}log.info("total:{}", totalIndex.get());}@Overridepublic Long getTotal2() {return baseMapper.getTotal2();}}

CreditRecordServiceImpl类如下:

package xxxxxxx;import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeanUtils;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAdjusters;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;@Slf4j
@Service
@RequiredArgsConstructor
public class CreditRecordServiceImpl extends ServiceImpl<CreditRecordMapper, CreditRecord> implements CreditRecordService {private final TransactionDefinition transactionDefinition;private final DataSourceTransactionManager transactionManager;private final CreditRecordMapper creditRecordMapper;private final MemberMapper memberMapper;@Override@Transactional(propagation = Propagation.REQUIRES_NEW)public void dealData(CopyOnWriteArrayList<CreditsRecordVO> creditsRecordVOS, CountDownLatch countDownLatch) {if (CollectionUtil.isNotEmpty(creditsRecordVOS)) {EventListenerExecutor.execute(() -> {doWork(creditsRecordVOS);countDownLatch.countDown();});}}private void doWork(CopyOnWriteArrayList<CreditsRecordVO> creditLogVOS) {try {List<CreditRecord> result = new ArrayList<>();for (CreditsRecordVO creditLogVO : creditLogVOS) {CreditRecord creditRecord = new CreditRecord();creditRecord.setId(Long.valueOf(creditLogVO.getId()));creditRecord.setContent(creditLogVO.getRemark() != null ? creditLogVO.getRemark() : "");if (Objects.nonNull(creditLogVO.getNum())) {creditRecord.setNum(BigDecimal.valueOf(creditLogVO.getNum()));if (creditLogVO.getNum() > 0) {creditRecord.setFlowType(1);} else if (creditLogVO.getNum() < 0) {creditRecord.setFlowType(1);}} else {continue;}creditRecord.setCreditType(creditLogVO.getCredittype());creditRecord.setStatus(1);if (Objects.nonNull(creditLogVO.getUid())) {creditRecord.setMemberId(Long.valueOf(creditLogVO.getUid()));} else {continue;}Integer createtime = creditLogVO.getCreatetime();if (Objects.nonNull(createtime)) {LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(createtime), ZoneOffset.ofHours(8));creditRecord.setCreateTime(localDateTime);}creditRecord.setBalanceSnapshot(BigDecimal.valueOf(-1));creditRecord.setDeleted(0);creditRecord.setIsExp(0);creditRecord.setFromId(0L);creditRecord.setOpType(0);creditRecord.setOrderNo("");creditRecord.setUpdateTime(LocalDateTime.now());result.add(creditRecord);}log.info("插入数据条数:{}", result.size());Integer cr = creditRecordMapper.insertBatchSomeColumn(result);if (cr > 0) {log.info("插入积分记录ok");Thread.sleep(1000);}} catch (Exception e) {e.printStackTrace();log.error("插入积分记录异常:{}", ExceptionUtils.getMessage(e));}}}

4.总结

  这个是一个经过实战的套路,最近重构了一个会员模块涉及到会员表数据(4百多万)、粉丝表数据(3百多万)、关联映射表数据(4百多万)还有积分日志记录数据(1千多万),需要清洗同步到新的表中,前三个表是有关联的白表总的数据量有1千多万,后面的1千多万,总的2千多万数据同步时间只需要20分钟就可以搞定的,性能和效率还是杠杠的,执行时间如下图所示:

在这里插入图片描述

  如果这个问题没有处理好,生产少的的会员数据就需要重新去反查,使用sql的join查出少了的数据重新补上去,由于用户数据同步少了,老用户登录系统就会变成一个新用户,就会带来一系列的问题,之前用老账号下的单子,登录后是一个新用户,关联不到订单了,所以需要去修复各种业务数据,数据补完后需要把新用户相关删除,然后清除redis的用户相关的数据,什么token/access_token/用户信息了,数据量大,那生产redis的用户相关的缓存数据也大的,这些由于数据同步的问题带来的后续这种蛋疼的问题,也是让人头疼,所以写了一篇避坑提效的文章,希望对你有帮助,请一键三连,么么哒,如何批量删除redis中的用户信息,请看下一篇文章!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/46301.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++11 智能指针

文章目录 什么是智能指针为什么用智能指针智能指针的类型和各自的功能auto_ptr&#xff08;c98使用的&#xff0c;现已废弃&#xff09;unique_ptrshared_ptrweak_ptr unique_ptr和shared_ptr的简单模拟实现 什么是智能指针 智能指针是一种在编程中用于管理动态分配内存的指针…

DC电源模块如何调节电源输出电压和电流

BOSHIDA DC电源模块如何调节电源输出电压和电流 DC电源模块是一种电源转换器&#xff0c;在电子设备中广泛使用。它可以将交流电转换为直流电&#xff0c;或者将低电压直流电转换为高电压直流电。 DC电源模块通常可以调节输出电压和电流&#xff0c;以满足各种电子设备的不同需…

【C++】 使用红黑树模拟实现STL中的map与set

文章目录 前言1. 对之前实现的红黑树进行一些补充和完善1.1 析构1.2 查找 2. STL源码中map和set的实现3. 改造红黑树封装map和set3.1 红黑树结构修改3.2 map、set的结构定义3.3 insert的封装3.4 insert测试3.5 发现问题并解决3.6 红黑树迭代器实现3.7 封装set和map的迭代器并测…

RK3399平台开发系列讲解(内核调试篇)Valgrind使用案例

🚀返回专栏总目录 文章目录 一、使用未初始化的内存案例二、内存泄露三、在内存被释放后进行读/写案例四、从已分配内存块的尾部进行读/写案例五、两次释放内存案例沉淀、分享、成长,让自己和他人都能有所收获!😄 📢Valgrind 是一个开源的内存调试和性能分析工具,用于…

Electron 报gpu_process_host.cc(951)] GPU process launch faile错误

解决方法&#xff0c;在入口js文件中&#xff0c;添加如下代码: app.commandLine.appendSwitch(no-sandbox)

Eltima USB Network Gate 10.0 Crack

USB Network Gate -通过网络共享USB 设备 USB Network Gate (前身为以太网USB控制器USB) 轻松的通过网络(Internet/LAN/WAN)分享您的一个或者多个连接到您计算机的USB设备。 无论您身处异国还是近在隔壁办公室&#xff0c;您都可以轻松使用远程扫描仪、打印机、摄像头、调制解…

CVE-2015-5254漏洞复现

1.漏洞介绍。 Apache ActiveMQ 是美国阿帕奇&#xff08;Apache&#xff09;软件基金会所研发的一套开源的消息中间件&#xff0c;它支持 Java 消息服务&#xff0c;集群&#xff0c;Spring Framework 等。Apache ActiveMQ 5.13.0之前 5.x 版本中存在安全漏洞&#xff0c;该漏…

《HeadFirst设计模式(第二版)》第十一章代码——代理模式

代码文件目录&#xff1a; RMI&#xff1a; MyRemote package Chapter11_ProxyPattern.RMI;import java.rmi.Remote; import java.rmi.RemoteException;public interface MyRemote extends Remote {public String sayHello() throws RemoteException; }MyRemoteClient packa…

SpringBoot基于Zookeeper实现分布式锁

文章目录 问题背景前言实现搭建Zookeeper容器引入依赖ZK客户端的配置类ZK客户端的工厂类注入bean构建测试类 问题背景 研究分布式锁&#xff0c;基于ZK实现&#xff0c;需要整合到SpringBoot使用 前言 参考自SpringBoot集成Curator实现Zookeeper基本操作&#xff0c;Zookeeper入…

ssm+vue校园美食交流系统源码

ssmvue校园美食交流系统源码和论文026 开发工具&#xff1a;idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 技术&#xff1a;ssm 摘 要 随着现在网络的快速发展&#xff0c;网上管理系统也逐渐快速发展起来&#xff0c;网上管理模式很快融入到了许多商…

el-table 实现动态表头 静态内容 根据数据显示动态输入框

直接放代码了 <el-table:data"form.tableDataA"borderstripestyle"width: 100%; margin-top: 20px"><el-table-columnv-for"(category, categoryIndex) in form.tableDataA":key"categoryIndex":label"category.name&qu…

Java虚拟机(JVM):垃圾收集算法

目录 一、分代收集理论 二、标记-清除算法 三、标记-复制算法 四、标记-整理算法 一、分代收集理论 分代收集理论建立在两个分代假说之上&#xff1a; 1、弱分代假说&#xff1a;绝大多数对象都是朝生夕灭的。 2、强分代假说&#xff1a;熬过越多次垃圾收集过程的对象就…

5.8.webrtc事件处理基础知识

在之前的课程中呢&#xff0c;我向你介绍了大量web rtc线程相关内容&#xff0c;今天呢&#xff0c;我们来看一下线程事件处理的基本知识。首先&#xff0c;我们要清楚啊&#xff0c;不同的平台处理事件的API是不一样的&#xff0c;这就如同我们当时创建线程是类似的&#xff0…

K8s实战4-使用Helm在Azure上部署Ingress-Nginx和Tokengateway

手动发布Ingress-Nginx 1 登录到aks(dfinder-gw-aks) az login az account set --subscription ${sub ID} az aks get-credentials --resource-group ${groupname} --name ${aks name} 2 下载 ingress-nginx-4.2.5.tgz curl -LO https://github.com/kubernetes/ingress-ngi…

“开发和运维”只是一个开始,最终目标是构建高质量的软件工程

随着技术的飞速发展&#xff0c;软件行业不断寻求改进和创新的方法来提供更高质量的产品。在这方面&#xff0c;DevOps已经展现出了巨大的潜力。通过打破开发和运维之间的壁垒&#xff0c;DevOps将持续集成、持续交付和自动化流程引入到软件开发中&#xff0c;使团队能够更快地…

数字孪生助力智慧水务:科技创新赋能水资源保护

智慧水务中&#xff0c;数字孪生有着深远的作用&#xff0c;正引领着水资源管理和环境保护的创新变革。随着城市化和工业化的不断推进&#xff0c;水资源的可持续利用和管理愈发显得重要&#xff0c;而数字孪生技术为解决这一挑战提供了独特的解决方案。 数字孪生技术&#xf…

Docker容器无法启动 Cannot find /usr/local/tomcat/bin/setclasspath.sh

报错信息如下 解决办法 权限不够 加上--privileged 获取最大权限 docker run --privileged --name lenglianerqi -p 9266:8080 -v /opt/docker/lenglianerqi/webapps:/usr/local/tomcat/webapps/ -v /opt/docker/lenglianerqi/webapps/userfile:/usr/local/tomcat/webapps/u…

Qt安卓开发经验技巧总结V202308

01&#xff1a;01-05 pro中引入安卓拓展模块 QT androidextras 。pro中指定安卓打包目录 ANDROID_PACKAGE_SOURCE_DIR $$PWD/android 指定引入安卓特定目录比如程序图标、变量、颜色、java代码文件、jar库文件等。 AndroidManifest.xml 每个程序唯一的一个全局配置文件&…

【Redis】Redis中的布隆过滤器

【Redis】Redis中的布隆过滤器 前言 在实际开发中&#xff0c;会遇到很多要判断一个元素是否在某个集合中的业务场景&#xff0c;类似于垃圾邮件的识别&#xff0c;恶意IP地址的访问&#xff0c;缓存穿透等情况。类似于缓存穿透这种情况&#xff0c;有许多的解决方法&#xf…

基于MATLAB开发AUTOSAR软件应用层Code mapping专题-part 2 Inport和Outports 标签页介绍

上篇我们介绍了Function页的内容,这篇我们介绍Inports和Outports页的内容,这里我们再次强调一个概念,code mapping是以simulink的角度去看的,就是先要在模型中建立simulink模块,在code mapping里映射他要对应的autosar的元素,之后生成代码时的c语言的名字是以Autosar的元…