Skywalking Kafka Tracing实现

背景

Skywalking默认场景下,Tracing对于消息队列的发送场景,无法将TraceId传递到下游消费者,但对于微服务场景下,是有大量消息队列的业务场景的,这显然无法满足业务预期。

解决方案

Skywalking的官方社区中,有用户提出了该场景问题,Skywalking在补充工具包中,提供了对Kafka的tracing支持。

skywalking kafka problem

代码实现:

<dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-toolkit-kafka</artifactId><version>${skywalking.version}</version></dependency>

对于该工具包,默认情况下,是针对KafkaTemplate进行trace,即如果使用KafkaTemplate发送消息,代码层面无需做任何改动。

如果没有使用KafkaTemplate的场景,toolkit也提供的了注解的支持:

public class ConsumerThread2 extends Thread {@Overridepublic void run() {Properties consumerProperties = new Properties();//...consumerProperties.put()KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());while (true) {if (pollAndInvoke(consumer)) break;}consumer.close();}@KafkaPollAndInvokeprivate boolean pollAndInvoke(KafkaConsumer<String, String> consumer) {try {Thread.sleep(1000);} catch (InterruptedException e) {}ConsumerRecords<String, String> records = consumer.poll(100);if (!records.isEmpty()) {OkHttpClient client = new OkHttpClient.Builder().build();Request request = new Request.Builder().url("http://localhost:8080/kafka-scenario/case/kafka-thread2-ping").build();Response response = null;try {response = client.newCall(request).execute();} catch (IOException e) {}response.body().close();return true;}return false;}
}

异步线程Tracing

对于Kafka消息的发送,经常会配合异步线程池的场景使用,Tracing的基本原理是基于ThreadLocal进行实现的,那么对于异步场景,是会丢失TraceId,通常的解决方式,是需要手动将主线程的TraceId手动赋值给子线程,但这种方式需要手动代码侵入,并不友好。

幸运的是,Skywalking的toolkit中提供了对于异步线程tracing的支持。

<dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-toolkit-trace</artifactId><version>${skywalking.version}</version>
</dependency>

推荐用法:

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(RunnableWrapper.of(new Runnable() {@Override public void run() {//your code}
}));

或者:

 @TraceCrossThreadpublic static class MyCallable<String> implements Callable<String> {@Overridepublic String call() throws Exception {return null;}}
...ExecutorService executorService = Executors.newFixedThreadPool(1);executorService.submit(new MyCallable());

PS:事实上,RunnableWrapper也是基于@TraceCrossThread实现。

相关文档:
https://skywalking.apache.org/docs/skywalking-java/v8.16.0/en/setup/service-agent/java-agent/application-toolkit-kafka/

https://skyapm.github.io/document-cn-translation-of-skywalking/zh/6.1.0/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.html

https://blog.51cto.com/knifeedge/5268667

https://blog.csdn.net/lijunwyf/article/details/107954543

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/49901.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习入门(三):卷积神经网络(CNN)

引入 给定一张图片&#xff0c;计算机需要模型判断图里的东西是什么&#xff1f; &#xff08;car、truck、airplane、ship、horse&#xff09; 一、卷积神经网络整体架构 CONV&#xff1a;卷积计算层&#xff0c;线性乘积求和RELU&#xff1a;激励层&#xff0c;激活函数P…

Flutter性能揭秘之RepaintBoundary

作者&#xff1a;xuyisheng Flutter会在屏幕上绘制Widget。如果一个Widget的内容需要更新&#xff0c;那就只能重绘了。尽管如此&#xff0c;Flutter同样会重新绘制一些Widget&#xff0c;而这些Widget的内容仍有部分未被改变。这可能会影响应用程序的执行性能&#xff0c;有时…

Flume、Logstash、Filebeat对比

Flume、Logstash和Filebeat是三种常用的数据采集工具,用于收集、聚合和传输日志和事件数据。它们在功能、特性和适用场景上有一些区别。以下是对它们的简要对比: Apache Flume: 用途:主要用于大规模数据采集、传输和聚合,特别适用于将数据送入Hadoop生态系统。特点: 提供…

springboot集成Graphql相关问题汇总

1、idea在debug运行时出现java.lang.NoClassDefFoundError:kotlin/collections/AbstractMutableMap 解决&#xff1a;禁用idea dubugger中kotlin coroutine agent 见&#xff1a;https://stackoverflow.com/questions/70796177/after-the-spring-boot-source-code-is-compile…

Mongodb 删除文档Delete与Remove的区别

db.collection.remove() 此方法已被 mongosh 弃用 已弃用的方法替代方法db.collection.remove()db.collection.deleteOne() db.collection.deleteMany() db.collection.findOneAndDelete() db.collection.bulkWrite() 5.0版本更改。 db.collection.remove(<query>,…

Android studio之 build.gradle配置

在使用Android studio创建项目会出现两个build.gradle&#xff1a; 一. Project项目级别的build.gradle &#xff08;1&#xff09;、buildscript{}闭包里是gradle脚本执行所需依赖&#xff0c;分别是对应的maven库和插件。 闭包下包含&#xff1a; 1、repositories闭包 2、d…

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…

git 查看某个分支是从哪个分支拉出来的

原文链接&#xff1a;https://blog.csdn.net/allanGold/article/details/102478157 git reflog show 分支名git reflog --datelocal | grep 分支名git reflog --datelocal | grep 分支名 $ git reflog --datelocal | grep release3 5c50761 HEAD{Thu Jun 29 12:53:45 2023}: c…

3D旅游情景实训教学展示

随着科技的不断发展&#xff0c;情景实训教学在教育领域中的应用越来越广泛。通过虚拟现实技术&#xff0c;3D视觉技术&#xff0c;计算机技术等为学生提供了一个身临其境的学习环境&#xff0c;让他们能够在模拟的场景中学习和实践&#xff0c;从而更好地理解和掌握知识。 3D虚…

Docker搭建LNMP运行Wordpress平台

一、项目1.1 项目环境1.2 服务器环境1.3 任务需求 二、Linux 系统基础镜像三、Nginx1、建立工作目录2、编写 Dockerfile 脚本3、准备 nginx.conf 配置文件4、生成镜像5、创建自定义网络6、启动镜像容器7、验证 nginx 四、Mysql1、建立工作目录2、编写 Dockerfile3、准备 my.cnf…

红黑树遍历与Redis存储

引言 在计算机科学领域&#xff0c;红黑树&#xff08;Red-Black Tree&#xff09;是一种自平衡的二叉查找树&#xff0c;它能在O(log n)的时间复杂度内完成插入、删除和查找操作。由于其高效性和可预测性的性能&#xff0c;红黑树在许多领域都得到广泛应用。本文将重点介绍红…

Ubuntu系统安装之后首需要做的事情

Ubuntu系统的初步环境搭建 1、换源2、显卡3、浏览器4、输入法5、终端6、ROS7、VSCode8、设置时间与win一致9、 TimeShift10、 Anaconda&#xff08;考虑装不装&#xff09; 1、换源 点开Software&&Update&#xff0c;找到Ubuntu Software中的Download from&#xff0c…

html5提供的FileReader是一种异步文件读取文件中的数据

前言&#xff1a;FileReader是一种异步文件读取机制&#xff0c;结合input:file可以很方便的读取本地文件。 input:file 在介绍FileReader之前&#xff0c;先简单介绍input的file类型。 <input type"file" id"file"> input的file类型会渲染为一个按…

Web3.0

一、Web3.0是什么 Web3.0&#xff08;有时称为“分布式Web”或“去中心化Web”&#xff09;是对互联网的下一代演进的概念。它代表了一种更加分散、去中心化和用户掌控的互联网模式&#xff0c;与传统的Web2.0模型有很大不同。 以下是Web3.0的一些关键特征和概念&#xff1a;…

c++ java rgb与nv21互转

目录 jni函数 c++ rgb转nv21,可以转,不报错,但是转完只有黑白图 java yuv420保存图片,先转nv21,再保存ok: c++ yuv420月bgr互转,测试ok jni函数 JNIEXPORT void JNICALL Java_com_tencent_blazefacencnn_BlazeFaceNcnn_encode(JNIEnv *env,jobject thiz, jobject in…

Qt-事件循环与QtConcurrent、QThread结合使用时注意的点

QEventLoop和QtConcurrent可以结合使用达到主线程ui不阻塞同步执行的效果&#xff0c;但是要小心避坑&#xff0c;查看如下代码&#xff1a; QEventLoop loop; QtConcurrent::run([&]() {doSomething();loop.quit(); }); loop.exec();上述写法存在两个问题&#xff1a; Q…

Linux学习记录——이십오 多线程(2)

文章目录 1、理解原生线程库线程局部存储 2、互斥1、并发代码&#xff08;抢票&#xff09;2、锁3、互斥锁的实现原理 3、线程封装1、线程本体2、封装锁 4、线程安全5、死锁6、线程同步1、条件变量1、接口2、demo代码 1、理解原生线程库 线程库在物理内存中存在&#xff0c;也…

[bug日志]springboot多模块启动,在yml配置启动端口8081,但还是启动了8080

【问题描述】 配置的启动端口是8081&#xff0c;实际启动端口是8080 【解决方法】 1.检查application.yml的配置是否有错误(配置项中&#xff0c;显示白色就错&#xff0c;橙色无措) 2.检查pom.xml的打包方式配置项配置&#xff0c;主pom.xml中的配置项一般为&#xff1a;&l…

【hello git】初识Git

目录 一、简述Git 二、Linux 下 Git 的安装&#xff1a;CentOS 2.1 基本命令 2.2 示例&#xff1a; 三、Linux 下 Git 的安装&#xff1a;ubuntu 3.1 基本命令 3.2 示例&#xff1a; 一、简述Git Git &#xff1a;版本控制器&#xff0c;记录每次的修改以及版本迭代的一个管…

day 37 | ● 1049. 最后一块石头的重量 II ● 494. 目标和 ● 474.一和零

1049. 最后一块石头的重量 II 与前一道分割等和子集的思路差不多&#xff0c;都是01背包问题。因为是采用滚动数组的形式&#xff0c;所以必须要倒序遍历才可以。 dp[i]代表着在i的限制下最大的承重。所以另一半就是all - dp【all / 2】 func lastStoneWeightII(stones []int…