集合类不安全
list不安全
//报错 java.util.ConcurrentModificationException
public class ListTest {public static void main(String[] args) {List<String> list= new CopyOnWriteArrayList<>();//并发下Arrayist边读边写会不安全的/*** 解决方案:* 1.List<String> list= new Vector<>();* 2.List<String> list= Collections.synchronizedList(new ArrayList<>());* 3.List<String> list= new CopyOnWriteArrayList<>();*/for(int i=0;i<50;i++){new Thread(()->{list.add(UUID.randomUUID().toString().substring(0,5));System.out.println(list);},String.valueOf(i)).start();}}
}
上面这个多个线程边读边写时会出现如下报错
java.util.ConcurrentModificationException
在CopyOnWriteArrayList的底层用一个这样的数据实现
volatile内存,模型中规定保证线程间的可见性,但不保证原子性
CopyOnWrite使用的是lock锁,Vertor使用的是synchronized,有sync都会很慢。
list的解决方案有使用vector这个安全类和工具类和 juc.
工具类是将其变成synchronized,但很慢,juc是用写入时复制。
Set不安全
set和List是一个同级的关系,都是Collection的子类。
所以set在边读边写时也会有java.util.ConcurrentModificationException报错。
但是set没有vector,只有工具类和juc的解决方案。
public class SetList {public static void main(String[] args) {Set<String> set = new HashSet<>();
// Set<String> set = Collections.synchronizedSet(new HashSet<>());
// Set<String> set = new CopyOnWriteArraySet<>();for(int i=0;i<50;i++){new Thread(()->{set.add(UUID.randomUUID().toString().substring(0,5));System.out.println(set);},String.valueOf(i)).start();}}
}
hashset底层就是hashmap。
hashset的add方法就是hashmap的put方法。
map不安全
这个也有" java.util.ConcurrentModificationException报错
这里的解决方案是juc下的ConcurrentHashMap。
public class MapList {public static void main(String[] args) {//加载因子,初始化容量 0.75和16
// Map<String,String> map=new HashMap<>();Map<String,String> map=new ConcurrentHashMap<>();for(int i=0;i<50;i++){new Thread(()->{map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));System.out.println(map);},String.valueOf(i)).start();}}
}
走进Callable
callable接口就类似于runnable接口,然而runnable接口不会返回结果也不会抛出异常,callable就可以。一个是call()方法,一个是run()方法.
callable接口需要提供一个泛型,泛型的参数等于方法的返回值。
如何用new Thread().start接收callable接口并开启线程
Thread()只能接收runnable参数,不认识callable()参数,所以callable要通过runnable去做一个桥梁,在runnable里面有如下的一些实现。
其中FutureTask这个实现类与Callable有联系,如下所示,有一个构造参数就是Callable<V>
这里用到的应该就是适配器模式,这里的futuretask就是一个适配器。
1.两个同样的callable实现类开启的线程内的输出结果会被缓存。
2.结果可能会等待,会阻塞。
public class CallavkeTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// new Thread(new Runnable()).start(); //传统方式
// new Thread(new FutureTask<V>()).start();
// new Thread(new FutureTask<V>(Callable)).start();mythread mythread=new mythread(); //callable接口的实现类FutureTask futureTask = new FutureTask(mythread); //callable接口的适配类new Thread(futureTask,"A").start();new Thread(futureTask,"B").start(); //只会输出一个call,结果被缓存了,达到提高效率的目的String str=(String)futureTask.get(); //获取callable的返回结果,get方法会等待结果,可能产生阻塞,要将其放在最后//或者通过异步通信来处理!System.out.println(str);}
}class mythread implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("call方法被调用");//耗时操作return "1230";}
}//class mythread implements Runnable{
//
// @Override
// public void run() {
//
// }
//}
常用的辅助类
CountDownLatch
原理:
countDownLatch.countDown(); //-1
countDownLatch.await(); //等待计数器归零再向下执行
//计数器
public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {//总数是6CountDownLatch countDownLatch = new CountDownLatch(6);for(int i=0;i<6;i++){new Thread(()->{System.out.println(Thread.currentThread().getName()+"go out");countDownLatch.countDown(); //-1},String.valueOf(i)).start();}countDownLatch.await(); //等待计数器归零再向下执行System.out.println("Close door");
// countDownLatch.countDown(); //-1}
}
CyclicBarrier
加法计数器
有两种构造参数,一个是传个计数,一个是传个计数完之后要执行的线程。
public class CyclicBarrierDemo {public static void main(String[] args) {CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{System.out.println("g盖亚!!!");});for(int i=0;i<7;i++){final int temp=i;//lamda表达式能操作到i吗?new Thread(()->{System.out.println(Thread.currentThread().getName()+"收集"+temp+"个"); //间接获得try {cyclicBarrier.await(); //等待} catch (InterruptedException e) {throw new RuntimeException(e);} catch (BrokenBarrierException e) {throw new RuntimeException(e);}}).start();}}
}
如果计数为8但是线程只有7个的话,就会永远卡死在一个地方。
Semaphore(信号量)
这个 的实现也有两种参数
public class SemaphoreDemo {public static void main(String[] args) {//线程数量:停车位Semaphore semaphore = new Semaphore(3);for(int i=1;i<=6;i++){new Thread(()->{//acquire()得到try {semaphore.acquire();System.out.println(Thread.currentThread().getName()+"抢到车位");TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread().getName()+"离开车位 ");} catch (InterruptedException e) {throw new RuntimeException(e);}finally {//realease() 释放semaphore.release();}},String.valueOf(i)).start();}}
}
一开始只有三个进去了,后面三个都出去了,后三个才能进来,这里的主要应用场景就是限流。
原理:
semaphore.acquire(); //获取,假设已经的满了,就等待到资源被释放为止。
semaphore.release(); //释放,将当前信号量释放+1,然后唤醒等待线程。
作用:多个共享资源的互斥使用。并发限流,控制最大线程数。
读写锁
readwritelock只有一个实现类,可重入的读写锁. 读的时候可以多个线程同时读,但是写的时候只能一个线程在写。
如下所示的一个自定义缓存类读写操作
/*** readwritelock*/
public class readwritelockdemo {public static void main(String[] args) {MyCache myCache=new MyCache();//写入for(int i=0;i<5;i++){final int temp=i;new Thread(()->{myCache.put(temp+"",temp+"");},String.valueOf(i)).start();}//读取for(int i=0;i<5;i++){final int temp=i;new Thread(()->{myCache.get(temp+"");},String.valueOf(i)).start();}}
}/*** 自定义缓存*/
class MyCache{private volatile Map<String,Object> map=new HashMap<>();//存入public void put(String key,Object value){System.out.println(Thread.currentThread().getName()+"写入"+key);map.put(key,value);System.out.println(Thread.currentThread().getName()+"写入完毕");}//读取public void get(String key){System.out.println(Thread.currentThread().getName()+"读取"+key);Object o=map.get(key);System.out.println(Thread.currentThread().getName()+"读取完毕");}
}
输出如下,在一个线程写入的过程中另一个线程也在写入,这种情况是不能发生的
使用了读写锁之后,写操作只会允许一个线程执行,读操作则会有多个线程同时进行.。
/*** readwritelock*/
public class readwritelockdemo {public static void main(String[] args) {
// MyCache myCache=new MyCache();MyCacheLock myCacheLock=new MyCacheLock();//写入for(int i=0;i<5;i++){final int temp=i;new Thread(()->{myCacheLock.put(temp+"",temp+"");},String.valueOf(i)).start();}//读取for(int i=0;i<5;i++){final int temp=i;new Thread(()->{myCacheLock.get(temp+"");},String.valueOf(i)).start();}}
}
/*** 加上锁之后*/
class MyCacheLock{private volatile Map<String,Object> map=new HashMap<>();//读写锁,可以更加细力度的控制private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();//存入,只有一个线程写public void put(String key,Object value){readWriteLock.writeLock().lock();try{System.out.println(Thread.currentThread().getName()+"写入"+key);map.put(key,value);System.out.println(Thread.currentThread().getName()+"写入完毕");}catch (Exception e){e.printStackTrace();}finally {readWriteLock.writeLock().unlock();}}//读取,所有线程都能读public void get(String key){readWriteLock.readLock().lock();try{System.out.println(Thread.currentThread().getName()+"读取"+key);Object o=map.get(key);System.out.println(Thread.currentThread().getName()+"读取完毕");}catch (Exception e){e.printStackTrace();}finally {readWriteLock.readLock().unlock();}}
}
阻塞队列
JUC有这样一个阻塞队列的接口,t它的实现有一个SynchronousQueue同步队列,还有一些数组阻塞队列,和链表阻塞队列等等
可以看见Queue和List和Set是同一级别的,在Queue接口下有这个BlockingQueue接口和Deque和AbstractQueue。
典型使用场景: 多线程并发处理,线程池,生产者消费者。
阻塞队列四组API
1.抛出异常2.不会抛出异常3.阻塞等待4.超时等待
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
添加 | add | offer() | put() | offer() |
移除 | remove | poll() | take() | poll() |
检测队首元素 | element | peek |
public class Test {public static void main(String[] args) throws InterruptedException {
// test1();test4();}/*** 抛出异常*/public static void test1(){//设置队列大小ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.add("a"));System.out.println(blockingQueue.add("b"));System.out.println(blockingQueue.add("c"));//Exception in thread "main" java.lang.IllegalStateException: Queue full
// blockingQueue.add("d");System.out.println("_______________");System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());//Exception in thread "main" java.util.NoSuchElementException
// System.out.println(blockingQueue.remove());}/*** 有返回值,不抛出异常*/public static void test2(){ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);System.out.println(blockingQueue.offer("a"));System.out.println(blockingQueue.offer("b"));System.out.println(blockingQueue.offer("c"));System.out.println(blockingQueue.offer("d")); //不抛出异常,返回falseSystem.out.println("_______________————————————————————");System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll()); //也不抛出异常,返回null}/*** 等待,阻塞(一直阻塞)*/public static void test3() throws InterruptedException {ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);blockingQueue.put("a");blockingQueue.put("b");blockingQueue.put("c");
// blockingQueue.put("d"); //一直阻塞System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());//也是一直阻塞}/*** 等待,阻塞(等待超时)*/public static void test4() throws InterruptedException {ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);blockingQueue.offer("a");blockingQueue.offer("b");blockingQueue.offer("c");blockingQueue.offer("d",2, TimeUnit.SECONDS); //超时时间和单位System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll());System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));}
}
同步队列SynchronousQueue
没有容量,进去一个元素必须等待取出来之后,才能再往里面放一个元素。
/*** 同步队列*/
public class SynchronousQueueDemo {public static void main(String[] args) {SynchronousQueue<String> blockingQueue = new SynchronousQueue<>();//同步队列new Thread(()->{try {System.out.println(Thread.currentThread().getName()+"put 1");blockingQueue.put("1");System.out.println(Thread.currentThread().getName()+"put 2");blockingQueue.put("2");System.out.println(Thread.currentThread().getName()+"put 3");blockingQueue.put("3");} catch (InterruptedException e) {throw new RuntimeException(e);}},"t1").start();new Thread(()->{try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());} catch (InterruptedException e) {throw new RuntimeException(e);}},"t2").start();}
}
线程池(重点)
线程池:三大方法,7大参数,4中拒绝策略。
池化技术:事先准备好资源,来人就用,用完放回。
程序的运行,本质:占用系统的资源!优化资源的使用!=>池化技术
线程池,连接池,内存池,对象池。
线程池的好处:
1、降低资源的消耗
2、提高响应的速度
3、方便管理。
线程复用、可以控制最大并发数、管理线程
线程的三大方法
//Executors 工具类、三大方法。
public class Demo01 {public static void main(String[] args) {
// ExecutorService threadPoll = Executors.newSingleThreadExecutor();//单个线程
// ExecutorService threadPoll =Executors.newFixedThreadPool(5); //创建一个固定线程池的大小ExecutorService threadPoll =Executors.newCachedThreadPool(); //可伸缩try {for(int i=0;i<10;i++){//使用了线程池后,使用线程池创建线程threadPoll.execute(()->{System.out.println(Thread.currentThread().getName()+" OK");});}} catch (Exception e) {throw new RuntimeException(e);}finally {//线程池用完,程序结束,关闭线程池threadPoll.shutdown();}}
}
7大参数
源码分析
第一个方法的源码
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
第二个方法的源码
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
第三个方法的源码
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
可以看见三种开启方法都是用的ThreadPoolExecutor,它的源码如下
可以看见有7个参数
1.核心线程池大小2.最大核心线程池大小3.存活时间,4.超时单位5.阻塞队列6.线程工厂,用于创建线程7.拒绝策略。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
前面三个方法的前两个参数分别是1,1 5,5 0,Inter.MaxValue。(21亿大小)
因此阿里巴巴的手册里面才会这样写。
四大策略
核心线程池大小为2,最大为5,一开始只有2个,但是阻塞队列里面满了之后又来人了会开放剩下三个,又慢了之后就不给进了,这就是拒绝策略。
等到了那三个队列空闲后后,经过了超时时间就会关闭释放。
四个实现类对应四大策略
自定义线程池、
拒绝策略
会抛出异常.
//Executors 工具类、三大方法。
public class Demo01 {public static void main(String[] args) {ExecutorService threadPoll =new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(), //一般不变new ThreadPoolExecutor.AbortPolicy()); //该拒绝策略是银行满了,还有人进来时就不处理该人并抛出异常。try {for(int i=0;i<10;i++){//使用了线程池后,使用线程池创建线程threadPoll.execute(()->{System.out.println(Thread.currentThread().getName()+" OK");});}} catch (Exception e) {throw new RuntimeException(e);}finally {//线程池用完,程序结束,关闭线程池threadPoll.shutdown();}}
}
第二策略
哪来的回哪里去,由原本的线程来执行。
//Executors 工具类、三大方法。
public class Demo01 {public static void main(String[] args) {ExecutorService threadPoll =new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(), //一般不变new ThreadPoolExecutor.CallerRunsPolicy()); //哪来的去哪里!try {for(int i=0;i<10;i++){//使用了线程池后,使用线程池创建线程threadPoll.execute(()->{System.out.println(Thread.currentThread().getName()+" OK");});}} catch (Exception e) {throw new RuntimeException(e);}finally {//线程池用完,程序结束,关闭线程池threadPoll.shutdown();}}
}
第三策略
队列满了不会抛出异常。会直接丢掉任务。
package com.yhy.pool;import java.util.concurrent.*;/*** new ThreadPoolExecutor.AbortPolicy()); //该拒绝策略是银行满了,还有人进来时就不处理该人并抛出异常。* new ThreadPoolExecutor.CallerRunsPolicy()); //哪来的去哪里!* new ThreadPoolExecutor.DiscardPolicy()); //队列满了就踢了并且不抛出异常。* new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了不会抛出异常,尝试去和最早的竞争,也不会抛出异常!*/
public class Demo01 {public static void main(String[] args) {ExecutorService threadPoll =new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(), //一般不变new ThreadPoolExecutor.DiscardPolicy()); try {for(int i=0;i<10;i++){//使用了线程池后,使用线程池创建线程threadPoll.execute(()->{System.out.println(Thread.currentThread().getName()+" OK");});}} catch (Exception e) {throw new RuntimeException(e);}finally {//线程池用完,程序结束,关闭线程池threadPoll.shutdown();}}
}
可以看见只有8条输出,有两条被踢了。
第四策略
//Executors 工具类、三大方法。
public class Demo01 {public static void main(String[] args) {ExecutorService threadPoll =new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(), //一般不变new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了不会抛出异常,尝试去和最早的竞争,也不会抛出异常!try {for(int i=0;i<10;i++){//使用了线程池后,使用线程池创建线程threadPoll.execute(()->{System.out.println(Thread.currentThread().getName()+" OK");});}} catch (Exception e) {throw new RuntimeException(e);}finally {//线程池用完,程序结束,关闭线程池threadPoll.shutdown();}}
}
CPU密集型,IO密集型(扩展)
经常会被问,池的最大大小如何去设置。
CPU密集型: 12核的CPU最多12条线程同时执行,多少核就
IO密集型: 程序 15个大型任务 IO十分占用资源 ,一般设置为两倍30个线程。
/*** new ThreadPoolExecutor.AbortPolicy()); //该拒绝策略是银行满了,还有人进来时就不处理该人并抛出异常。* new ThreadPoolExecutor.CallerRunsPolicy()); //哪来的去哪里!* new ThreadPoolExecutor.DiscardPolicy()); //队列满了就踢了并且不抛出异常。* new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了不会抛出异常,尝试去和最早的竞争,也不会抛出异常!*/
public class Demo01 {public static void main(String[] args) {//自定义线程池! 工作 ThreadPoolExecutor//最大线程池如何定义//1、CPU 密集型,几核,就是几,可以保CPu的效率最高!//2、IO密集型// 程序 15个大型任务 IO十分占用资源//获取CPU核数System.out.println(Runtime.getRuntime().availableProcessors());ExecutorService threadPoll =new ThreadPoolExecutor(2,Runtime.getRuntime().availableProcessors(),3,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(), //一般不变new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了不会抛出异常,尝试去和最早的竞争,也不会抛出异常!try {for(int i=0;i<10;i++){//使用了线程池后,使用线程池创建线程threadPoll.execute(()->{System.out.println(Thread.currentThread().getName()+" OK");});}} catch (Exception e) {throw new RuntimeException(e);}finally {//线程池用完,程序结束,关闭线程池threadPoll.shutdown();}}
}