07用户行为日志数据采集

用户行为数据由Flume从Kafka直接同步到HDFS,由于离线数仓采用Hive的分区表按天统计,所以目标路径要包含一层日期。具体数据流向如下图所示。
在这里插入图片描述
按照规划,该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。
此处选择KafkaSource、FileChannel、HDFSSink。关键配置如下:
在这里插入图片描述

日志消费者 Flume 实操

  1. 在hadoop101 节点的Flume 的 job目录下创建 kafka_to_hdfs_log.conf,内容如下
    配置注释:

    • FileChannel优化:配置 dataDirsk可以通过逗号分隔指向多个路径,每个路径对应不同硬盘,可以增加吞吐量。
    • 新增checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
    #定义组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1#配置source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092
    a1.sources.r1.kafka.topics=topic_log
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.logan.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6#配置sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = log
    a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = gzip#组装 
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
  2. HDFS Sink 优化

    • HDFS存入大量小文件,有什么影响?
      元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命。
    • HDFS小文件处理。
      官方默认三个参数配置写入HDFS 后会产生小文件: hdfs.rollInterval, hdfs.rollSize, hdfs.rollCount。
      本次配置的参数为hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0。意味着文件在达到128M时会滚动生成新文件,或者文件超过 3600 秒会生成新文件。
  3. 编写 Flume 拦截器

    • 解决问题
      在这里插入图片描述
    • 在com.logan.gmall.flume.interceptor包下创建TimestampInterceptor类
    package com.logan.gmall.flume.interceptor;import com.alibaba.fastjson.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
    import java.util.List;
    import java.util.Map;public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 获取header和body数据Map<String, String> headers = event.getHeaders();String body = new String(event.getBody(), StandardCharsets.UTF_8);// 将body转换成JsonObject类型JSONObject jsonObject = JSONObject.parseObject(body);// 将header中的timestamp时间转换成body中的timestamp(解决数据漂移问题)String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TimestampInterceptor();}@Overridepublic void configure(Context context) {}}@Overridepublic void close() {}
    }
  4. 将打好的包放入到hadoop101的/opt/module/flume/lib文件夹下

启动测试

  1. 启动 Zookeeper、Kafka 集群
  2. 启动 hadoop101 的消费Flume
[logan@hadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
  1. 生成模拟数据[logan@hadoop101 ~]$ vim /opt/module/applog/log/app.2023-12-14.log
{"common":{"ar":"110000","ba":"vivo","ch":"oppo","is_new":"0","md":"vivo iqoo3","mid":"mid_70997","os":"Android 11.0","uid":"776","vc":"v2.1.134"},"start":{"entry":"icon","loading_time":11968,"open_ad_id":16,"open_ad_ms":7891,"open_ad_skip_ms":0},"ts":1672503309000}
{"common":{"ar":"110000","ba":"vivo","ch":"oppo","is_new":"0","md":"vivo iqoo3","mid":"mid_70997","os":"Android 11.0","uid":"776","vc":"v2.1.134"},"displays":[{"display_type":"activity","item":"2","item_type":"activity_id","order":1,"pos_id":1},{"display_type":"activity","item":"2","item_type":"activity_id","order":2,"pos_id":1},{"display_type":"query","item":"9","item_type":"sku_id","order":3,"pos_id":1},{"display_type":"query","item":"18","item_type":"sku_id","order":4,"pos_id":4},{"display_type":"promotion","item":"35","item_type":"sku_id","order":5,"pos_id":4},{"display_type":"query","item":"35","item_type":"sku_id","order":6,"pos_id":4},{"display_type":"recommend","item":"13","item_type":"sku_id","order":7,"pos_id":5}],"page":{"during_time":14287,"page_id":"home"},"ts":1672503309000}
  1. 检查HFDS是否生成数据
  2. 当 HDFS 生成数据后,增加[logan@hadoop101 bin]$ vim f2.sh
#!/bin/bashcase $1 in
"start")echo " --------启动 hadoop101 日志数据flume-------"ssh hadoop101 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")echo " --------停止 hadoop101 日志数据flume-------"ssh hadoop101 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
  1. 最终 HDFS 文件
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/220944.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python高级算法——遗传算法(Genetic Algorithm)

