点一下关注吧!!!非常感谢!!持续更新!!!
Java篇开始了!
目前开始更新 MyBatis,一起深入浅出!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(已更完)
- Druid(已更完)
- Kylin(已更完)
- Elasticsearch(已更完)
- DataX(已更完)
- Tez(已更完)
- 数据挖掘(已更完)
- Prometheus(已更完)
- Grafana(已更完)
- 离线数仓(正在更新…)
章节内容
上节我们完成了如下的内容:
- 广告业务 点击次数 ADS层
- 广告效果分析 ADS 层
- 需求分析与加载
导入数据
Flume Agent
Flume 是一个分布式、可靠且可扩展的系统,用于收集、聚合和传输大量日志数据。它常用于从各种数据源(例如日志文件、应用程序、系统等)收集数据并将其传输到 Hadoop 生态系统(例如 HDFS、Hive、HBase 等)进行进一步处理。Flume 主要由多个组件构成,其中 Flume Agent 是核心的执行单元。
Flume Agent 是 Flume 架构中的基本执行单元,负责处理数据流的接收、传输和存储。它可以独立运行或作为 Flume 集群的一部分来提供更高的可扩展性。每个 Flume Agent 由以下几部分组成:
- Source:用于接收数据。
- Channel:用于在 Source 和 Sink 之间暂时存储数据。
- Sink:用于将数据传送到外部存储系统(如 HDFS、HBase 等)。
flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs3.conf -name a1 -Dflume.roog.logger=INFO,console
具体内容如下所示:
扩展性与容错性
Flume Agent 支持分布式部署,可以通过多个 Agent 在不同节点之间传递数据,适应大规模数据流转的需求。它的容错机制保证了即使某个组件出现故障,数据也不会丢失,依靠 Channel 和 Sink 的队列机制,事件可以被持久化,直到成功传送。
使用场景
Flume Agent 广泛应用于各种日志收集和大数据处理场景:
- 日志收集:Flume 可以从多个日志源收集数据,并将其统一存储到 Hadoop 系统(如 HDFS、HBase)中进行后续分析。
- 实时数据传输:Flume 可以作为实时数据流管道,将数据从不同的数据源实时地传输到目标存储。
- 数据聚合与整合:Flume 支持将多种类型的数据源进行聚合,提供统一的流处理方式。
准备数据
这里准备了 event 数据,如下图所示,将这批数据上传到指定的目录下,Flume会根据配置进行解析:
上传到服务器上:
观察结果
观察Flume的日志,可以看到如下的结果:
24/08/31 16:47:59 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=Unknown//startlog..1725094067630.tmp
24/08/31 16:48:00 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
24/08/31 16:48:00 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=2020-07-29//startlog..1725094080328.tmp
24/08/31 16:48:01 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
24/08/31 16:48:01 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=2020-07-30//startlog..1725094081742.tmp
24/08/31 16:48:03 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
24/08/31 16:48:03 INFO hdfs.BucketWriter: Creating /user/data/logs/event/dt=2020-07-31//startlog..1725094083220.tmp
24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0726.log, inode: 2172609, pos: 7815141
24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0725.log, inode: 2172608, pos: 7817434
24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0727.log, inode: 2172610, pos: 7813191
24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0729.log, inode: 2172613, pos: 7833524
24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0728.log, inode: 2172612, pos: 7841254
24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0721.log, inode: 2164743, pos: 7795972
24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0731.log, inode: 2172615, pos: 7804311
24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0723.log, inode: 2164790, pos: 7810323
24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0730.log, inode: 2172614, pos: 7814852
24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0722.log, inode: 2164744, pos: 7841582
24/08/31 16:49:53 INFO taildir.TaildirSource: Closed file: /opt/wzk/logs/event/event0724.log, inode: 2164798, pos: 7860701
对应的截图如下所示:
同时我们查看HDFS中的数据情况,如下所示:
调用顺序
脚本的调用顺序,下面是我们在广告业务中编写的脚本:
ods_load_event_log.sh
dwd_load_event_log.sh
dwd_load_ad_log.shads_load_ad_show.sh
ads_load_ad_show_rate.sh
ads_load_ad_show_page.sh
ads_load_ad_show_page_window.sh
加载ODS层
之前编写的:
sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-21
执行脚本,结果如下所示:
在Hive中查看对应的数据:
hive use ods;
select * from ods_log_event limit 5;
可以看到数据已经加载进来了:
这里我是把所有数据都加载了,后续执行:
sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-22
sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-23
sh /opt/wzk/hive/ods_load_event_log.sh 2020-07-24
...省略
最终的数据大约有:
加载DWD层
event_log
sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-21
执行结果如下所示:
在Hive中查看对应的内容:
hive use dwd;
select * from dwd_event_log limit 5;
执行结果如下:
这里我是把所有的数据都加载了,如下所示:
sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-22
sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-23
sh /opt/wzk/hive/dwd_load_event_log.sh 2020-07-24
...省略
加载完成之后,Hive中的数据量如下所示:
ad_log
sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-21
运行结果如下图所示:
查看Hive中的数据:
hive use dwd;
select * from dwd_ad limit 5;
运行结果如下图所示:
继续加载其他的数据:
sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-22
sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-23
sh /opt/wzk/hive/dwd_load_ad_log.sh 2020-07-24
...省略
最终Hive中的数据总量的结果是:
select count(*) from dwd_ad;
执行结果如下图所示: