使用双异步后,如何保证数据一致性?

在这里插入图片描述

目录

    • 一、前情提要
    • 二、通过Future获取异步返回值
      • 1、FutureTask 是基于 AbstractQueuedSynchronizer实现的
      • 2、FutureTask执行流程
      • 3、get()方法执行流程
    • 三、FutureTask源码具体分析
      • 1、FutureTask源码
        • 2、将异步方法的返回值改为```Future<Integer>```,将返回值放到```new AsyncResult<>();```中;
      • 3、通过```Future<Integer>.get()```获取返回值:
      • 4、这里也可以通过新线程+Future获取Future返回值
      • 在BUG中磨砺,在优化中成长

大家好,我是哪吒。

一、前情提要

在上一篇文章中,我们通过双异步的方式导入了10万行的Excel,有个小伙伴在评论区问我,如何保证插入后数据的一致性呢?

很简单,通过对比Excel文件行数和入库数量是否相等即可。

那么,如何获取异步线程的返回值呢?

在这里插入图片描述

二、通过Future获取异步返回值

我们可以通过给异步方法添加Future返回值的方式获取结果。

FutureTask 除了实现 Future 接口外,还实现了 Runnable 接口。因此,FutureTask 可以交给 Executor 执行,也可以由调用线程直接执行FutureTask.run()。

1、FutureTask 是基于 AbstractQueuedSynchronizer实现的

AbstractQueuedSynchronizer简称AQS,它是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及 维护被阻塞线程的队列。
基于 AQS 实现的同步器包括: ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。

基于 AQS实现的同步器包含两种操作:

  1. acquire,阻塞调用线程,直到AQS的状态允许这个线程继续执行,在FutureTask中,get()就是这个方法;
  2. release,改变AQS的状态,使state变为非阻塞状态,在FutureTask中,可以通过run()和cancel()实现。

2、FutureTask执行流程

在这里插入图片描述

  1. 执行@Async异步方法;
  2. 建立新线程async-executor-X,执行Runnable的run()方法,(FutureTask实现RunnableFuture,RunnableFuture实现Runnable);
  3. 判断状态state;
    • 如果未新建或者不处于AQS,直接返回;
    • 否则进入COMPLETING状态,执行异步线程代码;
  4. 如果执行cancel()方法改变AQS的状态时,会唤醒AQS等待队列中的第一个线程线程async-executor-1;
  5. 线程async-executor-1被唤醒后
    • 将自己从AQS队列中移除;
    • 然后唤醒next线程async-executor-2;
    • 改变线程async-executor-1的state;
    • 等待get()线程取值。
  6. next等待线程被唤醒后,循环线程async-executor-1的步骤
    • 被唤醒
    • 从AQS队列中移除
    • 唤醒next线程
    • 改变异步线程状态
  7. 新建线程async-executor-N,监听异步方法的state
    • 如果处于EXCEPTIONAL以上状态,抛出异常;
    • 如果处于COMPLETING状态,加入AQS队列等待;
    • 如果处于NORMAL状态,返回结果;

3、get()方法执行流程

get()方法通过判断状态state观测异步线程是否已结束,如果结束直接将结果返回,否则会将等待节点扔进等待队列自旋,阻塞住线程。

自旋直至异步线程执行完毕,获取另一边的线程计算出结果或取消后,将等待队列里的所有节点依次唤醒并移除队列。

在这里插入图片描述

  1. 如果state小于等于COMPLETING,表示任务还在执行中;
    • 如果线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常;
    • 如果state大于COMPLETING;
      • 如果已有等待节点WaitNode,将线程置空;
      • 返回当前状态;
    • 如果任务正在执行,让出时间片;
    • 如果还未构造等待节点,则new一个新的等待节点;
    • 如果未入队列,CAS尝试入队;
    • 如果有超时时间参数;
      • 计算超时时间;
      • 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state;
      • 阻塞队列nanos毫秒。
    • 否则阻塞队列;
  2. 如果state大于COMPLETING;
    • 如果执行完毕,返回结果;
    • 如果大于等于取消状态,则抛出异常。

很多小朋友对读源码,嗤之以鼻,工作3年、5年,还是没认真读过任何源码,觉得读了也没啥用,或者读了也看不懂~

其实,只要把源码的执行流程通过画图的形式呈现出来,你就会幡然醒悟,原来是这样的~

