协作基础(wait/notify)
Java的根父类是Object,Java在Object类而非Thread类中,定义了一些线程协作的基本方法,使得每个对象都可以调用这些方法,这些方法有两类,一类是wait,另一类是notify。
wait方法主要有两个:
public final void wait() throws InterruptedException public final native void wait(long timeout) throws InterruptedException;
一个带时间参数,单位是毫秒,表示最多等待这么长时间,参数为0表示无限期等待。一个不带时间参数,表示无限期等待,实际就是调用wait(0)。在等待期间都可以被中断,如果被中断,会抛出InterruptedException。
wait实际上做了什么呢?每个对象都有一把锁和一个锁等待队列,一个线程在进入synchronized代码块时,会尝试获取锁,获取不到的话会把当前线程加入等待队列中。其实,除了用于锁的等待队列,每个对象还有另一个等待队列,表示条件队列,该队列用于线程间的协作。调用wait就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。当其他线程改变了条件后,应该调用Object的notify方法:
public final native void notify(); public final native void notifyAll();
notify做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,notifyAll和notify的区别是,它会移除条件队列中所有的线程并全部唤醒。
wait/notify方法只能在synchronized代码块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常java.lang.IllegalMonitorStateException。
wait的具体过程是:
- 把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING
- 等待时间到或被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁
-
- 如果能够获得锁,线程状态变为RUNNABLE,并从wait调用中返回
- 否则,该线程加入对象锁等待队列,线程状态变为BLOCKED,只有在获得锁后才会从wait调用中返回
线程从wait调用中返回后,不代表其等待的条件就一定成立了,它需要重新检查其等待的条件,一般的调用模式是:
synchronized (obj) {while (条件不成立)obj.wait();... // 条件满足后的操作 }
生产者/消费者模式
下面来看一个生产者和消费者的例子:
/*** @author 沉默哥* */ public class MyProducerConsumerDemo {static class GoodsQueue {private int size;private Queue<String> que = new ArrayDeque<String>();public GoodsQueue(int size) {// 维护一个有界队列,传入队列的最大容量super();this.size = size;}public synchronized void put(String e) throws InterruptedException {while (que.size() == size) {System.out.println("队列已满,生产者等待");wait();}que.add(e);System.out.println("生产者生产:" + e);notify();}public synchronized String take() throws InterruptedException {while (que.size() == 0) {System.out.println("队列为空,消费者等待");wait();}String e = que.poll();System.out.println("消费者消费" + e);notify();return e;}}static class Producer extends Thread {GoodsQueue que;Random rad = new Random();public Producer(GoodsQueue que) {super();this.que = que;}@Overridepublic void run() {int i = 0;try {while (true) {String e = String.valueOf(i);que.put(e);i++;Thread.sleep(rad.nextInt(1000));// 生产者休息准备下一次生产 }} catch (InterruptedException e1) {}}}static class Consumer extends Thread {GoodsQueue que;Random rad = new Random();public Consumer(GoodsQueue que) {super();this.que = que;}@Overridepublic void run() {try {while (true) {que.take();Thread.sleep(rad.nextInt(1000));// 消费者休息准备下一次消费 }} catch (InterruptedException e) {}}}public static void main(String[] args) throws InterruptedException {GoodsQueue que = new GoodsQueue(1);Producer pro = new Producer(que);Consumer con = new Consumer(que);con.start();Thread.sleep(500);pro.start();} }