序
本文主要研究一下AsyncHttpClient的RequestFilter
RequestFilter
org/asynchttpclient/filter/RequestFilter.java
/*** A Filter interface that gets invoked before making an actual request.*/
public interface RequestFilter {/*** An {@link org.asynchttpclient.AsyncHttpClient} will invoke {@link RequestFilter#filter} and will use the* returned {@link FilterContext#getRequest()} and {@link FilterContext#getAsyncHandler()} to continue the request* processing.** @param ctx a {@link FilterContext}* @param <T> the handler result type* @return {@link FilterContext}. The {@link FilterContext} instance may not the same as the original one.* @throws FilterException to interrupt the filter processing.*/<T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException;
}
RequestFilter定义了filter方法
ThrottleRequestFilter
org/asynchttpclient/filter/ThrottleRequestFilter.java
/*** A {@link org.asynchttpclient.filter.RequestFilter} throttles requests and block when the number of permits is reached,* waiting for the response to arrives before executing the next request.*/
public class ThrottleRequestFilter implements RequestFilter {private static final Logger logger = LoggerFactory.getLogger(ThrottleRequestFilter.class);private final Semaphore available;private final int maxWait;public ThrottleRequestFilter(int maxConnections) {this(maxConnections, Integer.MAX_VALUE);}public ThrottleRequestFilter(int maxConnections, int maxWait) {this(maxConnections, maxWait, false);}public ThrottleRequestFilter(int maxConnections, int maxWait, boolean fair) {this.maxWait = maxWait;available = new Semaphore(maxConnections, fair);}/*** {@inheritDoc}*/@Overridepublic <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException {try {if (logger.isDebugEnabled()) {logger.debug("Current Throttling Status {}", available.availablePermits());}if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) {throw new FilterException(String.format("No slot available for processing Request %s with AsyncHandler %s",ctx.getRequest(), ctx.getAsyncHandler()));}} catch (InterruptedException e) {throw new FilterException(String.format("Interrupted Request %s with AsyncHandler %s",ctx.getRequest(), ctx.getAsyncHandler()));}return new FilterContext.FilterContextBuilder<>(ctx).asyncHandler(ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available)).build();}
}
ThrottleRequestFilter实现了RequestFilter接口,它使用Semaphore来对request进行限流,限流不通过抛出FilterException,若通过则通过ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available)包装一下asyncHandler以释放信号量ReleasePermitOnComplete
ReleasePermitOnComplete
org/asynchttpclient/filter/ReleasePermitOnComplete.java
/*** Wrapper for {@link AsyncHandler}s to release a permit on {@link AsyncHandler#onCompleted()}. This is done via a dynamic proxy to preserve all interfaces of the wrapped handler.*/
public class ReleasePermitOnComplete {/*** Wrap handler to release the permit of the semaphore on {@link AsyncHandler#onCompleted()}.** @param handler the handler to be wrapped* @param available the Semaphore to be released when the wrapped handler is completed* @param <T> the handler result type* @return the wrapped handler*/@SuppressWarnings("unchecked")public static <T> AsyncHandler<T> wrap(final AsyncHandler<T> handler, final Semaphore available) {Class<?> handlerClass = handler.getClass();ClassLoader classLoader = handlerClass.getClassLoader();Class<?>[] interfaces = allInterfaces(handlerClass);return (AsyncHandler<T>) Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {try {return method.invoke(handler, args);} finally {switch (method.getName()) {case "onCompleted":case "onThrowable":available.release();default:}}});}//......
}
ReleasePermitOnComplete的wrap对原来的handler进行代理,在finally里头执行available.release()
preProcessRequest
org/asynchttpclient/DefaultAsyncHttpClient.java
/*** Configure and execute the associated {@link RequestFilter}. This class* may decorate the {@link Request} and {@link AsyncHandler}** @param fc {@link FilterContext}* @return {@link FilterContext}*/private <T> FilterContext<T> preProcessRequest(FilterContext<T> fc) throws FilterException {for (RequestFilter asyncFilter : config.getRequestFilters()) {fc = asyncFilter.filter(fc);assertNotNull(fc, "filterContext");}Request request = fc.getRequest();if (fc.getAsyncHandler() instanceof ResumableAsyncHandler) {request = ResumableAsyncHandler.class.cast(fc.getAsyncHandler()).adjustRequestRange(request);}if (request.getRangeOffset() != 0) {RequestBuilder builder = new RequestBuilder(request);builder.setHeader("Range", "bytes=" + request.getRangeOffset() + "-");request = builder.build();}fc = new FilterContext.FilterContextBuilder<>(fc).request(request).build();return fc;}
DefaultAsyncHttpClient的preProcessRequest方法遍历config.getRequestFilters(),挨个执行asyncFilter.filter(fc)
executeRequest
org/asynchttpclient/DefaultAsyncHttpClient.java
public <T> ListenableFuture<T> executeRequest(Request request, AsyncHandler<T> handler) {if (config.getCookieStore() != null) {try {List<Cookie> cookies = config.getCookieStore().get(request.getUri());if (!cookies.isEmpty()) {RequestBuilder requestBuilder = new RequestBuilder(request);for (Cookie cookie : cookies) {requestBuilder.addOrReplaceCookie(cookie);}request = requestBuilder.build();}} catch (Exception e) {handler.onThrowable(e);return new ListenableFuture.CompletedFailure<>("Failed to set cookies of request", e);}}if (noRequestFilters) {return execute(request, handler);} else {FilterContext<T> fc = new FilterContext.FilterContextBuilder<T>().asyncHandler(handler).request(request).build();try {fc = preProcessRequest(fc);} catch (Exception e) {handler.onThrowable(e);return new ListenableFuture.CompletedFailure<>("preProcessRequest failed", e);}return execute(fc.getRequest(), fc.getAsyncHandler());}}
executeRequest方法对于noRequestFilters为false会执行preProcessRequest
小结
AsyncHttpClient的RequestFilter定义了filter方法,它有一个实现类为ThrottleRequestFilter,使用信号量用于对请求进行限流;DefaultAsyncHttpClient的executeRequest方法对于noRequestFilters为false会执行preProcessRequest,而preProcessRequest方法遍历config.getRequestFilters(),挨个执行asyncFilter.filter(fc)。