文章目录
- 并发容器
- BlockingQueue
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlocking
- DelayQueue
- SynchronousQueue
- BlockingDeque
- CopyOnWrite
- CopyOnWriteArrayList
- CopyOnWriteArraySet
- ConcurrentLinkedQueue/Deque
- ConcurrentHashMap
- ConcurrentSkipListMap/Set
- 同步工具类
- AQS实现类
- CountDownLatch
- CyclicBarrier
- Semaphore
- ReentrantLock
- ReentrantLockReadWriteLock
- ThreadPoolExecutor
- Exchanger
- Phaser
- Atomic类
- AtomicInteger和AtomicLong
- AtomicBoolean和AtomicReference
- AtomicStampedReference和AtomicMarkableReference
- AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater
- AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray
- Striped64和LongAdder
- Lock和Condition
- 互斥锁
- 读写锁
- Condition
- StampedLock
并发容器
BlockingQueue
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlocking
DelayQueue
SynchronousQueue
BlockingDeque
CopyOnWrite
CopyOnWriteArrayList
CopyOnWriteArraySet
ConcurrentLinkedQueue/Deque
ConcurrentHashMap
ConcurrentSkipListMap/Set
同步工具类
AQS实现类
AQS是一个用于构建锁、同步器等线程协作工具类的框架,有了AQS后,很多用于线程协作的工具类都可以很方便的被写出来
内部使用了AQS(AbstractQueuedSynchronizer)下的工具类
- CountDownLatch(CyclicBarrier->ReentrantLock->AQS)
- Semaphore
- ThreadPoolExecutor
- ReentrantLock
- ReentrantLockReadWriteLock
一下介绍各个工具类的使用(源码部分,等看完后,后续补充详细说明)
CountDownLatch
作用:计数器
源码
实战
作用一:一个线程等待其他多个线程都执行完毕,再继续自己的工作:
@Testpublic void countdownlatchTest() throws InterruptedException {final CountDownLatch countDownLatch = new CountDownLatch(5);ExecutorService threadPool = Executors.newFixedThreadPool(5);for (int i = 0; i < 5; i++) {final int finalI = i;threadPool.submit(new Runnable() {@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(finalI + "开始跑步!");//把倒数值-1,那么之前awiat等待的线程会被唤醒countDownLatch.countDown();}});}System.out.println("等待5个运动员都跑完");//调用await方法的线程开始等待,直到倒数结束countDownLatch.await();System.out.println("所有人都跑完了,比赛结束");}
作用二:多个线程等待某一个线程的信号,同时开始执行
@Testpublic void countdownlatch2() throws InterruptedException {ExecutorService threadPool = Executors.newFixedThreadPool(5);final CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 5; i++) {final int finalI = i;threadPool.submit(new Runnable() {@Overridepublic void run() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(finalI + "ready");try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(finalI + "run");}});}Thread.sleep(5000);System.out.println("start run!");countDownLatch.countDown();}
CyclicBarrier
源码
从源码中可以看出,CyclicBarrier利用可重入锁ReentrantLock和Condition
实战
CyclicBarrier构造出一个集结点,当某一个线程执行await()当时候,它就会到这个集结点开始等待,等待达到预定值等待到线程就会统一出发
public void cyclicBarrierTest() throws InterruptedException {CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {@Overridepublic void run() {System.out.println("集结完毕,准备出发");}});//重复利用for (int i = 0; i < 6; i++) {new Thread(new Task(i, cyclicBarrier)).start();}Thread.sleep(1000 * 100000);}class Task implements Runnable {private int id;private CyclicBarrier cyclicBarrier;public Task(int i, CyclicBarrier cyclicBarrier) {this.id = i;this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {System.out.println("同学" + id + "现在从大门出发");try {Thread.sleep((long) (Math.random() * 1000));System.out.println("同学" + id + "到了自行车驿站,开始等待其他人到达");cyclicBarrier.await();System.out.println("同学" + id + "开始骑车");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}
Semaphore
信号量用来控制需要限制并发访问量的资源
源码
实战
@Testpublic void semaphoreTest() throws Exception {final Semaphore semaphore = new Semaphore(2);ExecutorService executorService = Executors.newFixedThreadPool(1000);for (int i = 0; i < 1000; i++) {final int finalI = i;executorService.submit(new Runnable() {@Overridepublic void run() {try {semaphore.acquire();Thread.sleep(3000);System.out.println(finalI + "run");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}System.out.println("--------");}});}Thread.sleep(1000 * 100000);}
ReentrantLock
源码
实战
未使用锁情况:出现线程安全问题
@Testpublic void ReentrantLockTest() {ReentrantLock lock = new ReentrantLock();CountDownLatch countDownLatch = new CountDownLatch(1);//模拟10个并发for (int i = 0; i < 10; i++) {final int n = i;new Thread(new Runnable() {@Overridepublic void run() {System.out.println(n + " 线程变成运行状态,等待命令");try {countDownLatch.await();/*** tryLock的方法就是试一下,如果能得到锁,就返回真,如果当时得不到,马上就返回假,绝不等。tryLock(时间)的用法就是 在规定的时间内设法得到锁。如果在规定的时间内最终不能得到锁,就返回假。* tryLock()方法只有在成功获取了锁的情况下才会返回true,如果别的线程当前正持有锁,则会立即返回false*/
// while (!lock.tryLock()) ;compute(n);} catch (InterruptedException e) {e.printStackTrace();} finally {
// lock.unlock();}}}).start();}try {Thread.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程创建完毕,准备计数!");countDownLatch.countDown();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("计数结果:" + k);}private void compute(int n) {for (int j = 0; j < 10000; j++) {if (n % 2 == 0) {k--;} else {k++;}}}
采用锁情况:返回结果为0,没有线程安全问题
@Testpublic void ReentrantLockTest() {ReentrantLock lock = new ReentrantLock();CountDownLatch countDownLatch = new CountDownLatch(1);//模拟10个并发for (int i = 0; i < 10; i++) {final int n = i;new Thread(new Runnable() {@Overridepublic void run() {System.out.println(n + " 线程变成运行状态,等待命令");try {countDownLatch.await();/*** tryLock的方法就是试一下,如果能得到锁,就返回真,如果当时得不到,马上就返回假,绝不等。tryLock(时间)的用法就是 在规定的时间内设法得到锁。如果在规定的时间内最终不能得到锁,就返回假。* tryLock()方法只有在成功获取了锁的情况下才会返回true,如果别的线程当前正持有锁,则会立即返回false*/while (!lock.tryLock()) ;compute(n);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}).start();}try {Thread.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程创建完毕,准备计数!");countDownLatch.countDown();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("计数结果:" + k);}private void compute(int n) {for (int j = 0; j < 10000; j++) {if (n % 2 == 0) {k--;} else {k++;}}}
ReentrantLockReadWriteLock
多线程情况下:读-写互斥、写-读互斥、写-写互斥、读-读共享
使用场景:
对于数据比较敏感的场景,
读锁:在读取数据时是不能出现多次读取不一致的情况的,这点有点像可重复读和幻读,
写锁:写数据时,又不能同时读取数据
参考博客:https://www.cnblogs.com/zxporz/p/10874853.html
源码
实战
private final Map<String, Integer> map = new TreeMap<>();private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();private final Lock readLock = lock.readLock();private final Lock writeLock = lock.writeLock();public Integer get(String key) {readLock.lock();try {return map.get(key);} finally {readLock.unlock();}}public Set<String> allKeys() {readLock.lock();try {return map.keySet();} finally {readLock.unlock();}}public Integer put(String key, Integer value) {writeLock.lock();try {return map.put(key, value);} finally {writeLock.unlock();}}public void clear() {writeLock.lock();try {map.clear();} finally {writeLock.unlock();}}
ThreadPoolExecutor
用来构建线程池
源码
实战
@Testpublic void threadpoolExcutorTest() throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 5, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),new ThreadPoolExecutor.DiscardOldestPolicy());/*** 模拟20个并发,3个核心线程数、最大线程数5个,阻塞队列容量为10,* 此时处理流程?** 取决于用何种拒绝策略* 1.如果用到是默认拒绝策略:AbortPolicy 此时程序直接抛出RejectedExecutionException到RuntimeException异常,* 为unchecked异常,如果不手动捕获程序就会异常退出* java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@64a294a6* rejected from java.util.concurrent.ThreadPoolExecutor@7e0b37bc[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 0]* 2.如果是DiscardPolicy:新任务提交后直接被丢弃了,也不会给你任何到通知* 0-thread run* 1-thread run* 2-thread run* 3-thread run* 4-thread run* 5-thread run* 6-thread run* 7-thread run* 8-thread run* 9-thread run* 10-thread run* 11-thread run* 12-thread run* 13-thread run* 14-thread run* 3.如果是DiscardOldestPolicy,丢弃任务队列中到头节点,通常是存活时间最长到任务,存在数据丢失风险* 0-thread run* 1-thread run* 2-thread run* 4.如果是CallerRunsPolicy,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交给提交任务到线程去执行* 0-thread run* 1-thread run* 2-thread run* 3-thread run* 4-thread run* 5-thread run* 6-thread run* 7-thread run* 8-thread run* 9-thread run* 10-thread run* 11-thread run* 12-thread run* 13-thread run* 14-thread run* 15-thread run* 16-thread run* 17-thread run* 18-thread run* 19-thread run*/ArrayList<Future> futureList = new ArrayList<>(20);for (int i = 0; i < 20; i++){final int finalI = i;Future<String> future = threadPoolExecutor.submit(new Callable<String>() {@Overridepublic String call() throws Exception {Thread.sleep(1000);return finalI + "-thread run";}});futureList.add(future);}for (int i = 0; i < 20; i++){try {System.out.println(futureList.get(i).get(1000, TimeUnit.SECONDS));} catch (TimeoutException e) {e.printStackTrace();}}try {Thread.sleep(1000 * 10000);} catch (InterruptedException e) {e.printStackTrace();}}