解决Java并发问题的常见思路

写在文章开头

近期对一些比较老的项目进行代码走查,碰到一些极端的并发编程恶习,所以笔者就基于此文演示这类问题以及面对并发编程时我们应该需要了解一些常见套路。

在这里插入图片描述

Hi,我是sharkChili,是个不断在硬核技术上作死的java coder,是CSDN的博客专家,也是开源项目Java Guide的维护者之一,熟悉Java也会一点Go,偶尔也会在C源码边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号:写代码的SharkChili,实时获取笔者最新的技术推文同时还能和笔者进行深入交流。

在这里插入图片描述

提出一个需求

基于笔者近期走查的案例笔者以一个类似的需求进行演示,这个需求是通过一个定时的任务调度线程从任务表中获取任务项,通过这个任务项得到要到data表查询对应任务的数据集并进行数据推送。
此时如果用户通过页面点击暂停,这些正在发送的数据在数据库中的状态就会被更新为暂停,完成后再将这个定时调度的线程暂停。

整体流程如下图所示,理想情况下,两个线程的工作过程为:

  1. 线程1从数据库找到任务,并通过这个任务找到数据表找到要发送的数据集,存入内存中。
  2. 线程1更新数据集状态为待发送,不断发送数据。
  3. 系统收到用户页面的暂停操作,创建一个线程2,从内存中找到要发送的数据,将这些数据集的状态更新为已暂停。
  4. 线程完成数据暂停后将线程1的执行打断。

在这里插入图片描述

问题复现

基于这个需求,笔者给出下面这样一个错误的例子,首先我们定义一下要发送的数据类,可以看到这个类包含id、数据和数据发送状态:

@Data
@AllArgsConstructor
public class SendData {private int id;private String data;/*** 0 未开始* 1 发送中* 2 已完成* 3 暂停*/private int status;
}

然后我们再给出任务的封装,如下所示,我们通过任务表可以查到任务的id和名称,通过id就可以到数据表定位到当前任务的数据集,并将其添加到sendDataLinkedList中:

@Data
@AllArgsConstructor
public class TaskInfo {private int taskId;private String taskName;//数据集private LinkedList<SendData> sendDataLinkedList;//若sendDataLinkedList不为空则弹出第一个元素public SendData popSendData() {if (CollUtil.isNotEmpty(sendDataLinkedList)) {return sendDataLinkedList.pop();}return null;}//将数据添加到sendDataLinkedList中public void addSendData(SendData sendData) {sendDataLinkedList.add(sendData);}
}

然后我们给出模拟数据,可以看到笔者用taskInfoMap 模拟任务表中的数据,用mysqlSendDataList 模拟数据库中对应task要发送的数据集:

