分布式日志治理:Log4j2自定义Appender写日志到RocketMQ

🧑 博主简介:CSDN博客专家历代文学网(PC端可以访问:https://literature.sinhy.com/#/?__c=1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,精通Java编程高并发设计Springboot和微服务,熟悉LinuxESXI虚拟化以及云原生Docker和K8s,热衷于探索科技的边界,并将理论知识转化为实际应用。保持对新技术的好奇心,乐于分享所学,希望通过我的实践经历和见解,启发他人的创新思维。在这里,我希望能与志同道合的朋友交流探讨,共同进步,一起在技术的世界里不断学习成长。
技术合作请加本人wx(注明来自csdn):foreast_sea

在这里插入图片描述

在这里插入图片描述


文章目录

  • Log4j2自定义Appender写日志到RocketMQ
      • 引言:分布式系统下的日志治理新范式——基于Log4j2与RocketMQ的高效实践
      • 1. 添加Maven依赖
      • 2. 实现自定义Appender
      • 3. 配置log4j2.xml
      • 4. 关键点说明
      • 5. 注意事项

Log4j2自定义Appender写日志到RocketMQ

引言:分布式系统下的日志治理新范式——基于Log4j2与RocketMQ的高效实践

在云原生与微服务架构大行其道的今天,日志管理已从简单的本地文件存储演化为支撑系统可观测性的核心支柱。传统日志处理方式在面对日均TB级的日志量、跨地域服务调用链追踪、实时异常检测等场景时,往往陷入存储碎片化、检索效率低下、处理延迟高的困境。尤其在金融交易、物联网、在线教育等高并发领域,日志数据不仅是问题排查的"黑匣子",更是业务洞察的"数据金矿",亟需一种能够兼顾实时性、可靠性和可扩展性的新型日志处理方案。

Apache RocketMQ作为阿里巴巴开源的高性能分布式消息中间件,凭借其毫秒级消息投递、万亿级消息堆积能力和完善的事务机制,为日志数据的异步化处理提供了理想通道。而Log4j2作为Java生态中最主流的日志框架,其插件化架构和异步日志特性,使得开发者能够通过自定义Appender将日志生产与传输逻辑解耦。二者的结合,不仅实现了日志从"被动记录"到"主动流转"的范式升级,更构建起日志采集、传输、存储、分析的全链路解决方案。

本文深入探讨如何基于Log4j2最新架构扩展日志输出能力,通过构建自定义RocketMQAppender实现日志数据的实时投递。该方案突破传统日志文件的物理边界,使日志数据可无缝对接ElasticsearchFlinkSpark等大数据处理平台,为实时监控、安全审计、用户行为分析等场景提供高时效数据源。

本文从Maven依赖配置、Appender线程模型设计、RocketMQ生产者最佳实践等维度展开,详细解析如何在高并发场景下保障日志传输的可靠性与性能平衡,并针对消息压缩、失败重试、资源监控等关键问题给出工程级解决方案。通过此实践,开发者可将日志系统的吞吐量提升1-2个数量级,同时显著降低日志丢失风险,为构建企业级可观测性平台奠定坚实基础。

以下是基于Java Log4j2自定义Appender将日志写入RocketMQ的步骤:

1. 添加Maven依赖

<!-- Log4j2 核心依赖 -->
<dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.23.1</version>
</dependency><!-- RocketMQ客户端 -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.1.4</version>
</dependency>

2. 实现自定义Appender

import org.apache.logging.log4j.core.*;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.*;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
import org.apache.rocketmq.client.apis.producer.SendResult;import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;@Plugin(name = "RocketMQAppender",category = Core.CATEGORY_NAME,elementType = Appender.ELEMENT_TYPE,printObject = true
)
public final class RocketMQAppender extends AbstractAppender {private Producer producer;private final String namesrvAddr;private final String topic;private final String producerGroup;private final int sendTimeout;protected RocketMQAppender(String name, Filter filter, Layout<? extends Serializable> layout,String namesrvAddr, String topic, String producerGroup, int sendTimeout) {super(name, filter, layout, true, Property.EMPTY_ARRAY);this.namesrvAddr = namesrvAddr;this.topic = topic;this.producerGroup = producerGroup;this.sendTimeout = sendTimeout;}@Overridepublic void start() {try {final ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(namesrvAddr);ProducerBuilder producerBuilder = provider.newProducerBuilder().setClientConfiguration(builder.build()).setTopics(topic);if (producerGroup != null) {producerBuilder.setProducerGroup(producerGroup);}producer = producerBuilder.build();} catch (ClientException e) {LOGGER.error("Initialize RocketMQ Producer failed", e);}super.start();}@Overridepublic void append(LogEvent event) {if (producer == null) return;try {byte[] body = getLayout().toByteArray(event);String messageBody = new String(body, StandardCharsets.UTF_8);final ClientServiceProvider provider = ClientServiceProvider.loadService();Message message = provider.newMessageBuilder().setTopic(topic).setBody(body).build();SendResult sendResult = producer.send(message);// 可添加发送结果处理逻辑} catch (Exception e) {LOGGER.error("Send log to RocketMQ failed", e);}}@Overridepublic void stop() {super.stop();if (producer != null) {try {producer.close();} catch (Exception e) {LOGGER.error("Close RocketMQ Producer failed", e);}}}@PluginFactorypublic static RocketMQAppender createAppender(@PluginAttribute("name") String name,@PluginElement("Filter") Filter filter,@PluginElement("Layout") Layout<? extends Serializable> layout,@PluginAttribute("namesrvAddr") String namesrvAddr,@PluginAttribute("topic") String topic,@PluginAttribute(value = "producerGroup", defaultString = "LogProducerGroup") String producerGroup,@PluginAttribute(value = "sendTimeout", defaultInt = 3000) int sendTimeout) {if (name == null) {LOGGER.error("No name provided for RocketMQAppender");return null;}return new RocketMQAppender(name, filter, layout, namesrvAddr, topic, producerGroup, sendTimeout);}
}

3. 配置log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN"><Appenders><RocketMQAppender name="RocketMQ"namesrvAddr="localhost:8081"topic="LOG_TOPIC"producerGroup="LOG_PRODUCER_GROUP"sendTimeout="5000"><PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"/></RocketMQAppender></Appenders><Loggers><Root level="info"><AppenderRef ref="RocketMQ"/></Root></Loggers>
</Configuration>

4. 关键点说明

  1. 线程安全设计

    • RocketMQ Producer是线程安全的,可以复用实例
    • 在start()中初始化,stop()中销毁
  2. 异常处理

    • 在send方法中添加try-catch防止日志记录阻塞主线程
    • 建议添加失败重试机制(示例未展示)
  3. 性能优化建议

    // 可添加批量发送支持
    producer.send(List<Message> messages, SendReceipt sendReceipt);// 或使用异步发送
    CompletableFuture<SendResult> future = producer.sendAsync(message);
    
  4. 扩展功能建议

    • 添加消息Tag支持
    • 支持自定义Key/Value属性
    • 添加消息压缩功能
    • 支持同步/异步发送模式切换

5. 注意事项

  1. 版本兼容性

    • RocketMQ 5.x+ 使用新的客户端API
    • 旧版本(4.x)需要调整客户端实现
  2. 资源管理

    • 确保Producer在JVM关闭时正确关闭
    • 建议添加发送队列积压监控
  3. 安全配置

    // 如果需要认证
    ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(namesrvAddr).setCredentialProvider(new StaticSessionTokenCredentialProvider("accessKey", "secretKey"));
    
  4. 日志格式化

    • 建议使用JSON格式方便后续处理
    • 可添加TraceID等全链路追踪信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/76730.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【HTML】html文件

HTML文件全解析&#xff1a;搭建网页的基石 在互联网的广袤世界里&#xff0c;每一个绚丽多彩、功能各异的网页背后&#xff0c;都离不开HTML文件的默默支撑。HTML&#xff0c;即超文本标记语言&#xff08;HyperText Markup Language&#xff09;&#xff0c;作为网页创建的基…

oracle命令上下左右键无法使用如何解决?

1、问题如图 2、解决办法 (1) 安装readline yum -y install readline* &#xff08;2&#xff09;安装 rlwrap ##下载 wget http://files.cnblogs.com/files/killkill/rlwrap-0.30.tar.gz.zip ##解压 tar -xzvf rlwrap-0.30.tar.gz.zip ##编译安装 ./configure make &&…

vue事假机制都有哪些

Vue 的事件机制主要包含以下几种类型和方式&#xff0c;可以分为组件内部事件、父子组件通信事件、原生 DOM 事件封装、修饰符增强等&#xff0c;下面详细分类介绍&#xff1a; 一、DOM 事件绑定&#xff08;最基础的事件&#xff09; 使用 v-on&#xff08;或简写 &#xff0…

系统编程2(消息队列)

⦁ 消息队列概念 Linux系统中消息队列&#xff08;Message Queue&#xff09;是进程间通信的一种方式&#xff0c;这种通信机制的好处是可以传输指定类型(用户可以自行定义)的数据&#xff0c;相同类型的数据根据到达顺序在队列中进行排队。 当然&#xff0c;不同类型的数据不…

Pytorch深度学习框架60天进阶学习计划 - 第41天:生成对抗网络进阶(二)

Pytorch深度学习框架60天进阶学习计划 - 第41天&#xff1a;生成对抗网络进阶&#xff08;二&#xff09; 7. 实现条件WGAN-GP # 训练条件WGAN-GP def train_conditional_wgan_gp():# 用于记录损失d_losses []g_losses []# 用于记录生成样本的多样性&#xff08;通过类别分…

python 微博爬虫 01

起因&#xff0c; 目的: ✅下载单个视频&#xff0c;完成。✅ 获取某用户的视频列表&#xff0c;完成。剩下的就是&#xff0c; 根据视频列表&#xff0c;逐个下载视频&#xff0c;我没做&#xff0c;没意思。获取视频的评论&#xff0c;以后再说。 关键点记录: 1. 对一个视…

Servlet、HTTP与Spring Boot Web全面解析与整合指南

目录 第一部分&#xff1a;HTTP协议与Servlet基础 1. HTTP协议核心知识 2. Servlet核心机制 第二部分&#xff1a;Spring Boot Web深度整合 1. Spring Boot Web架构 2. 创建Spring Boot Web应用 3. 控制器开发实践 4. 请求与响应处理 第三部分&#xff1a;高级特性与最…

vue中根据html动态渲染内容2.0

上次使用的是p标签用的contenteditable代替的可编辑的input&#xff0c;最后实现还是选择了用el-input的textarea方式。 一开始考虑的是需要根据用户输入自动撑开输入框&#xff0c;所以选择了p标签可编辑。 最后发现还是el-input会更好一点&#xff0c;只不过需要处理输入框撑…

CentOS 系统磁盘扩容并挂载到根目录(/)的详细步骤

在使用 CentOS 系统时&#xff0c;经常会遇到需要扩展磁盘空间的情况。例如&#xff0c;当虚拟机的磁盘空间不足时&#xff0c;可以通过增加磁盘容量并将其挂载到根目录&#xff08;/&#xff09;来解决。以下是一个完整的操作流程&#xff0c;详细介绍了如何将新增的 10G 磁盘…

LINUX基础 [二] - Linux常见指令

目录 &#x1f4bb;前言 &#x1f4bb;指令 &#x1f3ae;ls指令 &#x1f3ae;pwd指令 &#x1f3ae;whoami指令 &#x1f3ae;cd指令 &#x1f3ae;clear指令 &#x1f3ae;touch指令 &#x1f3ae;mkdir指令 &#x1f3ae;rmdir指令 &#x1f3ae;rm指令 &#…

基于php的成绩分析和预警与预测网站(源码+lw+部署文档+讲解),源码可白嫖!

摘要 人类现已迈入二十一世纪&#xff0c;科学技术日新月异&#xff0c;经济、资讯等各方面都有了非常大的进步&#xff0c;尤其是资讯与网络技术的飞速发展&#xff0c;对政治、经济、军事、文化、教育等各方面都有了极大的影响。 利用电脑网络的这些便利&#xff0c;发展一套…

《从底层逻辑剖析:分布式软总线与传统计算机硬件总线的深度对话》

在科技飞速发展的当下&#xff0c;我们正见证着计算机技术领域的深刻变革。计算机总线作为信息传输的关键枢纽&#xff0c;其发展历程承载着技术演进的脉络。从传统计算机硬件总线到如今备受瞩目的分布式软总线&#xff0c;每一次的变革都为计算机系统性能与应用拓展带来了质的…

Spring Boot 3.5新特性解析:自动配置再升级,微服务开发更高效

&#x1f4dd; 摘要 Spring Boot 3.5作为Spring生态的最新版本&#xff0c;带来了多项令人振奋的改进。本文将深入解析其中最核心的自动配置增强特性&#xff0c;以及它们如何显著提升微服务开发效率。通过详细的代码示例和通俗易懂的讲解&#xff0c;您将全面了解这些新特性在…

【前端】webpack一本通

今日更新完毕&#xff0c;不定期补充&#xff0c;建议关注收藏点赞。 目录 简介Loader和Plugin的不同&#xff1f;&#xff08;必会&#xff09; 使用webpack默认只能处理js文件 ->引入加载器对JS语法降级&#xff0c;兼容低版本语法合并文件再次打包进阶 工作原理Webpack 的…

leetcode 264. Ugly Number II

动态规划解决。 关键是理解如何生成新的丑数。这道题和经典的斐波那契数列问题其实是一样的。求第n个数&#xff0c;需要用第n个数前面的数来求。不同的是&#xff0c;斐波那契数列不会重复。而本题的丑数&#xff0c;会重复出现。 class Solution { public:int nthUglyNumbe…

深入理解 HTML5 语义元素:提升网页结构与可访问性

引言 在构建网页的过程中&#xff0c;合理的结构与清晰的语义对于网页的质量、可维护性以及搜索引擎优化&#xff08;SEO&#xff09;都至关重要。HTML5 引入了一系列语义元素&#xff0c;为开发者提供了更精准描述网页内容的工具。本文将深入探讨 HTML5 语义元素的作用、使用…

PyCharm显示主菜单和工具栏

显示主菜单 新版 PyCharm 是不显示主菜单的&#xff0c;要想显示主菜单和工具栏&#xff0c;则通过 “视图” → “外观” &#xff0c;勾选 “在单独的工具栏中显示主菜单” 和 “工具栏” 即可。 设置工具栏 此时工具栏里并没有什么工具&#xff0c;因此我们需要自定义工具…

CyclicBarrier 基本用法

CyclicBarrier 基本用法 简介 CyclicBarrier 是 Java 并发包&#xff08;java.util.concurrent&#xff09;中的一个同步辅助类。它允许一组线程相互等待&#xff0c;直到到达某个公共屏障点&#xff08;common barrier point&#xff09;。只有当所有参与的线程都到达屏障点…

[特殊字符] 手机连接车机热点并使用 `iperf3` 测试网络性能

好的&#xff0c;以下是根据你的描述整理出来的步骤及解释&#xff1a; &#x1f4f6; 手机连接车机热点并使用 iperf3 测试网络性能 本文将通过 iperf3 来测试手机和车机之间的网络连接性能。我们会让车机作为服务端&#xff0c;手机作为客户端&#xff0c;进行 UDP 流量传输…

FPGA上实现SD卡连续多块读的命令

在FPGA上实现SD卡连续多块读的命令 CMD17命令一次只能读取1个块 CMD18命令一次可以连续读取多个块&#xff0c;直到停止命令CMD12 CMD18命令读的块数程序可任意设置 目录 前言 一、SD卡多块读命令CMD18 二、停止读命令CMD12 三、SD卡初始化SD卡连续块读操作的verilog代码 …