线程通信概念:
线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体,线程之间的通信就成为整体的必用方式之一。当线程存在通信指挥,系统间的交互性会更强大,在提高CPU利用率的同时还会对线程任务在处理过程中进行有效的把控与监督。
为了支持多线程之间的协作,JDK提供了两个非常重要的接口线程等待wait()方法和通知notify()方法。这两个方法并不是在Thread类中的,而是输出Object类。这也意味着任何对象都可以调用这2个方法。
我们先看一个简单的例子:
1 public class ListAdd1 { 2 private volatile static List list = new ArrayList(); 3 public void add(){ 4 list.add("jianzh5"); 5 } 6 public int size(){ 7 return list.size(); 8 } 9 10 public static void main(String[] args) { 11 final ListAdd1 list1 = new ListAdd1(); 12 Thread t1 = new Thread(new Runnable() { 13 @Override 14 public void run() { 15 try { 16 for(int i = 0; i <10; i++){ 17 list1.add(); 18 System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素.."); 19 Thread.sleep(500); 20 } 21 } catch (InterruptedException e) { 22 e.printStackTrace(); 23 } 24 } 25 }, "t1"); 26 27 Thread t2 = new Thread(new Runnable() { 28 @Override 29 public void run() { 30 while(true){ 31 if(list1.size() == 5){ 32 System.out.println("当前线程收到通知:" + Thread.currentThread().getName() + " list size = 5 线程停止.."); 33 throw new RuntimeException(); 34 } 35 } 36 } 37 }, "t2"); 38 t1.start(); 39 t2.start(); 40 } 41 }
代码很简单,这是在没使用JDK线程协作时的做法。线程t2一直在死循环,当list的size等于5时退出t2,t1则继续运行。
这样其实也可以是说线程之间的协作,但是问题就是t2会一直循环运行,浪费了CPU资源(PS:list必须使用关键字volatile修饰)。
我们再看使用wait和notify时的代码:
1 public class ListAdd2 { 2 private volatile static List list = new ArrayList(); 3 4 public void add(){ 5 list.add("jianzh5"); 6 } 7 public int size(){ 8 return list.size(); 9 } 10 11 public static void main(String[] args) { 12 13 final ListAdd2 list2 = new ListAdd2(); 14 final byte[] lock = new byte[0]; 15 Thread t1 = new Thread(new Runnable() { 16 @Override 17 public void run() { 18 try { 19 synchronized (lock) { 20 System.out.println("t1启动.."); 21 for(int i = 0; i <10; i++){ 22 list2.add(); 23 System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素.."); 24 Thread.sleep(500); 25 if(list2.size() == 5){ 26 System.out.println("已经发出通知.."); 27 lock.notify(); 28 } 29 } 30 } 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 } 34 35 } 36 }, "t1"); 37 38 Thread t2 = new Thread(new Runnable() { 39 @Override 40 public void run() { 41 synchronized (lock) { 42 System.out.println("t2启动.."); 43 if(list2.size() != 5){ 44 try { 45 lock.wait(); 46 } catch (InterruptedException e) { 47 e.printStackTrace(); 48 } 49 } 50 System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止.."); 51 throw new RuntimeException(); 52 } 53 } 54 }, "t2"); 55 t2.start(); 56 t1.start(); 57 } 58 }
这里首先创建了一个的byte[]对象lock,然后线程t1,t2使用synchronzied关键字同步lock对象。线程t1一直往list添加元素,当元素大小等于5的时候调用lock.notify()方法通知lock对象。线程t2在size不等于5的时候一直处于等待状态。
这里使用byte[0]数组是因为JVM创建byte[0]所占用的空间比普通的object对象小,而花费的代价也最小。
运行结果如下:
看到这里可能会有疑问,为什么t1通知了t2线程运行而结果却是t1先运行完后t2再运行。
说明如下:
1、wait() 和 notify()必须配合synchrozied关键字使用,无论是wait()还是notify()都需要首先获取目标对象的一个监听器。
2、wait()释放锁,而notify()不释放锁。
线程t2一开始处于wait状态,这时候释放了锁所以t1可以一直执行,而t1在notify的时候并不会释放锁,所以t1还会继续运行。
知识拓展
现在我们来探讨一下有界阻塞队列的实现原理并模拟一下它的实现 :
1、有界队列顾名思义是有容器大小限制的
2、当调用put()方法时,如果此时容器的长度等于限定的最大长度,那么该方法需要阻塞直到队列可以有空间容纳下添加的元素
3、当调用take()方法时,如果此时容器的长度等于最小长度0,那么该方法需要阻塞直到队列中有了元素能够取出
4、put() 和 take()方法是需要协作的,能够及时通知状态进行插入和移除操作
根据以上阻塞队列的几个属性,我们可以使用wait 和notify实现以下它的实现原理:
/*** 自定义大小的阻塞容器*/ public class MyQueue {//1、初始化容器private final LinkedList<Object> list = new LinkedList<>();//2、定义计数器private AtomicInteger count = new AtomicInteger(0);//3、设定容器的上限和下限private final int minSize = 0;private final int maxSize;//4、构造器public MyQueue(int size) {this.maxSize = size;}//5、定义锁对象private final Object lock = new Object();//6、阻塞增加方法public void put(Object obj) {synchronized (lock) {while (count.get() == this.maxSize) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}//加入元素 计数器累加 唤醒取数线程可以取数 list.add(obj);count.incrementAndGet();lock.notify();System.out.println("新增的元素:" + obj);}}public Object take() {Object result = null;synchronized (lock) {while (count.get() == this.minSize) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}//移除元素 计数器递减 唤醒添加的线程可以添加元素result = list.removeFirst();count.decrementAndGet();lock.notify();}return result;}public int getSize() {return this.count.get();}public static void main(String[] args) {final MyQueue myQueue = new MyQueue(5);myQueue.put("a");myQueue.put("b");myQueue.put("c");myQueue.put("d");myQueue.put("e");System.out.println("当前队列长度:" + myQueue.getSize());Thread t1 = new Thread(new Runnable() {@Override public void run() {myQueue.put("f");myQueue.put("g");}}, "t1");t1.start();Thread t2 = new Thread(new Runnable() {@Override public void run() {Object obj = myQueue.take();System.out.println("移除的元素为:"+obj);Object obj2 = myQueue.take();System.out.println("移除的元素为:"+obj2);}},"t2");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}t2.start();} }
实现过程如下:
1、通过构造器初始化指定容器的大小
2、程序内部有一个AtomicInteger的计数器,当调用put()操作时此计数器加1;当调用take()方法时此计数器减1
3、在进行相应的take()和put()方法时会使用while判断进行阻塞,会一直处于wait状态,并在可以进行操作的时候唤醒另外一个线程可以进行相应的操作。
4、将此代码运行可以看到相应的效果。