原子性: 单独的,不可分割的操作
- 不要使用过期状态值来决策当下的状态, 一定要先检查再执行(不检查, 将引发数据修改,丢失)
- 避免延迟初始化(懒加载: 先查看对象 == null, 然后new), 有可能查看对象状态的时候, 对象已经new出来, 只不过还没将对象赋给引用
- 避免复合操作, 例: count++; 非原子性操作, 包含读-改-写三个过程, 不加锁, 必然出错; 强行加锁, 使得整个复合操作变为原子性
- 使用atomic包下类(线程安全类), 读写操作都是加锁状态
- 修饰代码块:大括号括起来,作用于对象
- 修饰方法:方法名前使用, 作用于对象
在方法内部使用synchronized代码块,与修饰方法时效果是一致的 - 修饰静态方法:整个静态方法,作用于所有对象
- 修饰类:类名前添加,作用于所有对象
注:
1. 作用于对象时,不同调用对象之间不影响
2. 使用synchronized的父类(synchronized不属于方法声明的一部分),子类继承后需要对其方法重新添加synchronized修饰,不然不能使用同步 - 特点:
synchronized:不可中断锁,适用于竞争不激烈,可读性好
Lock:可中断锁,多样化同步,竞争激烈时能维持常态
Atomic:竞争激烈时能维持常态,比Lock性能好
锁
- 在线程中使用atomic包下类, 不一定是线程安全的, 例:
AtomicInteger i1 = new AtomicInteger(1);AtomicInteger i2 = new AtomicInteger(1);if(i1.incrementAndGet() > 1){//输出简写sout(i2.get());}
看似i1, i2各自是线程安全的, 但是他们合起来的操作就不再是原子性的, 多个线程操作必然出错.
- synchronized可以锁住对象引用, 也可锁对象本身, 从Thread, Runnable实现线程就可知
/*** @author regotto* 测试线程锁住公共资源*/public class Demo2 {public static void main(String[] args) {StringBuffer sb = new StringBuffer();new People("张三", sb).start();new People("李四", sb).start();}}class People extends Thread{private StringBuffer sb;public People(){}public People(String name, StringBuffer sb){this.setName(name);this.sb = sb;}@Overridepublic void run() {final int c = 10;/*这里的线程都是在extends Thread的基础上这里不能使用synchronized(this)(在函数名前加synchronized效果一样),由于前面新建两个线程,对应的this都是不同的,获取的不是同一把锁,当使用同一个Thread就可使用synchronized(this)当使用synchronized(sb)就是正确的,此时锁住的是sb,两个线程都拿着同一个sb,所以可以正确锁住,*/// synchronized (this){// for (int i = 0; i < c; i++) {// sb.append(this.getName()).append(" ");// System.out.println("我是:"+this.getName()+" sb:"+sb);// try {// Thread.sleep(200);// } catch (InterruptedException e) {// e.printStackTrace();// }// }// }synchronized(sb){for (int i = 0; i < c; i++) {sb.append(this.getName()).append(" ");System.out.println("我是:"+this.getName()+" sb:"+sb);try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}}}}
-
重入锁(ReentrancyLock): 当前线程可以多次获得同一把锁, 例如:lock();lock();此时线程获得2把锁(线程内部具有锁计数器)
-
long/double在JVM中会被拆分为两个32位读取, 多线程下, 将会导致读取到一个凭空而来的值(而不是过期或者正确的数值)
-
volatile不具有原子性, 他不能使count++具有原子性, 他只具有可见性, 线程中共享变量使用它
-
使用volatile的标准:
1. 确保只有单一的线程修改变量的值
2. 变量不需要与其他状态变量共同参与不变约束
3. 访问变量, 没有其他原因要加锁 -
ThreadLocal, 线程本地变量, JDBC中使用ThreadLocal存储Connection, 使得每个线程都能从连接池中获得自己的Connection
-
尝试将单线程中的内容迁移到多线程中, 尝试将共享变量转化到ThreadLocal中
-
不可变对象天生就就是线程安全的, 其状态永远都无改变
-
使用不可变对象, 保证操作的原子性, 在1中说到两个原子类的和操作并不一定是原子操作(他们各自的操作是原子操作), 这里我可以提供一个不可变容器, 例如:
final AtomicInteger i3;
final AtomicInteger i4;
//不可变容器
public init(AtomicInteger i1, AtomicInteger i2){
final AtomicInteger i3 = i1; final AtomicInteger i4 = i2;
}
//执行此操作的时候, 多线程情况下不会出现i3或i4为过期值,解决了原子操作
public AtomicInteger sum(){ return i3.get()+i4.get();}
注: 每次执行都开辟内存, 但是相比于synchronized, 这点性能损耗不算什么. -
使用线程安全的容器, 例如Hashtable, synchronizedMap, ConcurrentMap, Vector, BlockingQueue, ConcurrentLinkedQueue存储对象, 多线程下保证了从容器中取出的对象总是线程安全
-
不可变对象可以通过任意机制发布, 线程安全容器中的对象必须安全发布, 可变对象必须要安全发布,并且使用锁保护
-
共享对象线程安全的有效策略:
1. 线程限制, ThreadLocal处理
2. 共享只读, 使用不可变对象/使用线程安全容器存储该对象
3. 共享线程安全, 在对象内部同步, 使用公共接口进行访问
4. 被守护对象, 只能通过特定的锁进行访问 -
设计线程安全的类:
1. 确定对象状态由哪些变量组成
2. 限制状态的不变约束
3. 制定一个并发管理策略–使用12中策略 -
任意所对象都可以用来保护对象状态:
1. 使用私有对象作为锁对象, 而不是对象内部锁;
2. 线程使用的对象可以是克隆对象, 避免线程对真正的对象产生修改 -
解决1中的问题的时候, 由于两个变量是相互独立, 且之间存在一定的联系, 所以加锁的时候可以使用公共锁进行处理使用一个公共变量作为锁对象, 每当需要处理其中一个变量的时候, 都是通过公共锁防止其他线程修改另一变量, 导致两个变量之间的关系出现变化
注: 面对这样的问题的时候, 使用volatile已经没有用, volatile不能处理这样的复合操作, 使得整个操作具有原子性 -
状态变量是否线程安全取决于当前发布的状态(暴露给用户的状态,可以对公共接口进行加锁, 对当前变量设置final, 可以使用volatile, 可以使用线程安全的容器)
-
"先检查再运行"将导致严重的线程安全问题, 例如延迟加载; 在add操作的时候,对象还没添加到容器中, 另一线程检查将会发现也可以进行add, 这样将会有多次add, 导致同一个容器存在多份相同的对象
-
向一个类中添加一个原子操作, 最有效的选择是组合
同步容器:
- 同步容器都是线程安全的, 但是针对同步容器的复合操作不是原子性的, 需要对操作之间存在关系的那个对象/变量加锁, 使得复合操作是原子性的
例如: 多个线程对Vector执行remove, get将有可能引发数组越界的问题/访问元素不存在, 这时候需要重新对list加锁, 保证只有一个线程对公共资源进行操作 - 迭代器/forEach遍历需要加锁, 在遍历过程中, 另一线程很有可能remove某些元素, 导致遍历过程出现修改异常(ConcurrentModificationException)但是迭代过程中, 如果对容器加锁, 将会导致性能极大下降, 还有可能出现死锁, 饥饿风险, 采用的解决方案是复制容器, 这样即使容器被修改了, 但是复制品也不会被修改, 避免遍历过程出现的修改异常, 但是复制也将导致性能损耗;还有一个问题就是, 在程序中很有可能出现隐藏的迭代器, 这也将引起非常隐蔽的线程安全问题
例: for(){sout("AAAAAAB"+i); //这里的字符串连接隐含这是使用迭代器遍历,多线程操作很容易引起修改异常}
- ConcurrentHashMap使得不再是只有一个线程能同时访问容器; 内部使用分离锁, 使得可以多个线程执行读, 写操作且不影响性能; 由于并发操作, 使用size, isEmpty被弱化;
synchronizedMap却是为Map中每个方法同步操作而言, 并发情况下具有性能上的损耗 - CopyOnWriteArrayList避免了迭代期间对容器的加锁与复制, 底层存在的是一个不可变的基础数组引用, 对数组的修改总是用复制品去操作, 避免线程读的时候出现修改异常,但是修改频率较高的时候就会出现性能上的下降.
- 阻塞队列–生产-消费者模式, 有效避免了负载, 例如: 线程池, BlockingQueue维护的是一个线程队列, 他不维护队列元素储存空间
- 双端队列–窃取工作, 当自己队列中任务已经完成, 就窃取其他队列末尾的工作(避免竞争), 相比于传统消费者工作模式, 双端队列具有更强的伸缩性(生产者生产的内容很多, 窃取工作很容易加快消费者的消费行为, 使得每个线程都能高效运行)
- 阻塞可中断: 可阻塞方法使用interrupt提前中断, Runnable的interrupt必须进行捕获, extends Thread的可以抛给调用者
- Synchronizer: 一个对象, 根据本身状态调节线程控制流; 例如阻塞队列, Semaphore等
- 闭锁: 延迟线程的进度直到, 直到终点状态到达, 才允许所有线程往下执行,
例如:- 资源R已经被初始化, 但R执行的活动都在闭锁中等待
- 确保一个服务不会开始, 直到他所依赖的所有服务都已开始
- 等待所有活动都做好准备, 然后接着往下执行
- CountDownLatch:允许一个或多个线程等待一个事件集的发生, 内部使用countDown计数, 当由线程到达, 执行计数且进行await,当计数器达到某一状态, 就notifyAll
- FutureTask ==> 有返回值的Runnable, 使用Future.get获得返回值, 当线程还在运行中还没返回值, 此时get将会阻塞, 直到线程具有返回值, 阻塞到一定状态将会抛出异常
- Executor利用FutureTask完成异步操作
- ThreadPoolExecutor:
1. corePoolSize: 核心线程数, 运行的线程数小于corePoolSize时,直接创建新的线程,即使存在空闲线程
2. maximumPoolSize: 线程最大线程数
3. workQueue: 阻塞队列,存储等待执行的任务,会对线程池运行过程产生重大的影响
4. 只有当线程数大于maximumPoolSzie但workQueue还没满,则将线程放入workQueue中, 若workQueue满了, 则选择合适的策略执行后续操作
5. keepAliveTime: 当其没有任务执行,除核心线程,当超出指定时间,这些线程就销毁
6. unit: keepAliveTime的时间单位
7. threadFactory: 线程工程, 用于创建线程
8. rejectHandler: workQueue中的策略处理
具有如下策略: 1.抛出异常, 2.用调用者所在的线程执行任务(默认), 3.丢弃阻塞队列中最靠前的任务,执行当前任务,4.直接丢弃当前任务 - Executor异步执行任务, 不能正确关闭将会阻止JVM结束
- 线程池状态:
- running: 可以处理新提交的任务以及阻塞队列中的任务
- shutdown: 调用shutdown()进入,只能处理新提交的任务, 但是可以处理workQueue中的剩余任务, 阻塞队列为空, 线程池中线程数为0, 进入tidying
- stop:调用shutdownNow(),不能处理任何任务(任务队列中任务全部取消),线程池中线程数为0, 进入tidying
- tidying: 线程池处理后续任务
- 常用方法
- execute(): 提交任务, 交给线程池执行
- submit(): 提交任务, 能返回执行结果, execute+Future
- shutdown(): 关闭县此次, 等待任务执行完
- shutdownNow(): 关闭线程池, 不等待任务执行完
- getTaskCount(): 线程池已执行和未执行的任务总数
- getCompletedTaskCount(): 已完成的任务总数
- getPoolSize(): 线程池当前的线程数量
- getActiveCount(): 当前线程池中正在执行的任务数量
- newFixedThreadPool: 创建定长的线程池, 每当提交一个任务就创建一个线程, 直到达到最大长度(长度就不再变化)
- newCachedThreadPool: 创建的线程超出Max,则灵活收回空闲线程,需要增加时, 灵活添加, 对池的大小不做限制
- newSingleThreadExecutor: 创建单线程化executor, 当该线程异常结束, 会立即创建新线程取代, 从而保证任务队列中任务按照规定顺序执行(FIFO, LIFO, 优先级)
- newScheduledThreadPool: 创建定长线程池, 可定时周期性任务执行
- 线程池状态:
同步容器详解
- AQS
- 使用Node实现FIFO队列, 用于构建锁或其他同步装置的基础框架
- 利用int类型表示状态, 表示当前锁是哪种类型(轻量级, 重量级, 重入锁, 偏向锁…)
- 使用方法是继承,内部使用模板方法
- 子类通过继承并通过使用实现它的方法管理其状态(acquire, release方法操作状态)
- 可以同时实现排它锁和共享锁模式(独占, 共享)
- AQS同步组件:
- CountDownLatch: 通过计数判断线程是否需要阻塞
- Semaphore: 控制同一时间的并发数目
- CyclicBarrier: 与CountDownLatch相似
- ReentrantLock
- Condition
- FutureTask
- CountDownLatch:
线程调用await将会阻塞, 其他线程调用countDown()会使计数器减1, 当计数器值为0时, 因调用await()而等待的线程将由await变为唤醒
此种情况只会出现一次, 计数器不能被重置, 业务上需要重置, 就使用CyclicBarrier
例:public static void main(String[] args){Executor exec = new Executors.newCachedThreadPool();final CountDownLatch cdl = new CountDownLatch(200);for(int i = 0; i < 200; i++){final int threadNum = i;exec.execute(()->{try{test(threadNum);}catch(Exception e){e.printStack();}finally{cdl.countDown();//一个线程执行后计数器减1}});}cdl.await();//只有当计数器为0才唤醒所有线程, 否则线程执行此行代码就线程阻塞sysout("finish");exec.shutdown();//等线程池中线程执行完,就关闭线程池}public void static test(int t){Thread.sleep();sysout(t);}
运用场景: 在指定时间内完成指定任务, cdl.await(10, TimeUtit.MILLISECONDS)//10代表大小, 后面代表单位,10毫秒,等待的时间是指从线程开始执行到await的时间.
- Semaphore: 提供对有限资源的访问次数, 类似于生产消费者模式
例:public static void main(String[] args){Executor exec = new Executors.newCachedThreadPool();final Semaphore sp = new Semaphore(3);//执行一次放出3个许可for(int i = 0; i < 200; i++){final int threadNum = i;exec.execute(()->{try{sp.acquire();//获取许可,默认获取一个,执行一次放出3个许可, 此处一次获取1个即表示一次可以同时执行3个线程//如果写成sp.acquire(3)表示一次需要获取3个许可,而构造只给出3个, 因此类似单线程执行, 每次等待3个线程,然后3个都执行完才进行releasetest(threadNum);sp.release();//释放许可,与获取许可一直, 可以单个逐一释放, 也可以一次性释放多个}catch(Exception e){e.printStack();}});}exec.shutdown();//等线程池中线程执行完,就关闭线程池}public void static test(int t){Thread.sleep();sysout(t);}///if(sp.tryAcquire()){test(threadNum);sp.release();}//表示尝试获取许可,如上述代码, 尝试获取3个许可, 当时这三个线程还未执行完, 其他线程无法获取许可, 就不能执行if中的内容, 相当于丢弃其他线程, 可传入参数设定尝试许可数以及等待时间
注: Semaphore并没有真正向线程分配许可, 一个线程得到的许可可能是由另外的一个线程释放, 使用Semaphore实现资源池, 有资源的时候acquire, 资源使用结束后release放回资源池, 一个创建资源池最简单的方式就是使用BlockingQueue
- CyclicBarrier:一组线程同时等待, 只有当所有的线程都到达屏障点, 所有线程才能继续往下执行后续代码
例:private static CyclicBarrier cb = new CyclicBarrier(5);//一次5个线程为一组, 5个执行完再执行后续线程//在声明CyclicBarrier的时候可以指定Runnable//表示线程到达await的时候优先执行Runnable中的内容//例: new CyclicBarrier(5, ()->{sysout("----------");//每一组线程都到await时,先执行Runnable,然后再执行后续代码});public static void main(String[] args){Executor exec = new Executors.newCachedThreadPool();for(int i = 0; i < 200; i++){final int threadNum = i;Thread.sleep(1000);exec.execute(()->{try{test(threadNum);}catch(Exception e){e.printStack();}});}exec.shutdown();//等线程池中线程执行完,就关闭线程池}public void static test(int t) throws Exception{Thread.sleep(1000);try{barrier.await(2000, TimeUnit.MILLISECONDS);//在规定时间等待线程, 超出2秒后就抛出异常} catch(BrokenBarrierException | TimeoutException e) {sysout(e);//不捕获异常就会导致后续代码不执行}sysout(t);}
当线程到达屏障, 调用await, 直到所有的线程都到达, 进行notifyAll
注: 可用做并行迭代算法, 问题分解
- ReentrantLock(可重入锁):属于自旋锁, 死循环调用CAS实现锁, 而非等待数据进入内核态加锁
- ReentrantLock与synchronized的区别:
可重入性,二者区别不大,使用所计数器,计算当前锁的数目
锁的实现, ReentrantLock依赖于API, synchronized依赖于JVM
性能, 当synchronized引入轻量级,偏向,重量级锁后, 二者性能差不多 - ReentrantLock独有功能
可指定公平锁还是非公平锁
提供一个Condition类, 可以分组唤醒需要唤醒的线程, 而synchronized的单个唤醒是随机的
提供中断等待锁的线程机制, lock.lockInterruptibly()实现
- ReentrantLock与synchronized的区别:
例:private final static Lock lock = new ReentrantLock();public static void main(String[] args){ExecutorService es = new Executors.newCachedThreadPool();final CountDownLatch cd = new CountDownLatch(2000);final Semaphore sp = new Semaphore(200);for(int i = 0 ;i < 2000; i++){es.execute(()->{try{sp.acquire();add();sp.release();}catch(Exception e){sysout(e);}});}cd.await();es.shutdown();sysout("finish"+count);//看执行结果是否为2000}private static void add(){lock.lock();try{count++;}finally{lock.unclock();}}
- ReentrantReadWriteLock://使用场景不多
内部两个核心成员: ReentrantReadWriteLock.ReadLock readLock; ReentrantReadWriteLock.WriteLock writeLock;
在读写未被打扰的情况下才能执行读写锁的操作
例:ReentrantReadWriteLock lock = new ReentrantReadWriteLock();Lock readLock = lock.readLock();Lock writeLock = lock.writeLock();针对读写操作分别使用readLock和writeLock加锁实现的悲观操作, 当获取writeLock的时候不允许有读操作, 当读的操作频率大于写的操作时就会造成线程饥饿(写操作总会因为读操作而中断)
-
StampedLock://在读操作较多的场景下具有优势
使用悲观锁(强行认为读的时候不能有写操作)和乐观锁(认为读写发生的冲突很小)操作
先使用乐观策略, 当出现错误的时候转为悲观锁
StampedLock sl = new StampedLock();
long stm = sl.writeLock();//加锁会有long型返回值
sl.unclock(stm);//释放锁传入stm参数 -
Condition://用的很少
ReentrantLock rl = new ReentrantLock();
Condition c = rl.newCondition();
在run()中写如下
rl.clock();//线程加锁
c.awatir();//当前线程从AQS队列中移除, 相当于锁的释放, 将该线程加入到AQS中的单向队列中, 等待信号
c.signalAll()//唤醒所有的等待线程, 也可以单个唤醒
注: 一定要记住有unclock的存在 -
FutureTask//很重要, 融合了Thread, Runnable, Future, Callable
Callable(有返回值,可抛出异常), Runnable
Future(监视目标线程调用call()情况, 获取返回值, 若call未返回值, 则Future线程阻塞)
FutureTask实现Runnable, Future, 可以操作Runnable对象也可以获得线程返回值
例:FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>(){@Overridepublic String call() throws Exception {Thread.sleep(5000);return "Done";}});new Thread(futureTask).start();Thread.sleep(1000);String result = futureTask.get();//获取返回值FutureTask的构造方法:FutureTask(Runnable r, V v);v表示返回值类型, 此时的Runnable也可具有返回值; 是将Runnable转化为Callable接口, 然后再调用call方法
-
Fork/Join
将一个大的任务拆分多个小任务, 每个小任务放置到不同的双端队列中, 每个线程执行自己队列里面的值, 当自己队列中任务执行完就去其他队列中偷取任务执行, 每次任务的偷取是从队列的首开始, 原本执行该任务的线程取任务是从队列的末端开始
局限性:
任务只能使用Fork/Join的同步操作, 不能使用其他的同步操作
任务不是抛出检查异常, 不能执行IO操作 -
BlockingQueue
对满的队列进行put(o),将阻塞; add(o),将抛出异常; offer(o)//将返回false,反之true; offer(o, timeOut, timeunit)//操作在指定时间未执行, 就返回false
对空的队列进行take(),将阻塞; remove(o)//抛出异常; poll()//返回false,同上; poll(timeout,timenuit)//同上 -
生产-消费者模式
ArrayBlockingQueue//容量有限, 初始化后不能改变, 先进先出
DelayQueue//内部有序,按照元素过期时间排序
LinkedBlockingQueue
PriorityBlockingQueue//允许插入null, 插入对象按照Comparable接口的方式排序, 使用迭代器获取元素的时候并不一定按照优先级获取
SynchronousQueue//内部仅容纳一个元素
任务执行
- 在线程中执行任务:
为任务创建线程, 例:socket,如果只是用单线程处理每个用户的请求,那么性能将会是糟糕的.final ServerSocket socket = new ServerSocket(8080);while(true){Socket connection = socket.accept();new Thread(()->{execute(connection);}).start;}
- 根据Socket得出如下结论:
- 让执行任务脱离主线程,防止因执行任务出现异常导致主线程error; 让主线程在完成前面的请求之前接收新的请求, 提高响应速度
- 并行处理任务, 使得多个请求可以同时得到服务
- 执行的任务必须保证线程安全
- Executor 线程池(生产者-消费者模式)
//创建100大小的线程池
Executor exec = Executor.newFixedThreadPool(100) - 创建周期性,延迟性任务
Timer存在缺陷, 他只采用唯一线程执行,导致多个任务执行的时候影响其他任务的准确性并且他还没有处理检查异常,线程执行的过程中将出现无法预料的错误, 应考虑ScheduleThreadPoolExecutor - 寻找可强化的并行性
1. 顺序执行页面渲染, 将导致渲染图片是cpu资源的浪费,如果将问题分散到独立并发执行中, 将会获得更好的cpu性能: 使用一个线程渲染文本, 一个线程渲染图片, 通过Future.get判断当前线程执行的程度
2. 可携带结果的任务: Callable, Future, Executor使用Runnable, 但是Runnable具有局限性(不能返回值或抛出检查异常), 可以使用PrivilegedAction将任务类型封装成Callable
3. 并行的局限性, 分配任务的时候, 任务协调上的开销, 不能多于并行性带给生产力的提高; 例如:1中页面渲染,若渲染文字的速度远远大于渲染图片,那么使用多线程将会导致程序复制到增加,代码更加冗余繁杂;
注: 大量相互独立且同类的任务进行并发处理, 将不同的任务量分配到不同的任务中才能获得性能上的提升
4. CompletionService: 执行批量操作的线程池, 当使用Executor执行批量操作的时候带来的问题就是:每获得一个结果都需要使用Future.get(),此过程还会出现error,阻塞等问题;
5. CompletionService执行批量操作(一般使用子类ExecutorCompletionService, 它在构造函数中创建BlockingQueue储存所有任务),将所有任务结果封装到一个QueueingFuture(FutureTask子类)中, 使用队列操作从中获得线程返回的结果;
多个ExecutorCompletionService可以共享单一的Executor
6. 为任务设置时限, 避免销毁过多的资源, 使用Future.get(get在计算时间的时候是使用当前时间减去预测时间, 任务最开始执行的时候得到的结果可能是负数, 在concurrent包下,所有时间负数都按0处理,所以不需要担心),当抛出TimeoutException,就可以直接Future.cancel,取消当前任务
try{//在timeLeft的时间内等待,以NANOSECONDS为单位x = f.get(timeLeft, NANOSECONDS);}catch(Exception e){x = default;//超出时间,抛异常,取消当前任务f.cancel(true);}