原子操作类
-
基本类型原子类:AtomicInteger、AtomicBoolean、AtomicLong,常见API:
- get 获取当前值
- getAndSet 获取当前的值,并设置新的值
- getAndIncrement 获取当前的值,并自增
- getAndDecrement 获取当前的值,并自减
- getAndAdd 获取当前的值,并加上预期的值
- compareAndSet 如果输入的值等于预期值,则以原子方式将值更新
-
数组类型原子类:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray,用法和基本类型原子类类似
AtomicIntegerArray atomicIntegerArray =new AtomicIntegerArray(new int[]{1,2,3,4});
-
引用类型原子类
- AtomicReference :原子引用
- AtomicStampedReference :携带版本号的引用类型原子类,可以解决ABA问题,记录了引用被修改过多少次
- AtomicMarkableReference:带有标记位的引用类型原子类,通过一个布尔值的状态戳,用来判断对象是否被修改过,解决一次性修改问题
-
对象的属性修改原子类
-
原子操作增强类
CountDownLatch使用场景
-
线程计数器 用于线程执行任务,计数 等待线程结束
-
例如:
public static void test() throws Exception{AtomicInteger ato = new AtomicInteger(0);Integer size = 100;//100个线程用于计算CountDownLatch countDownLatch = new CountDownLatch(100);for (int a = 0; a < size; a++) {new Thread(()->{try {Thread.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}for (int b =0;b <100;b++){//自增ato.getAndIncrement();}//计算完了就减1countDownLatch.countDown();}).start();}//只有 countDownLatch 中的数量为0 才能继续执行,避免 Thread.sleep()的操作,计算完成能够即时执行countDownLatch.await();System.out.println("最终计算结果:"+ato.get());}
对象的属性修改原子类
对象的属性修改原子类,可以用线程安全的方式操作非线程安全对象内的某些字段,例如:某个对象有多个属性,但是只有少量字段需要频繁更新,加锁虽然可以保证线程安全,但是有可能会锁住整个对象,所以可以用原子更新代替加锁
-
AtomicIntegerFieldUpdater,基于反射,原子更新对象中,被volatile修饰的 int类型字段的值
-
AtomicLongFieldUpdater基于反射,原子更新对象中,被volatile修饰的 long类型字段的值
-
AtomicReferenceFieldUpdater,基于反射,原子更新引用类型,被volatile修饰的字段的值
-
使用原子更新的要:
- 更新的对象属性必须是 public volatile 修饰的
- 而且因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,通过更新器去更新属性
-
示例:如下出了金额以外的字段是不会经常更新的,针对金额字段加锁,肯定是可以保证线程安全的,但是很重量级,不加锁的话,结果会不确定,线程不安全
@Data @AllArgsConstructor @NoArgsConstructor public class AtomicEntity {//账号private String account;//联系方式private String phone;//姓名private String name;//金额private volatile Integer money;public synchronized void addMoney(Integer add) {money = money + add;}public synchronized void subMoney(Integer sub) {money = money - sub;} }private static void test3() throws Exception {AtomicEntity atomic = new AtomicEntity("123456789", "123456", "张三", 0);CountDownLatch countDownLatch = new CountDownLatch(200);for (int a = 0; a < 100; a++) {new Thread(() -> {for (int b = 0; b < 100; b++) {atomic.addMoney(1);}countDownLatch.countDown();}, "线程" + a).start();}for (int a = 0; a < 100; a++) {new Thread(() -> {for (int b = 0; b < 100; b++) {atomic.subMoney(1);}countDownLatch.countDown();}, "线程" + a).start();}countDownLatch.await();//这里执行结果肯定是正确的,如果不加锁会导致线程错乱System.out.println("最终结果" + atomic.getMoney());}
如果不想加锁,使用对象的属性修改原子类,AtomicIntegerFieldUpdater示例
@Data @AllArgsConstructor @NoArgsConstructor public class AtomicEntity {//账号private String account;//联系方式private String phone;//姓名private String name;//金额 ,修饰符必须是 public volatile public volatile int money;public AtomicEntity(String account, String phone, String name, Integer money) {this.account = account;this.phone = phone;this.name = name;this.money = money;}AtomicIntegerFieldUpdater<AtomicEntity> ato =AtomicIntegerFieldUpdater.newUpdater(AtomicEntity.class, "money");public void atoAddMoney(AtomicEntity atomic, Integer add) {ato.getAndAdd(atomic, add);}public void atoSubMoney(AtomicEntity atomic, Integer sub) {ato.getAndAdd(atomic, -sub);} }private static void test4() throws Exception {AtomicEntity atomic = new AtomicEntity("123456789", "123456", "李四", 0);CountDownLatch countDownLatch = new CountDownLatch(200);for (int a = 0; a < 100; a++) {new Thread(() -> {for (int b = 0; b < 100; b++) {atomic.atoAddMoney(atomic,1);}countDownLatch.countDown();}, "线程" + a).start();}for (int a = 0; a < 100; a++) {new Thread(() -> {for (int b = 0; b < 100; b++) {atomic.atoSubMoney(atomic,1);}countDownLatch.countDown();}, "线程" + a).start();}countDownLatch.await();//不加锁也可以保证线程安全System.out.println("最终结果" + atomic.getMoney());}
原子操作增强类
- 从 jdk 8 才引入了,原子操作增强类,如果是i++操作,推荐使用 LongAdder ,能比 AtomicLong 有更好的性能,因为LongAdder 可以减少乐观锁的重试次数
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
- 上面四个增强类继承于 Striped64 ,Striped64 继承了 Number,加上上面的16个类,统称为18罗汉
LongAdder
- 当多个线程更新用于收集统计信息,但不用于细粒度同步控制的目的公共和时,通常优于AtomicLong
- 在低并发下LongAdder和AtomicLong差距不大,高并发下明显优于AtomicLong,代价是空间消耗也大
- 常见api
- add (long x) 将当前值加x
- increment() 将当前值加1
- decrement() 将当前值减1
- sum() 返回当前值,如果是并发更新,sum返回的值不精确
- reset() 将value重置为0,这个方法只能在没有并发更新时使用
- sumThenReset() 获取当前值,并将value重置为0
- 只能计算加法,且必须从0开始计算
LongAccumulator
-
能够自定义计算规则
@FunctionalInterface public interface LongBinaryOperator {//left是初始值long applyAsLong(long left, long); }private static void test1() {//实现函数式接口 LongBinaryOperator ,自定义计算规则,第二个参数是初始值LongAccumulator longAccumulator = new LongAccumulator((x, y) -> {return x * y;}, 1);//这里的参数 y:5,和原本的值1,通过自定义的计算规则,得到两者的计算结果longAccumulator.accumulate(5);longAccumulator.accumulate(5);//结果为25System.out.println(longAccumulator.longValue());}
计数器案例
原子操作和同步方法的性能比较
/*** 计数器*/
@NoArgsConstructor
@AllArgsConstructor
@Data
public class LikeCounter {public volatile long num;public LikeCounter(long num) {this.num = num;}/*** 第一种同步方法*/public synchronized void add() {num++;}/*** atomicLong原子操作*/AtomicLong atomicLong = new AtomicLong(num);public void addAto() {atomicLong.getAndIncrement();}public long getAto() {return atomicLong.get();}/*** 对象的属性修改原子类 atomicLongFieldUpdater*/AtomicLongFieldUpdater<LikeCounter> atomicLongFieldUpdater = AtomicLongFieldUpdater.newUpdater(LikeCounter.class, "num");public void addAtoField(LikeCounter likeCounter) {atomicLongFieldUpdater.getAndIncrement(likeCounter);}/*** 原子扩展类 longAdder*/LongAdder longAdder = new LongAdder();public void addLongAdder() {longAdder.increment();}public long getLongAdder() {return longAdder.longValue();}/*** 原子扩展类 LongAccumulator*/LongAccumulator longAccumulator =new LongAccumulator((x,y)->{return x+y;},num);public void addlongAccumulatorr() {longAccumulator.accumulate(1);}public long getlongAccumulator() {return longAccumulator.get();}}
public static void main(String[] args) {try {test1();test2(); test3();test4();test5();} catch (Exception e) {throw new RuntimeException(e);}}private static void test1() throws Exception {long l1 = System.currentTimeMillis();LikeCounter like1 = new LikeCounter(0);CountDownLatch countDownLatch = new CountDownLatch(10000);for (int a = 0; a < 10000; a++) {new Thread(() -> {for (int b = 0; b < 10000; b++) {like1.add();}countDownLatch.countDown();}).start();}countDownLatch.await();System.out.println("调用synchronized方法,执行结果为:"+like1.getNum()+",耗时:"+(System.currentTimeMillis()-l1));}private static void test2() throws Exception {long l1 = System.currentTimeMillis();LikeCounter like1 = new LikeCounter(0);CountDownLatch countDownLatch = new CountDownLatch(10000);for (int a = 0; a < 10000; a++) {new Thread(() -> {for (int b = 0; b < 10000; b++) {like1.addAto();}countDownLatch.countDown();}).start();}countDownLatch.await();System.out.println("调用atomicLong方法,执行结果为:"+like1.getAto()+",耗时:"+(System.currentTimeMillis()-l1));}private static void test3() throws Exception {long l1 = System.currentTimeMillis();LikeCounter like1 = new LikeCounter(0);CountDownLatch countDownLatch = new CountDownLatch(10000);for (int a = 0; a < 10000; a++) {new Thread(() -> {for (int b = 0; b < 10000; b++) {like1.addAtoField(like1);}countDownLatch.countDown();}).start();}countDownLatch.await();System.out.println("调用atomicLongFieldUpdater方法,执行结果为:"+like1.getNum()+",耗时:"+(System.currentTimeMillis()-l1));}private static void test4() throws Exception {long l1 = System.currentTimeMillis();LikeCounter like1 = new LikeCounter(0);CountDownLatch countDownLatch = new CountDownLatch(10000);for (int a = 0; a < 10000; a++) {new Thread(() -> {for (int b = 0; b < 10000; b++) {like1.addLongAdder();}countDownLatch.countDown();}).start();}countDownLatch.await();System.out.println("调用longAdder方法,执行结果为:"+like1.getLongAdder()+",耗时:"+(System.currentTimeMillis()-l1));}private static void test5() throws Exception {long l1 = System.currentTimeMillis();LikeCounter like1 = new LikeCounter(0);CountDownLatch countDownLatch = new CountDownLatch(10000);for (int a = 0; a < 10000; a++) {new Thread(() -> {for (int b = 0; b < 10000; b++) {like1.addlongAccumulatorr();}countDownLatch.countDown();}).start();}countDownLatch.await();System.out.println("调用longAccumulator方法,执行结果为:"+like1.getlongAccumulator()+",耗时:"+(System.currentTimeMillis()-l1));}
调用结果如下:
- synchronized方法最慢,
- longAdder 和 longAccumulato 两者最快,性能基本相当
调用synchronized方法,执行结果为:100000000,耗时:4621
调用atomicLong方法,执行结果为:100000000,耗时:1522
调用atomicLongFieldUpdater方法,执行结果为:100000000,耗时:2611
调用longAdder方法,执行结果为:100000000,耗时:368
调用longAccumulator方法,执行结果为:100000000,耗时:333
longAdder
Striped64 内部结构
-
内部变量 base :并发低的时候,直接累加到base上
-
另外还有一个内部类cell ,和 cell[],并发高的时候,会把线程分散到自己的槽 cell[i] 中
longAdder 为什么快
- AtomicLong的底层是cas,即使并发很大,一次也只能有一个线程完成cas操作,剩下的线程只能自旋等待,就会导致大量的cpu空转
- longAdder的同样是cas
- 但是采用了分散热点的思想,将value值分散到一个cell 数组中,不同的线程会命中数组中不同的槽,每个线程对自己槽中的值进行cas操作,这样就分摊了压力,减少了线程冲突的概率,也就减少了线程自旋等待的时间,
- 如果要获取真正的value值时,sum方法会将所有的cell数组中的value值和base累加作为返回值
- 所以如果并发不大,longAdder和AtomicLong差别不大,都是对一个base进行操作
- 但是高并发下longAdder会采用空间换时间的做法,使用一个cell数组拆分value值,多个线程需要同时对value值进行操作的时候,先通过线程 id 的 hash 值映射到数组对应位置,再对该位置的值进行操作,当所有线程都执行完毕,base的值加上cell 数组中的值就是最终结果
longAdder 和 AtomicLong对比
- AtomicLong是多个线程对单个热点值进行原子更新,是线程安全的,会损失一些性能,再高精度要求时,可以使用
- longAdder 再高并发下,有较好的性能,对值精确度要求不高时,可以使用,每个线程都有自己的槽,各个线程一般只对自己槽中的值进行cas操作
add 方法
-
方法入参是要增加的值
-
刚开始 cell[] 等于null,尝试用cas操作更新base值,cas执行成功,把base值改为了期望值,本次add就结束了
-
随着并发的升高,cas操作失败,就需要执行 longAccumulate方法,去初始化cell数组分散压力
-
一旦 cell[] 数组初始化完成,就需要判断当前线程所在的 cell 是否为null
-
为 null,执行 Striped64 的 longAccumulate方法,来初始化对应位置的cell
-
不为null,就执行对应 cell 的 cas操作,
- 执行成功就没有冲突,结束本次add操作
- 执行失败,表示本次操作有冲突,需要执行 Striped64 的 longAccumulate方法来扩容 cell []
-
public void add(long x) {//b获取的base值,v表示期望值 ,m为数组长度,a表示当前线程命中的数组单元格Cell[] as; long b, v; int m; Cell a;//刚开始cells等于null,执行casBase,结果取反if ((as = cells) != null || !casBase(b = base, b + x)) {//这个boolean值代表cell数组有没有冲突boolean uncontended = true;//判断数组有没有初始化完成if (as == null || (m = as.length - 1) < 0 ||//判断线程有没有冲突,等于null说明没有冲突,getProbe()计算hash值(a = as[getProbe() & m]) == null ||//如果线程映射的对应位置不为null,就执行对应 cell 的 cas操作,执行成功返回true,取反得到false表没有冲突,结束本次add操作!(uncontended = a.cas(v = a.value, v + x)))longAccumulate(x, null, uncontended);}}
longAccumulate
-
方法属于 Striped64,方法入参:
- x 是需要增加的值,
- longAdder 中 fn 默认是null, LongAccumulator会传递计算规则进来
- wasUncontended 是竞争标志,只有cell[]初始化完成,且cas竞争失败从才会是false
-
getProbe()方法,获取线程的hash值,如果返回0,会重新计算一个hash值,重新计算后,认为线程本次操作没有竞争关系,把竞争标志改为 true ,也就是不存在冲突
-
advanceProbe(h),重置当前线程的hash值,让线程再次参与竞争
-
longAccumulate 方法分为三大部分
-
数组没有初始化
- 如果数组没有初始化,就需要加锁去初始cell数组
- 这里没有使用synchronized加锁,而是使用内部变量cellsBusy,0表示无锁状态,1表示被其他线程持有了锁
- 如果是无锁状态,就会使用cas操作把该值更新为1,更新成功代表加锁成功,去完成数组的初始化,cell数组,默认长度为2,同时初始化当前线程hash对应数组位置的cell对象,数组初始化完成后释放锁
- 如果数组没有初始化,就需要加锁去初始cell数组
-
数组正在初始化,也计是其他未拿到锁的线程,作为兜底方案,会再这个分支把值直接累加到base上
-
数组初始化完成
-
如果数组初始化完成,但是线程对应位置的cell对象为null,就需要初始化cell对象
- 初始化cell对象,同样依靠内部变量cellsBusy,0表示无锁状态,1表示被其他线程持有了锁,值为0就使用cas把值更新为1,代表加锁
- cell对象初始化完成后,释放锁
-
如果线程竞争标志为false 存在冲突,就把竞争标识改为 true,然后重置当前线程的hash值,重新计算线程的槽位,让线程重新循环参与竞争
-
如果通过cas操作重新竞争成功,就跳出循环
-
如果数组的长度 n大于当前cpu的核数,就不可扩容,然后重置当前线程的hash值,让线程重新循环参与竞争
-
如果cell[] 需要扩容,同样需要拿到cas锁,新数组的长度是原数组的两倍,把原本的cell拷贝到新数组,数组的引用指向新数组后,释放锁
-
-
//x 是需要增加的值,fn 默认是null, wasUncontended 是竞争标志,只有cell[]初始化完成,且cas竞争失败从才会是falsefinal void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // force initializationh = getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {Cell[] as; Cell a; int n; long v;//数组初始化完成if ((as = cells) != null && (n = as.length) > 0) {if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // Try to attach new CellCell r = new Cell(x); // Optimistically createif (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try { // Recheck under lockCell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}//重制竞争标志else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehash//重新竞争cas操作else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;else if (n >= NCPU || cells != as)collide = false; // At max size or staleelse if (!collide)collide = true;else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == as) { // Expand table unless staleCell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}//重置hash值h = advanceProbe(h);}//数组没有初始化else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try { // Initialize tableif (cells == as) {//如果cell为null,初始化cell数组,默认长度为2Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}//数组正在初始化,作为一种兜底,把值直接累加到base上else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break; // Fall back on using base}}
sum方法
- 将所有cell数组中的value值和base累加作为返回值
- 但是再sum执行时,并没有限制对base和cell的更新,所以longAdder 不是强一致性的,而是保证最终一致性
public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}