跟上一篇文章比较,这次改进了之前的代码,使用了Lock Condition 和并发集合.代码量减了一些,并且更加容易读了.
这篇代码是上一篇的改进版,逻辑在前篇有说明,以防大家看不到,我再重现贴一遍.后续会使用高阶的线程工具再次改进,以求代码更简单.代码的逻辑:
1)SProducer不停的产生number到queue中.
2)3个carrier不停的取出queue中的number.
3)如果queue中存在10个剩余number时,SProducer会停下来等Carrier消费一些后再生产.
4)如果Carrier发现queue中没有number时,会等待.
5)如果Carrier取出的数字末尾为4, 则会挑起罢工事件.
6)Carrier罢工会引发一个Negotiation线程进行谈判.
7)罢工阶段SProducer和所有Carrier均停工.
8)Negotiation如果发现取出的number首位为3或者7,将引发谈判失败.
9)如果谈判成功,则恢复工作,如果谈判失败,则破产,所有线程均退出.
注意:使用lock的时候一定要注意, lock()和unlock()方法一定要成对出现. 最好放到try{}finally{}中,这样,unlock()方法必会调到.倘若没有使用unlock()就return了,会导致线程死锁.
package concurrency;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Producer extends Thread {
private final static ArrayBlockingQueue numbers = new ArrayBlockingQueue(10);
private final static ArrayList threads = new ArrayList();
private volatile boolean negotiating = false;
private ReentrantLock negotiationLock = new ReentrantLock();
private Condition negotiationCondition = negotiationLock.newCondition();
private class Negotiation implements Runnable {
private String number;
private Negotiation(String number) {
this.number = number;
}
public void run() {
try {
System.out.println("Start negotiation...");
sleep(5000);
if (number.startsWith("7") || number.startsWith("3")) {
System.out.println("Negotiation broken.");
for (Thread t : threads) {
t.interrupt();
}
System.out.println("Negotiation broken post handle.");
return;
}
System.out.println("Negotiation succeeds.");
} catch (InterruptedException e) {
System.out.println("Middle man is killed.");
}
negotiationLock.lock();
negotiating = false;
negotiationCondition.signalAll();
negotiationLock.unlock();
}
}
private class Carrier implements Runnable {
private String name;
private Carrier(String name) {
this.name = name;
}
public void run() {
while(true) {
try{
negotiationLock.lock();
while(negotiating) {
try {
System.out.println("Carrier ["+name+"] join stricks.");
negotiationCondition.await();
} catch (InterruptedException e) {
System.out.println("Negotiation fails. Carrier [" + name + "] retires.");
return;
}
}
} finally {
negotiationLock.unlock();
}
String number;
try {
number = numbers.take();
System.out.println("Carrier [" + name + "] carries "+ number +" out of List;");
} catch (InterruptedException e1) {
System.out.println("Negotiation fails. Carrier [" + name + "] retires.");
return;
}
if (number.endsWith("4")) {
try {
negotiationLock.lock();
while (negotiating) {
try {
negotiationCondition.await();
} catch (InterruptedException e) {
System.out.println("Negotiation fails. Carrier [" + name + "] retires.");
return;
}
}
negotiating = true;
System.out.println("Stricks happen on number:"+number);
new Thread(new Negotiation(number)).start();
} finally {
negotiationLock.unlock();
}
}
}
}
}
public void run() {
Thread a = new Thread(new Carrier("a"));
Thread b = new Thread(new Carrier("b"));
Thread c = new Thread(new Carrier("c"));
threads.add(this);
threads.add(a);
threads.add(b);
threads.add(c);
a.start();
b.start();
c.start();
this.produceNumbers();
}
private void produceNumbers() {
while (true) {
while(negotiating) {
negotiationLock.lock();
try {
System.out.println("Stricking... Producer waiting for negotiation result.");
negotiationCondition.await();
System.out.println("Negotiation succeeds. Producer happy.");
} catch (InterruptedException e) {
System.out.println("Negotiation fails. Producer breaks up.");
return;
} finally {
negotiationLock.unlock();
}
}
String number = ""+new java.util.Random().nextInt(47);
try {
numbers.put(number);
System.out.println("Produce number " + number + " into List;");
} catch (InterruptedException e) {
System.out.println("Negotiation fails. Producer breaks up.");
return;
}
}
}
public static void main(String[] args) {
new Producer().start();
}
}