分布式微服务架构日志调用链路跟踪-traceId

分布式微服务架构日志调用链路跟踪-traceId

在ELK日志集成平台里(日志的写入,收集,跟踪,搜索,分析)

背景知识

在xxx(博主之前的公司),每个前端请求里面,都会在request的header区携带一个traceId 随机数值,用来跟踪在后端的调用链路栈打印.通过ES收集的日志数据,在ELK日志集成平台里,用traceId就能得到用户请求的完整链路,排查问题的效率非常高.那么具体是怎么设计的?
同样的,每个请求的用户会话信息该怎么传递?
traceId,UserId,facilityId,tenantId.

遇到的场景

有几个难题需要思考: 1.客户端调用方法中,遇到了rpc该怎么传递traceId? 2.遇到了feign接口该怎么传递traceId? 3.遇到了mq该怎么传递tranceId? 4.遇到了线程池里的新线程该怎么传递tranceId? 下面针对各个场景进行逐一的分析

xxx的实现方案:

在gateway中对sid进行校验和初步存储 gateway-GlobalFilter-SIDCheckedFilter.java gateway的原理:一系列的router和filter集合.

常见:

  • IP白名单和黑名单控制
  • SID检测
  • 请求签名检测,防止恶意构建请求
  • 用户会话检测,防止非系统用户请求
  • private static String getSID(ServerHttpRequest request) {String sid = request.getHeaders().getFirst(Constants.HEADER_SID);// 将sid放到日志的变量中MDC.put(Constants.HEADER_SID,sid);return sid;
    }
    @Override
    public Mono<Void> execute(ServerHttpRequest request, ServerWebExchange exchange, GatewayFilterChain chain) {String sid = getSID(request);if(logger.isInfoEnabled()) {String url = StringUtils.defaultIfBlank(request.getPath().value(), "");String ip = IpUtils.getIp(request);MDC.put(MDC_SID,sid);MDC.put(MDC_URI,url);MDC.put(MDC_IP,ip);MDC.put(MDC_USER,null);logger.info("网关请求..");}if (validSid) {if (StringUtils.isBlank(sid)) {logger.info("{} SID为空", this.getRequestUri(request));// 构造返回错误信息ApiResponse<String> responseMap = ApiResponse.failResp(RequestCode.SID_ISNULL);return returnError(exchange, HttpStatus.BAD_REQUEST, responseMap);}// 解决高并发状态的多次重复请求问题String redisSidKey = GatewayRedisKeyConstants.GATEWAY_SID_KEY_PREFIX + sid;boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(redisSidKey, GATEWAY_SID_KEY_DEF_VAL, GATEWAY_SID_KEY_EXPIRE_TIME, TimeUnit.MINUTES);if (!isSuccess) {logger.info("SID已存在,重复请求:{}", sid);ApiResponse<String> responseMap = ApiResponse.failResp(RequestCode.SID_REPEATED_REQUESTS);return returnError(exchange, HttpStatus.BAD_REQUEST, responseMap);}}exchange = afterValidHandler(exchange);//验证过后处理器return chain.filter(exchange);
    }/*** 验证过后处理器** @param exchange* @return*/
    @Override
    public ServerWebExchange afterValidHandler(ServerWebExchange exchange) {ServerHttpRequest request = exchange.getRequest();String sid = request.getHeaders().getFirst(Constants.HEADER_SID);//不要验证SID的情况下,如果SI为空,默认生成一个SID,供下游系统使用if (StringUtils.isBlank(sid)) {//生成sidString gatewaySid = "gateway_" + DateTimeUtils.format(LocalDateTime.now(), DateTimeUtils.FORMAT_1) + RandomUtil.randomNumbers(6);//构造新的ServerHttpRequestServerHttpRequest.Builder builder = request.mutate()//往header中添加网关生成的SID.header(com.ess.framework.commons.constant.Constants.HEADER_SID, gatewaySid);// 将sid放到日志的变量中MDC.put(Constants.HEADER_SID,gatewaySid);exchange = exchange.mutate().request(builder.build()).build();}return exchange;
    }
    

    定义个framework-boot的依赖包

    这个依赖包内使用interceptor拦截器对web请求做拦截,拿到traceId,通过threadLocal对象放入一个Context对象中.每个请求对应一个ThreadLocal对象. 每个服务的全局拦截器
    @RefreshScope
    public abstract class BootWebConfigurer implements WebMvcConfigurer {protected final Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate RequestMappingHandlerAdapter requestMappingHandlerAdapter;@Bean@RefreshScopepublic GatewayInterceptor getGatewayInterceptor() {return new GatewayInterceptor();}@Bean@RefreshScopepublic SIDInterceptor getSIDInterceptor() {return new SIDInterceptor();}@Bean@RefreshScopepublic InnerInterceptor getInnerInterceptor() {return new InnerInterceptor();}@Beanpublic TokenInterceptor getTokenInterceptor() {return new TokenInterceptor();}@Beanpublic LogMdcInterceptor getLogMdcInterceptor() {return new LogMdcInterceptor();}@Beanpublic UnionInterceptor getUnionInterceptor() {return new UnionInterceptor();}@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(getSIDInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE);registry.addInterceptor(getGatewayInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE);registry.addInterceptor(getInnerInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE);registry.addInterceptor(getTokenInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE);registry.addInterceptor(getLogMdcInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE);// 添加联盟拦截器registry.addInterceptor(getUnionInterceptor()).addPathPatterns("/**").excludePathPatterns(ResouceConstants.EXCLUDE_STATIC_RESOURCE);}/*** 初始化SIDReturnValueHandler  对所有的响应返回SID字段*/@PostConstructpublic void initSidHandler() {final List<HandlerMethodReturnValueHandler> originalHandlers = new ArrayList<>(requestMappingHandlerAdapter.getReturnValueHandlers());final int deferredPos = obtainValueHandlerPosition(originalHandlers, DeferredResultMethodReturnValueHandler.class);SIDReturnValueHandler decorator = null;for (HandlerMethodReturnValueHandler handler : originalHandlers) {if (handler instanceof RequestResponseBodyMethodProcessor) {decorator = new SIDReturnValueHandler((RequestResponseBodyMethodProcessor) handler);break;}}originalHandlers.add(deferredPos + 1, decorator);requestMappingHandlerAdapter.setReturnValueHandlers(originalHandlers);}private int obtainValueHandlerPosition(final List<HandlerMethodReturnValueHandler> originalHandlers, Class<?> handlerClass) {for (int i = 0; i < originalHandlers.size(); i++) {final HandlerMethodReturnValueHandler valueHandler = originalHandlers.get(i);if (handlerClass.isAssignableFrom(valueHandler.getClass())) {return i;}}return -1;}@Overridepublic void addResourceHandlers(ResourceHandlerRegistry registry) {registry.addResourceHandler("/");registry.addResourceHandler("/favicon.ico").addResourceLocations("classpath:/favicon.ico");registry.addResourceHandler("app.html").addResourceLocations("classpath:/");registry.addResourceHandler("/error");// Swagger2配置registry.addResourceHandler("/v2/**");registry.addResourceHandler("/swagger-resources/**");registry.addResourceHandler("/csrf");registry.addResourceHandler("/webjars/**").addResourceLocations("classpath:/META-INF/resources/webjars/");registry.addResourceHandler("/swagger-ui.html").addResourceLocations("classpath:/META-INF/resources/");registry.addResourceHandler("/doc.html").addResourceLocations("classpath:/META-INF/resources/");}/*** 自定义扩展消息转换器** @param converters*/@Overridepublic void extendMessageConverters(List<HttpMessageConverter<?>> converters) {//定义fastjson转换消息对象FastJsonHttpMessageConverter fastJsonConverter = new FastJsonHttpMessageConverter();//添加fastjson全局配置FastJsonConfig fastJsonConfig = new FastJsonConfig();fastJsonConfig.setSerializerFeatures(// 排序配置SerializerFeature.SortField,SerializerFeature.MapSortField,// 避免对象重复引用SerializerFeature.DisableCircularReferenceDetect,// 格式化输出SerializerFeature.PrettyFormat);fastJsonConfig.setCharset(Charset.forName("UTF-8"));fastJsonConfig.setDateFormat("yyyy-MM-dd HH:mm:ss");//默认json会对属性里的json字符串值进行排序,加了这个Feature.OrderedField则会禁止排序fastJsonConfig.setFeatures(Feature.OrderedField);fastJsonConverter.setFastJsonConfig(fastJsonConfig);//添加支持的MediaType类型fastJsonConverter.setSupportedMediaTypes(Lists.newArrayList(MediaType.APPLICATION_JSON));converters.set(0, fastJsonConverter);//BigDecimal格式化SerializeConfig serializeConfig = SerializeConfig.getGlobalInstance();serializeConfig.put(BigDecimal.class, BigDecimalConfigure.instance);fastJsonConfig.setSerializeConfig(serializeConfig);//字节数组消息转换器(供文件下载使用)converters.add(new ByteArrayHttpMessageConverter());Map<String, HttpMessageConverter<?>> convertersMap = new LinkedHashMap<>();//转换器去重,并设置所有转换器的默认编码for (HttpMessageConverter converter : converters) {String name = converter.getClass().getSimpleName();if (converter instanceof StringHttpMessageConverter) {//设置StringHttpMessageConverter的默认编码为:UTF-8((StringHttpMessageConverter) converter).setDefaultCharset(Charset.forName("UTF-8"));}if (!convertersMap.containsKey(name)) {convertersMap.put(name, converter);}}converters.clear();converters.addAll(Lists.newArrayList(convertersMap.values()));}@Beanpublic Converter<String, LocalDate> localDateConverter() {return new Converter<String, LocalDate>() {@Overridepublic LocalDate convert(String source) {return LocalDate.parse(source, Constants.DATE_FORMATTER);}};}@Beanpublic Converter<String, LocalDateTime> localDateTimeConverter() {return new Converter<String, LocalDateTime>() {@Overridepublic LocalDateTime convert(String source) {return LocalDateTime.parse(source, Constants.DATE_TIME_FORMATTER);}};}/*** Java常用时间类型序列化和反序列化* LocalDateTime、LocalDate、LocalTime、java.util.Date、java.sql.Date、Calendar、Timestamp* 解决通过Feign调用时,客户端是以上类型会报错问题。*/@Beanpublic ObjectMapper objectMapper() {ObjectMapper objectMapper = new ObjectMapper();JavaTimeModule javaTimeModule = new JavaTimeModule();//定义序列化日期时间类型转换器javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeUtils.DEFAULT_FORMATTER));javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeUtils.YEAR_MONTH_DAY_FORMATTER));javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeUtils.HOUR_MINUTE_SECOND_FORMATTER));javaTimeModule.addSerializer(Date.class, new DateSerializer(true, new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT)));javaTimeModule.addSerializer(java.sql.Date.class, new SqlDateSerializer().withFormat(true, new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT)));javaTimeModule.addSerializer(Calendar.class, new CalendarSerializer(true, new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT)));//定义反序列化日期时间类型转换器javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeUtils.DEFAULT_FORMATTER));javaTimeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeUtils.YEAR_MONTH_DAY_FORMATTER));javaTimeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeUtils.HOUR_MINUTE_SECOND_FORMATTER));javaTimeModule.addDeserializer(Date.class, new DateDeserializers.DateDeserializer(DateDeserializers.DateDeserializer.instance,new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT), DateTimeUtils.DEFAULT_FORMAT));javaTimeModule.addDeserializer(java.sql.Date.class, new DateDeserializers.SqlDateDeserializer(new DateDeserializers.SqlDateDeserializer(),new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT), DateTimeUtils.DEFAULT_FORMAT));javaTimeModule.addDeserializer(Calendar.class, new DateDeserializers.CalendarDeserializer(new DateDeserializers.CalendarDeserializer(),new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT), DateTimeUtils.DEFAULT_FORMAT));javaTimeModule.addDeserializer(Timestamp.class, new DateDeserializers.TimestampDeserializer(new DateDeserializers.TimestampDeserializer(),new SimpleDateFormat(DateTimeUtils.DEFAULT_FORMAT), DateTimeUtils.DEFAULT_FORMAT));objectMapper.registerModule(javaTimeModule);objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);return objectMapper;}
    }

    sid拦截器

    
    package com.ess.framework.boot.interceptor;import com.ess.framework.commons.constant.Constants;
    import com.ess.framework.commons.response.RequestCode;
    import com.ess.framework.commons.utils.EssContextHolder;
    import com.ess.framework.commons.utils.ExceptionUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.context.config.annotation.RefreshScope;
    import org.springframework.web.servlet.HandlerInterceptor;import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;/*** SID 拦截器 ,验证SID是否为空*/
    @RefreshScope
    public class SIDInterceptor implements HandlerInterceptor {protected final Logger logger = LoggerFactory.getLogger(this.getClass());/*** 是否验证SID*/@Value("${valid.sid:false}")private boolean validSid = false;@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {String sid = request.getHeader(Constants.HEADER_SID);// 验证sid是否为空if (validSid && StringUtils.isBlank(sid)) {logger.warn("{} {}", request.getRequestURI(), RequestCode.SID_ISNULL.message());ExceptionUtils.throwBusiness(RequestCode.SID_ISNULL);}// 设置SID到线程变量 这是传输的关键if (StringUtils.isNotBlank(sid)) {EssContextHolder.setSID(sid);} else {EssContextHolder.setSID(null);}return true;}
    }
    

    关于feign接口的传递tranceId

    
    package com.ess.framework.boot.interceptor;import com.ess.framework.commons.constant.Constants;
    import com.ess.framework.commons.utils.EssContextHolder;
    import com.ess.framework.commons.utils.IpUtils;
    import feign.RequestInterceptor;
    import feign.RequestTemplate;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.MDC;
    import org.springframework.web.context.request.RequestContextHolder;
    import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;/*** @ClassName FeignRequestInterceptor* @Description feign拦截器* @Date 2021/5/27 21:56* @Version*/
    @Slf4j
    public class FeignRequestInterceptor implements RequestInterceptor {private final static String HEADER_ESS_FEIGN_IP = "ess-feign-ip";private final static String HEADER_IP = "ip";@Overridepublic void apply(RequestTemplate requestTemplate) {try {String ip = null;String sid = null;String token = null;String unionId = null;/* ExecuteTaskUtils 线程池处理方案**/String threadName = Thread.currentThread().getName();if (StringUtils.startsWith(threadName, "ess-task-pool") || StringUtils.startsWith(threadName, "ess-async-pool")) {sid = MDC.get(Constants.HEADER_SID);ip = MDC.get(HEADER_IP);unionId  = EssContextHolder.getUnionId();token = EssContextHolder.getToken();} else {ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();if (attributes == null) {return;}HttpServletRequest request = attributes.getRequest();token = request.getHeader(Constants.HEADER_TOKEN);sid = request.getHeader(Constants.HEADER_SID);ip = IpUtils.getIPAddress(request);unionId  = request.getHeader(Constants.HEADER_UNIONID);}requestTemplate.header(Constants.HEADER_TOKEN, token);requestTemplate.header(Constants.HEADER_SID, sid);requestTemplate.header(Constants.HEADER_UNIONID, unionId);requestTemplate.header(HEADER_ESS_FEIGN_IP, ip);// feign调用都设置网关标识设置为true,防止被拦截requestTemplate.header(Constants.PASS_GATEWAY, "true");} catch (Exception e) {log.error("拦截器异常", e);}}
    }

    由此可见,feign请求的本质就是requestTemplate,而原理上,就是把sid重新塞到request的header里面.

    线程池sid的传递

    那么怎么传递到另外的线程里呢? ==答案就是threadLocal作为管道存储对象.
    
    package com.ess.framework.commons.utils;/*** 线程变量上下文*/
    public class EssContextHolder {private EssContextHolder() {}/*** sid*/private final static ThreadLocal<String> SID = new ThreadLocal<>();/*** token*/private final static ThreadLocal<String> TOKEN = new ThreadLocal<>();/*** 联盟code*/private final static ThreadLocal<String> UNION_CODE = new ThreadLocal<>();/*** 联盟unionId*/private final static ThreadLocal<String> UNION_ID = new ThreadLocal<>();/*** 设置SID** @param sid*/public static void setSID(String sid) {EssContextHolder.SID.set(sid);}/*** 获取SID*/public static String getSID() {return EssContextHolder.SID.get();}/*** 设置TOKEN** @param token*/public static void setToken(String token) {EssContextHolder.TOKEN.set(token);}/*** 获取TOKEN*/public static String getToken() {return EssContextHolder.TOKEN.get();}/*** 设置unionCode*/public static void setUnionCode(String unionCode) {EssContextHolder.UNION_CODE.set(unionCode);}/*** 获取unionCode*/public static String getUnionCode() {return EssContextHolder.UNION_CODE.get();}/*** 设置unionId*/public static void setUnionId(String unionId) {EssContextHolder.UNION_ID.set(unionId);}/*** 获取联盟unionId*/public static String getUnionId() {return EssContextHolder.UNION_ID.get();}
    }

    重载线程池实现细节,这样springboot在使用自定义线程池时,就会初始化个性化的实现细节,把sid等会话状态传递到线程的ThreadLocal里.

    package com.ess.framework.boot.asynctask;import com.ess.framework.commons.utils.EssContextHolder;
    import org.slf4j.MDC;import java.util.concurrent.*;/*** @ClassName ttt* @Description TODO* @Author shengfq* @Date 2021/5/28 0028 上午 10:53* @Version*/
    public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {public ThreadPoolExecutorMdcWrapper(AsyncTaskThreadPoolConfig config,ThreadFactory threadFactory,RejectedExecutionHandler handler ) {this(config.getCorePoolSize(), config.getMaxPoolSize(), config.getKeepAliveSecond(), TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(config.getQueueCapacity()),threadFactory,handler);}public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);}public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);}public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}@Overridepublic void execute(Runnable task) {String sid =   EssContextHolder.getSID();String token = EssContextHolder.getToken();String unionId = EssContextHolder.getUnionId();super.execute(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap()));}@Overridepublic <T> Future<T> submit(Runnable task, T result) {String sid =   EssContextHolder.getSID();String token = EssContextHolder.getToken();String unionId = EssContextHolder.getUnionId();return super.submit(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap()), result);}@Overridepublic <T> Future<T> submit(Callable<T> task) {String sid =   EssContextHolder.getSID();String token = EssContextHolder.getToken();String unionId = EssContextHolder.getUnionId();return super.submit(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap()));}@Overridepublic Future<?> submit(Runnable task) {String sid =   EssContextHolder.getSID();String token = EssContextHolder.getToken();String unionId = EssContextHolder.getUnionId();return super.submit(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap()));}
    }

    mq消息的sid传递

    如何传递给mq的消费者呢?实际上有了EssContextHolder对象,在Thread里获取sid很方便,如何通过mq传递sid,可以把sid放入message的header区,在消费端取出来,存入消费端线程的ThreadLocal对象中,就能起到传递的作用.
    消息发送

    消息接收

    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.Map;
    import java.util.function.Consumer;import org.hzero.core.exception.OptimisticLockException;
    import org.hzero.core.message.MessageAccessor;
    import org.springframework.amqp.core.Message;import com.rabbitmq.client.Channel;
    import com..mom.me.common.constants.MeBaseConstants;
    import com..mom.me.common.mq.MessageHandler;import cn.hutool.core.util.ObjectUtil;
    import cn.hutool.json.JSONObject;
    import cn.hutool.json.JSONUtil;import io.choerodon.core.exception.CommonException;
    import io.choerodon.core.oauth.CustomUserDetails;
    import io.choerodon.core.oauth.DetailsHelper;import lombok.extern.slf4j.Slf4j;/*** <p>* 消息处理类* </p>*/
    @Slf4j
    public abstract class AbstractMessageHandlers implements MessageHandler {private AbstractMessageHandlers() {}public static void handleMessage(Message msg, Map<String, Object> headers, Channel channel, Consumer<String> fn) {String consumerQueue = msg.getMessageProperties().getConsumerQueue();try {log.trace("消息队列[{}]处理, headers={}", consumerQueue, headers);fn.accept(new String(msg.getBody(), StandardCharsets.UTF_8));} catch (CommonException e) {log.error("消息队列[{}], 消息处理失败业务异常->{}", consumerQueue,MessageAccessor.getMessage(e.getCode(), e.getParameters(), e.getMessage()).desc());} catch (IllegalArgumentException e) {log.error("消息队列[{}], 消息处理失败业务异常: {}", consumerQueue, e.getMessage());} catch (OptimisticLockException e) {// 如果乐观锁异常 则触发重试log.warn("消息队列[{}], 数据处理乐观锁异常触发重试", consumerQueue);throw e;} catch (Exception e) {log.error("消息队列[{}], 消息处理失败系统异常", consumerQueue, e);}}/*** 携带用户会话信息 -> 消息头获取用户会话信息* * 经AbstractMessageSender 发送MQ消息会在消息头附加用户会话信息* * JSON示例如下* * @param msg* @param headers* @param channel* @param fn*/public static void handleWithUser(Message msg, Map<String, Object> headers, Channel channel, Consumer<String> fn) {try {// 生产执行发出的mq消息头带有用户会话信息 切勿用于其他系统、模块的mq消息处理Object userDetails = headers.get(Constants.HEADER_SID);if (ObjectUtil.isNull(userDetails)) {throw new CommonException("MQ消息头获取用户会话信息失败!!!");}JSONObject jsonObject = JSONUtil.parseObj(userDetails);log.trace("设置用户会话信息,传入JSONObject信息: {}", jsonObject);CustomUserDetails details = new CustomUserDetails(jsonObject.getStr("username"), MeBaseConstants.DEFAULT);details.setTenantId(jsonObject.getLong("tenantId"));details.setUserId(jsonObject.getLong("userId"));details.setOrganizationId(jsonObject.getLong("organizationId"));details.setRealName(jsonObject.getStr("realName"));details.setLanguage(jsonObject.getStr("language"));details.setAdditionInfo(jsonObject.getJSONObject("additionInfo"));DetailsHelper.setCustomUserDetails(details);// 设置用户会话异常} catch (Exception e) {log.error("消息队列[{}], 消息处理失败", msg.getMessageProperties().getConsumerQueue(), e);return;}handleMessage(msg, headers, channel, fn);}public static void handleWithNack(Message msg, Map<String, Object> headers, Channel channel, Consumer<String> fn) {String consumerQueue = msg.getMessageProperties().getConsumerQueue();try {log.trace("消息队列[{}]处理, headers={}", consumerQueue, headers);fn.accept(new String(msg.getBody(), StandardCharsets.UTF_8));ack(msg, channel);} catch (CommonException e) {log.error("消息队列[{}], 消息处理失败业务异常->{}", consumerQueue,MessageAccessor.getMessage(e.getCode(), e.getParameters(), e.getMessage()).desc());nack(msg, channel);} catch (IllegalArgumentException e) {log.error("消息队列[{}], 消息处理失败业务异常: {}", consumerQueue, e.getMessage());nack(msg, channel);} catch (Exception e) {log.error("消息队列[{}], 消息处理失败系统异常:{}", consumerQueue, e);nack(msg, channel);}}public static void handleWithUserThrowEx(Message msg, Map<String, Object> headers, Channel channel,Consumer<String> fn) {String consumerQueue = msg.getMessageProperties().getConsumerQueue();try {// 生产执行发出的mq消息头带有用户会话信息 切勿用于其他系统、模块的mq消息处理Object userDetails = headers.get(MeBaseConstants.USER_DETAILS_KEY);if (ObjectUtil.isNull(userDetails)) {throw new CommonException("MQ消息头获取用户会话信息失败!!!");}JSONObject jsonObject = JSONUtil.parseObj(userDetails);log.trace("设置用户会话信息,传入JSONObject信息: {}", jsonObject);CustomUserDetails details = new CustomUserDetails(jsonObject.getStr("username"), MeBaseConstants.DEFAULT);details.setTenantId(jsonObject.getLong("tenantId"));details.setUserId(jsonObject.getLong("userId"));details.setOrganizationId(jsonObject.getLong("organizationId"));details.setRealName(jsonObject.getStr("realName"));details.setLanguage(jsonObject.getStr("language"));details.setAdditionInfo(jsonObject.getJSONObject("additionInfo"));DetailsHelper.setCustomUserDetails(details);log.trace("消息队列[{}]处理, headers={}", consumerQueue, headers);fn.accept(new String(msg.getBody(), StandardCharsets.UTF_8));} catch (CommonException e) {log.error("消息队列[{}], 消息处理失败业务异常->{}, 详细信息:", consumerQueue,MessageAccessor.getMessage(e.getCode(), e.getParameters(), e.getMessage()).desc(), e);throw e;} catch (Exception e) {log.error("消息队列[{}], 消息处理失败系统异常:{}", consumerQueue, e);throw e;}}private static void ack(Message msg, Channel channel) {try {long deliverTag = msg.getMessageProperties().getDeliveryTag();channel.basicAck(deliverTag, false);} catch (IOException ioe) {log.error("消息处理ack异常{}", ioe.getMessage(), ioe);}}private static void nack(Message msg, Channel channel) {try {long deliverTag = msg.getMessageProperties().getDeliveryTag();channel.basicNack(deliverTag, false, true);} catch (IOException ioe) {log.error("消息处理nack异常{}", ioe.getMessage(), ioe);}}}

    总结

    综上所述,sid的设置,拦截,传递,存储都有对应的机制.那么在日志系统中的存储则是通过org.slf4j.MDC;这个对象来实现的.

    这个对象的实现机制不在本文中展开细节,后续日志的入库,检索,都是通过slf4j这个日志框架和ELK日志系统进行.本文着重讲解了在应用服务中一个跟踪请求的链路id是如何传递,存储的.

    属于抛砖引玉的内容,内容较为主观,希望能对自己和其他开发者在分布式日志调用链路上有点启发.谢谢

    参考

    SpringBoot + MDC 实现全链路调用日志跟踪
    微服务-网关Spring Gateway进阶-日志跟踪唯一ID
    日志全链路追踪之MDC
    秒杀 : 做一个完善的全链路日志实现方案有多简单

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/224991.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

idea__SpringBoot微服务12——整合Mybatis框架(新依赖)(新注解)

整合Mybatis框架 完整项目地址&#xff1a;一、创建一个新的项目&#xff0c;导入mybatis依赖&#xff0c;lombok依赖。二、idea内置数据库管理工具连接数据库。三、编写实体类。四、编写Mapper接口。&#xff08;新注解&#xff09;五、编写Mapper.XML。六、编写数据库连接文件…

OpenCV中的格式转换

目录 1. 前言2. 采集到播放数据流的走向3. OpenCV中的格式转换3.1 RGB3.2 YUV3.2.1 YUV420{P}3.2.2 YUV420SP 4 简单应用5. 关联文章 1. 前言 实际音视频开发过程芯片是做了硬件加速的&#xff0c;主要涉及到视频编解码。二次开发过程中需要对SDK做一定的封装使用才行。 写这…

JMeter安装RabbitMQ测试插件

整体流程如下&#xff1a;先下载AMQP插件源码&#xff0c;可以通过antivy在本地编译成jar包&#xff0c;再将jar包导入JMeter目录下&#xff0c;重启JMeter生效。 Apache Ant 是一个基于 Java 的构建工具。Ant 可用于自动化构建和部署 Java 应用程序&#xff0c;使开发人员更轻…

【算法】动态规划(dp问题),持续更新

文章目录 0. 动态规划&#x1f3af;五个思考步骤 和 注意事项&#x1f3af;技巧优化思路 1. 子数组系列1.1 乘积为正数的最长子数组长度1.2 等差数列划分1.3 最长湍流子数组1.4 单词拆分1.5 环绕字符串中的子字符串 0. 动态规划 介绍本篇之前&#xff0c;我想先用人话叙述一般…

MYSQl基础操作命令合集与详解

MySQL入门 先来个总结 SQL语言分类 DDL&#xff08;Data Definition Language&#xff09; - 数据定义语言: 用于定义和管理数据库结构&#xff0c;包括创建、修改和删除数据库对象。 示例&#xff1a;CREATE, ALTER, DROP等语句。 DML&#xff08;Data Manipulation Lan…

常见箱包五金ERP有哪些?箱包五金ERP哪个好用

不同的箱包五金有不同的营销渠道和经营模式&#xff0c;而每一个营销渠道的商品信息维护流程和方式也不尽相同。另外&#xff0c;箱包五金价格制定、品质检验、产品种类、物料编码、批号追踪等环节的管理也比较繁琐。 近些年数字科技也被广泛的应用于箱包五金领域&#xff0c;…

Module ‘app‘: platform ‘android-33‘ not found.

目录 一、报错信息 二、解决方法 一、报错信息 Module app: platform android-33 not found. 检查你的应用程序的build.gradle文件中的targetSdkVersion和compileSdkVersion是否正确设置为已安装的Android SDK版本。 确保你的Android Studio已正确安装并配置了所需的Android …

spring-cloud-starter-gateway-mvc的网关实现

一 概括 最近&#xff0c;我也一直在使用SpringCloudGateway开发我们自己的网关产品。根据我对这份正式文件的理解&#xff0c;内容如下&#xff1a; SpringCloudGateway的默认底层依赖项是SpringWebflux。我们知道Spring Webflux是异步和响应式编程&#xff0c;并且编程范式…

TCP为什么可靠之“拥塞控制”

拥塞控制是对网络层面的控制&#xff0c;主要是为了避免发送方发送过多的数据导致网络阻塞&#xff0c;以及出现网络阻塞时能够调整数据发送速率&#xff0c;达到对网络阻塞的一个控制。 拥塞窗口 拥塞窗口cwnd&#xff0c;是发送方维护的一个状态变量&#xff0c;会根据网络…

Windows 11上边两个空格导致我多熬了1个多小时

将图中的文件路径复制&#xff0c;然后到文件管理器里边去搜索。 发现找不到&#xff0c;可是明明就在这里啊。 我百思不得其解&#xff0c;还以为是IDEA出了问题&#xff0c;我只能是重新启动项目&#xff0c;结果还是告诉我找不到文件。 要是同一个目录下已经有一个名为a…

什么是CORS?如何在PHP中处理CORS问题?

CORS&#xff08;Cross-Origin Resource Sharing&#xff09;是一种机制&#xff0c;它使用额外的 HTTP 头来告诉浏览器是否允许在 Web 页面上访问来自不同域的资源。在默认情况下&#xff0c;浏览器限制跨域请求&#xff0c;以防止潜在的安全风险。CORS 允许服务器指定哪些源&…

【设计模式--行为型--中介者模式】

设计模式--行为型--中介者模式 中介者模式定义结构案例实现优缺点使用场景 中介者模式 定义 又叫调停模式&#xff0c;定义一个中介角色来封装一系列对象之间的交互&#xff0c;使原有对象之间的耦合松散&#xff0c;且可以独立的改变它们之间的交互。 结构 抽象中介者角色…

React中简单实现路由守卫(主要演示其原理)

路由守卫在后台管理系统两种经典的跳转情况&#xff1a; 如果访问的是登录页面&#xff0c; 并且有token&#xff0c; 跳转到首页 如果访问的不是登录页面&#xff0c;并且没有token&#xff0c; 跳转到登录页 其余的都可以正常放行 下面简单实现React路由守卫功能&…

Python学习之复习MySQL-Day2(DML)

目录 文章声明⭐⭐⭐让我们开始今天的学习吧&#xff01;DML介绍添加数据给指定字段添加数据给全部字段添加数据给指定字段添加多条数据给全部字段添加多条数据 修改数据修改指定条件的记录的数据修改全部记录的数据 删除数据 文章声明⭐⭐⭐ 该文章为我&#xff08;有编程语言…

矩阵理论及其应用邱启荣习题3.5题解

(1) P ( − 1 0 1 − 1 − 1 2 1 1 − 1 ) \begin{pmatrix} -1 & 0&1 \\ -1 & -1&2\\1&1&-1 \end{pmatrix} ​−1−11​0−11​12−1​ ​ A ( 1 0 1 1 1 0 − 1 2 1 ) \begin{pmatrix} 1 & 0&1 \\ 1 & 1&0\\-1&2&1 \end{pmat…

MySQL5.7忘记root密码

1&#xff09;停止mysql服务 systemctl stop mysqld2&#xff09;跳过权限验证启动mysql mysqld --skip-grant-tables --userroot3&#xff09;新开一个shell客户端&#xff0c;直接输入mysql回车就能登陆 [rootlocalhost ~]# mysql Welcome to the MySQL monitor. Commands…

如何培养孩子的自信心

当谈论培养孩子的自信心时&#xff0c;许多家长可能会感到困惑。自信心是一个孩子成长过程中非常重要的品质&#xff0c;它可以帮助孩子在面对挑战时更加勇敢和坚定。那么&#xff0c;如何培养孩子的自信心呢&#xff1f;以下是一些建议&#xff0c;希望能对您有所帮助。 鼓励孩…

git checkout进行更改分支

git clone https://gitee.com/yaleguo1/minit-learning-demo.git下载代码。 cd minit-learning-demo/进入目录里边。 ls -l看一下当前分支的内容。 git checkout geek_chapter02更改分支到geek_chapter02。 ls -l看一下目录里边的内容。

【Rust日报】2023-12-14 Mojo 也要支持生存期

【帖子】学习 Rust 的经历&#xff0c;好坏参半 这篇 Reddit 帖子中&#xff0c;一位用户分享了他学习 Rust 编程语言的经历&#xff0c;并表示他的体验褒贬不一。他提到自己是一名有 15 年经验的开发者&#xff0c;曾使用过多种编程语言&#xff0c;包括 Go、Java、PHP、JavaS…

Sui第八轮资助:七个项目获得资助

今天&#xff0c;Sui基金会宣布本月的资助获得者&#xff0c;他们因构建项目以推动Sui的采用和发展而获得资助。要获得资助&#xff0c;项目必须提交提案&#xff0c;详细说明他们正在构建的内容、预算明细、关键里程碑、团队经验以及对Sui社区的预期贡献。 以下七个项目致力于…