大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 了 拦截器实现 Java

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 需求分析 指标口径
  • 日志数据采集 taildir source HDFS Sink Agent Flume
  • 优化配置

在这里插入图片描述

Flume的优化配置

Flume 是一种分布式、可靠且高效的数据收集、聚合和传输系统,广泛应用于大数据生态系统中。为了提升 Flume 的性能和稳定性,优化配置至关重要。

使用如下的指令,启动Agent进行测试:

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs1.conf -name a1 -Dflum
e.roog.logger=INFO,console

启动后的截图如下所示:
在这里插入图片描述

查看刚才的Flume窗口:
在这里插入图片描述

查看HDFS的内容:
在这里插入图片描述

批量处理

  • 参数:batchSize
  • 作用:控制 Flume 在批量传输时每次传输的事件数量。
    配置建议:
  • Source 到 Channel:根据 Source 的吞吐量和 Channel 的吞吐能力调整,推荐值为 100-1000。
  • Channel 到 Sink:根据 Sink 的处理能力和目标系统的写入性能调整,推荐值为 500-5000。

压缩传输

  • 参数:compressionType
  • 作用:对事件进行压缩后传输,减少网络带宽消耗。
  • 支持的压缩类型:gzip、snappy、lz4 等。
  • 配置建议:根据目标系统是否支持解压缩功能选择合适的压缩类型。

Source 优化

Taildir Source

  • 参数:batchSize 和 fileHeader
  • batchSize:设置单次从文件中读取的事件数量。
  • fileHeader:是否在事件头部添加文件名,推荐开启以便于后续处理。

Kafka Source

  • 参数:kafka.consumer.timeout.ms 和 fetch.message.max.bytes
  • kafka.consumer.timeout.ms:设置 Kafka 消费者读取数据的超时时间,通常为 100-500ms。
  • fetch.message.max.bytes:设置每次读取的最大消息大小,默认值通常为 1MB,可以根据业务场景适当调整。

Channel 优化

Memory Channel

  • 参数:capacity 和 transactionCapacity
  • capacity:Channel 中允许的最大事件数。
  • transactionCapacity:单次事务中允许的最大事件数。

File Channel

  • 参数:checkpointDir 和 dataDirs
  • checkpointDir:存储 Channel 状态的目录。
  • dataDirs:存储事件数据的目录,建议设置多个磁盘路径以提升 IO 性能。
  • 配置建议:确保磁盘 IO 性能足够,避免瓶颈。

Flume报错解决

向 logs 目录中存放入日志文件,此时如果出现OOM的日志,是因为缺省情况下FlumeJVM的最大分配20M,这个值太小,需要调整。
我这里直接放入:

vim /opt/wzk/logs/start/test.log2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

解决方案:
在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容:

export JAVA_OPTS="-Xms4000m -Xmx4000m -
Dcom.sun.management.jmxremote"
# 要想使配置文件生效,还要在命令行中指定配置文件目录
flume-ng agent --conf flume-1.9/conf --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,consoleflume-ng agent --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

Flume内存参数设置及优化:

  • 根据日志数据量大小,JVM堆一般要设置为4G或者更高
  • -Xms -Xmx最好设置一致,减少内存抖动带来的性能影响

自定义拦截器

前面FlumeAgent的配置使用了本地时间,可能导致数据存放的路径不正确。要解决上面的问题就需要使用自定义拦截器。
Agent用于测试自定义拦截器,source => logger sink
flumetest1.conf

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = h122.wzk.icu
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.CustomerInterceptor$Builder
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = logger
# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器原理

自定义拦截器的原理:

  • 自定义拦截器要集成 Flume 的 Interceptor
  • Event 分为 header 和 body (接收的字符串)
  • 获取 header 和 body
  • 从 body 中 获取 time,并将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置到header中

自定义拦截器实现

自定义拦截器的实现:

  • 获取event的header
  • 获取event的body
  • 解析body获取json串
  • 解析json串获取时间戳
  • 将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置header中
  • 返回event

导入依赖

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency>
</dependencies>

编写代码

