为什么80%的码农都做不了架构师?>>>
Lock(锁)&Condition(条件)实现线程同步通信
接下来介绍,java5线程并发库里面的锁。跟锁有关的类和接口主要是位于java.util.concurrent.locks包。
Lock(锁)接口实现线程间互斥
下面我们先来了解Lock(锁)这个接口,这个锁的作用呢就非常类实于我们传统模型线程中的synchronized关键字,它的作用主要用于线程互斥。就好比有一个人进了一个厕所,把那个厕所里面的门给锁上,那别人就进不去了,他出来的时候把那把锁拿掉,别人就可以进去了。每个人进去的时候都可以上锁,上锁了,别人就进不去了。它相比这个synchronized来说呢,它更加面向对象。锁本身就是一个对象,锁上进不去,打开,别人可以进去。这样就更面向对象。下面我们通过代码来跟synchronized关键字对比一下看这个锁怎么用。
ReentrantLock类实现可重入的互斥锁
Lock是一个接口,当我们想要获得一个它的对象的时候就要new一个它的实现了类,这里我们new ReentrantLock();这是一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。这样我们就制造了一把锁,当A线程来调用的时候,我们就在要线程互斥的代码的前面调用lock.lock();上一把锁。就是A线程一进去以后,马上就把锁锁上。当B线程再来的时候呢,能不能进呢,那就看B线程是不是也想要这把锁,也必须用这把锁,如果B线程用的是另外一把锁,B线程是可以进的,只有B线程也要抢这一把锁的时候,它抢不到了。当A线程执行完了要线程互斥的这部分代码以后呢,我们调用lock.unlock();方法,把锁交回去。这样两个线程就不会打架了。但是这个时候还有一个问题,如果在被锁上的这部分代码内部抛了异常,那么整个方法就跳出了,代码不会向下执行,这个时候这个锁还没有释放。就好比一个人进去了,好家伙,他晕倒在里面了,这时候谁也进不去了,所以,在这种情况下,我们最好是把这部分被锁包裹的代码try起来,在finally代码块里面释放锁。就是以防万一,就是说无论如何,不管是正常还是非正常的退出,我们都应该把锁给释放。这样就保证别人也可以进去,即使之前哪个线程在拿到锁之后死掉了,别人还是可以进去。否则就麻烦了。
public class LockTest {
/** * @param args */ public static void main(String[] args) { new LockTest().init(); }
private void init() { final Outputer outputer = new Outputer(); new Thread(new Runnable() { @Override public void run() { while (true) { try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } outputer.output("zhangxiaoxiang"); }
} }).start();
new Thread(new Runnable() { @Override public void run() { while (true) { try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } outputer.output("lihuoming"); }
} }).start();
}
static class Outputer { Lock lock = new ReentrantLock();
public void output(String name) { int len = name.length(); lock.lock(); try { for (int i = 0; i < len; i++) { System.out.print(name.charAt(i)); } System.out.println(); } finally { lock.unlock(); } } } } |
static class Outputer {
public void output(String name) { int len = name.length(); synchronized (Outputer.class) { for (int i = 0; i < len; i++) { System.out.print(name.charAt(i)); } System.out.println(); } }
|
读写锁ReadWriteLock接口和实现类ReentrantReadWriteLock可重入读写锁
接下来我们来看,java5中的锁比传统线程模型中的synchronized关键字更强。它还可以实现读写锁ReadWriteLock接口(用的是锁分离的优化思想:只要两个线程的操作互不影响,锁就可以分离)。什么叫读写锁呢?就是多个读锁不互斥,而读锁与写锁互斥,写锁与写锁互斥。这在我们实际项目当中,我们是经常需要这个功能的。我们为什么要上锁呀,就是为了保证数据的完整性。如果才读了一半,你就跑进去写,那数据改了,我都会来的就乱了。如果大家都在读,如果想在这个数据是3,现在100个人在读,请问有没有问题?肯定没有问题,怎么读都是3。就是怕这个读的过程中,别人跑去改了,如果全是读,是没有问题的。如果大家全是读的时候,大家还上锁,好不好?不好,性能低,没有并发了。我们希望可以提高性能,大家可以同时读,但是又不打架。什么时候会大家呢,就在于写数据的时候容易打架,在修改数据的时候才打架。在读的时候,又跑进去写,容易出问题。在写的时候还没写完,又跑进去写,容易出问题。下面我们就写一个读写锁的应用。我们产生3个线程负责读数据,还产生3个线程负责写数据,如果我们不上读写锁,我们会看到读和写交替的运行。就读中有写,写中有读。如果我们上了读写锁,我们将看到什么效果呢?读中有读,但是读中没有写,写中没有读,写中也没有写。读写锁非常重要,能够提高性能,又能够互斥。这个在java5以前干不了这个事,现在java5帮我们提供了这种功能。非常好。
public class ReadWriteLockTest { public static void main(String[] args) { final Queue3 q3 = new Queue3(); for (int i = 0; i < 3; i++) { new Thread() { public void run() { while (true) { q3.get(); } }
}.start();
new Thread() { public void run() { while (true) { q3.put(new Random().nextInt(10000)); } }
}.start(); }
} }
class Queue3 { private Object data = null;// 共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
ReadWriteLock rwl = new ReentrantReadWriteLock();
public void get() { rwl.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + " be ready to read data!"); Thread.sleep((long) (Math.random() * 1000)); System.out.println(Thread.currentThread().getName() + "have read data :" + data); } catch (InterruptedException e) { e.printStackTrace(); } finally { rwl.readLock().unlock(); } }
public void put(Object data) {
rwl.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + " be ready to write data!"); Thread.sleep((long) (Math.random() * 1000)); this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); } catch (InterruptedException e) { e.printStackTrace(); } finally { rwl.writeLock().unlock(); }
} } |
下面我们来看一个读写锁的比较好的例子。
先简单说一下Hibernate中session的load和get方法的区别:
get方法是直接去数据库里面查,如果有就返回,没有就返回null.
load方法得到的是一个代理对象,这个代理对象里面封装了我们想要的真实的对象的引用和它的id作为成员字段。他不会先去查数据库,首先会把方法参数中传入的id赋值给自己的id字段。然后返回这个代理对象。当我们要调用方法获取对象中id之外的字段的时候,它会先判断成员字段,就是那个真实对象的引用是否为空,如果为空的话,他就调用get方法去查询数据库,如果查到就赋值给那个真实对象的引用变量,并返回,没查到就抛出异常。
好了,明白上面的思想,我们再来看下面的例子。
class CachedData { Object data;
volatile boolean cacheValid;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {//处理缓存数据 rwl.readLock().lock();[z1] if (!cacheValid) {//检查有没有缓存的数据 // 在获得写锁之前必须释放读锁 rwl.readLock().unlock();[z2] rwl.writeLock().lock();[z3] // 重新检查状态,因为另一个线程可能已经获得写入锁定并在我们完成之前改变了状态。 if (!cacheValid) {//如果没有缓存数据 data = ...//去获取数据 cacheValid = true;//获取数据之后把缓存检查值设为true } // 通过在释放写锁之前获取读锁来降级 rwl.readLock().lock();[z4] rwl.writeLock().unlock(); // 解锁写入,仍然保持读取 }
use(data);//使用数据 rwl.readLock().unlock(); } } |
以上代码,其实就是,大家都去读数据,然后又怕某一个线程在写数据,所以大家一开始都挂了读锁,但是发现没有数据可读呢,第一个家伙发现没有数据可读呢,他马上挂一把写锁,就让别人读的暂时先不要进来了,等着,因为现在没有数据,我把数据写完了,我再把写锁解放,我也挂读锁,恢复成为读锁,大家就又都可以读了。这个例子就是一个缓存,一个代理的缓存,一个实体的缓存,不是缓存系统。
下面我们再做一个思考题,请你写出一个缓存系统的伪代码。这道题目跟我们上面讲的缓存代理对象不一样,缓存系统就是说可以装很多个对象,我们上面的缓存代理对象只能够装一个对象。什么是缓存呢?就是人家要取数据,别人调用我,调用我的一个方法叫getData(),我就应该检查,我内部是否有这个数据,如果有这个数据,我就直接给人家,如果没有这个数据,我就去数据库里面查,我查到以后,我就把查到的数据存到我的内存里面来,下次再有别人再找我要这个数据,就直接返回给人家,就不需要再查数据库了。这就是缓存系统的概念。你要拿对象不要直接找数据库,找我。如果我内部没有,我去查数据库,给你。和你直接查数据库是一样的。好处就在于,你下次再来找我的时候,我就不再查数据库了,我直接给你。这就是Cache.
因为我们是定义自己的Cache,我要存一堆对象,那如果人家来找我取某个对象的时候,怎么来取呢?我们定义一个key(key存的就是对象的名字),让要取对象的人把key给我,我就把这个key对应的对象给他。所以在缓存对象的内部,我们要定义一个Map成员,用来有对应规则的存储对象。那么现在人家来找我取数据,我是怎么做呢?我就先到map里面去get(key),来得到一个对象,如果这个对象为null,我就去查数据库,查到以后我再返回这个对象,在返回之间,我现在自己的map里面存一份。这就是一个简单的缓存系统。
public class CacheDemo {
private Map<String, Object> cache = new HashMap<String, Object>();
public static void main(String[] args) { // TODO Auto-generated method stub
}
private ReadWriteLock rwl = new ReentrantReadWriteLock();
public Object getData(String key) { rwl.readLock().lock(); Object value = null; try { value = cache.get(key); if (value == null) { rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (value == null) { value = "aaaa";// 实际是去queryDB(); } } finally { rwl.writeLock().unlock(); } rwl.readLock().lock(); } } finally { rwl.readLock().unlock(); } return value; } } |
Condition实现互斥线程间的通信
接下来我们来介绍java5并发库这个java.util.concurrent.locks子包里面的另一个东西,叫Condition,叫条件。这个Condition到底是什么呢?是执行条件,就类似与我们传统线程同步模型中的Object的wait()方法和notify()方法。我们说,锁,只能实现互斥,就是说,我进去了,你不能进来。但是无法实现通信。什么叫通信呢?说,哥们,你可以执行了。这就是通信。Condition就是用来解决这个问题的。就是说CPU即使分给了我,我进来了,但是,我可以让出CPU,说,兄弟,你干吧。我暂停,兄弟你干完了,再通知我,说,大哥,谢谢你,你再干吧。这就是通信。下面我们来看一个Condition实现线程通信的代码例子,对比Object的wait()和notify();注意:Condition是基于锁Lock之上的。Condition一定是在某个lock对象基础上,也就是说,是基于某个Lock对象以后,在lock对象下面new出一个Condition(调用lock.newCondition();方法)。
public class ConditionCommunication {
/** * @param args */ public static void main(String[] args) {
final Business business = new Business(); new Thread(new Runnable() {
@Override public void run() {
for (int i = 1; i <= 50; i++) { business.sub(i); }
} }).start();
for (int i = 1; i <= 50; i++) { business.main(i); }
}
static class Business { Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
private boolean bShouldSub = true;
public void sub(int i) { lock.lock(); try { while (!bShouldSub) { try { condition.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } for (int j = 1; j <= 10; j++) { System.out.println("sub thread sequence of " + j + ",loop of " + i); } bShouldSub = false; condition.signal(); } finally { lock.unlock(); } }
public void main(int i) { lock.lock(); try { while (bShouldSub) { try { condition.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } for (int j = 1; j <= 100; j++) { System.out.println("main thread sequence of " + j + ",loop of " + i); } bShouldSub = true; condition.signal(); } finally { lock.unlock(); } }
} } |
class Business { private boolean bShouldSub = true; public synchronized void sub(int i){ while(!bShouldSub){ try { this.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } for(int j=1;j<=10;j++){ System.out.println("sub thread sequence of " + j + ",loop of " + i); } bShouldSub = false; this.notify(); }
public synchronized void main(int i){ while(bShouldSub){ try { this.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } for(int j=1;j<=100;j++){ System.out.println("main thread sequence of " + j + ",loop of " + i); } bShouldSub = true; this.notify(); } }
|
使用Condition的好处,它可以有多路等待情况,它可以实现我们用Object的wait()和notify()这两个方法实现不了的功能。下面我们看一个Condition实现多路等待的例子:
这是一个队列,可阻塞的一个队列的底层代码实现。
我们先来了解一下什么叫阻塞队列:有一个发射机,对外发射信号。这个发射机发射的型号来自于哪里呢?来自于一个共同机,也就是一个电脑吧。电脑它连着发射机,电脑里面有很多的短信,都是通过发射机来发射出去的。那么,这个电脑又是怎么收到这个短信的呢?通过寻呼台的坐席接听客户的电话,然后讲客户要发送的信息通过网络输入到电脑。有很多个坐席,电脑收到了很多个留言,它就把这些留言交给发射机一个一个去发,这个电脑系统内部的代码怎么写?现在这个发射机对外,1秒钟可以发射一个短信,现在这1秒中正好来了一句,欸,这个发射机就发射出去了。如果这个一秒钟来了两个,这个发射机发射几个?发一个。还有一个就没了,你叫我,我是不是正在处理你呀,另外一个人也扔给我一个包,我没有时间去收它,那怎么办呢?我们准备一个队列,说白了就是一个数组,然后人家给我一个呼,我就往队列里面放,再有一个坐席给我一个呼,我又往队列里面按顺序放。依次这样操作。我有两个线程,一个线程是专门对应这些坐席,把他们的寻呼信息收进队列,我还有一个线程是专门从这个队列里面取,取出来交给发射机。这个队列总共有8个格子0-7,我取的话是从第0个格子开始取,然后依次往上增,这时候如果我已经取到了序号为7的格子,取完了,接着,我又开始从序号为0的格子开始取,这时候就要有一个变量,这个变量记录了正要取得那个格子得序号。接下来我放的时候,刚开始我放的是第0个格子,然后这样依次往上递增,当放到序号为7的格子之后,又从新从0号格子开始放,假设这个发射机还没有把这个第0个格子的信息取走,我怎么办呢?我就阻塞等在那里,等到他一取走,我马上就放进去。这就是阻塞,就是不能继续往里面放了,我没地方放了,我在外面等着。暂停,等待发射机发一个,空一个格子出来,空一个格子出来以后,我就可以放进去了。他一空出来,我马上就放进去,这是系统自动帮我们调度的。我们只要用到相应的技术就行,至于它内部怎么实现,我们并不管。现在,操作系统应该给我们提供解决这种问题的技术。
因为我现在队列已经满了,没有富余的空间放了。取的时候有没有一种可能,假如我已经放了3个,现在我取到3了,我再接着要取,是不是要取第四个?第四个你放数据没有?你要没放,那时候我怎么办?阻塞,在那里等,一旦你放了一个进来,是不是你正好就放在第四个格子了?我就把它取走了。那我现在为什么要用这个队列呢?起到缓冲的效果。不是缓存,是缓冲。缓冲区。我们是平均每一秒中发一个短信,我们也计算过,我们的客户有3000万,我们是平均每一秒钟收一个短信,所以是不是正好,我一个发射机就够呀!平均每一秒钟收一个,那就意味着有时1秒钟收6个,有时候3秒钟内一个都没收到。但是平均下来,1秒钟一个。这时候进入这个缓冲区,你这一秒钟来了3个,没关系,我放在这个队列里面,下一秒钟没有了,那我在缓冲队列里面取,慢慢发。这就是缓冲区的作用。如果说来了1万个,但我现在缓冲区只有8个格子,那就有1万-8的人在那里阻塞。如果出现这种情况,系统会不会崩?就算不崩,你公司也要崩了。这时候我赶快扩容呗。刚开搞更多的发射机呗。这个什么东西都是有个度的,你只是解决少量的这个问题,如果太大量,这个时候要解决硬件问题了,别指望这个软件崩不崩了。
那下面看,人家给我们实现了这么一个可阻塞的队列。队列,先放进来的先取走。
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer { final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];//队列里面1最多装100个对象
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {//往队列里面放数据 lock.lock(); try { while (count == items.length)//假如格子放满了,就等待 notFull.await(); items[putptr] = x;//放到数组里面的哪个索引 if (++putptr == items.length)//这个指针指示数据存放到数组的哪个索引 putptr = 0; ++count;//每放一个就把count给++,count记录实际有效的内容有几个 notEmpty.signal(); } finally { lock.unlock(); } }
public Object take() throws InterruptedException {//从队列里取数据 lock.lock(); try { while (count == 0)//假如队列里面一个数据都没有,就等待 notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count;//每取走一个就把count给-- notFull.signal(); return x; } finally { lock.unlock(); } } } |
用一个Condition也能实现功能,为什么代码里面要用两个Condition?既然它用了两个,这里面一定是有原因的。如果我们只用一个Condition到底会出现什么问题?假如我现在有5个线程同时在往缓冲区里面放数据,发现缓冲区已经满了,这时,这五个线程都在放数据的方法阻塞。结果,这时有一个线程去取数据,取完了数据就通知其它线程中的一个唤醒。这五个等待放数据的线程中有一个被唤醒了,唤醒了它就去放数据,放完数据之后,他又发一个通知去唤醒正在等待的线程中的某个苏醒,那么现在问题来了,他本来是要通知取的线程,说,我已经放完了,你现在可以取了。但是由于放数据和取数据用的是同一个Condition通信,此时等待通知的线程里面除了那个取数据的线程,还有4个放数据的线程,结果它这个唤醒通知没有通知到取数据的那个哥们,而是同时到了放数据的4个里面的一个,其实刚才他放数据的时候就又把缓冲区给放满了,此时这个被唤醒的放数据的线程一看缓冲区是满的,只好又继续阻塞等待。所以,如果有两个Condition的话们就可以把放数据和取数据的线程区分开来,一个只用来唤醒放数据的线程放数据,一个只用来唤醒取数据的线程取数据。通过这样,我们就自圆其说了。
根据我们上面的理解,下面我们来写一个3个Condition通信的代码:
public class ThreeConditionCommunication {
/** * @param args */ public static void main(String[] args) {
final Business business = new Business(); new Thread(new Runnable() {
@Override public void run() {
for (int i = 1; i <= 50; i++) { business.sub2(i); }
} }).start();
new Thread(new Runnable() {
@Override public void run() {
for (int i = 1; i <= 50; i++) { business.sub3(i); }
} }).start();
for (int i = 1; i <= 50; i++) { business.main(i); }
}
static class Business { Lock lock = new ReentrantLock(); //规则,main唤醒sub2,sub2唤醒shub3,sub3唤醒main。所以我们定义3个condition各司其职。 Condition condition1 = lock.newCondition();//用于控制main的condition
Condition condition2 = lock.newCondition();//用于控制sub2的condition
Condition condition3 = lock.newCondition();//用于控制sub3的condition
private int shouldSub = 1;
public void sub2(int i) { lock.lock(); try { while (shouldSub != 2) { try { condition2.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } for (int j = 1; j <= 10; j++) { System.out.println("sub2 thread sequence of " + j + ",loop of " + i); } shouldSub = 3; condition3.signal(); } finally { lock.unlock(); } }
public void sub3(int i) { lock.lock(); try { while (shouldSub != 3) { try { condition3.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } for (int j = 1; j <= 20; j++) { System.out.println("sub3 thread sequence of " + j + ",loop of " + i); } shouldSub = 1; condition1.signal(); } finally { lock.unlock(); } }
public void main(int i) { lock.lock(); try { while (shouldSub != 1) { try { condition1.await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } for (int j = 1; j <= 100; j++) { System.out.println("main thread sequence of " + j + ",loop of " + i); } shouldSub = 2; condition2.signal(); } finally { lock.unlock(); } }
} } |
[z1]线程进来是来读数据的,所以,先拿了一个读锁锁上,这个读锁它并不排斥其它的线程来拿,大家可以同时拿读锁。
[z2]这个时候,上面的if语句检查到并没有数据可读,所以接下来,这个线程准备进行写操作,这时候它释放了之前拿到的读锁,准备去拿写锁。
[z3]线程拿了一把写锁,写锁是互斥的,同一个写锁同一时间只能有一个线程持有。
[z4]注意这段代码:它挂上了一把读锁准备去读数据,在读数据之前,它要先把之前持有的写锁释放掉。我们的疑惑是,写锁里面是不能有读锁的,因为写的时候是不允许读的。解释:我自己挂的写锁,我还是可以挂读锁的,也就说,当一个线程拿到了写锁之后,这个线程还可以继续去拿读锁。锁是用来跟别的线程互斥的,是用来挡别人的,不是用来挡自己的。它其实是这样的,我在释放写锁前去挂读锁,就会把写锁降级,降级成为更新锁。更新锁只有当前线程才可以挂,别人挂不上的。