共享模型之无锁
问题提出
package com.zjy.unlock;import java.util.ArrayList;
import java.util.List;public class AccountDemo {public static void main(String[] args) {Account account = new AccountUnsafe(10000);Account.demo(account);}
}class AccountUnsafe implements Account{private Integer balance;public AccountUnsafe(Integer balance) {this.balance = balance;}@Overridepublic Integer getBalance() {return balance;}@Overridepublic void withdraw(Integer amount) {balance -= amount;}
}interface Account{//获取余额Integer getBalance();//取款void withdraw(Integer amount);/*** 方法内会启动1000个线程,每个线程-10的操作* 如果初始余额为10000那么正确的结果应该为当是0*/static void demo(Account account){List<Thread> ts = new ArrayList<>();for (int i = 0; i < 1000; i++) {ts.add(new Thread(()->{account.withdraw(10);}));}long start = System.nanoTime();ts.forEach(Thread::start);ts.forEach(t->{try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(account.getBalance()+"元"+(end-start)/1000_000+"ms");}
}
因为临界区的共享变量没有被锁保护所以结果并不能达到我们想要的
有锁解决方法
package com.zjy.unlock;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;public class AccountDemo {public static void main(String[] args) {Account account = new AccountCas(10000);Account.demo(account);}
}class AccountCas implements Account{private AtomicInteger balance;public AccountCas(int balance) {this.balance = new AtomicInteger(balance);}@Overridepublic Integer getBalance() {return balance.get();}@Overridepublic void withdraw(Integer amount) {while (true){//获取余额的最新值int pre = balance.get();//要修改的余额int next = pre - amount;//真正修改if(balance.compareAndSet(pre,next)){break;}}}
}class AccountUnsafe implements Account{private Integer balance;public AccountUnsafe(Integer balance) {this.balance = balance;}@Overridepublic Integer getBalance() {synchronized (this){return balance;}}@Overridepublic void withdraw(Integer amount) {synchronized (this){balance -= amount;}}
}interface Account{//获取余额Integer getBalance();//取款void withdraw(Integer amount);/*** 方法内会启动1000个线程,每个线程-10的操作* 如果初始余额为10000那么正确的结果应该为当是0*/static void demo(Account account){List<Thread> ts = new ArrayList<>();for (int i = 0; i < 1000; i++) {ts.add(new Thread(()->{account.withdraw(10);}));}long start = System.nanoTime();ts.forEach(Thread::start);ts.forEach(t->{try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(account.getBalance()+"元"+(end-start)/1000_000+"ms");}
无锁的解决方法
package com.zjy.unlock;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;public class AccountDemo {public static void main(String[] args) {Account account = new AccountCas(10000);Account.demo(account);}
}class AccountCas implements Account{private AtomicInteger balance;public AccountCas(int balance) {this.balance = new AtomicInteger(balance);}@Overridepublic Integer getBalance() {return balance.get();}@Overridepublic void withdraw(Integer amount) {while (true){//获取余额的最新值int pre = balance.get();//要修改的余额int next = pre - amount;//真正修改if(balance.compareAndSet(pre,next)){break;}}}
}interface Account{//获取余额Integer getBalance();//取款void withdraw(Integer amount);/*** 方法内会启动1000个线程,每个线程-10的操作* 如果初始余额为10000那么正确的结果应该为当是0*/static void demo(Account account){List<Thread> ts = new ArrayList<>();for (int i = 0; i < 1000; i++) {ts.add(new Thread(()->{account.withdraw(10);}));}long start = System.nanoTime();ts.forEach(Thread::start);ts.forEach(t->{try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(account.getBalance()+"元"+(end-start)/1000_000+"ms");}
}
CAS与volatile
慢动作分析
前面看到的AtomicInteger的解决方法,内部并没有用锁来保护共享变量的线程安全。它是如何实现的呢?
public void withdraw(Integer amount){while(true){int pre = balance.get();int next = pre - amount;//比较并设置if(balance.compareAndSet(pre,next)){break;}}
}
其中的关键是compareAndSet,它的简称就是CAS(也有Compare And Swap 的说法),它必须是原子操作。
注意
其实CAS的底层是lock cmpxchg 指令(X86框架),在单核CPU和多核CPU下都能够保证比较-交换的原子性
· 在多核状态下,某个核执行到带lock的指令时,CPU会让总线锁住,当这个核把此指令执行完毕,再开启总线程。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
volatile
获取共享变量时,为了保证该变量的可见性,需要使用volatile修饰。
CAS必须借助volatile才能读取到共享变量的最新值来实现比较并交换的效果
为什么无锁效率高
· 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而synchronized会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。
· 线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦上下文切换,就好比赛车要减速、熄火,等待唤醒又得重新打火、启动、加速…恢复到高速运行,代价比较大。
· 但无锁状态下,因为线程要保持运行,需要额外CPU的支持,CPU在这里就好比高速赛道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但是没有分到时间片,让然会进入可运行状态,还是会导致上下文切换。(所以CAS的情况最好是多核CPU,并且CPU核数超过线程数)
CAS特点
结合CAS和volatile可以实现无锁并发,适用于线程数少、多核CPU的场景下。
· CAS是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算修改了也没关系,再重试。
· synchronized是基于悲观锁的思想: 最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别修改,我改完了解锁,你们才有机会。
· CAS体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
· 因为没有使用synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
· 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
原子整数
JUC并发包提供了:
· AtomicBoolean
· AtomicInteger
· AtomicLong
以AtomicInteger为例
AtomicInteger i = new AtomicInteger(0);
//自增并获取值
System.out.println(i.incrementAndGet());//++i
//获取并自增
System.out.println(i.getAndIncrement());//i++
//自减并获取
System.out.println(i.decrementAndGet());//--i
//增加n并获取值
System.out.println(i.addAndGet(5));// i += n
//修改并获取值 箭头前是读取到的值 箭头后是将来要设置的值
i.updateAndGet(value -> value * 10);
//获取最新值
System.out.println(i.get());
updateAndGet的实现
public static void main(String[] args) {AtomicInteger i = new AtomicInteger(5);System.out.println(updateAndGet(i,p -> p * 10));
}public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator){while (true){int pre = i.get();int next = operator.applyAsInt(pre);if(i.compareAndSet(pre,next)){return next;}}
}
原子引用
原子引用类
· AtomicReference
· AtomicMarkableReference
· AtomicStampedReference
AtomicReference的使用
public class AccountDemo2 {public static void main(String[] args) {DecimalAccount account = new DecimalAccountCas(new BigDecimal(10000));DecimalAccount.demo(account);}
}class DecimalAccountCas implements DecimalAccount{private AtomicReference<BigDecimal> balance;public DecimalAccountCas(BigDecimal balance) {this.balance = new AtomicReference<>(balance);}@Overridepublic BigDecimal getBalance() {return balance.get();}@Overridepublic void withdraw(BigDecimal amount) {balance.updateAndGet(t -> t.subtract(amount));}
}interface DecimalAccount{//获取余额BigDecimal getBalance();//取款void withdraw(BigDecimal amount);/*** 方法内会启动1000个线程,每个线程做-10元操作* 如果初始余额为10000那么正确结果应该是0*/static void demo(DecimalAccount account){List<Thread> list = new ArrayList<>();for (int i = 0; i < 1000; i++) {list.add(new Thread(()->{account.withdraw(BigDecimal.TEN);}));}list.forEach(t->t.start());list.forEach(t->{try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(account.getBalance());}
}
ABA问题
@Slf4j(topic = "c.ABADemo")
public class ABADemo {static AtomicReference<String> ref = new AtomicReference<>("A");public static void main(String[] args) throws InterruptedException {log.debug("main start...");//获取值A//这个共享变量被其它线程修改?String pre = ref.get();other();Thread.sleep(1000);log.debug("A->C {}",ref.compareAndSet(pre,"C"));}private static void other() throws InterruptedException {new Thread(()->{log.debug("A->B {}",ref.compareAndSet(ref.get(),"B"));},"t1").start();Thread.sleep(500);new Thread(()->{log.debug("B->A {}",ref.compareAndSet(ref.get(), "A"));},"t2").start();}
}
主线程仅能判断出共享变量的值与最初值A是否相同,不能感知到这种从A改为B又改为A的情况,如果主线程希望:
只要有其它线程动过了共享变量,那么自己的cas就算失败,这时仅比较值是不够的,需要再加一个版本号
AtomicStampedReference
@Slf4j(topic = "c.ABADemo")
public class ABADemo {static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);public static void main(String[] args) throws InterruptedException {log.debug("main start...");//获取值A//这个共享变量被其它线程修改?String pre = ref.getReference();//获取版本号int stamp = ref.getStamp();log.debug("版本号 {}",stamp);other();Thread.sleep(1000);log.debug("版本号 {}",stamp);log.debug("A->C {}",ref.compareAndSet(pre,"C",stamp,stamp+1));}private static void other() throws InterruptedException {new Thread(()->{int stamp = ref.getStamp();log.debug("版本号 {}",stamp);log.debug("A->B {}",ref.compareAndSet(ref.getReference(),"B", stamp, stamp+1));},"t1").start();Thread.sleep(500);new Thread(()->{int stamp = ref.getStamp();log.debug("版本号 {}",stamp);log.debug("B->A {}",ref.compareAndSet(ref.getReference(), "A", stamp, stamp+1));},"t2").start();}
}
AtomicStampedReference可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A -> C,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
但有时候,并不关心引用变量更改了几次,只是单纯关心是否被更改过,所以有了AtomicMarkableReference
AtomicMarkableReference
@Slf4j(topic = "c.AtomicMarkableReferenceDemo")
public class AtomicMarkableReferenceDemo {public static void main(String[] args) throws InterruptedException {GarbageBag bag = new GarbageBag("装满了垃圾");//参数2 mark 可以看作一个标记,表示垃圾袋满了AtomicMarkableReference<GarbageBag> ref =new AtomicMarkableReference<>(bag,true);log.debug("start...");GarbageBag pre = ref.getReference();log.debug(pre.toString());new Thread(()->{log.debug("start...");bag.setDesc("空垃圾袋");ref.compareAndSet(bag,bag,true,false);log.debug(bag.toString());},"保洁阿姨").start();Thread.sleep(1000);log.debug("想换一只新塑料袋");boolean success = ref.compareAndSet(pre, new GarbageBag("空垃圾袋"), true, false);log.debug("换了么?{}",success);log.debug(ref.getReference().toString());}
}class GarbageBag{private String desc;public GarbageBag(String desc) {this.desc = desc;}@Overridepublic String toString() {return "GarbageBag{" +"desc='" + desc + '\'' +'}';}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}
}
原子数组
· AtomicIntegerArray
· AtomicLongArray
· AtomicReferenceArray
通过一段测试代码来检测普通数组与原子数组
public class ThreadTestDemo {public static void main(String[] args) {long l = System.nanoTime();demo(()->new int[10],(array)-> array.length,(array,index)-> array[index]++,(array)-> System.out.println(Arrays.toString(array)));long l1 = System.nanoTime();demo(()->new AtomicIntegerArray(10),(array)-> array.length(),(array,index)-> array.getAndIncrement(index),(array)-> System.out.println(array));long l2 = System.nanoTime();System.out.println(l1-l);System.out.println(l2-l1);}/*** 参数1, 提供数组、可以是线程不安全数组或线程安全数组* 参数2, 获取数组长度的方法* 参数3, 自增方法,回传array,index* 参数4, 打印数组的方法*/private static <T> void demo(Supplier<T> arraySupplier,Function<T,Integer> lengthFun,BiConsumer<T,Integer> putConsumer,Consumer<T> printConsumer){List<Thread> ts = new ArrayList<>();T array = arraySupplier.get();int length = lengthFun.apply(array);for (int i = 0; i < length; i++) {//每个线程对数组作10000次操作ts.add(new Thread(()->{for (int j = 0; j < 10000; j++) {putConsumer.accept(array,j%length);}}));}ts.forEach(i->i.start());ts.forEach(i-> {try {i.join();} catch (InterruptedException e) {e.printStackTrace();}});printConsumer.accept(array);}
}
字段更新器
· AtomicReferenceFieldUpdater
· AtomicIntegerFieldUpdater
· AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合volatile修饰的字段使用,否则会出现异常
Exceptin in thread "main" java.lang.IllegalArgumentException: Must be volatile type
示例演示
@Slf4j(topic = "c.AtomicFieldUpdaterDemo")
public class AtomicFieldUpdaterDemo {public static void main(String[] args) {Student student = new Student();AtomicReferenceFieldUpdater updater = //类名 属性类型 属性名AtomicReferenceFieldUpdater.newUpdater(Student.class,String.class,"name");System.out.println(updater.compareAndSet(student, null, "张三"));System.out.println(student);}
}class Student{volatile String name;@Overridepublic String toString() {return "Student{" +"name='" + name + '\'' +'}';}
}
原子累加器(LongAdder)
累加器性能比较
每个累加器需要执行的代码
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action){T adder = adderSupplier.get();List<Thread> ts = new ArrayList<>();//4个线程,每人累加50万for (int i = 0; i < 4; i++) {ts.add(new Thread(()->{for (int j = 0; j < 500000; j++) {action.accept(adder);}}));}long st = System.currentTimeMillis();ts.forEach(i->i.start());ts.forEach(i-> {try {i.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.currentTimeMillis();System.out.println(adder + "cost:" + (end-st));
}
让每个累加器预热
public static void main(String[] args) {for (int i = 0; i < 5; i++) {demo(() -> new AtomicLong(0),(adder) -> adder.getAndIncrement());}for (int i = 0; i < 5; i++) {demo(() -> new LongAdder(),(adder) -> adder.increment());}
}
得出结果
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Thread-0累加Cell[0],而Thread-1累加Cell[1]…最后将结果汇总。这样它们在累加时操作的不同的Cell变量,因此减少了CAS重试失败,从而提高性能
LongAdder源码
LongAdder是并发大师@author Doug Lea(大哥李)的作品,设计的非常精巧
LongAdder类有几个关键域
//累加单元数组,懒惰初始化
transient volatile Cell[] cells;
//基础值,如果没有竞争,则用cas累加这个域
transient volatile long base;
//在cells创建或扩容时,置为1,表示加锁
transient volatile int cellsBusy;
cas锁(用java给的就行了,别自己去手搓了)
@Slf4j(topic = "c.CASLockDemo")
public class CASLockDemo {//0 没加锁 1 加锁了private AtomicInteger state = new AtomicInteger(0);public void lock(){while (true){if(state.compareAndSet(0,1)){break;}}}public void unlock(){log.debug("unlock...");state.set(0);}public static void main(String[] args) {CASLockDemo lock = new CASLockDemo();new Thread(()->{log.debug("bagin...");lock.lock();try {log.debug("lock...");Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {lock.unlock();}}).start();new Thread(()->{log.debug("bagin...");lock.lock();try {log.debug("lock...");}finally {lock.unlock();}}).start();}
}
Cell累加单元
//防止缓存行伪共享
@sun.misc.Contended
static final class Cell{volatile long value;Cell(long x){value = x;}//最重要的方法,用来cas方式进行累加,prev表示旧值,next表示新值final boolean cas(long prev,long next){return UNSAFE.compareAndSwapLong(this,valueOffset,prev,next);}//省略不重要代码
}
得从缓存说起
缓存与内存的速度比较
因为CPU与内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是64byte(8个long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU要保证数据的一致性,如果某个CPU核心更改了数据,其它CPU核心对应的整个缓存行必须失效
因为Cell是数组形式,在内存中是连续存储的,一个Cell为24字节(16字节的对象头和8字节的value),因此缓存行可以存下来2个的Cell对象。这样问题来了:
· Core-0 要修改 Cell[0]
· Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方Core的缓存行失效,比如Core-0中Cell[0]=6000,Cell[1]=8000要累加Cell[0]=6001,Cell[1]=8000,这时会让Core-1的缓存行失效
@sun.misc.Contended用来解决这个问题,它的原理是在使用此注释的对象或字段的前后各增加128字节大小的padding,从而让CPU将对象预读至缓存时占用不同的缓存行,这样不会造成对方缓存行的失效
LongAdder.add()
longAccumulate()
当cells为空时
当cells不为空时
当cells存在&cell已创建
获取最终结果通过sum方法
public long sum(){Cell[] as = cell;Cell a;long sum = base;if(as != null){for(int i = 0;i < as.length; ++i){if((a = as[i]) != null){sum += a.value;}}}return sum;
}
Unsafe
概述
Unsafe对象提供了非常底层的,操作内存、线程的方法,Unsafe对象不能直接调用,只能通过反射获得
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true);//设置是否可以访问私有成员变量Unsafe unsafe = (Unsafe) theUnsafe.get(null);System.out.println(unsafe);
}
Unsafe CAS操作
public class UnsafeDemo {public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true);//设置是否可以访问私有成员变量Unsafe unsafe = (Unsafe) theUnsafe.get(null);System.out.println(unsafe);// 1.获取域的偏移地址long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));Teacher teacher = new Teacher();// 2.执行CAS操作unsafe.compareAndSwapInt(teacher,idOffset,0,1);unsafe.compareAndSwapObject(teacher,nameOffset,null,"张三");// 3.验证System.out.println(teacher);}
}
基于Unsafe封装MyAtomicInteger
//测试类
public class MyAtomicIntegerDemo {public static void main(String[] args) {Account account = new MyAtomicInteger(100);List<Thread> ts = new ArrayList<>();for (int i = 0; i < 10; i++) {ts.add(new Thread(()->{for (int j = 0; j < 50; j++) {account.withdraw(1);}}));}ts.forEach(Thread::run);ts.forEach(thread -> {try {thread.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(account.getBalance());}
}//封装类
class MyAtomicInteger implements Account{private volatile int value;private static final long valueOffset;private static final Unsafe UNSAFE;static {UNSAFE = UnsafeAccessor.getUnsafe();try {valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));} catch (NoSuchFieldException e) {throw new RuntimeException(e);}}private int getValue(){return value;}private void decrement(int amount){while (true){if (UNSAFE.compareAndSwapInt(this,valueOffset,value,value-amount)){break;}}}public MyAtomicInteger(int value) {this.value = value;}@Overridepublic Integer getBalance() {return value;}@Overridepublic void withdraw(Integer amount) {decrement(amount);}
}