异步重试
当您有一段经常失败且必须重试的代码时,此Java 7/8库提供了丰富且简洁的API以及针对此问题的快速且可扩展的解决方案:
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
RetryExecutor executor = new AsyncRetryExecutor(scheduler).retryOn(SocketException.class).withExponentialBackoff(500, 2). //500ms times 2 after each retrywithMaxDelay(10_000). //10 secondswithUniformJitter(). //add between +/- 100 ms randomlywithMaxRetries(20);
现在,您可以运行任意代码块,并且库将为您重试该代码块,以防它抛出SocketException
:
final CompletableFuture<Socket> future = executor.getWithRetry(() ->new Socket("localhost", 8080)
);future.thenAccept(socket ->System.out.println("Connected! " + socket)
);
请仔细看! getWithRetry()
不会阻止。 它立即返回CompletableFuture
并异步调用给定的函数。 您可以一次收听该Future
甚至是多个Future
,并同时进行其他工作。 因此,这段代码的作用是:尝试连接到localhost:8080
,如果由于SocketException
失败,它将在500毫秒后重试(带有一些随机抖动),每次重试后的延迟加倍,但不超过10秒。
等效但更简洁的语法:
executor.getWithRetry(() -> new Socket("localhost", 8080)).thenAccept(socket -> System.out.println("Connected! " + socket));
这是您可能期望的示例输出:
TRACE | Retry 0 failed after 3ms, scheduled next retry in 508ms (Sun Jul 21 21:01:12 CEST 2013)
java.net.ConnectException: Connection refusedat java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]//...TRACE | Retry 1 failed after 0ms, scheduled next retry in 934ms (Sun Jul 21 21:01:13 CEST 2013)
java.net.ConnectException: Connection refusedat java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]//...TRACE | Retry 2 failed after 0ms, scheduled next retry in 1919ms (Sun Jul 21 21:01:15 CEST 2013)
java.net.ConnectException: Connection refusedat java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]//...TRACE | Successful after 2 retries, took 0ms and returned: Socket[addr=localhost/127.0.0.1,port=8080,localport=46332]Connected! Socket[addr=localhost/127.0.0.1,port=8080,localport=46332]
想象一下,您连接到两个不同的系统,一个系统速度很慢 ,第二个系统不可靠并且经常失败:
CompletableFuture<String> stringFuture = executor.getWithRetry(ctx -> unreliable());
CompletableFuture<Integer> intFuture = executor.getWithRetry(ctx -> slow());stringFuture.thenAcceptBoth(intFuture, (String s, Integer i) -> {//both done after some retries
});
当缓慢而又不可靠的系统最终没有任何失败地进行答复时,异步执行thenAcceptBoth()
回调。 类似地(使用CompletableFuture.acceptEither()
),您可以同时异步调用两个或多个不可靠的服务器,并在重试几次后第一个成功时会收到通知。
我对此不够强调–重试是异步执行的,并且有效地使用了线程池,而不是盲目地睡眠。
基本原理
通常我们被迫重试给定的代码段,因为它失败了,我们必须再次尝试,通常会稍有延迟以节省CPU。 这项要求非常普遍,并且很少有现成的通用实现,其中RetryTemplate
是通过RetryTemplate
类在Spring Batch中提供重试支持 。 但是几乎没有其他类似的方法( [1] , [2] )。 所有这些尝试(我敢打赌,你们中的许多人自己都实现了类似的工具!)遇到了同样的问题-它们正在阻塞,从而浪费了大量资源,并且扩展性不好。
这本身并不坏,因为它使编程模型更加简单-库负责重试,而您只需要等待比平常更长的返回值即可。 但是,这不仅会造成泄漏的抽象(由于重试和延迟,通常非常快的方法突然变得很慢),而且浪费了宝贵的线程,因为此类工具将在重试之间花费大部分时间。 因此
创建了Async-Retry
实用程序,该实用程序针对Java 8 (现有Java 7 backport )并解决了上述问题。
主要的抽象是RetryExecutor
,它提供了简单的API:
public interface RetryExecutor {CompletableFuture<Void> doWithRetry(RetryRunnable action);<V> CompletableFuture<V> getWithRetry(Callable<V> task);<V> CompletableFuture<V> getWithRetry(RetryCallable<V> task);<V> CompletableFuture<V> getFutureWithRetry(RetryCallable<CompletableFuture<V>> task);
}
不必担心RetryRunnable
和RetryCallable
–为方便起见,它们允许使用已检查的异常,并且在大多数情况下,无论如何我们都会使用lambda表达式。
请注意,它返回CompletableFuture
。 我们不再假装调用错误方法很快。 如果库遇到异常,它将使用预先配置的退避延迟重试我们的代码块。 调用时间将从几毫秒飞涨到几秒钟。 CompletableFuture
清楚地表明了这一点。 而且,它不是愚蠢的java.util.concurrent.Future
我们都知道– Java 8中的CompletableFuture
非常强大 ,最重要的是–默认情况下是非阻塞的。
如果您毕竟需要阻止结果,只需在Future
对象上调用.get()
。
基本API
该API非常简单。 您提供了一个代码块,该库将多次运行它,直到它正常返回为止,而不是引发异常。 它也可能在重试之间设置可配置的延迟:
RetryExecutor executor = //...executor.getWithRetry(() -> new Socket("localhost", 8080));
一旦成功连接到localhost:8080
将解析返回的CompletableFuture<Socket>
。 (可选)我们可以使用RetryContext
来获取额外的上下文,例如当前正在执行的重试:
executor.getWithRetry(ctx -> new Socket("localhost", 8080 + ctx.getRetryCount())).thenAccept(System.out::println);
此代码比看起来更聪明。 在第一次执行期间ctx.getRetryCount()
返回0
,因此我们尝试连接到localhost:8080
。 如果失败,则下一次重试将尝试localhost:8081
( 8080 + 1
),依此类推。 而且,如果您意识到所有这些都是异步发生的,那么您可以扫描多台计算机的端口,并收到有关每个主机上第一个响应端口的通知:
Arrays.asList("host-one", "host-two", "host-three").stream().forEach(host ->executor.getWithRetry(ctx -> new Socket(host, 8080 + ctx.getRetryCount())).thenAccept(System.out::println));
对于每个主机, RetryExecutor
将尝试连接到端口8080,并尝试使用更高的端口。
getFutureWithRetry()
需要特别注意。 我想重试已经返回CompletableFuture<V>
:例如异步HTTP调用的结果:
private CompletableFuture<String> asyncHttp(URL url) { /*...*/}//...final CompletableFuture<CompletableFuture<String>> response = executor.getWithRetry(ctx -> asyncHttp(new URL("http://example.com")));
将asyncHttp()
传递给getWithRetry()
将产生CompletableFuture<CompletableFuture<V>>
。 与它一起工作不仅很尴尬,而且还很麻烦。 该库将仅调用asyncHttp()
并仅在失败时重试,但在返回时不重试
CompletableFuture<String>
失败。 解决方案很简单:
final CompletableFuture<String> response =executor.getFutureWithRetry(ctx ->asyncHttp(new URL("http://example.com")));
在这种情况下, RetryExecutor
将理解从asyncHttp()
返回的内容实际上只是一个Future
并且将(异步)等待结果或失败。 该库功能更强大,因此让我们深入了解:
配置选项
通常,您可以配置两个重要因素: RetryPolicy
,用于控制是否应进行下一次重试;以及Backoff
(可以有选择地增加后续重试之间的延迟)。
默认情况下, RetryExecutor
在每个Throwable
上无限重复用户任务,并在RetryExecutor
重试之间增加1秒的延迟。
创建
RetryExecutor
默认实现是AsyncRetryExecutor
,您可以直接创建:
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();RetryExecutor executor = new AsyncRetryExecutor(scheduler);//...scheduler.shutdownNow();
唯一需要的依赖项是JDK的标准ScheduledExecutorService
。 在许多情况下,一个线程就足够了,但是如果您要同时处理数百个或更多任务的重试,请考虑增加池大小。
注意, AsyncRetryExecutor
不会关闭ScheduledExecutorService
。 这是一个有意识的设计决策,将在后面进行解释。
AsyncRetryExecutor
几乎没有其他构造函数,但是在大多数情况下,更改类的行为最方便的方法是with*()
方法链接调用。 您将看到大量以此方式编写的示例。 稍后,我们将仅使用executor
引用而不定义它。 假设它是RetryExecutor
类型。
重试政策
例外情况
默认情况下,从用户任务抛出的每个Throwable
(特殊AbortRetryException
除外)都会导致重试。 显然,这是可配置的。 例如,在JPA中,您可能想重试由于OptimisticLockException
而失败的事务-但其他所有异常都应立即失败:
executor.retryOn(OptimisticLockException.class).withNoDelay().getWithRetry(ctx -> dao.optimistic());
其中dao.optimistic()
可能会引发OptimisticLockException
。 在这种情况下,您可能不希望重试之间有任何延迟,以后再说。 如果您不喜欢在每个Throwable
上重试的默认设置,只需使用retryOn()
来限制它:
executor.retryOn(Exception.class)
当然,也可能需要相反的做法–中止重试并在抛出某些异常的情况下立即失败而不是重试。 就这么简单:
executor.abortOn(NullPointerException.class).abortOn(IllegalArgumentException.class).getWithRetry(ctx -> dao.optimistic());
显然,您不想重试NullPointerException
或IllegalArgumentException
因为它们指示编程错误,而不是瞬时失败。 最后,您可以结合使用重试和中止策略。 如果发生任何retryOn()
异常(或子类),则用户代码将重试,除非它应该abortOn()
指定的异常。 例如,我们想重试每个IOException
或SQLException
但是如果遇到FileNotFoundException
或java.sql.DataTruncation
(顺序无关),则中止:
executor.retryOn(IOException.class).abortIf(FileNotFoundException.class).retryOn(SQLException.class).abortIf(DataTruncation.class).getWithRetry(ctx -> dao.load(42));
如果这还不够,您可以提供将在每次失败时调用的自定义谓词:
executor.abortIf(throwable ->throwable instanceof SQLException &&throwable.getMessage().contains("ORA-00911"));
最大重试次数
中断重试“循环”的另一种方法(请记住此过程是异步的,没有阻塞循环 )是通过指定最大重试次数:
executor.withMaxRetries(5)
在极少数情况下,您可能希望禁用重试,而几乎不利用异步执行。 在这种情况下,请尝试:
executor.dontRetry()
重试之间的延迟(退避)
有时需要在失败后立即重试(请参阅OptimisticLockException
示例),但是在大多数情况下,这是一个坏主意。 如果您无法连接到外部系统,请稍等片刻,然后再尝试下一次尝试。 您可以节省CPU,带宽和其他服务器的资源。 但是有很多选择要考虑:
- 我们应该以固定的间隔重试还是增加每次失败后的延迟 ?
- 轮候时间是否应该有上限和下限?
- 我们是否应该添加随机“抖动”来延迟时间以及时分散许多任务的重试?
该库回答了所有这些问题。
重试间隔固定
默认情况下,每次重试之前都有1秒的等待时间。 因此,如果初始尝试失败,则将在1秒后执行第一次重试。 当然,我们可以将默认值更改为200毫秒:
executor.withFixedBackoff(200)
如果我们已经在此处,则默认情况下,执行用户任务后将应用退避。 如果用户任务本身消耗一些时间,则重试的频率将降低。 例如,重试延迟为RetryExecutor
毫秒,而用户任务失败所需的平均时间约为50毫秒, RetryExecutor
将每秒重试约4次(50毫秒+ RetryExecutor
毫秒)。 但是,如果要将重试频率保持在更可预测的水平,则可以使用fixedRate
标志:
executor.withFixedBackoff(200).withFixedRate()
这类似于ScheduledExecutorService
“固定速率”与“固定延迟”方法。 顺便说一句,不要期望RetryExecutor
会非常精确,这是最好的,但是在很大程度上取决于前面提到的ScheduledExecutorService
准确性。
重试之间的间隔呈指数增长
它可能是一个活跃的研究主题,但是总的来说,您可能希望随着时间的推移扩展重试延迟,假设如果用户任务多次失败,我们应该减少尝试次数。 例如,假设我们从100ms的延迟开始,直到进行第一次重试为止,但是如果该尝试也失败了,我们应该再等待两次(200ms)。 再过400毫秒,800毫秒……您就会明白:
executor.withExponentialBackoff(100, 2)
这是一个指数函数,可以快速增长。 因此,将最大退避时间设置在某个合理的水平(例如10秒)非常有用:
executor.withExponentialBackoff(100, 2).withMaxDelay(10_000) //10 seconds
随机抖动
在严重停机期间经常观察到的一种现象是系统趋于同步。 想象一个繁忙的系统突然停止响应。 成百上千的请求失败并被重试。 这取决于您的退避,但默认情况下,所有这些请求都会在一秒钟后重试,而一秒钟会产生大量流量。 最后,此类故障会传播到其他系统,这些系统又会进行同步。
为避免此问题,随着时间的推移扩展重试,使负载平坦化是很有用的。 一个简单的解决方案是添加随机抖动来延迟时间,以便并非所有请求都计划在完全相同的时间重试。 您可以在均匀抖动(随机值从-100ms到100ms)之间进行选择:
executor.withUniformJitter(100) //ms
…和比例抖动,将延迟时间乘以随机因子,默认情况下为0.9到1.1(10%):
executor.withProportionalJitter(0.1) //10%
您还可以对延迟时间设置严格的下限,以避免安排较短的重试时间:
executor.withMinDelay(50) //ms
实施细节
该库是在考虑Java 8的情况下构建的,以利用lambda和新的CompletableFuture
抽象(但存在具有Guava依赖项的Java 7 port )。 它在下面使用ScheduledExecutorService
来运行任务并计划将来的重试-这样可以最大程度地利用线程。
但是真正有趣的是,整个库是完全不变的,根本没有单个可变字段。 起初这可能是违反直觉的,例如以以下简单代码示例为例:
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();AsyncRetryExecutor first = new AsyncRetryExecutor(scheduler).retryOn(Exception.class).withExponentialBackoff(500, 2);AsyncRetryExecutor second = first.abortOn(FileNotFoundException.class);AsyncRetryExecutor third = second.withMaxRetries(10);
似乎所有with*()
方法或retryOn()
/ abortOn()
方法retryOn()
现有执行程序变异。 但是事实并非如此,每次配置更改都会创建一个新实例 ,而旧实例则保持不变。 因此,例如,当first
执行者将重试FileNotFoundException
, second
和third
执行者则不会。 但是,它们都共享同一scheduler
。 这就是AsyncRetryExecutor
不关闭ScheduledExecutorService
(甚至没有任何close()
方法)的原因。 由于我们不知道有多少个AsyncRetryExecutor
副本指向同一调度程序,因此我们甚至不尝试管理其生命周期。 但是,这通常不是问题(请参见下面的Spring集成 )。
您可能想知道,为什么这么笨拙的设计决定? 有以下三个原因:
- 在编写并发代码时,不变性可以大大降低多线程错误的风险。 例如,
RetryContext
保留重试次数。 但是,我们无需更改变量,而只需创建具有递增但final
计数器的新实例(副本)。 没有比赛条件或能见度。 - 如果给您现有的
RetryExecutor
几乎完全是您想要的,但是您需要进行一些细微调整,则只需调用executor.with...()
并获取一个新副本。 您不必担心使用同一执行程序的其他地方(请参阅: Spring集成以获取更多示例) - 如今,功能编程和不可变数据结构非常流行 。
注意: AsyncRetryExecutor
未标记为final
,您可以通过将其子类化并添加可变状态来打破不变性。 请不要这样做,子类只允许更改行为。
依存关系
该库需要Java 8和SLF4J进行记录。 Java 7端口还取决于Guava 。
Spring整合
如果您即将在Spring中使用RetryExecutor
,请随时使用,但配置API可能对您不起作用。 Spring通过大量的设置来促进(或用于促进)可变服务的约定。 在XML中,您定义bean并在其上调用setter(通过<property name="..."/>
)。 该约定假定存在变异设置器。 但是我发现这种方法在某些情况下容易出错并且违反直觉。
假设我们全局定义了org.springframework.transaction.support.TransactionTemplate
bean并将其注入到多个位置。 大。 现在有一个请求,它的超时要求略有不同:
@Autowired
private TransactionTemplate template;
后来在同一个班级:
final int oldTimeout = template.getTimeout();
template.setTimeout(10_000);
//do the work
template.setTimeout(oldTimeout);
此代码在许多级别上都是错误的! 首先,如果发生故障,我们将永远不会恢复oldTimeout
。 好了, finally
救了。 但还要注意我们如何更改全局共享的TransactionTemplate
实例。 谁知道不知道更改配置的其他几个bean和线程将要使用它?
即使您确实想全局更改事务超时,也足够公平,但是这样做仍然是错误的方法。 private timeout
字段不是volatile
,因此对其进行的更改对于其他线程可能可见,也可能不可见。 真是一团糟! 同样的问题出现在许多其他类(如JmsTemplate
。
你知道我要去哪里吗? 只需创建一个不变的服务类,并在需要时通过创建副本来安全地对其进行调整。 现在,使用此类服务同样简单:
@Configuration
class Beans {@Beanpublic RetryExecutor retryExecutor() {return new AsyncRetryExecutor(scheduler()).retryOn(SocketException.class).withExponentialBackoff(500, 2);}@Bean(destroyMethod = "shutdownNow")public ScheduledExecutorService scheduler() {return Executors.newSingleThreadScheduledExecutor();}}
嘿! 进入21世纪,我们在Spring不再需要XML。 Bootstrap也很简单:
final ApplicationContext context = new AnnotationConfigApplicationContext(Beans.class);
final RetryExecutor executor = context.getBean(RetryExecutor.class);
//...
context.close();
如您所见,将现代的,不可变的服务与Spring集成非常简单。 顺便说一句,如果您在设计自己的服务时没有准备好进行如此大的更改,请至少考虑构造函数注入 。
到期
该库包含大量的单元测试。 但是,尚未在任何生产代码中使用它,并且该API可能会更改。 当然,我们鼓励您提交错误,功能请求和提取请求 。 它是在考虑到Java 8的情况下开发的,但是Java 7 backport存在一些更冗长的API和强制性Guava依赖关系( ListenableFuture
而不是
Java 8的CompletableFuture
)。
GitHub上的完整源代码。
翻译自: https://www.javacodegeeks.com/2013/08/asynchronous-retry-pattern.html
异步重试