Guarded Suspension 模式
比如,项目组团建要外出聚餐,我们提前预订了一个包间,然后兴冲冲地奔过去,到那儿后大堂经理看了一眼包间,发现服务员正在收拾,就会告诉我们:“您预订的包间服务员正在收拾,请您稍等片刻。”过了一会,大堂经理发现包间已经收拾完了,于是马上带我们去包间就餐。
那我们来看看现实世界里是如何解决这类问题的呢?现实世界里大堂经理这个角色很重要,我们是否等待,完全是由他来协调的。通过类比,相信你也一定有思路了:我们的程序里,也需要这样一个大堂经理。的确是这样,那程序世界里的大堂经理该如何设计呢?其实设计方案前人早就搞定了,而且还将其总结成了一个设计模式:Guarded Suspension。所谓Guarded Suspension,直译过来就是“保护性地暂停”。
class GuardedObject<T>{// 受保护的对象private T obj;final Lock lock = new ReentrantLock();final Condition done = lock.newCondition();final int timeout=1;// 获取受保护对象 public T get(Predicate<T> p) {lock.lock();try {//MESA 管程推荐写法while(!p.test(obj)){done.await(timeout, TimeUnit.SECONDS);}}catch(InterruptedException e){throw new RuntimeException(e);}finally{lock.unlock();}// 返回非空的受保护对象return obj;}// 事件通知方法public void onChanged(T obj) {lock.lock();try {this.obj = obj;done.signalAll();} finally {lock.unlock();}}
}
扩展 Guarded Suspension 模式
每个发送到 MQ 的消息,都有一个唯一性的属性 id,所以我们可以维护一个 MQ 消息 id 和GuardedObject 对象实例的关系,这个关系可以类比大堂经理大脑里维护的包间和就餐人的关系。
class GuardedObject<T>{// 受保护的对象T obj;final Lock lock = new ReentrantLock();final Condition done = lock.newCondition();final int timeout=2;// 保存所有 GuardedObjectfinal static Map<Object, GuardedObject> gos = new ConcurrentHashMap<>();// 静态方法创建 GuardedObjectpublic static <K> GuardedObject create(K key){GuardedObject go=new GuardedObject();gos.put(key, go);return go;}public static <K, T> void fireEvent(K key, T obj){GuardedObject go=gos.remove(key);if (go != null){go.onChanged(obj);}}// 获取受保护对象 public T get(Predicate<T> p) {lock.lock();try {//MESA 管程推荐写法while(!p.test(obj)){done.await(timeout, TimeUnit.SECONDS);}}catch(InterruptedException e){throw new RuntimeException(e);}finally{lock.unlock();}// 返回非空的受保护对象return obj;}// 事件通知方法void onChanged(T obj) {lock.lock();try {this.obj = obj;done.signalAll();} finally {lock.unlock();}}
}
利用扩展后的 GuardedObject 来解决消息反馈后处理很简单了
// 处理浏览器发来的请求
Respond handleWebReq(){int id= 序号生成器.get();// 创建一消息Message msg1 = new Message(id,"{...}");// 创建 GuardedObject 实例GuardedObject<Message> go= GuardedObject.create(id); // 发送消息send(msg1);// 等待 MQ 消息Message r = go.get(t->t != null);
}void onMessage(Message msg){// 唤醒等待的线程GuardedObject.fireEvent(msg.id, msg);
}