package icu.wzk;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class CustomerInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 这里是逐条处理String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);// 获取Event的HeaderMap<String, String> headerMap = event.getHeaders();// 解析Body获取JSON字符串String[] bodyArr = eventBody.split("\\s+");try {String jsonStr = bodyArr[6];// 解析JSON字符串获取时间戳JSONObject jsonObject = JSON.parseObject(jsonStr);String timestampStr = jsonObject.getJSONObject("app_active").getString("time");// 将时间戳转换字符串 yyyy-MM-dd// 将字符串转换为Longlong timestampLong = Long.parseLong(timestampStr);DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");Instant instant = Instant.ofEpochMilli(timestampLong);LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());String date = formatter.format(localDateTime);// 将转换后的字符串放置header中headerMap.put("logtime", date);event.setHeaders(headerMap);} catch (Exception e) {headerMap.put("logtime", "Unknown");event.setHeaders(headerMap);}return event;}@Overridepublic List<Event> intercept(List<Event> list) {List<Event> lstEvent = new ArrayList<>();for (Event event : list) {Event outEvent = intercept(event);if (outEvent != null) {lstEvent.add(outEvent);}}return lstEvent;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new CustomerInterceptor();}@Overridepublic void configure(Context context) {}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/59717.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

无人机动力系统测试-实测数据与CFD模拟仿真数据关联对比分析

我们经常被问到这样的问题&#xff1a;“我们计划运行 CFD 仿真&#xff0c;我们还需要对电机和螺旋桨进行实验测试吗&#xff1f;我们可能有偏见&#xff0c;但我们的答案始终是肯定的&#xff0c;而且有充分的理由。我们自己执行了大量的 CFD 仿真&#xff0c;但我们承认&…

验证双随机矩阵(doubly stochastic matrix) 满足C(P)=C(P^T)

验证双随机矩阵(doubly stochastic matrix) 满足C( P P P)C(P T ^T T) 双随机矩阵&#xff1a; 在数学中&#xff0c;一个双随机矩阵&#xff08;doubly stochastic matrix&#xff09;是一个满足以下条件的矩阵&#xff1a; 非负矩阵&#xff1a;矩阵中的每个元素都是非负的…

Chrome 浏览器开启打印模式

打开开发者工具ctrl shift p输入print 找到 Emulate CSS print media type

Vite初始化Vue3+Typescrpt项目

初始化项目 安装 Vite 首先&#xff0c;确保你的 Node.js 版本 > 12.0.0。然后在命令行中运行以下命令来创建一个 Vite Vue 3 TypeScript 的项目模板&#xff1a; npm init vitelatest进入项目目录 创建完成后&#xff0c;进入项目目录&#xff1a; cd vue3-demo启动…

24 年第十届数维杯国际数模竞赛赛题浅析

本次万众瞩目的数维杯国际大学生数学建模赛题已正式出炉&#xff0c;无论是赛题难度还是认可度&#xff0c;该比赛都是数模届的独一档&#xff0c;含金量极高&#xff0c;可以用于综测加分、保研、简历添彩等各方面。考虑到大家解题实属不易&#xff0c;为了帮助大家取得好成绩…

自动语音识别(ASR)与文本转语音(TTS)技术的应用与发展

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

CentOS8 启动错误,enter emergency mode ,开机直接进入紧急救援模式,报错 Failed to mount /home 解决方法

先看现场问题截图&#xff1a; 1.根据提示 按 ctrld 输入 root 密码&#xff0c;进入系统。 2. 在紧急模式下运行&#xff1a;journalctl -xe &#xff0c;查看相关日志&#xff0c;找到关键点&#xff1a; Failed to mount /home 3.接着执行修复命令&#xff1a; xfs_repa…

