实现观察者模式以提供Java事件通知似乎是一件容易的事。 但是,容易陷入一些陷阱。 这是我在各种场合不慎造成的常见错误的解释……
Java事件通知
让我们从一个简单的bean StateHolder
开始,它封装了带有适当访问器的私有int
字段state
:
public class StateHolder {private int state;public int getState() {return state;}public void setState( int state ) {this.state = state;}
}
考虑到我们已经决定我们的bean应该向注册的观察者广播state changes
的消息。 没问题! 方便的事件和侦听器定义很容易创建...
// change event to broadcast
public class StateEvent {public final int oldState;public final int newState;StateEvent( int oldState, int newState ) {this.oldState = oldState;this.newState = newState;}
}// observer interface
public interface StateListener {void stateChanged( StateEvent event );
}
…接下来我们需要能够在StateHolder
实例上注册StatListeners
…
public class StateHolder {private final Set<StateListener> listeners = new HashSet<>();[...]public void addStateListener( StateListener listener ) {listeners.add( listener );}public void removeStateListener( StateListener listener ) {listeners.remove( listener );}
}
…最后但并非最不重要的StateHolder#setState
必须进行调整,以触发有关状态更改的实际通知:
public void setState( int state ) {int oldState = this.state;this.state = state;if( oldState != state ) {broadcast( new StateEvent( oldState, state ) );}
}private void broadcast( StateEvent stateEvent ) {for( StateListener listener : listeners ) {listener.stateChanged( stateEvent );}
}
答对了! 这就是全部。 作为专业人士,我们甚至可能已经实施了此测试驱动程序,并且对我们全面的代码覆盖范围和绿色标杆感到满意。 无论如何,这不是我们从网络教程中学到的吗?
坏消息来了:解决方案有缺陷……
并发修改
给定上述StateHolder
,即使仅在单线程限制内使用,也可以很容易地遇到ConcurrentModificationException
。 但是是谁引起的,为什么会发生呢?
java.util.ConcurrentModificationExceptionat java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)at java.util.HashMap$KeyIterator.next(HashMap.java:1453)at com.codeaffine.events.StateProvider.broadcast(StateProvider.java:60)at com.codeaffine.events.StateProvider.setState(StateProvider.java:55)at com.codeaffine.events.StateProvider.main(StateProvider.java:122)
查看stacktrace会发现该异常是由我们使用的HashMap
的Iterator
引发的。 只是我们在代码中没有使用任何迭代器,还是我们? 好吧,我们做到了。 broadcast
for each
构造的for each
基于Iterable
,因此在编译时转换为迭代器循环。
因此,侦听器在事件通知期间将自己从StateHolder
实例中删除可能会导致ConcurrentModificationException
。 因此,代替研究原始数据结构,一种解决方案是遍历侦听器的快照 。
这样,侦听器的删除将不再干扰广播机制(但请注意,通知语义也将稍有更改,因为在broadcast
执行时快照不会反映这种删除):
private void broadcast( StateEvent stateEvent ) {Set<StateListener> snapshot = new HashSet<>( listeners );for( StateListener listener : snapshot ) {listener.stateChanged( stateEvent );}
}
但是,如果要在多线程上下文中使用StateHolder
怎么办?
同步化
为了能够在多线程环境中使用StateHolder
,它必须是线程安全的。 这可以很容易地实现。 向我们类的每个方法添加同步应该可以解决问题,对吗?
public class StateHolder {public synchronized void addStateListener( StateListener listener ) { [...]public synchronized void removeStateListener( StateListener listener ) { [...]public synchronized int getState() { [...]public synchronized void setState( int state ) { [...]
现在,通过其内部锁来保护对StateHolder
实例的读/写访问。 这使公共方法具有原子性,并确保了不同线程的正确状态可见性。 任务完成!
不完全是……尽管该实现是线程安全的,但它冒着使用它死锁应用程序的风险。
考虑以下情况: Thread
A更改StateHolder
S的状态。在通知S的侦听器期间, Thread
B尝试访问S并被阻塞。 如果B对即将由S的侦听器之一通知的对象持有同步锁,则我们将陷入死锁。
这就是为什么我们需要缩小同步范围以声明状态并在受保护的段落之外广播事件:
public class StateHolder {private final Set<StateListener> listeners = new HashSet<>();private int state;public void addStateListener( StateListener listener ) {synchronized( listeners ) {listeners.add( listener );}}public void removeStateListener( StateListener listener ) {synchronized( listeners ) {listeners.remove( listener );}}public int getState() {synchronized( listeners ) {return state;}}public void setState( int state ) {int oldState = this.state;synchronized( listeners ) {this.state = state;}if( oldState != state ) {broadcast( new StateEvent( oldState, state ) );}}private void broadcast( StateEvent stateEvent ) {Set<StateListener> snapshot;synchronized( listeners ) {snapshot = new HashSet<>( listeners );}for( StateListener listener : snapshot ) {listener.stateChanged( stateEvent );}}
}
清单显示了从以前的片段演变而来的实现,该实现使用Set
实例作为内部锁提供了适当的(但有些过时的)同步。 侦听器通知发生在受保护的块之外,因此避免了循环等待 。
注意:由于系统具有并发性,因此该解决方案不能保证更改通知按其发生的顺序到达侦听器。 如果需要有关观察者端的实际状态值的更多准确性,请考虑提供
StateHolder
作为事件对象的源。如果事件顺序是至关重要的一个会想到一个线程安全的FIFO结构来缓冲在的守卫块根据听众快照一起事件
setState
。 只要FIFO结构不为空( Producer-Consumer-Pattern ),一个单独的线程就可以从不受保护的块中触发实际的事件通知。 这应该确保按时间顺序排列,而不会冒死机的危险。 我说应该,因为我从来没有尝试过这个解决方案。
鉴于先前实现的语义,使用诸如CopyOnWriteArraySet
和AtomicInteger
类的线程安全类来构成我们的类,会使解决方案的详细程度降低:
public class StateHolder {private final Set<StateListener> listeners = new CopyOnWriteArraySet<>();private final AtomicInteger state = new AtomicInteger();public void addStateListener( StateListener listener ) {listeners.add( listener );}public void removeStateListener( StateListener listener ) {listeners.remove( listener );}public int getState() {return state.get();}public void setState( int state ) {int oldState = this.state.getAndSet( state );if( oldState != state ) {broadcast( new StateEvent( oldState, state ) );}}private void broadcast( StateEvent stateEvent ) {for( StateListener listener : listeners ) {listener.stateChanged( stateEvent );}}
}
由于CopyOnWriteArraySet
和AtomicInteger
是线程安全的,因此我们不再需要受保护的块。 但请稍等! 我们不是只是学习使用快照进行广播,而不是遍历原始集的隐藏迭代器吗?
可能有点令人困惑,但是CopyOnWriteArraySet
提供的Iterator
已经是快照。 CopyOnWriteXXX
集合是专为此类用例而发明的-如果大小较小则非常有效,针对内容很少变化的频繁迭代进行了优化。 这意味着我们的代码是安全的。
在Java 8中,使用Iterable#forEach
结合lambda可以进一步简化broadcast
方法。 该代码当然是安全的,因为还在快照上执行了迭代:
private void broadcast( StateEvent stateEvent ) {listeners.forEach( listener -> listener.stateChanged( stateEvent ) );
}
异常处理
这篇文章的最后一部分讨论了如何处理抛出意外RuntimeException
的破碎侦听器。 尽管我通常严格选择快速失败的方法,但是在这种情况下,让此类异常不予处理可能是不合适的。 特别考虑到该实现可能在多线程环境中使用。
中断的侦听器以两种方式损害系统。 首先,它可以防止我们的柏忌后通知那些观察者。 其次,它可能损害可能没有准备好处理该问题的调用线程。 概括而言,它可能导致多种潜行故障,而最初的原因可能很难追查。
因此,将每个通知屏蔽在try-catch块中可能会很有用:
private void broadcast( StateEvent stateEvent ) {listeners.forEach( listener -> notifySafely( stateEvent, listener ) );
}private void notifySafely( StateEvent stateEvent, StateListener listener ) {try {listener.stateChanged( stateEvent );} catch( RuntimeException unexpected ) {// appropriate exception handling goes here...}
}
结论
如以上各节所示,Java事件通知有几点需要牢记。 确保在事件通知期间遍历侦听器集合的快照,将事件通知置于同步块之外,并在适当的情况下安全地通知侦听器。
希望我能够以一种易于理解的方式解决这些细微问题,并且不会特别弄乱并发部分。 如果您发现一些错误或需要分享其他智慧,请随时使用下面的评论部分。
翻译自: https://www.javacodegeeks.com/2015/03/getting-java-event-notification-right.html