Python中的遗传算法&#xff08;Genetic Algorithm&#xff09;&#xff1a;高级算法解析 遗传算法是一种启发式搜索算法&#xff0c;模拟自然选择和遗传机制&#xff0c;用于在解空间中寻找优化问题的解。它通过模拟基因的变异、交叉和选择操作&#xff0c;逐代演化产生新的解…

【Android】DeepLink

官方文档&#xff1a;创建指向应用内容的深层链接 Intro to Deep Linking on Android What is Deep linking? Deeplinks are a concept that help users navigate between the web and applications. They are basically URLs which navigate users directly to the specif…

数据集汇总

1、农业、生物、数据竞赛、教育、金融、健康汇总&#xff1a;https://github.com/awesomedata/awesome-public-datasets 2、人脸识别数据集&#xff1a;http://www.face-rec.org/databases/ 3、Yahoo实验室公开1亿Flickr图像和视频&#xff1a; http://yahoolabs.tumblr.com/po…

linux vfs 路径解析代码注释

linux版本为 v6.7 以chroot修改根目录为例&#xff0c;走一遍流程&#xff0c;重点在path_lookupat的实现。代码按逻辑组织&#xff0c;非真实代码顺序。由于涉及太多细节&#xff0c;每部分的开始会先做一个小结。 chroot 解析路径字符串&#xff0c;逐层进入&#xff0c;检…

C/C++ 表达式求值(含多位数)

个人主页&#xff1a;仍有未知等待探索_C语言疑难,数据结构,算法-CSDN博客 专题分栏&#xff1a;算法_仍有未知等待探索的博客-CSDN博客 目录 一、前言 二、解析 分析 最后直接上代码&#xff01; 一、前言 表达式求值是一个比较基础的代码关于栈的使用。在写的时候充分锻炼…

WEB 3D技术 以vue3+vite环境为例 讲解vue项目中使用three

上文 WEB 3D 技术&#xff0c;通过node环境创建一个three案例 中 我们打造了自己的第一个Web 3D界面 那么 今天 我们就来结合vue来开发我们的3D界面 这里 我们先创建一个文件夹 作为文件目录 千万不要放C盘 我们 依旧是在终端执行命令 npm init vitelatest输入一下项目名称 …

同城线下社交搭子,同城圈子交友系统

简介:打破传统耗时耗力的交友模式&#xff0c;实现1对1,点对点的快速即时交友模式&#xff0c;线上线下 整合&#xff0c;可在线查看状态以及距离远近&#xff0c;可自行设置每单的收益提成以及代理的分佣提成。 结构: TINKPHP框架 公众号H5;系统开源&#xff0c;方便二次开发…

自动机器学习是什么?概念及应用

自动机器学习 (Auto Machine Learning) 的应用和方法 随着众多企业在大量场景中开始采用机器学习&#xff0c;前后期处理和优化的数据量及规模指数级增长。企业很难雇用充足的人手来完成与高级机器学习模型相关的所有工作&#xff0c;因此机器学习自动化工具是未来人工智能 (A…

【INTEL(ALTERA)】 quartus版本 21使用SDI II IP出现错误:无法生成示例设计example_design

项目场景&#xff1a; quartus版本 21SDI II FPGA IP 设计示例生成失败怎么办 原因分析&#xff1a; 适用于 Windows* 的英特尔 Quartus Prime Pro Edition 软件版本 21.3 和版本 21.4 以及英特尔 Quartus Prime Standard Edition 软件版本 22.1 中存在问题&#xff0c;SDI I…