简而言之:

1. 如果异步线程还没执行完,则进入CAS自旋;
2. 其它线程获取结果或取消后,重新唤醒CAS队列中等待的线程;
3. 再通过get()判断状态state;
4. 直至返回结果或(取消、超时、异常)为止。

三、FutureTask源码具体分析

1、FutureTask源码

通过定义整形状态值,判断state大小,这个思想很有意思,值得学习。

public interface RunnableFuture<V> extends Runnable, Future<V> {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run();
}
public class FutureTask<V> implements RunnableFuture<V> {// 最初始的状态是new 新建状态private volatile int state;private static final int NEW          = 0; // 新建状态private static final int COMPLETING   = 1; // 完成中private static final int NORMAL       = 2; // 正常执行完private static final int EXCEPTIONAL  = 3; // 异常private static final int CANCELLED    = 4; // 取消private static final int INTERRUPTING = 5; // 正在中断private static final int INTERRUPTED  = 6; // 已中断public V get() throws InterruptedException, ExecutionException {int s = state;// 任务还在执行中if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {// 线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;// 任务已执行完毕或取消if (s > COMPLETING) {// 如果已有等待节点WaitNode,将线程置空if (q != null)q.thread = null;return s;}// 任务正在执行,让出时间片else if (s == COMPLETING) // cannot time out yetThread.yield();// 还未构造等待节点,则new一个新的等待节点else if (q == null)q = new WaitNode();// 未入队列,CAS尝试入队else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 如果有超时时间参数else if (timed) {// 计算超时时间nanos = deadline - System.nanoTime();// 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态stateif (nanos <= 0L) {removeWaiter(q);return state;}// 阻塞队列nanos毫秒LockSupport.parkNanos(this, nanos);}else// 阻塞队列LockSupport.park(this);}}private V report(int s) throws ExecutionException {// 获取outcome中记录的返回结果Object x = outcome;// 如果执行完毕,返回结果if (s == NORMAL)return (V)x;// 如果大于等于取消状态,则抛出异常if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
}
2、将异步方法的返回值改为Future<Integer>,将返回值放到new AsyncResult<>();中;
@Async("async-executor")
public void readXls(String filePath, String filename) {try {// 此代码为简化关键性代码List<Future<Integer>> futureList = new ArrayList<>();for (int time = 0; time < times; time++) {Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();futureList.add(sumFuture);}}catch (Exception e){logger.error("readXlsCacheAsync---插入数据异常:",e);}
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {try {// 此代码为简化关键性代码return new AsyncResult<>(sum);}catch (Exception e){return new AsyncResult<>(0);}
}

3、通过Future<Integer>.get()获取返回值:

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) throws Exception{int[] futureSumArr = new int[futureList.size()];for (int i = 0;i<futureList.size();i++) {try {Future<Integer> future = futureList.get(i);while (true) {if (future.isDone() && !future.isCancelled()) {Integer futureSum = future.get();logger.info("获取Future返回值成功"+"----Future:" + future+ ",Result:" + futureSum);futureSumArr[i] += futureSum;break;} else {logger.info("Future正在执行---获取Future返回值中---等待3秒");Thread.sleep(3000);}}} catch (Exception e) {logger.error("获取Future返回值异常: ", e);}}boolean insertFlag = getInsertSum(futureSumArr, excelRow);logger.info("获取所有异步线程Future的返回值成功,Excel插入结果="+insertFlag);return insertFlag;
}

4、这里也可以通过新线程+Future获取Future返回值

不过感觉多此一举了,就当练习Future异步取返回值了~

public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) throws Exception {ExecutorService service = Executors.newSingleThreadExecutor();final boolean[] insertFlag = {false};service.execute(new Runnable() {public void run() {try {insertFlag[0] = getFutureResult(futureList, excelRow);} catch (Exception e) {logger.error("新线程+Future获取Future返回值异常: ", e);insertFlag[0] = false;}}});service.shutdown();return new AsyncResult<>(insertFlag[0]);
}

获取异步线程结果后,我们可以通过添加事务的方式,实现Excel入库操作的数据一致性。

但Future会造成主线程的阻塞,这个就很不友好了,有没有更优解呢?


在BUG中磨砺,在优化中成长

使用双异步后,从 191s 优化到 2s

增加索引 + 异步 + 不落地后,从 12h 优化到 15 min

使用懒加载 + 零拷贝后,程序的秒开率提升至99.99%

性能优化2.0,新增缓存后,程序的秒开率不升反降


🏆文章收录于:100天精通Java从入门到就业

全网最细Java零基础手把手入门教程,系列课程包括:Java基础、Java8新特性、Java集合、高并发、性能优化等,适合零基础和进阶提升的同学。

🏆哪吒多年工作总结:Java学习路线总结,搬砖工逆袭Java架构师

华为OD机试 2023B卷题库疯狂收录中,刷题点这里

刷的越多,抽中的概率越大,每一题都有详细的答题思路、详细的代码注释、样例测试,发现新题目,随时更新,全天CSDN在线答疑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/640816.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1.21 day6 IO网络编程

网络聊天室 服务端 #include <myhead.h> #define PORT 8888 #define IP "192.168.122.48" struct MSG {char tyep;char name[20];char buf[128]; }; typedef struct Node {struct sockaddr_in cin;struct Node*next; }*node;int main(int argc, const char *…

提升认知,推荐15个面向开发者的中文播客

前言 对于科技从业者而言&#xff0c;无论是自学成才的程序员&#xff0c;还是行业资深人士&#xff0c;终身学习是很有必要的&#xff0c;尤其是在这样一个技术快速迭代更新的时代。 作为一个摆脱了时间和空间限制的资讯分享平台&#xff0c;播客&#xff08;Podcast&#x…

42 漏洞发现-操作系统之漏洞探针类型利用修复

目录 系统漏洞发现意义?漏洞类型危害情况?如何做好未卜先知?相关名词解释:漏洞扫描工具演示-Goby&#xff0c;Nmap&#xff0c;Nessus(操作)漏洞类型区分讲解-权限提升,远程执行等(思路)漏洞利用框架演示-Metasploit,Searchsploit等(操作)漏洞修复方案讲解说明-补丁,防护软件…

go和swoole性能比较

开发效率 Go语言是本质上是静态语言&#xff0c;开发效率稍差&#xff0c;但性能更强&#xff0c;更适合底层软件的开发 Swoole使用PHP语言&#xff0c;动态脚本语言&#xff0c;开发效率最佳&#xff0c;更适合应用软件的开发 IO模型 go语言使用单线程eventloop处理IO事件&…

GO 的那些 IDE

文章目录 支持哪些功能快捷键代码高亮代码格式化代码提示导航跳转代码调试构建编译其他功能 GO有哪些IDEGolandVS CodeVim GOSublime TextAtomLiteIDEEclipse 总结 “程序员为什么要使用 IDE”&#xff0c;在一些社区论坛&#xff0c;经常可以看到这样的提问。关于是否应该使用…

Tomcat目录和文件

打开tomcat的解压之后的目录可以看到如下的目录结构&#xff1a; Bin bin目录主要是用来存放tomcat的命令&#xff0c;主要有两大类&#xff0c;一类是以.sh结尾的&#xff08;linux命令&#xff09;&#xff0c;另一类是以.bat结尾的&#xff08;windows命令&#xff09;。 …

sqlmap使用教程(2)-连接目标

目录 连接目标 1.1 设置认证信息 1.2 配置代理 1.3 Tor匿名网络 1.4 检测WAF/IPS 1.5 调整连接选项 1.6 处理连接错误 连接目标 场景1&#xff1a;通过代理网络上网&#xff0c;需要进行相应配置才可以成功访问目标主机 场景2&#xff1a;目标网站需要进行身份认证后才…

微信小程序-03

小程序官方把 API 分为了如下 3 大类&#xff1a; 事件监听 API 特点&#xff1a;以 on 开头&#xff0c;用来监听某些事件的触发 举例&#xff1a;wx.onWindowResize(function callback) 监听窗口尺寸变化的事件 同步 API 特点1&#xff1a;以 Sync 结尾的 API 都是同步 API 特…

扫地机器人(二分算法+贪心算法)

1. if(robot[i]-len<sweep)这个代码的意思是——如果机器人向左移动len个长度后&#xff0c;比现在sweep的位置&#xff08;现在已经覆盖的范围&#xff09;还要靠左&#xff0c;就是覆盖连续不起来&#xff0c;呢么这个len就是有问题的&#xff0c;退出函数&#xff0c;再…

黑马axios案例之地区查询

查询某个省内某个城市的所有地区 接口&#xff1a;http://hmajax.itheima.net/api/area 参数名: pname:省份名字或直辖市名字&#xff0c;比如北京、福建省、辽宁省… cname:城市名字&#xff0c;比如北京市、厦门市、大连市… <!DOCTYPE html> <html lang"en&q…

嵌入式软件工程师面试题——2025校招社招通用(计算机网络篇)(三十二)

说明&#xff1a; 面试群&#xff0c;群号&#xff1a; 228447240面试题来源于网络书籍&#xff0c;公司题目以及博主原创或修改&#xff08;题目大部分来源于各种公司&#xff09;&#xff1b;文中很多题目&#xff0c;或许大家直接编译器写完&#xff0c;1分钟就出结果了。但…

[足式机器人]Part2 Dr. CAN学习笔记- 最优控制Optimal Control Ch07-4 轨迹追踪

本文仅供学习使用 本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记 - 最优控制Optimal Control Ch07-4 轨迹追踪 1. 目标误差控制-误差的调节2. 稳态非零值控制3. 输入增量控制 1. 目标误差控制-误差的调节 2. 稳态非零值控制 3. 输入增量控制

fiddler流量捕获之winconfig

windows 使用了一种叫做“AppContainer”的隔离技术&#xff0c;使得一些流量无法正常捕获&#xff0c;在 fiddler中点击 WinConfig 按钮可以解除&#xff0c;这个与菜单栏 Tools→Win8 Loopback Exemptions 功能是相同的&#xff0c;流量捕获&#xff1a;指拦截和记录通过计算…

GPT-4 的决策在股市中进行量化投资

论文题目:Can Large Language Models Beat Wall Street? Unveiling the Potential of AI in Stock Selection 论文链接:https://arxiv.org/abs/2401.03737 博客地址:https://www.marketsense-ai.com/ 从本质上来说&#xff0c;股票选择是个价格发现机制&#xff0c;在股票投…

Web 开发 1: Flask 框架介绍和使用

在 Web 开发中&#xff0c;Flask 是一个流行且灵活的 Python Web 框架&#xff0c;用于构建 Web 应用程序。它简洁而易于上手&#xff0c;适用于小型到中型的项目。在本篇博客中&#xff0c;我将为你介绍 Flask 框架的基础知识和常用技巧&#xff0c;帮助你更好地掌握 Web 开发…

opencv#28 图像卷积

图像卷积 图像卷积是图像处理中最为基础的操作之一&#xff0c;其常用在图像的边缘检测&#xff0c;图像的去噪声以及图像压缩等领域。 图像卷积主要步骤: Step1:将卷积模板旋转180。 Step2:卷积模板移动到对应位置。 Step3:模板内求和&#xff0c;保存求和结果。 Step4:滑…

容器技术2-镜像与容器储存

目录 一、镜像制作 1、ddocker build 2、docker commit 二、镜像存储 1、公共仓库 2、私有仓库 三、镜像使用 四、容器存储 1、镜像元数据 2、存储驱动 3、数据卷 一、镜像制作 1、ddocker build 基于 Dockerfile 自动构建镜像 其机制为&#xff1a;每一行都会基于…

<C++>STL->string

string类的由来 这是string的定义&#xff1a; string类是模板实例化后的别名&#xff0c;basic_string是字符串类模板&#xff0c;常见的字符串类型有wchar_t char char16_t char32_t &#xff0c;basic_string类针对的是所有字符串类型设计出来的一个模板&#xff0c;而我…

Elasticsearch基础篇(八):常用查询以及使用Java Api Client进行检索

ES常用查询以及使用Java Api Client进行检索 1. 检索需求 参照豆瓣阅读的列表页面 需求&#xff1a; 检索词需要在数据库中的题名、作者和摘要字段进行检索并进行高亮标红返回的检索结果需要根据综合、热度最高、最近更新、销量最高、好评最多进行排序分页数量为10&#xf…

flutter设置windows是否显示标题栏和状态栏和全屏显示

想要让桌面软件实现全屏和不显示状态栏或者自定义状态栏&#xff0c;就可以使用window_manager这个依赖库&#xff0c;使用起来还是非常方便的&#xff0c;可以自定义显示窗口大小和位置&#xff0c;还有设置标题栏是否展示等内容&#xff0c;也可以设置可拖动区域。官方仓库地…