那么,批处理如何神奇地减少延迟呢? 这取决于采用什么算法和数据结构。 在分布式环境中,我们经常不得不将消息/事件分批放入网络数据包中以实现更大的吞吐量。 我们还采用类似的技术来缓冲对存储的写入,以减少IOPS的数量。 该存储可以是块设备支持的文件系统或关系数据库。 大多数IO设备每秒只能处理少量的IO操作,因此最好高效地填充这些操作。 许多批处理方法都涉及等待超时发生,这本质上会增加等待时间。 批处理也可以在超时发生之前被填满,从而使延迟更加不可预测。
图1 |
上面的图1.描绘了通过引入类似队列的结构来暂存要发送的消息/事件,以及通过进行批量处理以写入到设备的线程,将对IO设备的访问以及对访问它的争用分离。
算法
批处理方法使用Java伪代码中的以下算法:
public final class NetworkBatcherimplements Runnable
{private final NetworkFacade network;private final Queue<Message> queue;private final ByteBuffer buffer;public NetworkBatcher(final NetworkFacade network,final int maxPacketSize,final Queue<Message> queue){this.network = network;buffer = ByteBuffer.allocate(maxPacketSize);this.queue = queue;}public void run(){while (!Thread.currentThread().isInterrupted()){while (null == queue.peek()){employWaitStrategy(); // block, spin, yield, etc.}Message msg;while (null != (msg = queue.poll())){if (msg.size() > buffer.remaining()){sendBuffer();}buffer.put(msg.getBytes());}sendBuffer();}}private void sendBuffer(){buffer.flip();network.send(buffer);buffer.clear();}
}
基本上,等待数据可用,并立即将其发送。 在发送前一条消息或等待新消息时,可能会到达一连串的流量,所有流量都可以批量发送,直到缓冲区的大小,然后发送到基础资源。 此方法可以使用ConcurrentLinkedQueue ,它提供低延迟并避免锁定。 但是,如果线程的速度超过批处理程序的速度,则不会产生使生产/发布线程停顿的反压力,因为队列不受限制,因此队列可能会失去控制。 我经常不得不包装ConcurrentLinkedQueue来跟踪其大小,从而产生背压。 根据我的经验,此大小跟踪可以使使用此队列的处理成本增加50%。
该算法遵循单一写入器原理 ,可在写入网络或存储设备时经常使用,因此避免了第三方API库中的锁争用。 通过避免争用,由于对锁的排队效应,我们避免了通常与资源争用相关的J曲线延迟配置文件。 使用此算法,随着负载的增加,延迟会保持恒定,直到底层设备的流量饱和为止,从而导致比“ J曲线”更多的“浴缸”配置文件。
让我们举一个处理10个消息的示例,这些消息作为流量突发而到达。 在大多数系统中,流量是突发的,很少在时间上均匀地间隔开。 一种方法将假定不进行批处理,并且线程将直接写入设备API,如上面的图1所示。 另一个将使用无锁数据结构来收集消息,并按照上述算法在循环中收集消耗消息的单个线程。 对于该示例,我们假设花费100 µs的时间将单个缓冲区作为同步操作写入网络设备并得到确认。 当等待时间很关键时,缓冲区的大小最好小于网络的MTU。 许多网络子系统都是异步的,并且支持流水线化,但是我们将做出上述假设以阐明示例。 如果网络操作在REST或Web服务下使用HTTP之类的协议,则此假设与基础实现相匹配。
最佳(µs) | 平均值(µs) | 最差(µs) | 发送的数据包 | |
---|---|---|---|---|
序列号 | 100 | 500 | 1,000 | 10 |
智能配料 | 100 | 150 | 200 | 1-2 |
如果从线程发起数据直接将消息发送到资源(如果资源无竞争),则将实现绝对最低的延迟。 上表显示了发生争用并产生排队效应时发生的情况。采用串行方法时,将必须发送10个单独的数据包,并且这些数据包通常需要排队等待管理对资源的访问的锁,因此将按顺序进行处理。 上图假定锁定策略在没有可察觉开销的情况下完美工作,而这在实际应用中是不可能的。
对于批处理解决方案,如果并发队列有效,则很有可能在首批中拾取所有10个数据包,从而提供最佳的延迟情况。 在最坏的情况下,在第一批中仅发送一条消息,在下一批中发送其他九条消息。 因此,在最坏的情况下,一条消息的延迟为100 µs,随后的9条消息的延迟为200 µs,因此,最坏情况的平均值为190 µs,这比串行方法要好得多。
当最简单的解决方案由于争用而过于简单时,这就是一个很好的例子。 批处理解决方案有助于在突发条件下实现一致的低延迟,并且最适合吞吐量。 它在接收端的整个网络上也具有很好的效果,因为接收器必须处理更少的数据包,因此使两端的通信效率更高。
大多数硬件都会处理缓冲区中的数据(最大固定大小)以提高效率。 对于存储设备,通常为4KB块。 对于网络,这将是MTU,对于以太网,通常为1500字节。 批处理时,最好了解底层硬件并以理想的缓冲区大小写下批处理,以实现最佳效率。 但是请记住,某些设备需要封装数据,例如,网络数据包的以太网和IP标头,因此缓冲区需要考虑到这一点。
线程切换总是会增加等待时间,并且通过数据结构进行交换的成本也会增加。 但是,使用无锁技术可以使用许多非常好的非阻塞结构。 对于Disruptor,这种类型的交换可以在短短的50-100ns内完成,因此对于低延迟或高吞吐量的分布式系统而言,选择智能批处理方法毫无困难。
这项技术可以用于许多问题,而不仅仅是IO。 当发布者突发事件并超过EventProcessor时,Disruptor的核心使用此技术来帮助重新平衡系统。 可以在BatchEventProcessor内部看到该算法。
注意:为了使该算法起作用,排队结构必须比基础资源更好地处理争用。 许多队列实现在管理争用方面非常差。 在得出结论之前,请运用科学和测量。
使用干扰器批量处理
下面的代码显示了使用Disruptor的EventHandler机制执行的相同算法。 以我的经验,这是一种非常有效的技术,可以有效地处理任何IO设备,并在处理负载或突发流量时保持较低的延迟。
public final class NetworkBatchHandlerimplements EventHander<Message>
{private final NetworkFacade network;private final ByteBuffer buffer;public NetworkBatchHandler(final NetworkFacade network,final int maxPacketSize){this.network = network;buffer = ByteBuffer.allocate(maxPacketSize);}public void onEvent(Message msg, long sequence, boolean endOfBatch) throws Exception{if (msg.size() > buffer.remaining()){sendBuffer();}buffer.put(msg.getBytes());if (endOfBatch){sendBuffer();}} private void sendBuffer(){buffer.flip();network.send(buffer);buffer.clear();}
}
与上述算法中的double循环相比,endOfBatch参数大大简化了批处理。
我简化了示例以说明算法。 显然,需要考虑错误处理和其他边缘条件。
IO与工作处理的分离
还有另一个很好的理由将IO与执行工作处理的线程分开。 将IO移交给另一个线程意味着一个或多个工作线程可以继续处理而不会以一种友好的缓存友好方式进行阻塞。 我发现这对于实现高性能吞吐量至关重要。
如果基础IO设备或资源短暂饱和,则可以将消息排队等待批处理程序线程,以允许工作处理线程继续进行。 然后,批处理线程以最有效的方式将消息馈送到IO设备,从而允许数据结构处理突发数据,如果已完全施加必要的反压力,则可以很好地分离工作流程中的关注点。
结论
所以你有它。 智能批处理可与适当的数据结构配合使用,以实现一致的低延迟和最大吞吐量。
参考:来自Mechanical慰问博客的JCG合作伙伴 Martin Thompson提供的智能配料 。
翻译自: https://www.javacodegeeks.com/2012/08/smart-batching.html