基于trace_id实现SpringCloudGateway网关的链路追踪

之前写的两篇关于基于 trace_id 的链路追踪的文章:

  • 基于trace_id的链路追踪(含Feign、Hystrix、线程池等场景)
  • 基于trace_id的链路追踪(ForkJoinPool场景)

一、引言

在之前的文章中,我们讨论了基于 trace_id 的链路追踪的常见场景。然而,最近我意识到在微服务架构中,我们还缺少对一个非常常见场景的探讨:在网关中如何处理 trace_id,尤其是在 Reactor 异步模式下的处理。因此,我决定记录下这些思考和解决方案。

二、具体场景

在Spring Cloud Gateway网关中,我们需要实现请求访问日志的打印功能,以便更好地排查问题。具体的实现方式包括两个全局过滤器:

  • TraceIdGlobalFilter:实现 trace_id 全局拦截(先执行)。
  • AccessLogGlobalFilter:实现请求访问日志的打印(后执行)。

在正常情况下,这两个过滤器可以打印请求的 request 日志和 response 日志,并且日志中都包含相同的 trace_id。然而,在开发调试过程中,我发现了一种异常情况:request 日志中总能打印出 trace_id,而 response 日志中则有时能打印出 trace_id,有时却不能。这导致了 request 日志和 response 日志无法关联的问题。

三、分析

1. 为什么 response 日志没有打印 trace_id
通过分析日志,我发现打印 response 日志的线程与打印 request 日志的线程并不是同一个线程。基于此,我们可以判断,trace_id 没有传递到打印 response 日志的线程中。

2. 为什么 trace_id 没有传递到打印 response 日志的线程中?
我们知道 Spring Cloud Gateway 是基于 WebFlux Reactor 异步模式实现的,因此一个请求的 request response 可能由不同的线程来执行。在 TraceIdGlobalFilter 中,我们使用了 MDC来传递 trace_id。然而,MDC 在普通的多线程环境中有效,但在 Reactor 异步模式下并不起作用。这是因为 Reactor 异步模式需要通过另外一种方式来传递 trace_id

四、解决方案

在 WebFlux Reactor 异步模式下,我们需要使用 reactor.util.context.Context 来传递 trace_id。核心逻辑如下:
透传 trace_id 通过 Mono.contextWrite(context) 往 context 中设置 trace_id
取出 trace_id 通过 Flux.deferContextual(context) 从 context 中获取 trace_id
具体实现代码示例如下:

// 设置 trace_id 
Mono.contextWrite(context -> context.put("trace_id", traceId));// 获取 trace_id
Flux.deferContextual(context -> {String traceId = context.get("trace_id");// 可将 traceId 设置到MDC中供当前线程使用return Flux.just(traceId);
});

通过这种方式,我们可以确保 trace_id 在整个请求处理链路中都能被正确传递和使用,解决了 request 日志和 response 日志断联的问题。

五、具体代码

TraceIdGlobalFilter

