文章源码地址
文章目录
- JUC概述
- 什么是JUC
- 线程和进程的概念
- 进程和线程
- 线程的状态
- wait和sleep
- 并发和并行
- 管程monitor
- 用户线程与守护线程
- Lock接口
- 复习Synchronized
- syschronized
- 卖票案例
- 多线程编程步骤
- 什么是Lock接口
- Lock接口的介绍
- Lock实现可重入锁
- 实现卖票案例
- 线程间通信
- 案例
- 虚假唤醒问题
- 线程间的通信基于 Lock 实现
- 小结
- 线程间定制化通信
- 资源类
- 测试代码
- 结果
- 生产者与消费者模型
- 小结
- 集合的线程安全
- 集合线程不安全演示
- 解决方案-Vector
- 解决方案-Collections
- 解决方案-CopyOnWriteArrayList(常用 JUC)
- HashSet (无序,不可重复)线程不安全
- 使用ConcurrentSkipListSet 进行解决
- HashMap 线程不安全
- 使用 ConcurrentHashMap 进行解决
- 多线程锁
- 同步锁的8钟情况
- 公平锁与非公平锁
- 可重入锁
- 死锁
- 演示
- Callable接口
- 与 Runable 接口的比较
- 使用 FutureTask 实现 Callable 接口
- JUC辅助类
- 减少计数CountDownLatch
- 常规写法,有问题
- 引入 CountDownLatch
- 循环栅栏CyclicBarrier
- 信号灯Semphore
- ReentrantReadWriteLock(读写锁)
- 读写锁概述
- 乐观锁与悲观锁
- 悲观锁(Pessimistic Locking):
- 乐观锁(Optimistic Locking):
- 使用场景
- 原理图
- 表锁与行锁
- 读写锁案例
- 读写锁发生死锁的场景
- 编写资源类
- 编写多线程测试代码
- 结果
- JUC 解决读写锁死锁的发生 ReadWriteLock 类
- 修改资源类
- 运行测试看结果
- 读写锁的演变
- 锁降级
- BlockingQueue阻塞队列
- 常见的 BlockingQueue
- 小结
- 核心方法
- 核心方法演示
- ThreadPool线程池
- 线程池使用方式
- 线程池的七个参数
- 线程池底层工作流程与拒绝策略
- 自定义线程线程池
- Fork/Join分支合并框架
- CompletableFuture异步回调
JUC概述
什么是JUC
java.utils.concurrent
Java并发编程工具包的简称,是一个处理线程的工具包,JDK1.5开始出现
线程和进程的概念
进程和线程
- 进程(process)
- 进程是计算机中关于某数据集合的一次运行活动
- 是系统进行资源分配和调度的基本单位
- 进程是线程的容器
- 线程(thread)
- 是操作系统能够进行调度的最小单位
- 被包含于进程之中,是进程在实际运行的单位
- 一条线程指的是进程中一个单一顺序的控制流,
- 一个进程中可以并发多个线程,每条线程执行不同的任务
- 小结
- 进程是指在操作系统中正在运行的一个应用程序,程序一旦运行就是进程,进程是系统资源分配的最小单位
- 线程系统分配处理器时间资源的基本单元,换而言之是进程之内独立运行的一个单元执行流,是程序执行的最小单位
线程的状态
- 线程状态枚举类
public enum State{NEW,//新建RUNNABLE,//就绪BLOCKED,//阻塞WAITING,//一直等待TIMED_WAITTING,//超时等待TERMINATED;//终结
}
wait和sleep
- sleep是Thread的静态方法,wait是Object的方法,任何对象实例都能调用
- sleep不会释放锁,也不需要占用锁,wait会释放锁,调用它的前提是当前线程占有锁(代码要在synchronized中)
- 在哪睡,在哪醒,均可被interrupted方法中断
并发和并行
- 并发同一瞬间,多个线程同时抢占一个cpu
- 并行同一时段,多个线程同时执行(多cpu)
管程monitor
- 管程——OS中称为监视器,在Java中称为锁,是一种同步机制,
- 保证同一时刻中,只有一个线程访问被保护的数据或代码
- JVM同步基于进入和退出
- 使用管程对象实现,每个对象均有一个monitor对象(随着Java对象的创建和销毁)
- 管程对象的作用,对临界区资源进行加锁(进入时加锁,退出时解锁)
用户线程与守护线程
- 用户线程==>自定义的线程
- 守护线程==>example: 垃圾回收机制 GC
- code segment
public static void main(String[] args) {Thread threadName = new Thread(() -> {System.out.println(Thread.currentThread().getName() + "::"+Thread.currentThread().isDaemon());while (true) {}}, "threadName");threadName.start();//用户线程System.out.println(Thread.currentThread().getName()+"::"+"over");//主线程}
- 现象说明
- 主线程结束了,用户线程还在运行,jvm存活
- 没有用户线程了,都是守护线程,jvm 结束
- 设置守护线程
//设置守护线程threadName.setDaemon(true);threadName.start();
Lock接口
多线程编程过程(高内聚)
- 创建资源类,设置属性与方法
- 在资源类中
- 判断
- 干活
- 通知
- 创建多个线程,调用资源类的操作方法
- 防止虚假唤醒问题
复习Synchronized
syschronized
synchronized 是Java中的关键字,是一种同步锁
可修饰
- 代码块
- 方法
- synchronized 关键字
- 可修饰一个代码快,称为同步代码块,作用范围==>大括号之内,作用对象==>调用此代码块的对象
- 可修饰一个方法,称为同步方法,作用范围==>整个方法,作用对象==>调用这个方法的对象可修饰一个静态方法,作用范围==>整个静态方法,作用对象==>这个类的所有对象
- 可修饰一个类,其作用范围是synchronized后面括号之内,作用对象==>类的所有对象
卖票案例
3个售票员,卖出30张票
- 创建资源类,定义属性和操作方法
public class Ticket {/*** 票数*/private Integer number = 30;/*** 卖票方法 基础版本* 使用同步监视器 synchronized*/public synchronized void saleV1() {//判断 是否有票if (number > 0) {System.out.println(Thread.currentThread().getName() + ": 卖出 " + number-- + ",剩下" + number);}}
}
- 创建多个线程,调用资源类的操作方法
public class SaleTicket {//step2public static void main(String[] args) {//创建Ticket对象Ticket ticket = new Ticket();//创建线程Thread thread1 = new Thread(new Runnable() {@Overridepublic void run() {for (int i = 0; i < 40; i++) {ticket.saleV1();}}},"sale01");Thread thread2 = new Thread(new Runnable() {@Overridepublic void run() {for (int i = 0; i < 40; i++) {ticket.saleV1();}}},"sale02");Thread thread3 = new Thread(new Runnable() {@Overridepublic void run() {for (int i = 0; i < 40; i++) {ticket.saleV2();}}},"sale03");thread1.setPriority(1);thread2.setPriority(5);thread1.start();thread2.start();thread3.start();}
}
多线程编程步骤
- 创建资源类,创建属性和操作方法
- 创建多线程调用资源类方法
什么是Lock接口
Lock接口的介绍
API->java.utils-concurrent.locks
- 为锁和等待条件提供一个框架的接口和类,不同于内置的同步和监视器
- Lock实现提供了比使用synchronized方法和语句可获得更广泛的锁定操作
- 接口 Lock所有已知实现类:
- ReentrantLock,
- ReentrantReadWriteLock.ReadLock,
- ReentrantReadWriteLock.WriteLock
- 与Synchronized关键字的区别(面试题)
- Lock不是Java语言内置的,是一个类,可以实现同步,synchronized是Java语言关键字
- synchronized无需用户手动释放锁,发生异常时,synchronized会自动释放线程所占用的锁,Lock需要手动释放锁,当发生异常时若没有主动通过unLock释放锁将导致死锁现象,为避免死锁通常在finally中进行unLock
- Lock可以让等待锁的线程响应中断,synchronized需要一直等待,不能够响应中断
- 通过Lock可以知道是否成功获取锁,Synchronized无从得知
- Lock可以提高多个线程进行读操作的效率
- 从性能上讲
- 竞争不激烈时,二者性能差不多
- 大量线程同时竞争时,Lock性能远超synchronized
Lock实现可重入锁
实现卖票案例
public class Ticket {/*** 可重入锁 对象*/private final ReentrantLock lock = new ReentrantLock();/*** 票数*/private Integer number = 30;/*** 卖票方法 手动锁管理版本* 使用可重入锁 对象 lock上锁,unlock解锁*/public void saleV2() {lock.lock();//上锁try {//判断if (number > 0) {System.out.println(Thread.currentThread().getName() + ": 卖出 " + number-- + ",剩下" + number);}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();//解锁}}
}
线程间通信
案例
- 有两个线程
- 资源类, 定义一个初始值为0的变量
- 线程1,获取值,执行+1
- 线程2,获取值,执行-1
public class SyncThreadCommunicationShareResource {/*** 初始值*/private Integer number = 0;/*** 加1的方法*/public synchronized void increment() throws InterruptedException {//判断if (number != 0) {//判断number值是否是0,是0干活,不是0等待//对于某一个参数的版本,实现中断和虚假唤醒是可能的,// 此方法应存在于循环中,在非循环代码中存在虚假唤醒问题this.wait();}//干活this.number++;//number为0执行++System.out.println(Thread.currentThread().getName() + "::"+this.number);//通知this.notifyAll();}/*** 减1的方法*/public synchronized void decrement() throws InterruptedException {//判断if (number != 1) {this.wait();}//干活this.number--;System.out.println(Thread.currentThread().getName() + "::"+this.number);//通知this.notifyAll();}}
public class SyncThreadCommunicationDemo {public static void main(String[] args) {SyncThreadCommunicationShareResource shareResource = new SyncThreadCommunicationShareResource();/*** 创建线程*/new Thread(()->{for (int i = 0; i < 10; i++) {try {shareResource.increment();} catch (InterruptedException e) {e.printStackTrace();}}},"increment").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {shareResource.decrement();} catch (InterruptedException e) {e.printStackTrace();}}},"decrement").start();//-------------------------------------new Thread(()->{for (int i = 0; i < 10; i++) {try {shareResource.increment();} catch (InterruptedException e) {e.printStackTrace();}}},"increment02").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {shareResource.decrement();} catch (InterruptedException e) {e.printStackTrace();}}},"decrement02").start();}
}
虚假唤醒问题
- wait 在哪睡就在哪醒
- 解决方法将 if 判断改成 while 判断,不管 wait 什么时候醒都需要经过 while 判断
线程间的通信基于 Lock 实现
//创建资源类
public class LockThreadCommunicationShareResource {/*** 初始值*/private Integer number = 0;//创建lockprivate Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();//执行+1public void increment(){lock.lock();try {while(number!=0){condition.await();}number++;System.out.println(Thread.currentThread().getName() + "::"+this.number);condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}//执行-1public void decrement(){lock.lock();try {while(number!=1){condition.await();}number--;System.out.println(Thread.currentThread().getName() + "::"+this.number);condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}}
public static void main(String[] args) {// demo1();LockThreadCommunicationShareResource shareResource= new LockThreadCommunicationShareResource();Thread thread0 = new Thread(() -> {for (int i = 0; i < 10; i++) {shareResource.increment();}}, "lockIncrement");Thread thread1 = new Thread(() -> {for (int i = 0; i < 10; i++) {shareResource.increment();}}, "lockIncrement01");Thread thread2 = new Thread(() -> {for (int i = 0; i < 10; i++) {shareResource.decrement();}}, "lockDecrement");Thread thread3 = new Thread(() -> {for (int i = 0; i < 10; i++) {shareResource.decrement();}}, "lockIDecrement01");thread0.start();thread1.start();thread2.start();thread3.start();
}
小结
线程间定制化通信
资源类
//创建 资源类
public class CustomThreadCommunityShareResource {//定义标志位private Integer flag =1; // 1 thread01 2 thread02 3 thread03//创建lock锁private Lock lock = new ReentrantLock();//创建3个conditionprivate Condition c1 = lock.newCondition();private Condition c2 = lock.newCondition();private Condition c3 = lock.newCondition();//打印5次,参数为第几轮public void printFive(Integer loop){//上锁lock.lock();try {//判断while(flag!=1){//等待c1.await();}//打印5次for (int i = 0; i < 5; i++) {System.out.println(Thread.currentThread().getName()+"::"+i+" :轮数: "+loop);}//通知flag = 2;//修改标志位c2.signal();//通知thread02线程} catch (Exception e) {e.printStackTrace();} finally {//释放锁lock.unlock();}}public void printTen(Integer loop){lock.lock();try {//判断while(flag!=2){c2.await();}for (int i = 0; i < 10; i++) {System.out.println(Thread.currentThread().getName()+"::"+i+" :轮数: "+loop);}flag = 3;c3.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void printFifteen(Integer loop){lock.lock();try {//判断while(flag!=3){c3.await();}for (int i = 0; i < 15; i++) {System.out.println(Thread.currentThread().getName()+"::"+i+" :轮数: "+loop);}flag = 1;c1.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}
}
测试代码
public static void main(String[] args) {CustomThreadCommunityShareResource shareResource = new CustomThreadCommunityShareResource();new Thread(()->{for (int i = 0; i < 10; i++) {shareResource.printFive(i);}},"AA").start();new Thread(()->{for (int i = 0; i < 10; i++) {shareResource.printTen(i);}},"BB").start();new Thread(()->{for (int i = 0; i < 10; i++) {shareResource.printFifteen(i);}},"CC").start();}
结果
生产者与消费者模型
小结
集合的线程安全
集合线程不安全演示
public static void main(String[] args) {//创建ArrayListList<String> list = new ArrayList<>();//并发修改异常for (int i = 0; i < 100; i++) {new Thread(()->{//向集合添加内容list.add(UUID.randomUUID().toString().toString().substring(0,8));//从集合中获取内容System.out.println(list);},String.valueOf(i)).start();}}
解决方案-Vector
解决方案-Collections
解决方案-CopyOnWriteArrayList(常用 JUC)
HashSet (无序,不可重复)线程不安全
使用ConcurrentSkipListSet 进行解决
HashMap 线程不安全
使用 ConcurrentHashMap 进行解决
多线程锁
同步锁的8钟情况
public class Phone {public synchronized void sendSMS() throws InterruptedException {TimeUnit.SECONDS.sleep(4);System.out.println("----------sendSMS");}public synchronized void sendEmail() throws InterruptedException {System.out.println("----------sendEmail");}public void getHello() {System.out.println("----------getHello");}
}
public static void main(String[] args) throws InterruptedException {Phone phone = new Phone();
// Phone phone2 = new Phone();new Thread(()->{try {phone.sendSMS();} catch (InterruptedException e) {e.printStackTrace();} finally {}},"AA").start();Thread.sleep(100);new Thread(()->{try {phone.sendEmail();
// phone2.sendEmail();
// phone.getHello();} catch (InterruptedException e) {e.printStackTrace();} finally {}},"BB").start();}
/*** 1.标准访问,先打印短信还是邮件* ----------sendSMS* ----------sendEmail* 2.停4秒在短信方法内,先打印短信还是邮件* ----------sendSMS* ----------sendEmail** 对于1和2,synchronized锁的范围是当前对象this,执行时,其它方法只能等待* 3.新增普通的hello方法,先是打印短信还是邮件* ----------getHello 与锁无光* ----------sendSMS* 4.现在两部手机,先是打印短信还是邮件* ----------sendEmail* ----------sendSMS* 两个对象,synchronized锁的范围是各种对象,先执行Email,SMS在sleep* 5.两个静态同步方法,1部手机,先是打印短信还是邮件* ----------sendSMS* ----------sendEmail* 6.两个静态同步方法,2部手机,,先是打印短信还是邮件* ----------sendSMS* ----------sendEmail** 对于5和6 static synchronized 锁住的是当前类的Class对象* 7.1个静态同步方法,1个普通同步方法,1部手机,先是打印短信还是邮件* ----------sendEmail* ----------sendSMS* 8.1个静态同步方法,1个普通同步方法,2部手机,先是打印短信还是邮件* ----------sendEmail* ----------sendSMS* 对于7和8是两个不同范围的不同的锁,static是大门的锁,plain是房间的锁*/
公平锁与非公平锁
- 源码视图
private final ReentrantLock lock = new ReentrantLock(true);
//默认值为flase,非公平锁,将fair置为true此时lock就变成公平锁了
- 非公平锁
- 会出现线程饥饿现象
- 效率较高
- 公平锁
- 相对比较公平,每个线程都能分配到资源
- 效率先对较低
- 阅读源码
//ReentrantLock类源码/*** Creates an instance of {@code ReentrantLock}.* This is equivalent to using {@code ReentrantLock(false)}.*/public ReentrantLock() {sync = new NonfairSync();}/*** Creates an instance of {@code ReentrantLock} with the* given fairness policy.** @param fair {@code true} if this lock should use a fair ordering policy*///传入的fair为true创建公平锁,否则使用默认无参构造public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
/*** Sync object for non-fair locks*///非公平锁进入之后直接执行static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}/*** Sync object for fair locks*///公平锁进入之后先进行进行礼貌询问static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {//进行询问if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {//没人,执行操作setExclusiveOwnerThread(current);return true;}}//有人,进行排队else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
可重入锁
- synchronized(隐式上锁) 和 Lock(显示上锁) 都是可重入锁
- 什么是可重入锁?
public class ReentrantLockDemo {public synchronized void add(){add();}//synchronizedpublic static void main(String[] args) {new ReentrantLockDemo().add();}
}
- 现象解释
- 由于 synchronized 是一个可重入锁,因此调用 add 之后,任然可以递归调用
- 加入 synchronized 不是可重入锁则,add 调用之后其它方法进入等待,则无法出现递归栈溢出异常
- condiction.signal 的描述
死锁
死锁(Deadlock)是指在多线程或多进程的系统中,两个或多个进程或线程由于相互等待对方释放资源而无法继续执行的一种阻塞现象。在死锁状态下,每个进程或线程都在等待其他进程或线程释放资源,导致所有参与的进程或线程都无法继续执行下去。
- 死锁通常发生在多个进程或线程之间共享多个资源的情况下,它是一种常见但严重的并发问题。
- 死锁的发生需要满足四个必要条件,也称为死锁的四个必要条件:
- 互斥条件(Mutual Exclusion):
- 资源不能被共享,一次只能被一个进程或线程占用。
- 占有并等待条件(Hold and Wait):
- 一个进程或线程可以持有一个资源,并等待另一个资源。
- 不可剥夺条件(No Preemption):
- 资源只能在持有者释放资源后才能被其他进程或线程抢占。
- 循环等待条件(Circular Wait):
- 存在一个进程或线程的资源等待链,形成一个循环。
当以上四个条件同时满足时,就有可能导致死锁的发生。死锁是一种难以调试和解决的问题,因此在设计并发程序时需要特别小心,避免引入可能导致死锁的条件。
解决死锁的方法包括避免死锁、检测死锁和解除死锁。避免死锁的方法通常涉及对资源的动态分配和合理使用,以确保死锁的四个必要条件不同时满足。检测死锁通常涉及周期性地检查系统中是否存在死锁,一旦检测到死锁,可以采取相应的措施解除死锁。解除死锁的方法包括撤销进程、回滚事务或通过抢占资源等。
演示
public class DealLock {static Object a = new Object();static Object b = new Object();public static void main(String[] args) {new Thread(() -> {synchronized (a) {System.out.println(Thread.currentThread().getName() + "持有锁a试图获取锁b");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}synchronized (b) {System.out.println(Thread.currentThread().getName() + "获取到锁b");}}}, "A").start();new Thread(() -> {synchronized (b) {System.out.println(Thread.currentThread().getName() + "持有锁b试图获取锁a");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}synchronized (a) {System.out.println(Thread.currentThread().getName() + "获取到锁a");}}}, "B").start();}
}
- 现象
- 验证是否是死锁
- jps+jstack
PS D:\Desktop\JUC_workspace> jps -l
12084 top.ljzstudy.lock.DealLock
18260
20716 org.jetbrains.jps.cmdline.Launcher
5996 jdk.jcmd/sun.tools.jps.Jps
PS D:\Desktop\JUC_workspace> jstack 12084
2024-01-27 14:29:17
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode):"DestroyJavaVM" #14 prio=5 os_prio=0 tid=0x0000000002f53800 nid=0x5c2c waiting on condition [0x0000000000000000]java.lang.Thread.State: RUNNABLE"B" #13 prio=5 os_prio=0 tid=0x0000000024b73000 nid=0x5718 waiting for monitor entry [0x00000000254cf000]java.lang.Thread.State: BLOCKED (on object monitor)at top.ljzstudy.lock.DealLock.lambda$main$1(DealLock.java:43)- waiting to lock <0x0000000740da2ee0> (a java.lang.Object)- locked <0x0000000740da2ef0> (a java.lang.Object)at top.ljzstudy.lock.DealLock$$Lambda$2/1831932724.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)"A" #12 prio=5 os_prio=0 tid=0x0000000024b16800 nid=0x5e9c waiting for monitor entry [0x00000000253cf000]java.lang.Thread.State: BLOCKED (on object monitor)at top.ljzstudy.lock.DealLock.lambda$main$0(DealLock.java:29)- waiting to lock <0x0000000740da2ef0> (a java.lang.Object)- locked <0x0000000740da2ee0> (a java.lang.Object)at top.ljzstudy.lock.DealLock$$Lambda$1/990368553.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)"Service Thread" #11 daemon prio=9 os_prio=0 tid=0x0000000022f73800 nid=0x2e44 runnable [0x0000000000000000]java.lang.Thread.State: RUNNABLE"C1 CompilerThread3" #10 daemon prio=9 os_prio=2 tid=0x0000000022ece800 nid=0x4810 waiting on condition [0x0000000000000000]java.lang.Thread.State: RUNNABLE"C2 CompilerThread2" #9 daemon prio=9 os_prio=2 tid=0x0000000022ece000 nid=0x38d0 waiting on condition [0x0000000000000000]java.lang.Thread.State: RUNNABLE"C2 CompilerThread1" #8 daemon prio=9 os_prio=2 tid=0x0000000022eca000 nid=0x58e8 waiting on condition [0x0000000000000000]java.lang.Thread.State: RUNNABLE"C2 CompilerThread0" #7 daemon prio=9 os_prio=2 tid=0x0000000022ec5000 nid=0x1060 waiting on condition [0x0000000000000000]java.lang.Thread.State: RUNNABLE"Monitor Ctrl-Break" #6 daemon prio=5 os_prio=0 tid=0x0000000022eb8000 nid=0x2f2c runnable [0x00000000244ce000]java.lang.Thread.State: RUNNABLEat java.net.SocketInputStream.socketRead0(Native Method)at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)at java.net.SocketInputStream.read(SocketInputStream.java:171)at java.net.SocketInputStream.read(SocketInputStream.java:141)at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)- locked <0x0000000740e2b040> (a java.io.InputStreamReader)at java.io.InputStreamReader.read(InputStreamReader.java:184)at java.io.BufferedReader.fill(BufferedReader.java:161)at java.io.BufferedReader.readLine(BufferedReader.java:324)- locked <0x0000000740e2b040> (a java.io.InputStreamReader)at java.io.BufferedReader.readLine(BufferedReader.java:389)at com.intellij.rt.execution.application.AppMainV2$1.run(AppMainV2.java:49)"Attach Listener" #5 daemon prio=5 os_prio=2 tid=0x0000000022e20000 nid=0x5598 waiting on condition [0x0000000000000000]java.lang.Thread.State: RUNNABLE"Signal Dispatcher" #4 daemon prio=9 os_prio=2 tid=0x0000000022e1f000 nid=0x166c runnable [0x0000000000000000]java.lang.Thread.State: RUNNABLE"Finalizer" #3 daemon prio=8 os_prio=1 tid=0x000000000304c000 nid=0x1d9c in Object.wait() [0x000000002415f000]java.lang.Thread.State: WAITING (on object monitor)at java.lang.Object.wait(Native Method)- waiting on <0x0000000740c08ec8> (a java.lang.ref.ReferenceQueue$Lock)at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)- locked <0x0000000740c08ec8> (a java.lang.ref.ReferenceQueue$Lock)at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)"Reference Handler" #2 daemon prio=10 os_prio=2 tid=0x000000000304b000 nid=0x20e0 in Object.wait() [0x000000002405e000]java.lang.Thread.State: WAITING (on object monitor)at java.lang.Object.wait(Native Method)- waiting on <0x0000000740c06b68> (a java.lang.ref.Reference$Lock)at java.lang.Object.wait(Object.java:502)at java.lang.ref.Reference.tryHandlePending(Reference.java:191)- locked <0x0000000740c06b68> (a java.lang.ref.Reference$Lock)at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)"VM Thread" os_prio=2 tid=0x0000000021737000 nid=0x5efc runnable"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x0000000002f69000 nid=0x2240 runnable"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x0000000002f6a800 nid=0x4990 runnable"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x0000000002f6c000 nid=0x50b0 runnable"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x0000000002f6e800 nid=0x37cc runnable"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x0000000002f70800 nid=0x2d70 runnable"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x0000000002f72000 nid=0x51c0 runnable"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x0000000002f75000 nid=0x13d4 runnable"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x0000000002f76000 nid=0x4e9c runnable"GC task thread#8 (ParallelGC)" os_prio=0 tid=0x0000000002f77800 nid=0x5ab4 runnable"GC task thread#9 (ParallelGC)" os_prio=0 tid=0x0000000002f78800 nid=0x5760 runnable"VM Periodic Task Thread" os_prio=2 tid=0x0000000022fd8000 nid=0x694 waiting on conditionJNI global references: 319Found one Java-level deadlock:
=============================
"B":waiting to lock monitor 0x0000000021740be8 (object 0x0000000740da2ee0, a java.lang.Object),which is held by "A"
"A":waiting to lock monitor 0x0000000021743528 (object 0x0000000740da2ef0, a java.lang.Object),which is held by "B"Java stack information for the threads listed above:
===================================================
"B":at top.ljzstudy.lock.DealLock.lambda$main$1(DealLock.java:43)- waiting to lock <0x0000000740da2ee0> (a java.lang.Object)- locked <0x0000000740da2ef0> (a java.lang.Object)at top.ljzstudy.lock.DealLock$$Lambda$2/1831932724.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)
"A":at top.ljzstudy.lock.DealLock.lambda$main$0(DealLock.java:29)- waiting to lock <0x0000000740da2ef0> (a java.lang.Object)- locked <0x0000000740da2ee0> (a java.lang.Object)at top.ljzstudy.lock.DealLock$$Lambda$1/990368553.run(Unknown Source)at java.lang.Thread.run(Thread.java:748)Found 1 deadlock.
Callable接口
与 Runable 接口的比较
- Runable 接口
- 线程执行使用 run()方法没有返回值
- run()方法 只能捕获并处理异常
- Callable 接口
- 线程执行使用 call()方法有返回值
- call()方法可以抛出异常
使用 FutureTask 实现 Callable 接口
class MyThread1 implements Runnable{@Overridepublic void run() {}
}class MyThread2 implements Callable{@Overridepublic Integer call() throws Exception {System.out.println(Thread.currentThread().getName()+"come in callable");return 200;}
}
public class InterfaceComparison {public static void main(String[] args) throws ExecutionException, InterruptedException {//使用Runable创建一个线程new Thread(new MyThread1(),"runable").start();//使用Callable创建一个线程//new Thread(new MyThread2(),"callable").start();//futureTaskFutureTask<Integer> futureTask1 = new FutureTask<>(new MyThread2());//lamada表达式FutureTask<Integer> futureTask = new FutureTask<>(()->{System.out.println(Thread.currentThread().getName()+"come in callable");return 1024;});new Thread(futureTask,"future").start();while(!futureTask.isDone()){System.out.println("wait....");}System.out.println(futureTask.get());//get方法获取返回值System.out.println(Thread.currentThread().getName()+"come over");
- 结果
- 连续调用两次 get
System.out.println(futureTask.get());
System.out.println(futureTask.get());
- 开启两个线程
new Thread(futureTask,"future").start();
new Thread(futureTask1,"future1").start();System.out.println(futureTask.get());
System.out.println(futureTask1.get());
System.out.println(Thread.currentThread().getName()+"come over");
JUC辅助类
减少计数CountDownLatch
常规写法,有问题
//6个同学陆续离开教室之后,班长锁门public static void main(String[] args) {//6个同学离开教室之后for (int i = 1; i <= 6; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName()+" 号同学离开了教室");},String.valueOf(i)).start();}System.out.println(Thread.currentThread().getName()+"班长锁门走人了");}
- 有的同学被锁在教室
引入 CountDownLatch
//6个同学陆续离开教室之后,班长锁门
public static void main(String[] args) throws InterruptedException {//6个同学离开教室之后CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <= 6; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName()+" 号同学离开了教室");countDownLatch.countDown();//每次让计数器-1},String.valueOf(i)).start();}//countDown不为0,阻塞countDownLatch.await();System.out.println(Thread.currentThread().getName()+"班长锁门走人了");
}
- 效果
循环栅栏CyclicBarrier
- 代码
//案例:集齐7科龙珠可以转换神龙//1.创建固定值private static final Integer NUMBER = 7;public static void main(String[] args) {//2.创建CyclicBarrierCyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, () -> {System.out.println("集齐7科龙珠可以转换神龙");});//3.收集龙珠for (int i = 1; i <= 6; i++) {//7个线程new Thread(()->{System.out.println(Thread.currentThread().getName()+" 星龙被收集到了");try {cyclicBarrier.await();} catch (Exception e) {e.printStackTrace();}},String.valueOf(i)).start();}}
- 效果
- 将循环次数修改为 6
- 未到达屏障点,一直处于 await()状态
信号灯Semphore
//6辆汽车,停3个停车位public static void main(String[] args) {//创建Semphore,设置许可数量Semaphore semaphore = new Semaphore(3);//模拟6辆汽车for (int i = 1; i <= 6; i++) {new Thread(()->{try {//抢占车位semaphore.acquire();System.out.println(Thread.currentThread().getName()+" 抢到了车位");TimeUnit.SECONDS.sleep(new Random().nextInt(5));//设置随机停车时间System.out.println(Thread.currentThread().getName()+" ----离开了车位");} catch (InterruptedException e) {e.printStackTrace();}finally {//释放车位semaphore.release();}},String.valueOf(i)).start();}}
- 效果
ReentrantReadWriteLock(读写锁)
读写锁概述
乐观锁与悲观锁
乐观锁和悲观锁是两种不同的并发控制策略,用于处理多个线程对共享资源的访问
悲观锁(Pessimistic Locking):
- 思想:
- 悲观锁的思想是,在整个访问期间,认为数据会被其他线程修改,因此在访问数据之前,会先对数据进行加锁,确保其他线程无法修改。
- 实现方式:
- 在数据库中,悲观锁通常通过数据库的锁机制(如行级锁、表级锁)来实现。
- 在Java编程中,synchronized 关键字和 ReentrantLock 类都是悲观锁的实现。
- 优点和缺点:
- 优点: 简单,易于理解和实现。
- 缺点: 性能相对较低,因为在整个访问期间,其他线程可能被阻塞。
乐观锁(Optimistic Locking):
- 思想
- 乐观锁的思想是,在整个访问期间,认为数据不会被其他线程修改。因此,不对数据进行加锁,而是在更新数据时检查是否有其他线程对数据进行了修改。
- 实现方式:
- 在数据库中,乐观锁通常通过版本号(Version Number)或时间戳(Timestamp)等机制实现。
- 在 Java 编程中,乐观锁常用于基于版本的控制,例如在使用 Hibernate 进行数据库访问时,可以通过 @Version 注解实现乐观锁。
- 优点和缺点:
- 优点: 性能相对较高,因为在大多数情况下,不需要加锁,只有在更新时才进行检查。
- 缺点: 实现相对复杂,需要解决冲突和处理失败更新的情况。
使用场景
- 悲观锁适用于:
- 数据更新频繁,冲突概率较高的情况。
- 读操作相对较少,写操作较多的情况。
- 乐观锁适用于:
- 数据更新相对较少,读操作频繁的情况。
- 冲突概率较低,因为大多数情况下不需要加锁。
- 在实际应用中,选择悲观锁还是乐观锁取决于具体业务场景和性能需求。在分布式环境中,乐观锁通常更受欢迎,因为它对系统的可伸缩性更友好。
原理图
表锁与行锁
- 行锁的死锁概率通常比表锁高。
- 原因是行锁是更细粒度的锁,它锁定的是数据表中的某一行,而表锁则锁定整个数据表。因此,当多个事务尝试访问同一行数据时,如果使用行锁,可能会出现一个事务在等待另一个事务释放锁的情况,这就可能导致死锁。
- 相比之下,表锁锁定整个表,只要有一个事务在使用表,其他事务就必须等待,这就减少了死锁的可能性。但是,表锁的缺点是它会阻止其他所有事务访问整个表,这可能会降低并发性能。
- 然而,这并不是说行锁就一定比表锁差,事实上,行锁由于其更细的粒度,通常可以提供更高的并发性能。只是在处理死锁问题时,需要更小心地设计事务,以避免出现死锁。
读写锁案例
读锁是共享锁,写锁是独占锁,两者都会发生死锁
- 读锁发生死锁的案例
- 写锁发生死锁的案例
读写锁发生死锁的场景
编写资源类
public class MyCache {//创建map集合private Map<String,Object> map = new HashMap<>();//放数据public void put(String key,Object value){System.out.println(Thread.currentThread().getName()+" 正在进行写操作 "+ key);//暂停一会try {TimeUnit.MICROSECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}//放数据map.put(key,value);System.out.println(Thread.currentThread().getName()+" 写完了 " + key);}//取数据public Object get(String key){Object result = null;System.out.println(Thread.currentThread().getName()+" 正在进行读取操作 "+ key);//暂停一会try {TimeUnit.MICROSECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}//放数据result = map.get(key);System.out.println(result!=null?Thread.currentThread().getName()+" 取完了------- " + key:Thread.currentThread().getName()+" 没取到--------- " + key);return result;}
}
编写多线程测试代码
public static void main(String[] args) {MyCache myCache = new MyCache();//创建多线程放数据for (int i = 1; i <=5 ; i++) {final Integer num = i;new Thread(()->{myCache.put(String.valueOf(num),String.valueOf(num));},String.valueOf(i)).start();}//创建多线程取数据for (int i = 1; i <=5 ; i++) {final Integer num = i;new Thread(()->{myCache.get(String.valueOf(num));},String.valueOf(i)).start();}}
结果
JUC 解决读写锁死锁的发生 ReadWriteLock 类
修改资源类
public class MyCache {//创建map集合private Map<String,Object> map = new HashMap<>();//创建读写锁对象private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();//放数据public void put(String key,Object value){//写操作,加写锁readWriteLock.writeLock().lock();System.out.println(Thread.currentThread().getName()+" 正在进行写操作 "+ key);//暂停一会try {TimeUnit.MICROSECONDS.sleep(300);//放数据map.put(key,value);System.out.println(Thread.currentThread().getName()+" 写完了 " + key);} catch (InterruptedException e) {e.printStackTrace();}finally {readWriteLock.writeLock().unlock();}}//取数据public Object get(String key){//读取数据,加读锁readWriteLock.readLock().lock();Object result = null;try {System.out.println(Thread.currentThread().getName()+" 正在进行读取操作 "+ key);//暂停一会TimeUnit.MICROSECONDS.sleep(300);//放数据result = map.get(key);System.out.println(result!=null?Thread.currentThread().getName()+" 取完了------- " + key:Thread.currentThread().getName()+" 没取到--------- " + key);} catch (InterruptedException e) {e.printStackTrace();}finally {//释放读锁readWriteLock.readLock().unlock();}return result;}
}
运行测试看结果
读写锁的演变
一个资源可以被多个读线程访问,或者可以被一个写线程访问,但是不能够同时存在读写线程
读写互斥,读读共享
锁降级
将写入锁将为读锁
- jdk8 的降级过程
- 获取写锁->获取读锁->释放写锁…->释放读锁
- 读锁不能升级为写锁
- 读写锁降级演示
//写锁降级为读锁public static void main(String[] args) {//可重入读写锁对象ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();//创建读写锁ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();//演示锁降级writeLock.lock(); //获取写锁System.out.println("get write lock");readLock.lock(); //获取读锁System.out.println("get read lock");writeLock.unlock(); //释放写锁readLock.unlock(); //释放读锁}
- 读锁升级写锁,出现死锁
- 进入无限等待…
BlockingQueue阻塞队列
常见的 BlockingQueue
- ArrayBlockingQueue(常用)
- 由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue(常用)
- 由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
- ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。
- DelayQueue
- 使用优先级队列实现的延迟无界阻塞队列。
- PriorityBlockingQueue
- 支持优先级排序的无界阻塞队列。
- SynchronousQueue
- 不存储元素的阻塞队列,也即单个元素的队列。
- LinkedTransferQueue
- 由链表组成的无界阻塞队列。
- LinkedBlockingDeque
- 由链表组成的双向阻塞队列
- 由链表组成的双向阻塞队列
小结
- 在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
- 为什么需要BlockingQueue?
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。使用后我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了
核心方法
核心方法演示
public static void main(String[] args) throws InterruptedException {//创建阻塞对列BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(3);//group add()
// System.out.println(blockingQueue.add("a"));
// System.out.println(blockingQueue.add("b"));
// System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.add("d"));
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());//group offer(e)
// System.out.println(blockingQueue.offer("a"));
// System.out.println(blockingQueue.offer("b"));
// System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.peek());
// System.out.println(blockingQueue.poll());//group put(e)
// blockingQueue.put("a");
// System.out.println(blockingQueue.take());
// blockingQueue.put("b");
// blockingQueue.put("c");
// System.out.println(blockingQueue.take());
// blockingQueue.put("d");
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());//group offer-time_outSystem.out.println(blockingQueue.offer("a"));System.out.println(blockingQueue.offer("b"));System.out.println(blockingQueue.offer("c"));System.out.println(blockingQueue.offer("d",3L, TimeUnit.SECONDS));}
ThreadPool线程池
线程池使用方式
- 一池 N 线程
Executors.newFixedThreadPool(int)
public static void main(String[] args) {//一池n线程//场景描述:银行5个窗口,处理10个顾客的业务ExecutorService threadPool = Executors.newFixedThreadPool(5);try {for (int i = 1; i <= 10; i++) {//执行threadPool.execute(()->{System.out.println(Thread.currentThread().getName()+" 办理业务");});}} catch (Exception e) {e.printStackTrace();} finally {//关闭资源threadPool.shutdown();}}
- 一个任务一个任务执行,一池一线程
Executors.newSingleThreadExecutor()
ExecutorService threadPool = Executors.newSingleThreadExecutor();
- 线程池根据需求创建线程,可扩容
Executors.newCachedThreadPool()
ExecutorService threadPool = Executors.newCachedThreadPool();
线程池的七个参数
public ThreadPoolExecutor(int corePoolSize, //常驻线程线程数量(核心)int maximumPoolSize, //最大线程数量long keepAliveTime, //存活时间TimeUnit unit, //存活时间单位BlockingQueue<Runnable> workQueue, //阻塞队列ThreadFactory threadFactory, //线程工厂RejectedExecutionHandler handler //拒绝策略) { if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
线程池底层工作流程与拒绝策略
- 当线程池对象执行 execute()方法之后才创建线程
- 常驻线程 corePool 数量为 2
- 阻塞队列的任务为 3
- 最大线程数是 5
- 线程优先使用线程池中的常驻线程,但常驻线程用完之后第三线程会进入阻塞队列中进行等待
- 单队列满了之后,第六个任务来时会在线程池中进行创建,直至线程池满
- 3,4,5 还在等待,第六个线程会优先执行
- 当线程池满,阻塞队列也满之后第九个任务来时,会执行拒绝策略
自定义线程线程池
- 自定义线程池
//自定义线程池创建public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,2L,TimeUnit.SECONDS,new ArrayBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());try {for (int i = 1; i <= 10; i++) {//执行threadPool.execute(()->{System.out.println(Thread.currentThread().getName()+" 办理业务");});}} catch (Exception e) {e.printStackTrace();} finally {//关闭资源threadPool.shutdown();}}
- 效果
Fork/Join分支合并框架
- 基于分支合并框架实现分治法求 1+…100
public class MyTask extends RecursiveTask<Integer> {//拆分的差值不能超过10private static final Integer DIFFERENCE = 10;private Integer begin; //拆分起始值private Integer end; //拆分结束值private int result; //结果值//创建有参构造public MyTask(Integer begin, Integer end) {this.begin = begin;this.end = end;}//拆分与合并过程@Overrideprotected Integer compute() {//判断相加的两个值是否大于10if ((end-begin)<=DIFFERENCE) {//相加for (int i = begin; i <=end; i++) {result += i;}}else{//进一步拆分int middle = (begin+end)/2;//拆分左边MyTask myTask01 = new MyTask(begin, middle);//拆分右边MyTask myTask02 = new MyTask(middle+1, end);//执行拆分myTask01.fork();myTask02.fork();//合并结果result = myTask01.join()+ myTask02.join();}return result;}
}
- 测试代码
public static void main(String[] args) throws ExecutionException, InterruptedException {//创建MyTask对象MyTask myTask = new MyTask(0, 100);//创建分支池对象ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);//获取合并结果Integer result = forkJoinTask.get();System.out.println(result);forkJoinPool.shutdown();}
- 结果
CompletableFuture异步回调
- 继承实现结构
- 没有返回值的异步调用
private static void nonReturnValue() throws InterruptedException, ExecutionException {//异步调用没有返回值CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{System.out.println(Thread.currentThread().getName()+" completableFuture1");});completableFuture.get();}
- 有返回值的异步调用
private static void returnValue() {//异步调用有返回值CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread().getName() + " completableFuture02");int i = 1/0; //模拟异常return 1024;});completableFuture.whenComplete((t,u)->{System.out.println("t = " + t);//u为异常信息System.out.println("u = " + u);});}