大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 拦截器实现 Java

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 需求分析 指标口径
  • 日志数据采集 taildir source HDFS Sink Agent Flume
  • 优化配置

在这里插入图片描述

Flume的优化配置

Flume 是一种分布式、可靠且高效的数据收集、聚合和传输系统,广泛应用于大数据生态系统中。为了提升 Flume 的性能和稳定性,优化配置至关重要。

使用如下的指令,启动Agent进行测试:

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs1.conf -name a1 -Dflum
e.roog.logger=INFO,console

启动后的截图如下所示:
在这里插入图片描述

查看刚才的Flume窗口:
在这里插入图片描述

查看HDFS的内容:
在这里插入图片描述

批量处理

  • 参数:batchSize
  • 作用:控制 Flume 在批量传输时每次传输的事件数量。
    配置建议:
  • Source 到 Channel:根据 Source 的吞吐量和 Channel 的吞吐能力调整,推荐值为 100-1000。
  • Channel 到 Sink:根据 Sink 的处理能力和目标系统的写入性能调整,推荐值为 500-5000。

压缩传输

  • 参数:compressionType
  • 作用:对事件进行压缩后传输,减少网络带宽消耗。
  • 支持的压缩类型:gzip、snappy、lz4 等。
  • 配置建议:根据目标系统是否支持解压缩功能选择合适的压缩类型。

Source 优化

Taildir Source

  • 参数:batchSize 和 fileHeader
  • batchSize:设置单次从文件中读取的事件数量。
  • fileHeader:是否在事件头部添加文件名,推荐开启以便于后续处理。

Kafka Source

  • 参数:kafka.consumer.timeout.ms 和 fetch.message.max.bytes
  • kafka.consumer.timeout.ms:设置 Kafka 消费者读取数据的超时时间,通常为 100-500ms。
  • fetch.message.max.bytes:设置每次读取的最大消息大小,默认值通常为 1MB,可以根据业务场景适当调整。

Channel 优化

Memory Channel

  • 参数:capacity 和 transactionCapacity
  • capacity:Channel 中允许的最大事件数。
  • transactionCapacity:单次事务中允许的最大事件数。

File Channel

  • 参数:checkpointDir 和 dataDirs
  • checkpointDir:存储 Channel 状态的目录。
  • dataDirs:存储事件数据的目录,建议设置多个磁盘路径以提升 IO 性能。
  • 配置建议:确保磁盘 IO 性能足够,避免瓶颈。

Flume报错解决

向 logs 目录中存放入日志文件,此时如果出现OOM的日志,是因为缺省情况下FlumeJVM的最大分配20M,这个值太小,需要调整。
我这里直接放入:

vim /opt/wzk/logs/start/test.log2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

解决方案:
在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容:

export JAVA_OPTS="-Xms4000m -Xmx4000m -
Dcom.sun.management.jmxremote"
# 要想使配置文件生效,还要在命令行中指定配置文件目录
flume-ng agent --conf flume-1.9/conf --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,consoleflume-ng agent --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

Flume内存参数设置及优化:

  • 根据日志数据量大小,JVM堆一般要设置为4G或者更高
  • -Xms -Xmx最好设置一致,减少内存抖动带来的性能影响

自定义拦截器

前面FlumeAgent的配置使用了本地时间,可能导致数据存放的路径不正确。要解决上面的问题就需要使用自定义拦截器。
Agent用于测试自定义拦截器,source => logger sink
flumetest1.conf

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = h122.wzk.icu
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.CustomerInterceptor$Builder
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = logger
# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器原理

自定义拦截器的原理:

  • 自定义拦截器要集成 Flume 的 Interceptor
  • Event 分为 header 和 body (接收的字符串)
  • 获取 header 和 body
  • 从 body 中 获取 time,并将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置到header中

自定义拦截器实现

自定义拦截器的实现:

  • 获取event的header
  • 获取event的body
  • 解析body获取json串
  • 解析json串获取时间戳
  • 将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置header中
  • 返回event

导入依赖

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency>
</dependencies>

编写代码

