大多数应用程序至少具有一个批处理任务,在后台执行特定的逻辑。 编写批处理作业并不复杂,但是您需要了解一些基本规则,我将列举一些我发现最重要的规则。
从输入类型的角度来看,处理项目可以通过轮询处理项目存储库来实现,也可以通过将它们通过队列推送到系统中来实现。 下图显示了典型批处理系统的三个主要组件:
- 输入组件(通过轮询或从输入队列加载项目)
- 处理器:主要处理逻辑组件
- 输出组件:输出结果的输出通道或存储位置
1.始终分批轮询
您一次只能检索一批项目。 我最近不得不在尝试检索所有可能的项目进行处理时,诊断由计划作业抛出的OutOfMemoryError。
系统集成测试正使用少量数据,因此通过了,但是由于某些部署问题,当计划的作业脱机两天时,由于没有人消耗它们,因此要处理的项目数已经累积起来。 ,并且当调度程序重新联机时,由于它们不适合调度程序的内存堆,因此无法使用它们。 因此,仅设置高调度频率速率是不够的。
为了避免这种情况,您只需要获取一批物料,将它们消耗掉即可,然后您可以重新运行该过程,直到没有剩余要处理的东西为止。
2.编写线程安全的批处理程序
通常,无论您选择并行运行多少个作业,计划作业都应正确运行。 因此,批处理处理器应该是无状态的,仅使用本地作业执行上下文将状态从一个组件传递到另一个组件。 毕竟,即使是踩踏安全的全局变量也不是那么安全,因为作业的数据可能在并发执行时混杂在一起。
3.节流
使用队列(输入或在批处理程序中)时,您应该始终有一个限制策略。 如果物品的生产率始终高于消耗的物品,那么您将遭受灾难。 如果排队的项目保留在内存中,最终将用完它。 如果项目存储在持久队列中,则会用完空间。 因此,您需要一种平衡生产者和消费者的机制。 只要生产率是有限的,您只是要确保您有合适的消费者数量来平衡生产率。
当队列大小超过给定阈值时,自动扩展消费者就像开始新的消费者一样,是一种合适的自适应策略。 当队列大小低于其他阈值时杀死使用者,可以释放不必要的空闲线程。
create-new-consumer阈值应大于kill-idle阈值,因为如果它们相等,则当队列大小在阈值大小附近波动时,您将获得create-kill抖动。
4.存储工作结果
在内存中存储作业结果不是很周到的事情。 选择一个持久性存储(MongoDb限制的集合)是一个更好的选择。
如果结果保存在内存中,而您忘记将它们限制在一个上限,则批处理处理器最终将耗尽内存。 重新启动计划程序将清除您以前的工作结果,这是非常有价值的,因为这是您获得的唯一反馈。
5.泛滥外部服务提供商
for(GeocodeRequest geocodeRequest : batchRequests) {mapsService.resolveLocation(geocodeRequest);
}
这段代码充斥着您的地图提供商,因为一旦您完成一项请求,几乎立即就会发出一个新请求,这给他们的服务器带来了很大压力。 如果batchRequests数目足够高,那么您可能会被禁止。
您应该在两次请求之间添加一个短暂的延迟,但是不要让当前的睡眠状态变慢,而应使用EIP延迟器。
6.对批处理
尽管程序风格的编程是大多数程序员的默认思维方式,但许多批处理任务更适合企业集成模式设计。 使用EIP工具更容易实现所有上述规则:
- 消息队列
- 投票渠道
- 变形金刚
- 分离器/聚合器
- 延迟器
使用EIP组件可简化测试,因为您一次只专注于一项职责。 EIP组件通过队列传递的消息进行通信,因此将一个同步处理通道更改为调度的线程池只是一个配置细节。
有关EIP的更多信息,请查看出色的Spring Integration框架。 我已经使用了三年了,接种疫苗后,您会更喜欢它而不是过程编程。
翻译自: https://www.javacodegeeks.com/2013/11/batch-processing-best-practices.html