核心接口
Filter
package com.xxx.arch.mw.nbp.common.extension;import com.xxx.commons.data.domain.Result;/*** @date 2023/08/25*/
public interface Filter {Result invoke(final Invoker invoker, final Invocation invocation);
}
Invoker
package com.xxx.arch.mw.nbp.common.extension;import com.xxx.commons.data.domain.Result;/*** @date 2023/08/25*/
public interface Invoker {Result invoke(Invocation invocation);
}
Invocation
package com.xxx.arch.mw.nbp.common.extension;import com.xxx.arch.mw.nbp.share.dto.PropertyDTO;/*** @date 2023/08/25*/
public interface Invocation<T> {T getDetail();PropertyDTO getProperty();boolean isAsync();}
关键类
FilterChain
package com.xxx.arch.mw.nbp.common.extension;import com.xxx.arch.mw.nbp.common.annotation.DispatchFilter;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;/*** @date 2023/08/28*/
public class FilterChain {private final List<Filter> filters = new ArrayList<>();public FilterChain() {}public void addFilter(Filter filter) {if (filter == null || filter.getClass().getAnnotation(DispatchFilter.class) == null) {return;}if (this.filters.stream().noneMatch(existFilter -> existFilter.getClass().equals(filter.getClass()))) {this.filters.add(filter);}}public void removeFilter(String filterName) {this.filters.removeIf(filter -> filter.getClass().getAnnotation(DispatchFilter.class).value().equals(filterName));}public void removeFilter(Filter filter) {this.filters.removeIf(aFilter -> aFilter.getClass().equals(filter.getClass()));}public Invoker buildInvokerChain(final Invoker invoker) {Invoker last = invoker;if (!filters.isEmpty()) {this.sort();for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);final Invoker next = last;last = invocation -> filter.invoke(next, invocation);}}return last;}public void sort() {Collections.sort(this.filters, FilterComparator.COMPARATOR);}public List<Filter> getFilters() {return filters;}
}
FilterComparator
package com.xxx.arch.mw.nbp.common.extension;import com.xxx.arch.mw.nbp.common.annotation.DispatchFilter;import java.util.Comparator;/*** @date 2023/08/28*/
public class FilterComparator implements Comparator<Object> {public static final Comparator<Object> COMPARATOR = new FilterComparator();@Overridepublic int compare(Object o1, Object o2) {if (o1 == null && o2 == null) {return 0;}if (o1 == null) {return -1;}if (o2 == null) {return 1;}if (o1.equals(o2)) {return 0;}DispatchFilter a1 = o1.getClass().getAnnotation(DispatchFilter.class);DispatchFilter a2 = o2.getClass().getAnnotation(DispatchFilter.class);int n1 = a1 == null ? 0 : a1.order();int n2 = a2 == null ? 0 : a2.order();// never return 0 even if n1 equals n2, otherwise, o1 and o2 will override each other in collection like HashSetreturn n1 > n2 ? 1 : -1;}
}
SingleInvocation
package com.xxx.arch.mw.nbp.common.extension;import com.xxx.arch.mw.nbp.share.dto.PropertyDTO;
import com.xxx.arch.mw.nbp.share.dto.SingleDetailDTO;/*** @date 2023/08/28*/
public class SingleInvocation implements Invocation<SingleDetailDTO> {private final SingleDetailDTO singleDetailDTO;private final PropertyDTO propertyDTO;private final boolean async;public SingleInvocation(SingleDetailDTO singleDetailDTO, PropertyDTO propertyDTO) {this.singleDetailDTO = singleDetailDTO;this.propertyDTO = propertyDTO;this.async = false;}public SingleInvocation(SingleDetailDTO singleDetailDTO, PropertyDTO propertyDTO, boolean async) {this.singleDetailDTO = singleDetailDTO;this.propertyDTO = propertyDTO;this.async = async;}@Overridepublic SingleDetailDTO getDetail() {return singleDetailDTO;}@Overridepublic PropertyDTO getProperty() {return propertyDTO;}@Overridepublic boolean isAsync() {return this.async;}
}
MultiInvocation
package com.xxx.arch.mw.nbp.common.extension;import com.xxx.arch.mw.nbp.share.dto.PropertyDTO;
import com.xxx.arch.mw.nbp.share.dto.SingleDetailDTO;import java.util.List;/*** @date 2023/08/28*/
public class MultiInvocation implements Invocation<List<SingleDetailDTO>> {private final List<SingleDetailDTO> singleDetailDTOS;private final PropertyDTO propertyDTO;private final boolean async;public MultiInvocation(List<SingleDetailDTO> singleDetailDTOS, PropertyDTO propertyDTO) {this.singleDetailDTOS = singleDetailDTOS;this.propertyDTO = propertyDTO;this.async = false;}public MultiInvocation(List<SingleDetailDTO> singleDetailDTOS, PropertyDTO propertyDTO, boolean async) {this.singleDetailDTOS = singleDetailDTOS;this.propertyDTO = propertyDTO;this.async = async;}@Overridepublic List<SingleDetailDTO> getDetail() {return singleDetailDTOS;}@Overridepublic PropertyDTO getProperty() {return propertyDTO;}@Overridepublic boolean isAsync() {return this.async;}
}
自定义Filter
选取几个经典实现
ValidationFilter
package com.xxx.arch.mw.nbp.client.extension;import com.xxx.arch.mw.nbp.common.annotation.DispatchFilter;
import com.xxx.arch.mw.nbp.common.constant.FilterConstants;
import com.xxx.arch.mw.nbp.common.domain.NbpCode;
import com.xxx.arch.mw.nbp.common.exception.NbpException;
import com.xxx.arch.mw.nbp.common.extension.*;
import com.xxx.arch.mw.nbp.share.dto.SingleDetailDTO;
import com.xxx.commons.data.domain.Result;import java.util.List;/*** @date 2023/08/28*/
@DispatchFilter(group = {FilterConstants.PUBLISHER, FilterConstants.EXECUTOR}, value = "validation", order = 20)
public class ValidationFilter implements Filter {@Overridepublic Result invoke(final Invoker invoker, final Invocation invocation) {if (invoker == null) {throw new NbpException(NbpCode.ILLEGAL_PARAM.getCode(), "invoker can't be null");}if (invocation == null) {throw new NbpException(NbpCode.ILLEGAL_PARAM.getCode(), "invocation can't be null");}if (invocation.getDetail() == null) {throw new NbpException(NbpCode.ILLEGAL_PARAM.getCode(), "detail can't be null");}if (invocation.getProperty() == null) {throw new NbpException(NbpCode.ILLEGAL_PARAM.getCode(), "property can't be null");}if (invocation instanceof SingleInvocation) {this.validate(((SingleInvocation) invocation).getDetail());} else if (invocation instanceof MultiInvocation) {List<SingleDetailDTO> singleDetailDTOList = ((MultiInvocation) invocation).getDetail();if (singleDetailDTOList.isEmpty()) {throw new NbpException(NbpCode.ILLEGAL_PARAM.getCode(), "singleDetailList can't be null or empty");}for (SingleDetailDTO singleDetail : singleDetailDTOList) {this.validate(singleDetail);}}return invoker.invoke(invocation);}private void validate(SingleDetailDTO singleDetail) {if (singleDetail == null) {throw new NbpException(NbpCode.ILLEGAL_PARAM.getCode(), "singleDetail can't be null");}if (singleDetail.getTemplateCode() == null) {throw new NbpException(NbpCode.ILLEGAL_PARAM.getCode(), "templateCode can't be null");}if (singleDetail.getUserContext() == null || singleDetail.getUserContext().size() == 0) {throw new NbpException(NbpCode.ILLEGAL_PARAM.getCode(), "userContext can't be null or empty");}}
}
PublishMetricFilter
package com.xxx.arch.mw.nbp.client.extension;import com.xxx.arch.mw.nbp.client.util.EnvUtils;
import com.xxx.arch.mw.nbp.client.util.VersionUtils;
import com.xxx.arch.mw.nbp.common.annotation.DispatchFilter;
import com.xxx.arch.mw.nbp.common.constant.FilterConstants;
import com.xxx.arch.mw.nbp.common.constant.InstanceKey;
import com.xxx.arch.mw.nbp.common.constant.TraceKey;
import com.xxx.arch.mw.nbp.common.domain.NbpCode;
import com.xxx.arch.mw.nbp.common.exception.NbpException;
import com.xxx.arch.mw.nbp.common.extension.*;
import com.xxx.arch.mw.nbp.common.util.TraceUtil;
import com.xxx.arch.mw.nbp.share.dto.MultiResult;
import com.xxx.arch.mw.nbp.share.dto.SingleDetailDTO;
import com.xxx.commons.data.domain.Result;
import com.xxx.commons.data.exception.ServiceException;
import org.apache.commons.lang3.time.StopWatch;import java.util.List;
import java.util.Map;import static com.xxx.arch.mw.nbp.client.logger.LoggerInit.LOGGER_PUBLISH;/*** @date 2023/08/28*/
@DispatchFilter(group = {FilterConstants.PUBLISHER}, value = "publishMetric", order = 200)
public class PublishMetricFilter implements Filter {@Overridepublic Result invoke(final Invoker invoker, final Invocation invocation) {if (Boolean.TRUE.equals(invocation.getProperty().getPublisher().getMetricDisabled())) {return invoker.invoke(invocation);}Result result;StopWatch stopWatch = new StopWatch();try {stopWatch.start();result = invoker.invoke(invocation);stopWatch.stop();if (result.isSuccess()) {MultiResult<SingleDetailDTO> resultData = (MultiResult<SingleDetailDTO>) result.getData();for (SingleDetailDTO singleDetail : resultData.getSuccessList()) {this.log(singleDetail, true, stopWatch, null);}for (SingleDetailDTO singleDetail : resultData.getFailureList()) {this.log(singleDetail, false, stopWatch, null);}} else {// 仅PublishExceptionFilter被禁用才会走该分支if (invocation instanceof SingleInvocation) {this.log(((SingleInvocation) invocation).getDetail(), false, stopWatch,new NbpException(result.getCode(), result.getMessage(), result.getCause()));} else if (invocation instanceof MultiInvocation) {List<SingleDetailDTO> singleDetailDTOList = ((MultiInvocation) invocation).getDetail();for (SingleDetailDTO singleDetail : singleDetailDTOList) {this.log(singleDetail, false, stopWatch,new NbpException(result.getCode(), result.getMessage(), result.getCause()));}}}} catch (Throwable e) {if (stopWatch.isStarted()) {stopWatch.stop();}if (invocation instanceof SingleInvocation) {this.log(((SingleInvocation) invocation).getDetail(), false, stopWatch, e);} else if (invocation instanceof MultiInvocation) {List<SingleDetailDTO> singleDetailDTOList = ((MultiInvocation) invocation).getDetail();for (SingleDetailDTO singleDetail : singleDetailDTOList) {this.log(singleDetail, false, stopWatch, e);}}throw e;}return result;}private void log(SingleDetailDTO singleDetail, boolean success, StopWatch stopWatch,Throwable throwable) {Map<String, Object> systemContext = singleDetail.getSystemContext();LOGGER_PUBLISH.info("NBP-CLIENT METRIC PUBLISH","success={},elapsed={},env={},shadow={},traceId={},rpcId={},version={},"+ "code={},id={},bizKey={},triggerTime={},strategyId={},ruleId={},msgId={},"+ "publishedTime={},publishedIp={},receivedTime={},receivedIp={},"+ "errorCode={}",success, stopWatch.getTime(), EnvUtils.getCurrentEnv().name(),TraceUtil.isShadow(), TraceUtil.getTraceId(), TraceUtil.getRpcId(),VersionUtils.VERSION, singleDetail.getTemplateCode(), singleDetail.getInstanceId(),singleDetail.getBizKey(), singleDetail.getTriggerTime(),systemContext.get(InstanceKey.STRATEGY_ID),systemContext.get(InstanceKey.RULE_ID),singleDetail.getUserContext().get(InstanceKey.DISASTER_MSG_ID),systemContext.get(TraceKey.PUBLISHED_TIME),systemContext.get(TraceKey.PUBLISHED_IP),systemContext.get(TraceKey.RECEIVED_TIME),systemContext.get(TraceKey.RECEIVED_IP),throwable == null ? null : throwable instanceof ServiceException ?((ServiceException) throwable).getCode() : NbpCode.UNKNOWN.getCode(),throwable == null ? null : throwable);}}
PublishTraceFilter
package com.xxx.arch.mw.nbp.client.extension;import com.xxx.arch.mw.nbp.common.annotation.DispatchFilter;
import com.xxx.arch.mw.nbp.common.constant.CommonConstants;
import com.xxx.arch.mw.nbp.common.constant.FilterConstants;
import com.xxx.arch.mw.nbp.common.extension.Filter;
import com.xxx.arch.mw.nbp.common.extension.Invocation;
import com.xxx.arch.mw.nbp.common.extension.Invoker;
import com.xxx.arch.mw.nbp.common.util.TraceUtil;
import com.xxx.arch.mw.nbp.share.dto.PropertyDTO;
import com.xxx.arch.mw.nbp.share.facade.DispatchPublishService;
import com.xxx.commons.data.domain.Result;
import com.xxx.arch.mw.util.RequestCtxUtil;import static com.xxx.arch.mw.nbp.common.util.TraceUtil.*;
import static com.xxx.arch.mw.nbp.common.util.TraceUtil.NBP_RPC_PUB_NAME;/*** @date 2023/08/28*/
@DispatchFilter(group = {FilterConstants.PUBLISHER}, value = "publishTrace", order = 100)
public class PublishTraceFilter implements Filter {@Overridepublic Result invoke(final Invoker invoker, final Invocation invocation) {final PropertyDTO property = invocation.getProperty();if (Boolean.TRUE.equals(property.getPublisher().getEagleEyeDisabled())) {return invoker.invoke(invocation);}Result result;try {TraceUtil.startRpc(String.join(CommonConstants.COLON, DispatchPublishService.class.getCanonicalName(),invocation.getProperty().getTemplateCode()), NBP_PUB_SEND_METHOD_NAME, NBP_RPC_PUB_TYPE);TraceUtil.rpcClientSend();result = invoker.invoke(invocation);final String remoteIp = RequestCtxUtil.getProviderIp();if (remoteIp != null) {TraceUtil.remoteIp(remoteIp);}if (result.isSuccess()) {TraceUtil.attribute(RPC_NAME_KEY, NBP_RPC_PUB_NAME);TraceUtil.rpcClientRecv(RPC_RESULT_SUCCESS, "success");} else {TraceUtil.attribute(RPC_NAME_KEY, NBP_RPC_PUB_NAME);TraceUtil.rpcClientRecv(RPC_RESULT_FAILED, result.getMessage());}} catch (Throwable e) {TraceUtil.attribute(RPC_NAME_KEY, NBP_RPC_PUB_NAME);TraceUtil.rpcClientRecv(RPC_RESULT_FAILED, e.getMessage());throw e;}return result;}
}
PublishExceptionFilter
package com.xxx.arch.mw.nbp.client.extension;import com.xxx.arch.mw.nbp.common.annotation.DispatchFilter;
import com.xxx.arch.mw.nbp.common.constant.FilterConstants;
import com.xxx.arch.mw.nbp.common.domain.NbpCode;
import com.xxx.arch.mw.nbp.common.exception.DegradeException;
import com.xxx.arch.mw.nbp.common.exception.FlowException;
import com.xxx.arch.mw.nbp.common.exception.NbpException;
import com.xxx.arch.mw.nbp.common.exception.RpcException;
import com.xxx.arch.mw.nbp.common.extension.Filter;
import com.xxx.arch.mw.nbp.common.extension.Invocation;
import com.xxx.arch.mw.nbp.common.extension.Invoker;
import com.xxx.commons.data.domain.Result;
import com.xxx.arch.mw.exception.RpcException;/*** @date 2023/08/28*/
@DispatchFilter(group = {FilterConstants.PUBLISHER}, value = "publishException", order = 300)
public class PublishExceptionFilter implements Filter {@Overridepublic Result invoke(final Invoker invoker, final Invocation invocation) {try {Result result = invoker.invoke(invocation);if (!result.isSuccess()) {if (NbpCode.FLOW_CONTROL_DENIED.getCode().equals(result.getCode())) {throw new FlowException(NbpCode.FLOW_CONTROL_DENIED.getCode(), result.getMessage(), result.getCause());} else if (NbpCode.BLACKLIST_DENIED.getCode().equals(result.getCode())) {throw new FlowException(NbpCode.BLACKLIST_DENIED.getCode(), result.getMessage(), result.getCause());} else if (NbpCode.DEGRADED_DENIED.getCode().equals(result.getCode())) {throw new DegradeException(NbpCode.DEGRADED_DENIED.getCode(), result.getMessage(), result.getCause());} else {throw new NbpException(result.getCode(), result.getMessage(), result.getCause());}}return result;} catch (NbpException e) {throw e;} catch (RpcException e) {throw new RpcException(NbpCode.RPC_ERROR.getCode(), e.getMessage(), e.getCause());} catch (Throwable e) {throw new NbpException(NbpCode.UNKNOWN.getCode(), e.getMessage(), e.getCause());}}}
PublishCompressFilter
package com.xxx.arch.mw.nbp.client.extension;import com.xxx.arch.mw.nbp.common.annotation.DispatchFilter;
import com.xxx.arch.mw.nbp.common.constant.CommonConstants;
import com.xxx.arch.mw.nbp.common.constant.FilterConstants;
import com.xxx.arch.mw.nbp.common.converter.ConverterUtil;
import com.xxx.arch.mw.nbp.common.csp.Compressor;
import com.xxx.arch.mw.nbp.common.csp.CompressorEnum;
import com.xxx.arch.mw.nbp.common.csp.CompressorFactory;
import com.xxx.arch.mw.nbp.common.extension.*;
import com.xxx.arch.mw.nbp.share.dto.PropertyDTO;
import com.xxx.arch.mw.nbp.share.dto.SingleDetailDTO;
import com.xxx.commons.data.domain.Result;import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import static com.xxx.arch.mw.nbp.common.constant.InstanceKey.COMPRESSED_ALGORITHM_KEY;
import static com.xxx.arch.mw.nbp.common.constant.InstanceKey.COMPRESSED_CONTEXT_KEY;/*** @date 2023/08/28*/
@DispatchFilter(group = {FilterConstants.PUBLISHER}, value = "publishCompress", order = 700)
public class PublishCompressFilter implements Filter {@Overridepublic Result invoke(final Invoker invoker, final Invocation invocation) {final PropertyDTO propertyDTO = invocation.getProperty();final boolean compressEnabled = propertyDTO.getPublisher().getCompressDisabled() != null ?!propertyDTO.getPublisher().getCompressDisabled() : CommonConstants.USER_CONTEXT_COMPRESS_ENABLED;if (compressEnabled) {final int userContextCompressSizeThreshold = propertyDTO.getPublisher().getCompressSizeThreshold() != null ?propertyDTO.getPublisher().getCompressSizeThreshold() : CommonConstants.USER_CONTEXT_COMPRESS_SIZE_THRESHOLD;final CompressorEnum compressorEnum = CompressorEnum.toEnumFromName(propertyDTO.getPublisher().getCompressAlgorithm());final Compressor compressor = compressorEnum != null ?CompressorFactory.getCompressor(compressorEnum) : CompressorFactory.getDefaultCompressor();if (invocation instanceof SingleInvocation) {this.doProcess(((SingleInvocation) invocation).getDetail(), propertyDTO,userContextCompressSizeThreshold, compressorEnum, compressor);} else if (invocation instanceof MultiInvocation) {List<SingleDetailDTO> singleDetailDTOList = ((MultiInvocation) invocation).getDetail();for (SingleDetailDTO singleDetail : singleDetailDTOList) {this.doProcess(singleDetail, propertyDTO,userContextCompressSizeThreshold, compressorEnum, compressor);}}}return invoker.invoke(invocation);}private void doProcess(SingleDetailDTO singleDetail,PropertyDTO propertyDTO,int userContextCompressSizeThreshold,CompressorEnum compressorEnum,Compressor compressor) {byte[] body = ConverterUtil.toBody(singleDetail.getUserContext());if (body.length > userContextCompressSizeThreshold) {final Map<String, String> remainUnchangedFields = new HashMap<>();propertyDTO.getPublisher().getRemainUnchangedFields().forEach(filed -> {if (singleDetail.getUserContext().containsKey(filed)) {remainUnchangedFields.put(filed, singleDetail.getUserContext().remove(filed));}});if (remainUnchangedFields.size() > 0) {body = ConverterUtil.toBody(singleDetail.getUserContext(), Map.class);}final byte[] compressedBody = compressor.compress(body);final String compressedContext = Base64.getEncoder().encodeToString(compressedBody);if (body.length - compressedContext.length() > CommonConstants.USER_CONTEXT_COMPRESS_REVENUE_THRESHOLD) {singleDetail.getUserContext().clear();singleDetail.getUserContext().put(COMPRESSED_CONTEXT_KEY, compressedContext);if (compressorEnum != null && compressorEnum != Compressor.DEFAULT_COMPRESSOR_ALGORITHM) {singleDetail.getUserContext().put(COMPRESSED_ALGORITHM_KEY, compressorEnum.getName());}}singleDetail.getUserContext().putAll(remainUnchangedFields);}}
}
自定义Invoker
PublishInvoker
package com.xxx.arch.mw.nbp.client.invoker;import com.xxx.arch.mw.nbp.client.configuration.DispatchProperty;
import com.xxx.arch.mw.nbp.common.extension.Invocation;
import com.xxx.arch.mw.nbp.common.extension.Invoker;
import com.xxx.arch.mw.nbp.common.extension.MultiInvocation;
import com.xxx.arch.mw.nbp.common.extension.SingleInvocation;
import com.xxx.arch.mw.nbp.share.dto.SingleDetailDTO;
import com.xxx.arch.mw.nbp.share.facade.DispatchPublishService;
import com.xxx.commons.data.domain.Result;import java.util.ArrayList;
import java.util.List;/*** @date 2023/08/30*/
public class PublishInvoker implements Invoker {private DispatchPublishService publishService;private DispatchProperty dispatchProperty;public PublishInvoker(DispatchPublishService publishService, DispatchProperty dispatchProperty) {this.publishService = publishService;this.dispatchProperty = dispatchProperty;}@Overridepublic Result invoke(Invocation invocation) {List<SingleDetailDTO> singleDetailDTOS = new ArrayList<>();if (invocation instanceof SingleInvocation) {singleDetailDTOS.add(((SingleInvocation) invocation).getDetail());} else if (invocation instanceof MultiInvocation) {singleDetailDTOS = ((MultiInvocation) invocation).getDetail();}return this.publishService.publish(singleDetailDTOS);}public DispatchProperty getDispatchProperty() {return dispatchProperty;}}
Invoker构建与使用
public static final List<Filter> PUBLISH_FILTERS = new ArrayList<Filter>() {{add(new ValidationFilter());add(new PublishMetricFilter());add(new PublishExceptionFilter());add(new PublishTraceFilter());add(new PublishCompressFilter());}
};FilterChain publishChain = new FilterChain();
for (Filter filter : ClientFilterConstants.PUBLISH_FILTERS) {publishChain.addFilter(filter);
}Invoker invoker = publishChain.buildInvokerChain(new PublishInvoker(publishService, this.property));Invocation invocation = singleDetailDTOList.size() == 1 ?new SingleInvocation(singleDetailDTOList.get(0), propertyDTO) :new MultiInvocation(singleDetailDTOList, propertyDTO);Result<MultiResult<SingleDetailDTO>> result = invoker.invoke(invocation);