Java项目实战II基于微信小程序的课堂助手(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 在数字化教…

借助 Pause 容器调试 Pod

借助 Pause 容器调试 Pod 在 K8S 中&#xff0c;Pod 是最核心、最基础的资源对象&#xff0c;也是 Kubernetes 中调度最小单元。在介绍 Pause 容器之前需要先说明下 Pod 与容器的关系来理解为什么需要 Pause 容器来帮助调试 1. Pod 与 容器的关系 Pod 是一个抽象的逻辑概念&…

IDEA自定义文件打开格式

介绍在IDEA中自定义文件打开格式的方法&#xff0c;比如一个文件&#xff0c;可以选择用txt格式打开&#xff0c;也可以选择用xml格式打开&#xff0c;也可以用java格式打开等等&#xff0c;通过这个方法可以方便的用任意格式在idea中打开想要打开的文件。 下面分别讨论三种不…

Git 分⽀规范 Git Flow 模型

前言 GitFlow 是一种流行的 Git 分支管理策略&#xff0c;由 Vincent Driessen 在 2010 年提出。它提供了一种结构化的方法来管理项目的开发、发布和维护&#xff0c;特别适合大型和复杂的项目。GitFlow 定义了一套明确的分支模型和工作流程&#xff0c;使得团队成员可以更有效…

ECG心电前级信号提取

由于ECG信号很微弱&#xff0c;处于mV级别&#xff0c;还有很多干扰信号&#xff0c;所以采集信号时需要进行滤波和放大处理&#xff0c;然后使用模数转换。为了滤波高频干扰和工频噪声&#xff0c;需要使用低通滤波器和陷波器抑制噪声&#xff0c;有时也要使用高通滤波器滤除低…

【Android】逆向开发与反逆向开发入门知识(一)

目录 逆向开发反编译 & 反混淆反编译工具反编译反混淆 修改预置资源文件抓包前期准备二次打包重签名 如何预防 App 被逆向开发&#xff1f;代码混淆应用加固防止动态调试Root 检测二次打包检测 警告&#xff1a;逆向开发相关知识请在法律规定范围内使用&#xff0c;请勿使用…

华为Mate 70临近上市:代理IP与抢购攻略

随着科技的飞速发展&#xff0c;智能手机已经成为我们日常生活中不可或缺的一部分。而在众多智能手机品牌中&#xff0c;华为一直以其卓越的技术和创新力引领着行业的发展。近日&#xff0c;华为Mate 70系列手机的发布会正式定档在11月26日&#xff0c;这一消息引发了众多科技爱…

【Linux之权限】理论篇

前言 Linux的权限是我们学习Linux初期非常重要的基础知识&#xff0c;接下来我将通过一个系列【Linux之权限】&#xff0c;共三篇文章&#xff0c;对此进行较为全面和详细的解说。 sudo 情况&#xff1a;如果我们不是超级管理员&#xff0c;但是想执行一个权限级别比较高的指…

[C++] 智能指针

文章目录 智能指针的使用原因及场景分析为什么需要智能指针&#xff1f;异常抛出导致的资源泄漏问题分析 智能指针与RAIIC常用智能指针 使用智能指针优化代码优化后的代码优化点分析 析构函数中的异常问题解决方法 RAII 和智能指针的设计思路详解什么是 RAII&#xff1f;RAII 的…

spark性能优化调优指导性文件

1.让我们看一下前面的核心参数设置&#xff1a; num-executors10||20&#xff0c;executor-cores1||2&#xff0c;executor-memory10||20&#xff0c;driver-memory20&#xff0c;spark.default.parallelism64 假设我们的火花队列资源如下&#xff1a; 内存1T&#xff0c;内…

视频流媒体播放器EasyPlayer.js RTSP播放器视频颜色变灰色/渲染发绿的原因分析

EasyPlayer.js RTSP播放器属于一款高效、精炼、稳定且免费的流媒体播放器&#xff0c;可支持多种流媒体协议播放&#xff0c;无须安装任何插件&#xff0c;起播快、延迟低、兼容性强&#xff0c;使用非常便捷。 EasyPlayer.js播放器不仅支持H.264与H.265视频编码格式&#xff0…

零售项目管理的核心问题:如何让协作更高效?

在零售行业&#xff0c;团队协作的效率直接影响到市场反应速度和客户满意度。商品的上下架、库存管理、促销活动的策划与执行、跨部门的沟通与协作……每一个环节都需要精准的协调。而在这些纷繁复杂的任务中&#xff0c;项目管理软件正成为零售行业的关键工具&#xff0c;帮助…

用appinventor制作艾宾浩斯遗忘曲线app

&#xff08;呕心沥血 仅供参考&#xff09; 测试效果演示 用appinventor制作课本记背应用程序&#xff08;基于遗忘曲线设计&#xff09; 目录 效果演示 项目重难点 总体设计 系统功能模块设计 总体结构如下图所示&#xff1a; 功能模块 详细设计与实现 登录界面 界…