/*** trace_id 全局拦截器*/
@Slf4j
@Component
public class TraceIdGlobalFilter implements GlobalFilter, Ordered {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();String traceId = request.getHeaders().getFirst(TraceConsts.TRACE_ID);// trace_idtraceId = MdcUtil.attachTraceId(traceId);// 将traceId传递给下游微服务String finalTraceId = traceId;Consumer<HttpHeaders> headersConsumer = httpHeaders -> {httpHeaders.set(TraceConsts.TRACE_ID, finalTraceId);};ServerHttpRequest requestNew = exchange.getRequest().mutate().headers(headersConsumer).build();return chain.filter(exchange.mutate().request(requestNew).build()).doFinally(s -> {// 清除MDCMdcUtil.detachTraceId();});}@Overridepublic int getOrder() {return -100;}}

AccessLogGlobalFilter


/*** 请求访问日志 全局拦截器*/
@Slf4j
@Component
public class AccessLogGlobalFilter implements GlobalFilter, Ordered {/*** gateway access log 日志开关* <p>* 特别注意:高并发业务场景下,可以关闭日志来提升性能*/@Value("${com.gateway.access.log.enabled:true}")private boolean logEnabled;private final HandlerStrategies handlerStrategies = HandlerStrategies.withDefaults();@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {StopWatch stopWatch = new StopWatch();stopWatch.start();ServerHttpRequest httpRequest = exchange.getRequest();// 日志开关,直接进入下一个Filterif (!logEnabled) {return chain.filter(exchange).then(Mono.fromRunnable(() -> {stopWatch.stop();// 为了方便排查问题,还是打印一个简单的日志if (log.isDebugEnabled()) {log.debug("请求参数 [{}] [{}] query:{}, time: {} ms", httpRequest.getURI().getPath(), httpRequest.getMethod(), httpRequest.getURI().getRawQuery(), stopWatch.getTotalTimeMillis());}}));}// Request 处理ServerRequest request = ServerRequest.create(exchange, handlerStrategies.messageReaders());// header 参数HttpHeaders httpHeaders = request.headers().asHttpHeaders();// 是否为文件上传,若是文件上传,则不打印bodyboolean isFile = null != httpHeaders.getContentType() && AccessLogUtil.isBinayBodyData(httpHeaders.getContentType().toString());// response 包装ServerHttpResponseDecorator responseDecorator = responseDecoratorAndRecordLog(exchange, stopWatch);if (isFile) {// 打印请求日志this.reqLog(request, isFile, null);// 执行过滤器return chain.filter(exchange.mutate().request(request.exchange().getRequest()).response(responseDecorator).build())// 从最初的Mono本身解析一个值,并将其放入上下文context中,以便下游可以通过上下文context API访问它// webflux reactor 异步模式下:通过 contextWrite 往context中设置trace_id.contextWrite(context -> context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId()));}Mono<String> modifiedBody = request.bodyToMono(String.class).defaultIfEmpty(CommonConsts.NULL).flatMap(body -> {// 打印请求日志this.reqLog(request, isFile, body);return Mono.just(body);});// 通过 BodyInserter 插入 body(支持修改body), 避免 request body 只能获取一次// BodyInserters.fromPublisher 不支持文件上传,所以不能用BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);HttpHeaders headers = new HttpHeaders();headers.putAll(exchange.getRequest().getHeaders());headers.remove(HttpHeaders.CONTENT_LENGTH);CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {// request 包装ServerHttpRequestDecorator requestDecorator = requestDecorator(exchange, headers, outputMessage);// 执行过滤器return chain.filter(exchange.mutate().request(requestDecorator).response(responseDecorator).build())// 从最初的Mono本身解析一个值,并将其放入上下文context中,以便下游可以通过上下文context API访问它// webflux reactor 异步模式下:通过 contextWrite 往context中设置trace_id.contextWrite(context -> context.put(TraceConsts.TRACE_ID, MdcUtil.getTraceId()));}));}@Overridepublic int getOrder() {return -90;}/*** 打印 request log*/private void reqLog(ServerRequest request, boolean isFile, String body) {// URL query 参数String queryString = request.uri().getRawQuery();// header 参数HttpHeaders headers = request.headers().asHttpHeaders();String headersParams = headersToString(headers);if (isFile) {if (log.isInfoEnabled()) {log.info("请求参数 [{}] [{}] query:{}, headers:{}", request.uri().getPath(), request.methodName(), queryString, headersParams);}return;}// request body 长度处理,避免太长,打印耗性能String requestBody = AccessLogUtil.fixFieldAndReplaceWhite(body, AccessLogUtil.DEF_MAX_LEN);if (log.isInfoEnabled()) {log.info("请求参数 [{}] [{}] query:{}, headers:{}, body:{}", request.uri().getPath(), request.methodName(), queryString, headersParams, requestBody);}}/*** 过滤headers,避免打印过多的日志*/private String headersToString(HttpHeaders headers) {Map<String, String> map = new HashMap<String, String>();for (Map.Entry<String, List<String>> entry : headers.entrySet()) {if (RequestParamUtil.containsHeader(entry.getKey())) {map.put(entry.getKey(), entry.getValue().toString());}}return JSON.toJSONString(map);}/*** Request装饰器,重新计算 headers*/private ServerHttpRequestDecorator requestDecorator(ServerWebExchange exchange, HttpHeaders headers,CachedBodyOutputMessage outputMessage) {return new ServerHttpRequestDecorator(exchange.getRequest()) {@Overridepublic HttpHeaders getHeaders() {long contentLength = headers.getContentLength();HttpHeaders httpHeaders = new HttpHeaders();httpHeaders.putAll(super.getHeaders());if (contentLength > 0) {httpHeaders.setContentLength(contentLength);} else {httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");}return httpHeaders;}@Overridepublic Flux<DataBuffer> getBody() {return outputMessage.getBody();}};}/*** Response装饰器,记录响应日志* <p>* 通过 DataBufferFactory 解决响应体分段传输问题。*/private ServerHttpResponseDecorator responseDecoratorAndRecordLog(ServerWebExchange exchange, StopWatch stopWatch) {ServerHttpResponse response = exchange.getResponse();DataBufferFactory bufferFactory = response.bufferFactory();return new ServerHttpResponseDecorator(response) {@Overridepublic Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {stopWatch.stop();if (!(body instanceof Flux)) {return super.writeWith(body);}// 获取响应类型String responseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);if (AccessLogUtil.isBinayBodyData(responseContentType)) {if (log.isInfoEnabled()) {log.info("响应参数: time {} ms", stopWatch.getTotalTimeMillis());}return super.writeWith(body);}// info及以上日志级别才做如下处理if (log.isInfoEnabled()) {Flux<? extends DataBuffer> fluxBody = Flux.from(body).flatMap(dataBuffer -> Flux.deferContextual(context -> {// webflux reactor 异步模式下:通过 deferContextual 取出context中的trace_idMdcUtil.putTraceId(context.get(TraceConsts.TRACE_ID));if (log.isDebugEnabled()) {log.debug("spring cloud gateway webflux reactor 异步模式下,透传trace_id: {}", MdcUtil.getTraceId());}return Flux.just(dataBuffer);})).doFinally(signalType -> {// 清理掉trace_idMdcUtil.removeTraceId();});return super.writeWith(fluxBody.buffer().map(dataBuffers -> {// 合并多个流集合,解决返回体分段传输DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();DataBuffer join = dataBufferFactory.join(dataBuffers);byte[] content = new byte[join.readableByteCount()];join.read(content);// 释放掉内存DataBufferUtils.release(join);String responseBody = new String(content, StandardCharsets.UTF_8);// response body 长度处理,避免太长,打印耗性能responseBody = AccessLogUtil.fixFieldAndReplaceWhite(responseBody, AccessLogUtil.DEF_MAX_LEN);log.info("响应参数: {}, time {} ms", responseBody, stopWatch.getTotalTimeMillis());return bufferFactory.wrap(content);}));}return super.writeWith(body);}};}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/48432.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ue5笔记

1 点光源 聚光源 矩形光源 参数比较好理解 &#xff08;窗口里面&#xff09;环境光混合器&#xff1a;快速创造关于环境光的组件 大气光源&#xff1a;太阳光&#xff0c;定向光源 天空大气&#xff1a;蓝色的天空和大气 高度雾&#xff1a;大气下面的高度感的雾气 体积…

【HarmonyOS】HarmonyOS NEXT学习日记:五、交互与状态管理

【HarmonyOS】HarmonyOS NEXT学习日记&#xff1a;五、交互与状态管理 在之前我们已经学习了页面布局相关的知识&#xff0c;绘制静态页面已经问题不大。那么今天来学习一下如何让页面动起来、并且结合所学完成一个代码实例。 交互 如果是为移动端开发应用&#xff0c;那么交…

自主巡航,目标射击

中国机器人及人工智能大赛 参赛经验&#xff1a; 自主巡航赛道 【机器人和人工智能——自主巡航赛项】动手实践篇-CSDN博客 主要逻辑代码 #!/usr/bin/env python #coding: utf-8import rospy from geometry_msgs.msg import Point import threading import actionlib impor…

SQL Server分布式查询:跨数据库的无缝数据探索

SQL Server分布式查询&#xff1a;跨数据库的无缝数据探索 在当今的企业环境中&#xff0c;数据往往分散在不同的数据库和服务器上。SQL Server的分布式查询功能提供了一种强大的手段&#xff0c;允许用户编写单一的查询来访问和操作分散在不同SQL Server实例中的数据。本文将…

鸿蒙开发 03 封装 @ohos/axios (最新深度封装)

鸿蒙开发 03 封装 ohos/axios &#xff08;最新深度封装&#xff09; 1、安装 ohos/axios2、开始封装2.1 新建 utils 文件夹 和 api 文件夹2.2 在 utils 文件夹里新建 http.ts2.3 在 api 文件夹里新建 api.ets 3、页面调用4、打印结果 1、安装 ohos/axios ohpm install ohos/a…

linux环境交叉编译openssl库,以使Qt支持https

一.前言 Qt若需要支持https&#xff0c;则需要openssl的支撑,并且要注意&#xff0c;Qt不同版本会指定对应的openssl版本库&#xff0c;比方我用的Qt5.15.10他要求用的openssl版本是1.1.1&#xff0c;你就不能用其他版本&#xff0c;不然基本就是失败报错。 如何查看Qt对应ope…

无人机反制技术常见的有哪些?

随着无人机技术的迅速发展和广泛应用&#xff0c;无人机在民用、军事等领域都发挥着重要作用。然而&#xff0c;无人机的滥用和非法入侵也带来了严重的安全隐患。为了维护国家安全和社会稳定&#xff0c;无人机反制技术应运而生。本文将详细介绍无人机反制技术的常见类型&#…

【Git学习 | 第2篇】在IDEA中使用Git

文章目录 在IDEA中使用Git1. IDEA中配置Git2. 获取Git仓库2.1 本地初始化仓库2.2 从远程仓库克隆 3. 本地仓库操作4. 远程仓库操作5. 分支操作 在IDEA中使用Git 1. IDEA中配置Git IDEA中使用Git&#xff0c;本质上使用的本地安装的Git软件配置步骤&#xff1a; 2. 获取Git仓库…

Unity UGUI 之 RectTransform

本文仅作学习笔记与交流&#xff0c;不作任何商业用途 本文包括但不限于unity官方手册&#xff0c;唐老狮&#xff0c;麦扣教程知识&#xff0c;引用会标记&#xff0c;如有不足还请斧正 Unity - Manual: Rect Transform 1.Rect Transform是什么 2.轴心与锚点的映射关系 首先…

【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【29】Sentinel

持续学习&持续更新中… 守破离 【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【29】Sentinel 简介熔断降级什么是熔断什么是降级相同点不同点 整合Sentinel自定义sentinel流控返回数据使用Sentinel来保护feign远程调用自定义资源给网关整合Sentinel参考 简介 熔断降…

分层评估的艺术:sklearn中的策略与实践

分层评估的艺术&#xff1a;sklearn中的策略与实践 在机器学习中&#xff0c;评估模型性能是一个至关重要的步骤。然而&#xff0c;对于不平衡的数据集&#xff0c;传统的评估方法可能会产生误导性的结果。分层评估&#xff08;Stratified Evaluation&#xff09;是一种确保评…

阿里开源的音频模型_原理与实操

英文名称: FunAudioLLM: Voice Understanding and Generation Foundation Models for Natural Interaction Between Humans and LLMs 中文名称: FunAudioLLM: 人与LLMs之间自然互动的语音理解和生成基础模型 论文地址: http://arxiv.org/abs/2407.04051v3 相关论文&#xff1a;…

人话讲下如何用github actions编译flutter应用-以编译windows为例

actions的脚本看下这个&#xff0c;有简单的说明&#xff0c;有关于编译个平台的脚本&#xff1a; https://github.com/marketplace/actions/flutter-action 打开你要编译的项目点击那个Actions按钮 然后随便点击一个脚本会跳到白框编辑界面 打开上文提到的网址随便抄下就ok …

达梦数据库(一)mysql2dm

达梦数据库(一)mysql2dm 文章目录 达梦数据库(一)mysql2dm一、安装篇ForWindows二、数据库初始化篇三、数据迁移篇出现的问题找不到对应表或者视图 注意字符集模式迁移出错大小写敏感解决方案 四、 代码修改篇group_concatGROUP BY方法一方法二(最笨)方法补充 多表联查更新参考…

求职学习day7

今天回顾&#xff1a; 广东省税务局事业编考&#xff1a; 睡的比较靠窗&#xff0c;早上6点就醒了。七点多感觉醒的差不多就玩了一下手机&#xff0c;将近八点感觉饿了就去吃早餐准备去华南理工考场。数推&#xff0c;图推&#xff0c;计算题&#xff0c;综合分析有三十几题根…

1.17、基于竞争层的竞争学习(matlab)

1、基于竞争层的竞争学习简介及原理 竞争学习是一种无监督学习方法&#xff0c;其中的竞争层神经元之间互相竞争以学习输入模式的表示。竞争学习的一个经典模型是竞争神经网络&#xff08;Competitive Neural Network&#xff0c;简称CNN&#xff09;&#xff0c;其核心部分是…

android include 和 merge 区别

在 Android 开发中&#xff0c;<include> 和 <merge> 是用来复用布局的两个标签&#xff0c;但它们的用途和行为有所不同。以下是它们的区别以及 Kotlin 代码示例&#xff1a; <include> 标签 <include> 标签允许你在一个布局中嵌入另一个布局文件。…

Linux复习02

一、什么是操作系统 操作系统是一款做软硬件管理的软件&#xff01; 一个好的操作系统&#xff0c;衡量的指标是&#xff1a;稳定、快、安全 操作系统的核心工作&#xff1a; 通过对下管理好软硬件资源的手段&#xff0c;达到对上提供良好的&#xff08;稳定&#xff0c;快…

什么是单例模式,有哪些应用?

目录 一、定义 二、应用场景 三、6种实现方式 1、懒汉式&#xff0c;线程不安全。 2、懒汉式&#xff0c;线程安全 3、双检锁/双重校验锁&#xff08;DCL&#xff0c;即 double-checked locking&#xff09; 4、静态内部类方式-------只适用于静态域 5、饿汉式 6、枚举…

嵌入式C++、STM32、树莓派4B、OpenCV、TensorFlow/Keras深度学习:基于边缘计算的实时异常行为识别

1. 项目概述 随着物联网和人工智能技术的发展,智能家居安全系统越来越受到人们的关注。本项目旨在设计并实现一套基于边缘计算的智能家居安全系统,利用STM32微控制器和树莓派等边缘设备,实时分析摄像头数据,识别异常行为(如入侵、跌倒等),并及时发出警报,提高家庭安全性。 系…