private static List<SendData> mysqlSendDataList = new ArrayList<>();private static Map<Integer, TaskInfo> taskInfoMap = new HashMap<>();static {//模拟其他线程查到要执行的任务,并存入内存taskInfoMap.put(1, new TaskInfo(1, "任务1", new LinkedList<>()));//模拟任务1在mysql表中要发送的电话号码mysqlSendDataList.add(new SendData(1, "数据1", 0));mysqlSendDataList.add(new SendData(2, "数据2", 0));mysqlSendDataList.add(new SendData(3, "数据3", 0));mysqlSendDataList.add(new SendData(4, "数据4", 0));mysqlSendDataList.add(new SendData(5, "数据5", 0));mysqlSendDataList.add(new SendData(6, "数据6", 0));mysqlSendDataList.add(new SendData(7, "数据7", 0));mysqlSendDataList.add(new SendData(8, "数据8", 0));mysqlSendDataList.add(new SendData(9, "数据9", 0));mysqlSendDataList.add(new SendData(10, "数据10", 0));}

对应的线程代码如下,可以看到线程1会从数据库中读取数据并更新为发送中然后进行发送,并在完成后更新数据库状态。
而线程2则是模拟收到用户状态请求后,从内存中的任务集找到任务1,然后定位到正在发送的数据集将其数据库状态更新为暂停,然后将线程1暂停(这里用stop模拟打断定时任务)

 public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(() -> {//模拟查任务TaskInfo taskInfo = taskInfoMap.get(1);//模拟从数据库中取出待发送的数据log.info("线程1更新状态为发送中");List<SendData> dataList = mysqlSendDataList.stream().filter(s -> s.getStatus() == 0).collect(Collectors.toList());//更新状态为发送中mysqlSendDataList.stream().forEach(d -> d.setStatus(1));//将数据存入链表中dataList.forEach(taskInfo::addSendData);while (true) {SendData sendData = taskInfo.popSendData();if (sendData == null) {break;}log.info("发送数据:{} 成功", JSONUtil.toJsonStr(sendData));}//更新状态为发送完成mysqlSendDataList.stream().forEach(d -> d.setStatus(2));});Thread t2 = new Thread(() -> {//模拟从内存中找到任务,然后从内存中找到正在发送的号码,并将其数据库状态更新为待发送TaskInfo taskInfo = taskInfoMap.get(1);for (SendData sendData : taskInfo.getSendDataLinkedList()) {SendData mysqlSendData = mysqlSendDataList.stream().filter(s -> s.getId() == sendData.getId()).findFirst().get();mysqlSendData.setStatus(3);log.info("暂停任务:{}", JSONUtil.toJsonStr(mysqlSendData));}//打断正在工作的线程try {t1.wait();t1.interrupt();} catch (InterruptedException e) {e.printStackTrace();}log.info("打断t1线程,暂停发送任务");});t1.setName("t1");t1.start();t2.setName("t2");t2.start();System.out.println("执行结束");}

正常情况下,这种代码因为多线程操作单一数据集进行动态迭代删除时是会抛出ConcurrentModificationException的,但是笔者在走查类似上文这种例子时并为发现这个问题,经过对于流程和场景梳理时得出了答案。
笔者发现这个启动和暂停任务的场景执行的数据量非常大,因为庞大的数据量,被暂停了任务基本都会在排队或者刚刚完成数据集状态更新为发送中就被类似于线程2的代码完美暂停掉。

在这里插入图片描述

但是也不免出现一些比较极端的场景:

  1. 任务1正好被执行。
  2. 执行过程中收到暂停信号,线程2读取内存中任务1的数据集,更新数据库状态。
  3. 任务2正准备打断任务1,CPU又切回线程1,因为线程2暂停数据时并没有将内存中的数据集删除,导致这些在数据库中已经被暂停的数据集仍然被发送了。

最终很可能导致同样的一批数据被重复发送两次。

在这里插入图片描述

对应的现象也就像下面这段代码一样,

00:17:43.052 [t1] INFO com.sharkChili.LinkListThreadSafeApplication - 线程1更新状态为发送中
00:17:49.093 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":1,"data":"数据1","status":3}
00:17:49.716 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":2,"data":"数据2","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":3,"data":"数据3","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":4,"data":"数据4","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":5,"data":"数据5","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":6,"data":"数据6","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":7,"data":"数据7","status":3}
00:17:50.421 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":8,"data":"数据8","status":3}
00:17:50.422 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":9,"data":"数据9","status":3}
00:17:50.422 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":10,"data":"数据10","status":3}
00:17:50.422 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 打断t1线程,暂停发送任务

解决方案

对于此类并发问题的重构并解决的套路考虑的基本要考虑如下两个点:

  1. 保持原有的业务逻辑
  2. 线程互斥保持在一个维度。
  3. 选用合适的并发容器。

我们都知道重构代码对于测试的回归,逻辑的扭转变化都存在很大的风险点,所以笔者在对这段代码重构时非常明确的梳理的任务执行的数据流,明确了业务逻辑,这位作者意图是想在任务暂停时及时更新任务状态且让线程1不执行被暂停的任务,所以为了保证暂停的数据集不被线程1发送,首先就需要保证两个线程操作的集合处于一个维度,而不是像上面的代码一样线程1用pop方法,线程2用get加遍历的方式。

所以笔者改动的第一步,就是像容器安全化,将数据集存储容器改为ConcurrentLinkedDeque,然后弹出元素的函数改为pollFirst

 //数据集private ConcurrentLinkedDeque<SendData> sendDataLinkedList;//若sendDataLinkedList不为空则弹出第一个元素public SendData popSendData() {return sendDataLinkedList.pollFirst();}

这里我们也给出pollFirst的源码,可以看到它进行元素弹出时会通过CAS确定弹出的item是否和操作直线得到的一致,只有compare and set成功之后才能弹出。

public E pollFirst() {for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;//只有cas成功才能弹出元素if (item != null && p.casItem(item, null)) {unlink(p);return item;}}//若为空直接返回nullreturn null;}

其次为了保证两个线程操作处于一个维度,笔者将getter容器方法私有化,确保两者操作都是用同一个pop方法操作:

private ConcurrentLinkedDeque<SendData> getSendDataLinkedList() {return sendDataLinkedList;}

这样线程2的暂停逻辑就改为实时pop出线程1正在发送的数据再暂停,保证了暂停的数据线程1不会发送:

Thread t2 = new Thread(() -> {//模拟从内存中找到任务,然后从内存中找到正在发送的号码,并将其数据库状态更新为待发送TaskInfo taskInfo = taskInfoMap.get(1);SendData sendData = null;while ((sendData = taskInfo.popSendData()) != null) {SendData finalSendData = sendData;SendData mysqlSendData = mysqlSendDataList.stream().filter(s -> s.getId() == finalSendData.getId()).findFirst().get();mysqlSendData.setStatus(3);log.info("暂停任务:{}", JSONUtil.toJsonStr(mysqlSendData));}//打断正在工作的线程try {log.info("打断t1线程,暂停发送任务");t1.stop();} catch (Exception e) {e.printStackTrace();}});

此时再看输出结果,可以看到线程1发送了一个数据之后,线程2暂停了其余的数据,调度回到线程1,线程1停止了发送,问题解决:

00:50:18.336 [t1] INFO com.sharkChili.LinkListThreadSafeApplication - 线程1更新状态为发送中
00:50:23.090 [t1] INFO com.sharkChili.LinkListThreadSafeApplication - 发送数据:{"id":1,"data":"数据1","status":1} 成功
00:50:26.242 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":2,"data":"数据2","status":3}
00:50:28.200 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":3,"data":"数据3","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":4,"data":"数据4","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":5,"data":"数据5","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":6,"data":"数据6","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":7,"data":"数据7","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":8,"data":"数据8","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":9,"data":"数据9","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 暂停任务:{"id":10,"data":"数据10","status":3}
00:50:28.201 [t2] INFO com.sharkChili.LinkListThreadSafeApplication - 打断t1线程,暂停发送任务

小结

总的来说这是一段比较基础的并发编程问题,本篇文章更着重的是让读者了解并发编程时如何复现以及考虑问题的维度,不难看出笔者进行并发编程问题的解决思路就是三步:

  1. 理清数据流和并发代码逻辑。
  2. 确定合适的容器。
  3. 确保多线程操作互斥在同一个维度。

我是sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号:
写代码的SharkChili,同时我的公众号也有我精心整理的并发编程JVMMySQL数据库个人专栏导航。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/716301.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于 Amazon EKS 的 Stable Diffusion ComfyUI 部署方案

01 背景介绍 Stable Diffusion 作为当下最流行的开源 AI 图像生成模型在游戏行业有着广泛的应用实践&#xff0c;无论是 ToC 面向玩家的游戏社区场景&#xff0c;还是 ToB 面向游戏工作室的美术制作场景&#xff0c;都可以发挥很大的价值&#xff0c;如何更好地使用 Stable Dif…

scanf和cin的利弊

scanf和cin的利弊&#xff1a; scanf: 利&#xff1a;耗时短&#xff0c;写法方便输入固定格式&#xff0c;比如scanf(“%*d%d”,&a)&#xff0c;可以直接忽略第一个输入&#xff0c;不用创建新对象&#xff0c;再比如scanf(“%1d”,&x[i])&#xff0c;输入3214&#x…

卡牌——二分

卡牌 题目分析 想一下前面题的特点&#xff0c;是不是都出现了“最大边长”&#xff0c;“最小的数”这种字眼&#xff0c;那么这里出现了“最多能凑出多少套牌”&#xff0c;我们可以考虑用二分。接下来我们要看一下他是否符合二段性&#xff0c;二分的关键在于二段性。 第…

续Java的执行语句、方法--学习JavaEE的day07

day07 一、特殊的流程控制语句 break(day06) continue 1.理解&#xff1a; 作用于循环中&#xff0c;表示跳过循环体剩余的部分&#xff0c;进入到下一次循环 做实验&#xff1a; while(true){ System.out.println(“111”); System.out.println(“222”); if(true){ conti…

编译链接实战(25)gcc ASAN、MSAN检测内存越界、泄露、使用未初始化内存等内存相关错误

文章目录 1 ASAN1.1 介绍1.2 原理编译时插桩模块运行时库2 检测示例2.1 内存越界2.2 内存泄露内存泄露检测原理作用域外访问2.3 使用已经释放的内存2.4 将漏洞信息输出文件3 MSAN1 ASAN 1.1 介绍 -fsanitize=address是一个编译器选项,用于启用AddressSanitizer(地址

基于SpringBoot的教师考勤管理系统(赠源码)

作者主页&#xff1a;易学蔚来-技术互助文末获取源码 简介&#xff1a;Java领域优质创作者 Java项目、简历模板、学习资料、面试题库 教师考勤管理系统是基于JavaVueSpringBootMySQL实现的&#xff0c;包含了管理员、学生、教师三类用户。该系统实现了班级管理、课程安排、考勤…

基于springboot的足球俱乐部管理系统的设计与实现

** &#x1f345;点赞收藏关注 → 私信领取本源代码、数据库&#x1f345; 本人在Java毕业设计领域有多年的经验&#xff0c;陆续会更新更多优质的Java实战项目希望你能有所收获&#xff0c;少走一些弯路。&#x1f345;关注我不迷路&#x1f345;** 一 、设计说明 1.1 课题…

2024.3.3每日一题

LeetCode 用队列实现栈 题目链接&#xff1a;225. 用队列实现栈 - 力扣&#xff08;LeetCode&#xff09; 题目描述 请你仅使用两个队列实现一个后入先出&#xff08;LIFO&#xff09;的栈&#xff0c;并支持普通栈的全部四种操作&#xff08;push、top、pop 和 empty&…

如何取消ChatGPT 4.0的自动续费和会员订阅(chatgpt4.0自動續費嗎)

如何取消ChatGPT 4.0的自动续费和会员订阅 ChatGPT 4.0自动续费是否存在 是的&#xff0c;ChatGPT 4.0 Plus会员服务存在自动续费功能。 ChatGPT 4.0 Plus会员服务自动续费 ChatGPT Plus会员服务的自动续费机制用户在购买ChatGPT 4.0 Plus会员服务后&#xff0c;系统会自动…

npm ERR! code ERESOLVE

1、问题概述&#xff1f; 执行npm install命令的时候报错如下&#xff1a; tangxiaochuntangxiaochundeMacBook-Pro stf % npm install npm ERR! code ERESOLVE npm ERR! ERESOLVE unable to resolve dependency tree npm ERR! npm ERR! While resol…

LeetCode102.二叉树的层序遍历

题目 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 示例 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;[[3],[9,20],[15,7]]输入&#xff1a;root [1] 输出&am…

SpringCloud-MQ消息队列

一、消息队列介绍 MQ (MessageQueue) &#xff0c;中文是消息队列&#xff0c;字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。消息队列是一种基于生产者-消费者模型的通信方式&#xff0c;通过在消息队列中存放和传递消息&#xff0c;实现了不同组件、服务或系统…

2024全新手机软件下载应用排行、平台和最新发布网站,采用响应式织梦模板

这是一款简洁蓝色的手机软件下载应用排行、平台和最新发布网站&#xff0c;采用响应式织梦模板。 主要包括主页、APP列表页、APP详情介绍页、新闻资讯列表、新闻详情页、关于我们等模块页面。 地 址 &#xff1a; runruncode.com/php/19703.html 软件程序演示图&#xff1a;…

最小高度树-力扣(Leetcode)

题目链接 最小高度树 思路&#xff1a;本质上是找到树中的最长路径。当最长路径上中间点&#xff08;若路经长为偶数&#xff0c;则中间点仅有一个&#xff0c;否者中间点有两个&#xff09;作为根时&#xff0c;此时树高最小。 Code: class Solution { public://拓扑排序int…

【深度优先搜索】【树】【C++算法】2003. 每棵子树内缺失的最小基因值

作者推荐 动态规划的时间复杂度优化 本文涉及知识点 深度优先搜索 LeetCode2003. 每棵子树内缺失的最小基因值 有一棵根节点为 0 的 家族树 &#xff0c;总共包含 n 个节点&#xff0c;节点编号为 0 到 n - 1 。给你一个下标从 0 开始的整数数组 parents &#xff0c;其中…

第二讲:用geth和以太坊交互

一&#xff1a;安装geth brew install ethereum geth github网址&#xff1a; https://github.com/ethereum/go-ethereum 二&#xff1a; 用geth连接以太坊 以太坊有主网络&#xff08;Ethereum Mainnet&#xff09;&#xff0c;有测试网络&#xff08;Sepolia、Goerli 等等…

设计模式学习笔记 - 设计原则 - 5.依赖反转原则(控制反转、依赖反转、依赖注入)

前言 今天学习 SOLID 中的最后一个原则&#xff0c;依赖反转原则。 本章内容&#xff0c;可以带着如下几个问题&#xff1a; “依赖反转” 这个概念指的是 “谁跟谁” 的 “什么依赖” 被反转了&#xff1f; “反转” 这两个字该如何理解。我们还经常听到另外两个概念&#…

【分块三维重建】【slam】LocalRF:逐步优化的局部辐射场鲁棒视图合成(CVPR 2023)

项目地址&#xff1a;https://localrf.github.io/ 题目&#xff1a;Progressively Optimized Local Radiance Fields for Robust View Synthesis 来源&#xff1a;KAIST、National Taiwan University、Meta 、University of Maryland, College Park 提示&#xff1a;文章用了s…

【Spring】20 解析Spring注解驱动的容器配置

文章目录 注解 vs. XMLJavaConfig选项注解配置注解注入顺序注解处理器实际运用总结 Spring 框架一直以 XML 配置为主导&#xff0c;然而随着注解驱动配置的引入&#xff0c;我们不禁思考&#xff1a;是注解配置优于 XML 呢&#xff0c;还是反之&#xff1f;本篇博客将介绍 Spri…

如何将一个远程git的所有分支推到另一个远程分支上

如何将一个远程git的所有分支推到另一个远程分支上 最初有 12 个分支 执行 git remote add 远程名 远程git地址 git push 远程名 --tags "refs/remotes/origin/*:refs/heads/*"之后就变成 26个分支