package icu.wzk;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class CustomerInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 这里是逐条处理String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);// 获取Event的HeaderMap<String, String> headerMap = event.getHeaders();// 解析Body获取JSON字符串String[] bodyArr = eventBody.split("\\s+");try {String jsonStr = bodyArr[6];// 解析JSON字符串获取时间戳JSONObject jsonObject = JSON.parseObject(jsonStr);String timestampStr = jsonObject.getJSONObject("app_active").getString("time");// 将时间戳转换字符串 yyyy-MM-dd// 将字符串转换为Longlong timestampLong = Long.parseLong(timestampStr);DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");Instant instant = Instant.ofEpochMilli(timestampLong);LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());String date = formatter.format(localDateTime);// 将转换后的字符串放置header中headerMap.put("logtime", date);event.setHeaders(headerMap);} catch (Exception e) {headerMap.put("logtime", "Unknown");event.setHeaders(headerMap);}return event;}@Overridepublic List<Event> intercept(List<Event> list) {List<Event> lstEvent = new ArrayList<>();for (Event event : list) {Event outEvent = intercept(event);if (outEvent != null) {lstEvent.add(outEvent);}}return lstEvent;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new CustomerInterceptor();}@Overridepublic void configure(Context context) {}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/60005.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

编译OpenCV的速度,家里和公司的电脑相差很大

这一段时间&#xff0c;研究OpenCV带ffmpeg的编译问题。然后发现&#xff0c;同样是虚拟机&#xff0c;编译速度&#xff0c;家里的电脑明显比公司慢多了。 都是在SSD上。虚拟机内存&#xff0c;家里是16G&#xff0c;公司是8G。CPU&#xff0c;家里是E5 2667&#xff0c;公司…

Qt 的 QThread:多线程编程的基础

Qt 的 QThread&#xff1a;多线程编程的基础 在现代应用程序中&#xff0c;尤其是需要处理大量数据、进行长时间计算或者进行 I/O 操作时&#xff0c;多线程编程变得至关重要。Qt 提供了一个功能强大且易于使用的线程类 QThread&#xff0c;可以帮助开发者在 Qt 应用程序中实现…

Java 线程池介绍与实践

文章目录 引言概念优势Java 中的线程池实现线程池的核心参数1. corePoolSize&#xff1a;核心线程数2. maximumPoolSize&#xff1a;最大线程数3. keepAliveTime&#xff1a;线程空闲时间4. unit&#xff1a;时间单位5. workQueue&#xff1a;任务队列6. threadFactory&#xf…

富格林:安全指正规防欺诈套路

富格林指出&#xff0c;在现货黄金投资操作中&#xff0c;有众多的投资技巧和投资方式&#xff0c;但其实并不是所有的都适用。投资者应该注意选择安全、可信的投资方式去规防欺诈套路。值得提醒的是&#xff0c;现货黄金虽然拥有很多获利的机会&#xff0c;但也有不少欺诈套路…

PyAEDT:Ansys Electronics Desktop API 简介

在本文中&#xff0c;我将向您介绍 PyAEDT&#xff0c;这是一个 Python 库&#xff0c;旨在增强您对 Ansys Electronics Desktop 或 AEDT 的体验。PyAEDT 通过直接与 AEDT API 交互来简化脚本编写&#xff0c;从而允许在 Ansys 的电磁、热和机械求解器套件之间无缝集成。通过利…

SpringBoot(二十六)SpringBoot自定义注解

注解在springboot日常开发中使用的频率是很高的,官方为我们提供了很多注解,比如:@Autowired、@GetMapping等…… 但是我们有些特定的需求官方提供的注解是没有的。我们可以自定义注解。 下面我们来了解一下自定义注解的过程。 一:元注解 Java为我们提供了几个元注解来自定…

DHTMLX-gantt组件显示不同的颜色

在 dhtmlxGantt 组件中&#xff0c;你可以通过自定义任务的颜色来显示不同的任务类型或状态。这通常通过配置任务的 color 属性来实现。你可以在初始化 Gantt 图表时或在动态添加任务时设置这个属性。 以下是一些常见的方法来为任务设置不同的颜色&#xff1a; 1. 初始化时设…

什么是迭代器?Python迭代器及其用法

迭代器是一种对象&#xff0c;它表示一个数据流&#xff0c;可以一次访问一个成员&#xff08;元素&#xff09;。 迭代器从集合的第一个元素开始访问&#xff0c;直到所有的元素被访问完结束。迭代器只能往前不会后退。 在Python中&#xff0c;迭代器是一个实现了迭代协议的…

什么是C++中的友元函数和友元类?

友元函数&#xff08;Friend Function&#xff09;和 友元类&#xff08;Friend Class&#xff09;是用于控制类的访问权限的机制。这允许特定的函数或类访问另一个类的私有成员和保护成员&#xff0c;打破了 C 的封装性规则。 友元函数 定义 友元提供了不同类的成员函数之间…

深入理解 CSS 属性 pointer-events: none