ICC2:low power与pg strategy(pg_macro_conn_pattern)

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球入口 创建hard macro上的stripe,参考示例: set pd_list{{DEFAULT_VA VDD_DIG VDD_DIG VSS} {PD_DSP VDD_DIG VDD_DSP VSS} } ;#两个电源域,DEFAULT_VA和PD_DSP是对应voltage area名字,其中DEFAULT_…

机器学习可重复性危机下,创建复杂数据系统的挑战

文章目录 一、前言二、主要内容三、总结 &#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 一、前言 数据科学系统已成为众多研究领域的关键性工具&#xff0c;其开发者群体呈现出多元化的背景特征。在过去十年中&#xff0c;尽管数据科学与机器学习的强…

[论文笔记] 大模型主流Benchmark测试集介绍

自然语言处理(NLP)的进步往往通过在各种benchmark测试集上的表现来衡量。随着多语言和跨语言NLP研究的兴起,越来越多的多语言测试集被提出以评估模型在不同语言和文化背景下的泛化能力。在这篇文章中,我们将介绍几个主流的多语言NLP benchmark测试集,包括ARC Challenge、H…

Android hwcomposer服务启动流程

Android hwcomposer服务启动流程 客户端 binder远程调用 服务端 surfaceflinger --binder--> hwcomposer .hal文件编译时生成支持binder进程间远程调用通信的cpp文件 在out/soong/.intermediates/hardware/interfaces/graphics/composer/2.1/ 目录下找…

测试用例设计方法:功能图

1 引言 前面几篇文章为我们讲述了因果图、判定表、正交试验等几种方法&#xff0c;主要是针对于不同条件输入输出的组合进行测试&#xff0c;但在实际需求中&#xff0c;我们也常会遇到需要对被测对象的状态流转进行验证的情况&#xff0c;此时前面几种方法将不再适用&#xf…

005 本地安全策略

一、本地安全策略 1、概念 主要是对登录计算机的账户进行一些安全设置主要影响是本地计算机安全设置 2、打开方式 开始菜单->管理工具->本地安全策略使用命令secpol.msc从本地组策略进去&#xff0c;使用命令gpedit.msc 二、账户策略 1、密码策略&#xff08;默认情…

Java安全工具Jar包加密

jar包加密有很多种方式&#xff0c;我这边记录一下使用mavenClassFinal的方式,classFinal可以通过jar包/maven的方式来使用&#xff0c;因为maven使用较为简单&#xff0c;我仅记录使用maven的方式 在需要打包的启动程序的pom文件中添加如下plugin 1.plugin需要加在spring-boot…

Windows本地的RabbitMQ服务怎么在Docker for Windows的容器中使用

1. 进入管理界面 windows安装过程请访问&#xff1a;Windows安装RabbitMQ、添加PHP的AMQP扩展 浏览器访问&#xff1a;http://127.0.0.1:15672/ 2. 创建虚拟主机 上面访问的是 RabbitMQ 的管理界面&#xff0c;可以在这个界面上进行一些操作&#xff0c;比如创建虚拟主机、…

P1000 超级玛丽游戏

题目背景 本题是洛谷的试机题目&#xff0c;可以帮助了解洛谷的使用。 建议完成本题目后继续尝试 P1001、P1008。 另外强烈推荐新用户必读贴 题目描述 超级玛丽是一个非常经典的游戏。请你用字符画的形式输出超级玛丽中的一个场景。 ******** ************ …

找到字符串中所有字母异位词-中等

leetcode ****1111 给定两个字符串 s 和 p&#xff0c;找到 s 中所有 p 的 异位词 的子串&#xff0c;返回这些子串的起始索引。不考虑答案输出的顺序。 异位词 指由相同字母重排列形成的字符串&#xff08;包括相同的字符串&#xff09;。 示例 1:输入: s "cbaebaba…

CSS彩色发光液体玻璃

效果展示 CSS 知识点 animation 综合运用animation-delay 综合运用filter 的 hue-rotate 属性运用 页面整体布局 <section><div class"glass" style"--i: 1"><div class"inner"><div class"liquid"></d…