一、同步模式之保护性暂停
即 Guarded Suspension ,用在一个线程等待另一个线程的执行结果
产生结果的线程和使用结果的线程是一一对应的,有多少个生产结果的线程就有多少个使用结果的线程。
要点
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
(一)、带延时的写法
不用将控锁的那个对象设置成全局的
package com.itcast.test;import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.List;@Slf4j(topic = "c.Test6")
public class Test6 {public static void main(String[] args){GuardedObjectV2 v2 = new GuardedObjectV2();new Thread(() -> {Thread.sleep(1);v2.complete(null);Thread.sleep(1);v2.complete(Arrays.asList("a", "b", "c"));}).start();Object response = v2.get(2500);if (response != null) {log.debug("get response: [{}] lines", ((List<String>) response).size());} else {log.debug("can't get response");}}
}@Slf4j(topic = "c.GuardedObjectV2")
class GuardedObjectV2 {private Object response;private final Object lock = new Object();public Object get(long millis) {synchronized (lock) {// 1) 记录最初时间long begin = System.currentTimeMillis();// 2) 已经经历的时间long timePassed = 0;while (response == null) {// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等long waitTime = millis - timePassed;log.debug("waitTime: {}", waitTime);if (waitTime <= 0) {log.debug("break...");break;}try {//这里不是millis,因为出现虚假唤醒的情况下只用再等 millis - timePassedlock.wait(waitTime);} catch (InterruptedException e) {e.printStackTrace();}// 3) 如果提前被唤醒,这时已经经历的时间假设为 400timePassed = System.currentTimeMillis() - begin;log.debug("timePassed: {}, object is null {}",timePassed, response == null);}return response;}}public void complete(Object response) {synchronized (lock) {// 条件满足,通知等待线程this.response = response;log.debug("notify...");lock.notifyAll();}}
}
(二)、多任务版
理解为顾客点外卖、外卖员送外卖到外卖柜、顾客收外卖更合适。
package com.itcast.test;import lombok.extern.slf4j.Slf4j;import java.util.Hashtable;
import java.util.Map;
import java.util.Set;@Slf4j(topic = "c.Test6")
public class Test6 {public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 3; i++) {new People().start();}Thread.sleep(1000);for (Integer id : Mailboxes.getIds()) {new Postman(id, "内容" + id).start();}}
}//中间解耦类,与业务不相关,这里为了方便取名为Mailboxes
class Mailboxes {private static Map<Integer, GuardedObject> boxes = new Hashtable<>();private static int id = 1;// 产生唯一 idprivate static synchronized int generateId() {return id++;}public static GuardedObject getGuardedObject(int id) {return boxes.remove(id);}public static GuardedObject createGuardedObject() {GuardedObject go = new GuardedObject(generateId());boxes.put(go.getId(), go);return go;}public static Set<Integer> getIds() {return boxes.keySet();}
}//业务相关类 People Postman
@Slf4j(topic = "c.People")
class People extends Thread{@Overridepublic void run() {// 收信GuardedObject guardedObject = Mailboxes.createGuardedObject();log.debug("开始收信 id:{}", guardedObject.getId());Object mail = guardedObject.get(5000);log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);}
}@Slf4j(topic = "c.Postman")
class Postman extends Thread {private int id;private String mail;public Postman(int id, String mail) {this.id = id;this.mail = mail;}@Overridepublic void run() {GuardedObject guardedObject = Mailboxes.getGuardedObject(id);log.debug("送信 id:{}, 内容:{}", id, mail);guardedObject.complete(mail);}
}class GuardedObject{private int id;public GuardedObject(int id) {this.id = id;}public int getId() {return id;}private Object response;public synchronized Object get(long timeout){long begin = System.currentTimeMillis();long passTime = 0;while (response == null){if (passTime > timeout ){break;}try {this.wait(passTime);} catch (InterruptedException e) {e.printStackTrace();}passTime = System.currentTimeMillis() - begin;}return response;}public synchronized void complete(Object response){this.response = response;this.notifyAll();}
}
二、异步模式之生产者/消费者
package com.itcast.test;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;@Slf4j(topic = "c.Test7")
public class Test7 {public static void main(String[] args) {MessageQueue messageQueue = new MessageQueue(2);
// 4 个生产者线程, 下载任务for (int i = 0; i < 4; i++) {int id = i;new Thread(() -> {try {log.debug("download...");List<String> response = Downloader.download();log.debug("try put message({})", id);messageQueue.put(new Message(id, response));} catch (IOException e) {e.printStackTrace();}}, "生产者" + i).start();}
// 1 个消费者线程, 处理结果new Thread(() -> {while (true) {Message message = messageQueue.take();List<String> response = (List<String>) message.getMessage();log.debug("take message({}): [{}] lines", message.getId(), response.size());}}, "消费者").start();}
}class Message {private int id;private Object message;public Message(int id, Object message) {this.id = id;this.message = message;}public int getId() {return id;}public Object getMessage() {return message;}
}@Slf4j(topic = "c.MessageQueue")
class MessageQueue {private LinkedList<Message> queue;private int capacity;public MessageQueue(int capacity) {this.capacity = capacity;queue = new LinkedList<>();}public Message take() {synchronized (queue) {while (queue.isEmpty()) {log.debug("没货了, wait");try {queue.wait();} catch (InterruptedException e) {e.printStackTrace();}}Message message = queue.removeFirst();queue.notifyAll();return message;}}public void put(Message message) {synchronized (queue) {while (queue.size() == capacity) {log.debug("库存已达上限, wait");try {queue.wait();} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(message);queue.notifyAll();}}
}
三、同步模式之顺序控制
(一)、固定运行顺序
先打印2在打印1
1、wait notify 版
package com.itcast.test;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.Test7")
public class Test7 {// 用来同步的对象static Object obj = new Object();// t2 运行标记, 代表 t2 是否执行过static boolean t2runed = false;public static void main(String[] args) {Thread t1 = new Thread(() -> {synchronized (obj) {// 如果 t2 没有执行过while (!t2runed) {try {// t1 先等一会obj.wait();} catch (InterruptedException e) {e.printStackTrace();}}}System.out.println(1);});Thread t2 = new Thread(() -> {System.out.println(2);synchronized (obj) {// 修改运行标记t2runed = true;// 通知 obj 上等待的线程(可能有多个,因此需要用 notifyAll)obj.notifyAll();}});t1.start();t2.start();}
}
2、Park Unpark 版
package com.itcast.test;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.LockSupport;@Slf4j(topic = "c.Test7")
public class Test7 {public static void main(String[] args) {Thread t1 = new Thread(() -> {try { Thread.sleep(1000); } catch (InterruptedException e) { }// 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行LockSupport.park();System.out.println("1");});Thread t2 = new Thread(() -> {System.out.println("2");// 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)LockSupport.unpark(t1);});t1.start();t2.start();}
}
3、可以使用await/signal
(二)、交替输出
线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现
1、wait notify 版
package com.itcast.test;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.Test7")
public class Test7 {public static void main(String[] args) {SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);new Thread(() -> {syncWaitNotify.print(1, 2, "a");}).start();new Thread(() -> {syncWaitNotify.print(2, 3, "b");}).start();new Thread(() -> {syncWaitNotify.print(3, 1, "c");}).start();}
}class SyncWaitNotify {private int flag;private int loopNumber;public SyncWaitNotify(int flag, int loopNumber) {this.flag = flag;this.loopNumber = loopNumber;}public void print(int waitFlag, int nextFlag, String str) {for (int i = 0; i < loopNumber; i++) {synchronized (this) {while (this.flag != waitFlag) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.print(str);flag = nextFlag;this.notifyAll();}}}
}
2、Lock 条件变量版
package com.itcast.test;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.Test7")
public class Test7 {public static void main(String[] args) {AwaitSignal as = new AwaitSignal(5);Condition aWaitSet = as.newCondition();Condition bWaitSet = as.newCondition();Condition cWaitSet = as.newCondition();new Thread(() -> {as.print("a", aWaitSet, bWaitSet);}).start();new Thread(() -> {as.print("b", bWaitSet, cWaitSet);}).start();new Thread(() -> {as.print("c", cWaitSet, aWaitSet);}).start();as.start(aWaitSet);}
}@Slf4j(topic = "c.AwaitSignal")
class AwaitSignal extends ReentrantLock {public void start(Condition first) {this.lock();try {log.debug("start");first.signal();} finally {this.unlock();}}public void print(String str, Condition current, Condition next) {for (int i = 0; i < loopNumber; i++) {this.lock();try {current.await();log.debug(str);next.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {this.unlock();}}}// 循环次数private int loopNumber;public AwaitSignal(int loopNumber) {this.loopNumber = loopNumber;}
}
3、park/unpark
package com.itcast.test;import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.LockSupport;@Slf4j(topic = "c.Test7")
public class Test7 {public static void main(String[] args) {SyncPark syncPark = new SyncPark(5);Thread t1 = new Thread(() -> {syncPark.print("a");});Thread t2 = new Thread(() -> {syncPark.print("b");});Thread t3 = new Thread(() -> {syncPark.print("c\n");});syncPark.setThreads(t1, t2, t3);syncPark.start();}
}@Slf4j(topic = "c.SyncPark")
class SyncPark {private int loopNumber;private Thread[] threads;public SyncPark(int loopNumber) {this.loopNumber = loopNumber;}public void setThreads(Thread... threads) {this.threads = threads;}public void print(String str) {for (int i = 0; i < loopNumber; i++) {LockSupport.park();System.out.print(str);LockSupport.unpark(nextThread());}}private Thread nextThread() {Thread current = Thread.currentThread();int index = 0;for (int i = 0; i < threads.length; i++) {if(threads[i] == current) {index = i;break;}}if(index < threads.length - 1) {return threads[index+1];} else {return threads[0];}}public void start() {for (Thread thread : threads) {thread.start();}LockSupport.unpark(threads[0]);}
}
四、终止模式之两阶段终止模式
(一)、利用 isInterrupted
package com.itcast.test;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.Test5")
public class Test5 {public static void main(String[] args) throws InterruptedException {TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();twoPhaseTermination.start();Thread.sleep(3500);twoPhaseTermination.stop();}
}@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination{private Thread monitor;public void start(){monitor = new Thread(new Runnable() {@Overridepublic void run() {while (true){Thread current = Thread.currentThread();if (current.isInterrupted()){log.debug("进程结束之前进行的工作(料理后事)");break;}try {Thread.sleep(1000); //睡眠过程中被打断,抛出异常,isInterrupted重置为falselog.debug("执行业务的相关功能"); //此时被打断,直接在下次循环中执行打断程序} catch (InterruptedException e){log.debug("在睡眠时被打断");current.interrupt(); //将打断标记重新置为true,下次循环时执行结束程序}}}});monitor.start();}public void stop(){monitor.interrupt();}
}
(二)、利用停止标记
package com.itcast.test;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.Test5")
public class Test5 {public static void main(String[] args) throws InterruptedException {TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();twoPhaseTermination.start();Thread.sleep(3500);twoPhaseTermination.stop();}
}@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination{private Thread monitor;private volatile boolean stop = false;public void start(){monitor = new Thread(new Runnable() {@Overridepublic void run() {while (true){Thread current = Thread.currentThread();if (stop){log.debug("进程结束之前进行的工作(料理后事)");break;}try {Thread.sleep(1000); //睡眠过程中被打断,抛出异常,isInterrupted重置为falselog.debug("执行业务的相关功能"); //此时被打断,直接在下次循环中执行打断程序} catch (InterruptedException e){}}}});monitor.start();}public void stop(){stop = true;monitor.interrupt();}
}
五、同步模式之 Balking
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
比如监控电脑状态的线程只需要开启一个时:
public class MonitorService {// 用来表示是否已经有线程已经在执行启动了private volatile boolean starting;public void start() {log.info("尝试启动监控线程...");synchronized (this) {if (starting) {return;}starting = true;}// 真正启动监控线程...开启监控线程}
}
六、享元模式
(一)、体现
1、包装类
在 JDK 中 Boolean , Byte , Short , Integer , Long , Character 等包装类提供了 valueOf 方法,例如 Long 的valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象:
public static Long valueOf(long l) {final int offset = 128;if (l >= -128 && l <= 127) { // will cachereturn LongCache.cache[(int)l + offset];}return new Long(l);
}
注意:
- Byte, Short, Long 缓存的范围都是 -128~127
- Character 缓存的范围是 0~127
- Integer的默认范围是 -128~127
- 最小值不能变,但最大值可以通过调整虚拟机参数 ` -Djava.lang.Integer.IntegerCache.high` 来改变
- Boolean 缓存了 TRUE 和 FALSE
2、String 串池
3、BigDecimal BigInteger
是不可变的
(二)、实现一个简单的连接池
class Pool {// 1. 连接池大小private final int poolSize;// 2. 连接对象数组private Connection[] connections;// 3. 连接状态数组 0 表示空闲, 1 表示繁忙private AtomicIntegerArray states;// 4. 构造方法初始化public Pool(int poolSize) {this.poolSize = poolSize;this.connections = new Connection[poolSize];this.states = new AtomicIntegerArray(new int[poolSize]);for (int i = 0; i < poolSize; i++) {connections[i] = new MockConnection("连接" + (i + 1));}}// 5. 借连接public Connection borrow() {while (true) {for (int i = 0; i < poolSize; i++) {// 获取空闲连接if (states.get(i) == 0) {if (states.compareAndSet(i, 0, 1)) {log.debug("borrow {}", connections[i]);return connections[i];}}}// 如果没有空闲连接,当前线程进入等待synchronized (this) {try {log.debug("wait...");this.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 6. 归还连接public void free(Connection conn) {for (int i = 0; i < poolSize; i++) {if (connections[i] == conn) {states.set(i, 0);synchronized (this) {log.debug("free {}", conn);this.notifyAll();}break;}}}
}class MockConnection implements Connection {// 实现略
}
使用连接池
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {new Thread(() -> {Connection conn = pool.borrow();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}pool.free(conn);}).start();
}
七、异步模式之工作线程
让有限的工作线程( Worker Thread )来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。
例如,如果一个餐馆的工人既要招呼客人(任务类型 A ),又要到后厨做菜(任务类型 B )显然效率不咋地,分成 服务员(线程池A )与厨师(线程池 B )更为合理,当然你能想到更细致的分工
(一)、饥饿
public class TestDeadLock {static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");static Random RANDOM = new Random();static String cooking() {return MENU.get(RANDOM.nextInt(MENU.size()));}public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.execute(() -> {log.debug("处理点餐...");Future<String> f = executorService.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});/*executorService.execute(() -> {log.debug("处理点餐...");Future<String> f = executorService.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) { e.printStackTrace();}});*/}
}