几天前,在我们的常规代码审查中,我的一位同事提出了一个问题,即如果可能,一次同时调用CDI观察者(这样的方法带有参数@Observes
的方法)会发生什么?用于不同的事件实例。 换句话说,在产生少量事件之后,以下方法是否可能同时被多个线程处理:
public void observe(@Observes MyEvent myEvent) { ... }
考虑一下之后,我决定运行一些测试并在本文中描述结果。
最初的结果:发生了CDI事件以同步模式触发,这让我有些惊讶。 为什么?
到目前为止,我是这样看的:CDI观察者允许我非常干净地将事件生产者与事件消费者分开,因此我没有任何硬编码的侦听器注册,维护侦听器列表并手动通知它们。 CDI容器为我做一切。
因此,如果我们将生产者与消费者完全分开,我认为存在某种事件总线运行在专门的线程执行程序池中,该池负责注册事件与调用的观察者方法之间的中介。 我想我是基于其他事件/侦听器解决方案(例如Google Guava EventBus)的这一假设。 它们使您有机会定义是否要使用同步(默认, EventBus )或异步事件分派器( AsyncEventBus) 。
而且,如果EJB既是生产者又是消费者,那么我认为它具有与异步EJB调用相同的功能。 异步事件观察器唯一可能的JTA事务属性是: REQUIRED
, REQUIRES_NEW
或NOT_SUPPORTED
。
现在,这就是我期望的所有工作方式,这似乎与当前状态大不相同 。 现实生活表明CDI事件是同步的。
使异步事件在CDI 1.1中可用存在一个问题,但是我不确定此功能的当前状态如何,并且在CDI 1.1(Java EE 7的一部分)中没有找到有关此功能的信息。
让我们看看如何独自处理它。
目录
- 默认同步事件
- 解决方案1 – CDI生产者和Singleton EJB作为接收者
- 解决方案2 –使用Singleton EJB作为具有读取锁定的接收器
- 解决方案3 – EJB生产者和CDI使用者
- 解决方案4 – EJB生产者和EJB使用者
- 解决方案4与解决方案2
- 解决方案5 – EJB生产者和CDI使用者II
- 解决方案6 –使用JMS进行CDI
- 结论
默认同步事件
让我们从显示问题的基本示例开始。 看一下代码–首先,CDI Bean生产者:
@Path("/produce")
public class EventGenerator {@Injectprivate Logger logger;@Injectprivate Event<MyEvent> events;@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) {for (int i = 0; i < numberOfEventsToGenerate; i++) {MyEvent event = new MyEvent(i);logger.info("Generating Event: " + event);events.fire(event);}return "Finished. Generated " + numberOfEventsToGenerate + " events.";}
}
MyEvent
只是一些事件对象,在这里并不是很重要。 它存储我们在实例化时传递的事件序列号。
消费者是一个非常简单的CDI Bean:
public class EventConsumer {@Injectprivate Logger logger;public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException {logger.info("Receiving event: " + myEvent);TimeUnit.MILLISECONDS.sleep(500);}
}
请注意,我已经插入了一个线程睡眠来模拟一些长时间运行的事件接收器进程。
现在,让我们通过调用EventProducer
公开的REST命令来运行此示例。 结果(运行JBoss EAP 6.1 Alpha )将类似于以下内容:
14:15:59,196 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 0 ] 14:15:59,197 [com.piotrnowicki.EventConsumer](http- / 127.0 .0.1:8080-1)接收事件:MyEvent [ seqNo = 0 ] 14:15:59,697 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 1 ] 14 :15:59,698 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)接收事件:MyEvent [ seqNo = 1 ] 14:16:00,199 [com.piotrnowicki.EventGenerator](http- / 127.0。 0.1:8080-1)生成事件:MyEvent [ seqNo = 2 ] 14:16:00,200 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)接收事件:MyEvent [ seqNo = 2 ]
它显示了CDI事件的同步性质–事件的产生和使用发生在同一线程中,一个接一个地发生。
那么,如何使用CDI实现异步事件?
解决方案1 – CDI生产者和Singleton EJB作为接收者
生产者坚持使用–纯CDI bean:
@Path("/produce") public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}
现在,如果您将接收器变成@Singleton EJB,并将observes方法标记为@Asynchronous,如下所示:
@Singleton
public class EventConsumer {@Asynchronouspublic void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... }
}
您将得到以下结果:
14:21:19,341 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:21:19,343 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:21:19,343 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :21: 19,347 [com.piotrnowicki.EventConsumer](EJB默认– 2)接收事件:MyEvent [seqNo = 1] 14:21: 19,848 [com.piotrnowicki.EventConsumer](EJB默认– 1)接收事件:MyEvent [seqNo = 0] 14:21: 20,350 [com.piotrnowicki.EventConsumer](EJB默认值– 3)接收事件:MyEvent [seqNo = 2]
事件是一个接一个地产生的,并且是在单独的线程中产生的。SingletonEJB一次又一次地为它们提供服务(请查看事件处理的时间。)这是因为Singleton EJB的每个业务方法都具有隐式写锁定。 因此,这是:
异步: 是
线程安全的观察者方法: 是
解决方案2 –使用Singleton EJB作为具有读取锁定的接收器
这种方法与解决方案1非常相似,但是,由于所有事件处理都是并行进行的,因此它为您提供了更高的吞吐量。
我们的生产者保持不变–它是一个CDI bean:
@Path("/produce")
public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}
我们的使用者将@Lock(READ)
添加到其@Lock(READ)
方法中; 这使得能够同时处理多个事件的魔力:
@Singleton
public class EventConsumer {@Asynchronous@Lock(LockType.READ)public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... }
}
结果就是这样:
14:24:44,202 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:24:44,204 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:24:44,205 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :24: 44,207 [com.piotrnowicki.EventConsumer](EJB默认– 4)接收事件:MyEvent [seqNo = 0] 14:24: 44,207 [com.piotrnowicki.EventConsumer](EJB默认– 6)接收事件:MyEvent [seqNo = 2] 14:24: 44,207 [com.piotrnowicki.EventConsumer](EJB默认值– 5)接收事件:MyEvent [seqNo = 1]
同时服务事件的不同线程为您提供更大的吞吐量。 因此,这是:
异步: 是
线程安全的观察者方法: 否
解决方案3 – EJB生产者和CDI使用者
CDI允许您观察特定交易阶段的事件。 您可以使用@Observes(during=TransactionPhase...)
指定它。 在我们的情况下,我们希望CDI堆叠所有这些事件并仅在事务结束后才调用观察者。 为此,我们只需将以上属性添加到我们的CDI Bean观察器中:
public class EventConsumer { public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) { ... }
}
现在,我们只需要确保EventGenerator
方法中有正在运行的事务EventGenerator
。 我们可以通过将CDI Bean转换为@Stateless
EJB并使用其隐式REQUIRED
TransactionAttribute来快速完成此操作:
@Stateless
@Path("/produce")
public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}
这是我们可能最终得到的结果:
14:39:06,776 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:39:06,776 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:39:06,776 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :39: 06,778 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)接收事件:MyEvent [seqNo = 2] 14:39: 07,279 [com.piotrnowicki.EventConsumer](http- / 127.0。 0.1:8080-1)接收事件:MyEvent [seqNo = 0] 14:39: 07,780 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)
EJB EventGenerator
启动事务,并且只有在事务完成之后,才会以序列化的方式调用CDI bean观察器。
异步: 是
线程安全的观察者方法: 是
解决方案4 – EJB生产者和EJB使用者
这与解决方案3非常相似。我们的生成器保持不变(无状态EJB):
@Stateless
@Path("/produce")
public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... }
}
现在对EventConsumer
进行了更改:
@Singleton
public class EventConsumer {@Asynchronous@Lock(LockType.READ)public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) throws InterruptedException { ... }
}
结果可能如下:
14:44:09,363 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:44:09,464 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:44:09,564 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :44: 09,670 [com.piotrnowicki.EventConsumer](EJB默认– 8)接收事件:MyEvent [seqNo = 2] 14: 44 : 09,670 [com.piotrnowicki.EventConsumer](EJB默认– 2)接收事件:MyEvent [seqNo = 1] 14:44: 09,670 [com.piotrnowicki.EventConsumer](EJB默认– 1)接收事件:MyEvent [seqNo = 0]
我们在这里使用了两个功能–一个是事件使用者方法是异步的,第二个是在生产者事务完成之前不会通知使用者。 这给我们:
异步: 是
线程安全的观察者方法: 否
解决方案4与解决方案2
这两个解决方案似乎是相同的。 它们仅与消费者的注释不同: @Observes
与@Observes(during = TransactionPhase.AFTER_COMPLETION)
。 此外,对于我们的测试用例,它们的行为相同: 它们是异步的,并且多个线程可以同时处理事件接收器 。 但是,它们之间有一个很大的区别。
在我们的测试案例中,我们一个接一个地触发事件。 想象一下,事件触发之间还有其他操作。 在这种情况下:
- 解决方案2(
@Observes
)将在第一个事件触发后立即开始处理事件, - 解决方案4(
@Observes(during = TransactionPhase.AFTER_COMPLETION)
)将在事务完成后立即开始处理,因此将触发所有事件。
这显示了这种情况的可能结果:
解决方案2( @Observes
)
15:01:34,318 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 0 ] 15:01:34,320 [com.piotrnowicki.EventConsumer](EJB默认– 3 )接收事件:MyEvent [ seqNo = 0 ] 15:01:34,419 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 1 ] 15:01:34,420 [com .piotrnowicki.EventConsumer](EJB默认– 6)接收事件:MyEvent [ seqNo = 1 ] 15:01:34,520 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 2 ] 15:01:34,521 [com.piotrnowicki.EventConsumer](EJB默认值– 9)接收事件:MyEvent [ seqNo = 2 ]
解决方案4( @Observes(during = TransactionPhase.AFTER_COMPLETION)
)
15:00:41,126 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 0 ] 15:00:41,226 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [ seqNo = 1 ] 15:00:41,326 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 2 ] 15 :00:41,432 [com.piotrnowicki.EventConsumer](EJB默认– 10)接收事件:MyEvent [ seqNo = 2 ] 15:00:41,432 [com.piotrnowicki.EventConsumer](EJB默认– 4)接收事件:MyEvent [ seqNo = 1 ] 15:00:41,432 [com.piotrnowicki.EventConsumer](EJB默认值– 5)接收事件:MyEvent [ seqNo = 0 ]
解决方案5 – EJB生产者和CDI使用者II
到目前为止,我们已经尝试使接收器异步。 也有相反的方法–我们可以使事件生成器异步 。 我们可以通过将生产者标记为@Stateless
并调用自己的异步方法来触发事件来实现:
@Stateless
@Path("/produce")
public class EventGenerator {// ...@Resourceprivate SessionContext sctx;@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) {for (int i = 0; i < numberOfEventsToGenerate; i++) {sctx.getBusinessObject(EventGenerator.class).fireEvent(new MyEvent(i));}return "Finished. Generated " + numberOfEventsToGenerate + " events.";}@Asynchronouspublic void fireEvent(final MyEvent event) {events.fire(event);}
}
使用SessionContext
仔细研究EJB自动引用。 在这种情况下,这是必需的,因为我们希望容器分派我们的方法调用并添加它的异步性质。 我们不希望使之成为本地呼叫,所以我们拒绝使用隐含的this
对象。
另一方面,事件使用者是纯CDI bean:
public class EventConsumer {public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... }
}
结果可能如下:
00:40:32,820 [com.piotrnowicki.EventGenerator](EJB默认– 2)正在生成事件:MyEvent [seqNo = 1] 00:40:32,820 [com.piotrnowicki.EventGenerator](EJB默认– 3)正在生成事件:MyEvent [ SEQNO = 2] 00:40:32820 [com.piotrnowicki.EventGenerator](EJB默认- 1)产生事件:MyEvent [SEQNO = 0] 00:40:32821 [com.piotrnowicki.EventConsumer](EJB默认- 1)接收事件:MyEvent [seqNo = 0] 00 : 40 : 32,821 [com.piotrnowicki.EventConsumer](EJB默认– 2)接收事件:MyEvent [seqNo = 1] 00 : 40 : 32,821 [com.piotrnowicki.EventConsumer](EJB默认– 3)接收事件:MyEvent [seqNo = 2]
异步: 是
线程安全的观察者方法: 否
解决方案6 –使用JMS进行CDI
这是Juliano Viana在他的博客上提出的解决方案。 它使用JMS作为事件总线。 生成CDI事件,然后由负责将该事件放入JMS主题/队列的某个类获取。 从主题/队列中获取消息的MDB正在生成一个调用实际接收者的事件。 这不仅为您提供了事件的异步传递,而且还为其添加了事务性质。 例如,如果事件接收者无法处理该消息–它可以回滚该事务,并且队列将确保该消息将被重新发送(也许您的事件处理器下次将能够处理此事件?)
结论
CDI 1.0不支持异步事件生成。 CDI 1.1似乎也没有这种支持。
但是,这并不意味着您无法实现异步处理。 已经存在基于EJB 3.1或现有CDI观察器属性的现有解决方案。 您还应该能够编写可移植的CDI扩展 ,以将此功能添加到代码中。
翻译自: https://www.javacodegeeks.com/2013/05/asynchronous-cdi-events.html