pointer-events 是 CSS 中一个用于控制元素响应鼠标事件&#xff08;或触摸事件&#xff09;的属性。 通过这个属性&#xff0c;我们可以控制元素是否能够接受鼠标事件&#xff0c;例如点击、悬停、拖动等。 其中&#xff0c;pointer-events: none 是 pointer-events 属性的一…

定时器(QTimer)与随机数生成器(QRandomGenerator)的应用实践——Qt(C++)

一、QTimer与QRandomGenerator &#xff08;一&#xff09;QTimer&#xff08;定时器&#xff09;[2] QTimer类为定时功能提供了一个高级编程接口。在使用QTimer时&#xff0c;实例化一个QTimer对象并将其timeout()发射信号与合适的信号槽相连接。通过调用QTimer的start()函数…

用redis的zset实现日榜,周榜,月榜

思路&#xff1a; 1.初始化一个月的数据&#xff1a; /*** 初始化一个月数据*/Testpublic void initMonthData(){//计算当前时间小时的keylong hourSystem.currentTimeMillis()/(1000*60*60);for(int i1;i<24*30;i){String key"W_hour"(hour-i);Random random new…

《MySQL 实战教程:从零开始到高手进阶》

当然可以。下面是一篇关于MySQL的学习指南&#xff0c;它适合初学者到中级用户&#xff0c;涵盖了MySQL的基础知识、安装步骤、基本命令以及一些高级功能。 MySQL 学习指南 1. 了解 MySQL MySQL 是一个关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;由瑞典…

通过shell脚本分析部署nginx网络服务

通过shell脚本分析部署nginx网络服务 1.接收用户部署的服务名称 [rootlocalhost xzy]# vim 1.sh [rootlocalhost xzy]# chmod x 1.sh [rootlocalhost xzy]# ./1.sh2.判断服务是否安装 已安装&#xff1b;自定义网站配置路径为/www&#xff1b;并创建共享目录和网页文件&…

威胁驱动的网络安全方法论

摘要 目前的网络安全风险管理实践很大程度上是由合规性要求驱动的&#xff0c;这使得公司/组织不得不在安全控制和漏洞上投入人力/物力。&#xff08;风险管理涉及多个方面&#xff0c;包括资产、威胁、漏洞和控制&#xff0c;并根据事故发生的可能性及造成的影响进行评估。威…

『VUE』30. 生命周期的介绍(详细图文注释)

目录 生命周期生命周期的8阶段生命周期小例子总结 欢迎关注 『VUE』 专栏&#xff0c;持续更新中 欢迎关注 『VUE』 专栏&#xff0c;持续更新中 生命周期 每个 Vue 组件实例在创建时都需要经历一系列的初始化步骤&#xff0c;比如设置好数据侦听&#xff0c;编译模板&#xf…

在 Unix 和类 Unix 操作系统中,信号是一种异步的通知机制,用于通知进程发生了一些特定的事件。

在 Unix 和类 Unix 操作系统中&#xff0c;信号是一种异步的通知机制&#xff0c;用于通知进程发生了一些特定的事件。以下是一些常见的信号及其用途和默认行为的详细介绍&#xff1a; 常见信号及其用途 SIGINT (2) 含义&#xff1a;中断信号&#xff08;Interrupt Signal&…

idea 通过git撤销commit但未push的操作

1、undo commit 适用情况&#xff1a;代码修改完了&#xff0c;已经Commit了&#xff0c;但是还未push&#xff0c;然后发现还有地方需要修改不想提交本次记录了。这时可以进行Undo Commit&#xff0c;修改后再重新Commit。注意&#xff1a;如果已经进行了Push&#xff0c;线上…

【graphics】图形绘制 C++

众所周知&#xff0c;周知所众&#xff0c;图形绘制对于竞赛学僧毫无用处&#xff0c;所以这个文章&#xff0c;专门对相关人员教学&#xff08;成长中的码农、高中僧、大学僧&#xff09;。 他人经验教学参考https://blog.csdn.net/qq_46107892/article/details/133386358?o…

Spring Boot出现java: 错误: 无效的源发行版:16的解决方式

第一步&#xff1a; 修改为SDK的目标字节码版本 第二步&#xff1a;CtrlShiftAltS进入项目结构 第三步&#xff1a;pom.xml文件中 在网上搜索和自己SDK适配的Springboot版本&#xff0c;1.8对应的是2.7.1&#xff08;可以用&#xff09; 修改Java版本为1.8 最后的最后&a…