本文只是分析Letture类型的Redis 池化连接出现的连接超时异常、读超时异常问题。
1.RedisConnectionException
默认是10秒。
通过如下可以配置:
public class MyLettuceClientConfigurationBuilderCustomizer implements LettuceClientConfigurationBuilderCustomizer {@Overridepublic void customize(LettuceClientConfiguration.LettuceClientConfigurationBuilder clientConfigurationBuilder) {
// spring.redis.timeout等价clientConfigurationBuilder.commandTimeout(Duration.ofSeconds(100));
// 控制连接超时时间,默认是10秒ClientOptions.Builder builder = ClientOptions.builder().socketOptions(SocketOptions.builder().connectTimeout(Duration.ofMillis(10)).build());clientConfigurationBuilder.clientOptions(builder.build());}
}
2.RedisCommandTimeoutException
结论:抛出RedisCommandTimeoutException异常并非数据的写一定失败。只不过Redis内部超时逻辑跟写逻辑是异步处理,所以存在写成功后仍然抛出异常的情况。但是只要出现该异常必须得处理。
io.lettuce.core.RedisCommandTimeoutException: Command timed out after 2 second(s)
2.1.CommandHandler限制
CommandHandler是Netty中的handler。只不过其同时充当入栈、出栈handler。通俗点讲是Redis Write & Flush 数据到通道NioSocketChannel的必经之路。
AsyncCommand存在两大核心功能。其一是包裹用户数据,其二是实现了CompletableFuture接口,方便实现异步相关操作。
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise){if (!isWriteable(command)) {// 此处的就是判断是否存在超时的核心之处,借用CompletableFuture异步操作功能promise.trySuccess();// 此时说明数据写过程中超过用户设定的超时时间return;}...ctx.write(command, promise);// 数据正常写入,最终归宿是服务端}private static boolean isWriteable(RedisCommand<?, ?, ?> command) {return !command.isDone();}
}
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {public boolean isDone() {return result != null;}
}
通过上述得知,只要CompletableFuture内部的变量result不为空,则就会按TimeoutException处理。所以result值何时赋值呢?
如下所示,CommandHandler处理完数据之后会通过如下方式设置result的值。
public class AsyncCommand<K, V, T> extends CompletableFuture<T> implements RedisCommand<K, V, T>, RedisFuture<T>,CompleteableCommand<T>, DecoratedCommand<K, V, T> {protected void completeResult() {if (command.getOutput() == null) {complete(null);} else if (command.getOutput().hasError()) {doCompleteExceptionally(ExceptionFactory.createExecutionException(command.getOutput().getError()));} else {// 成功发送则Output为字符串类型的“ok”complete(command.getOutput().get());}}
}
3.超时处理时机
3.1.阻塞等待超时
class FutureSyncInvocationHandler extends AbstractInvocationHandler {protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {try {Method targetMethod = this.translator.get(method);// CommandExpiryWriter#writerObject result = targetMethod.invoke(asyncApi, args);if (result instanceof RedisFuture<?>) {...long timeout = getTimeoutNs(command);//用户自定义的超时时间//command:AsyncCommand 利用CompletableFuture异步特性return LettuceFutures.awaitOrCancel(command, timeout, TimeUnit.NANOSECONDS);}return result;} catch (InvocationTargetException e) {throw e.getTargetException();}}
}
public class LettuceFutures {public static <T> T awaitOrCancel(RedisFuture<T> cmd, long timeout, TimeUnit unit) {try {//阻塞 其实内存调用的CompletableFuture#get阻塞方法。如果返回TimeoutExeception则说明AsyncCommand没有执行完毕,超时处理if (!cmd.await(timeout, unit)) {cmd.cancel(true);throw ExceptionFactory.createTimeoutException(Duration.ofNanos(unit.toNanos(timeout)));}return cmd.get();//这个方法还是调用CompletableFuture#get阻塞方法,但是该方法是根据内部属性result进行判断} catch (RuntimeException e) {throw e;} catch (ExecutionException e) {if (e.getCause() instanceof RedisCommandExecutionException) {throw ExceptionFactory.createExecutionException(e.getCause().getMessage(), e.getCause());}if (e.getCause() instanceof RedisCommandTimeoutException) {throw new RedisCommandTimeoutException(e.getCause());}throw new RedisException(e.getCause());} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RedisCommandInterruptedException(e);} catch (Exception e) {throw ExceptionFactory.createExecutionException(null, e);}}
}
如下伪代码所示:如果result不为null,则根据result类型抛出相关异常。否则通过timedGet在规定时间内阻塞等待,超时则抛出TimeoutException。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {Object r;long nanos = unit.toNanos(timeout);return reportGet((r = result) == null ? timedGet(nanos) : r);}
}
3.2.定时任务超时处理
public class CommandExpiryWriter implements RedisChannelWriter {public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {potentiallyExpire(command, getExecutorService());//开启定时任务return writer.write(command);//DefaultEndPoint#write}private void potentiallyExpire(RedisCommand<?, ?, ?> command, ScheduledExecutorService executors) {long timeout = applyConnectionTimeout ? this.timeout : source.getTimeout(command);//用户自定义的超时时间if (timeout <= 0) {return;}// 从 timeout 时间之后开始执行定时任务。此时result值如果存在值则说明超时ScheduledFuture<?> schedule = executors.schedule(() -> {if (!command.isDone()) {command.completeExceptionally(ExceptionFactory.createTimeoutException(Duration.ofNanos(timeUnit.toNanos(timeout))));}}, timeout, timeUnit);if (command instanceof CompleteableCommand) {((CompleteableCommand) command).onComplete((o, o2) -> {if (!schedule.isDone()) {schedule.cancel(false);}});}}
}
重点:定时任务判断成功与否的唯一条件就是result是否存在值。情况1,如果此时result = ok表明数据已经在服务端落盘成功,但是同样会抛出RedisCommandTimeoutException。情况2,发送之前出现异常则result值为Throwable类型的异常值。