离线数仓同步数据1

用户行为表数据同步

  • 2.1.4 日志消费Flume测试

[gpb@hadoop104 ~]$ cd /opt/module/flume/
[gpb@hadoop104 flume]$ cd job/
[gpb@hadoop104 job]$ rm file_to_kafka.conf

com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置sources
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.kafka.consumer.group.id=topic_log
a1.sources.r1.batchSize = 2000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 3#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.1.3 日志消费Flume配置实操
1)创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_log.conf
[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf 
2)配置文件内容如下#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置优化
1)FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
2HDFS Sink优化
(1HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。(2HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件
3)编写Flume拦截器
(1)数据漂移问题(2)在com.atguigu.gmall.flume.interceptor包下创建TimestampInterceptor类
package com.atguigu.gmall.flume.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {//1、获取header和body的数据Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);//2、将body的数据类型转成jsonObject类型(方便获取数据)JSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampInterceptor();}@Overridepublic void configure(Context context) {}}
}3)重新打包(4)需要先将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下面。

2.1.4 日志消费Flume测试

1)启动Zookeeper、Kafka集群
2)启动日志采集Flume
[atguigu@hadoop102 ~]$ f1.sh start
3)启动hadoop104的日志消费Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
4)生成模拟数据
[atguigu@hadoop102 ~]$ lg.sh 
5)观察HDFS是否出现数据
2.1.5 日志消费Flume启停脚本
若上述测试通过,为方便,此处创建一个Flume的启停脚本。
1)在hadoop102节点的/home/atguigu/bin目录下创建脚本f2.sh
[atguigu@hadoop102 bin]$ vim f2.sh在脚本中填写如下内容
#!/bin/bashcase $1 in
"start")echo " --------启动 hadoop104 日志数据flume-------"ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")echo " --------停止 hadoop104 日志数据flume-------"ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod 777 f2.sh
3)f2启动
[atguigu@hadoop102 module]$ f2.sh start
4)f2停止
[atguigu@hadoop102 module]$ f2.sh stop

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/77041.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringMvc增删改查

SpringMvc增删改查 一、前期准备二、逆向生成增删改查2.2.aspect切面层2.3.Mybatis generator逆向生成2.4.根据生成代码编写Biz层与实现类 三、controller层代码编写四、前台代码与分页代码五、案例测试 一、前期准备 1.2.导入pom.xml依赖 <?xml version"1.0" …

基于springboot的新闻门户网站

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…

接口测试 —— Requests库GET请求

Requests库GET请求是使用HTTP协议中的GET请求方式对目标网站发起请求。 &#xff08;不带参数的GET请求请看上一篇文章的练习&#xff09; 1、Requests库待参数的GET请求 使用Get方法带参数请求时&#xff0c;是params参数字典&#xff0c;而不是data参数字典。data参数字典…

基本Dos命令

1.打开cmd的方式 &#xff08;1&#xff09;winR&#xff0c;输入cmd即可 &#xff08;2&#xff09;在任意文件夹下面&#xff0c;按住shift键后点击鼠标右键&#xff0c;即可在此文件夹目录下打开命令行窗口。 &#xff08;3&#xff09;资源管理器的地址栏前面加上 cmd…

uni-app直播从0到1实战

1.安装开发工具 2.创建项目 参考&#xff1a;uniapp从零到一的学习商城实战_云澜哥哥的博客-CSDN博客 3.编写公共样式&#xff1a;common.css & free.css App.vue引入公共文件&#xff1a; 图标库&#xff1a;iconfont-阿里巴巴矢量图标库

第一次面试

1.多态的原理 2.编译原理 3.HTTPS的加密原理 4.说一说C11新特性 5.平时用过哪些STL容器 6.STL的比较器 原来就是自定义工具类hhhhhh 7.函数指针用过吗 8.I/O多路复用 9.Redis 问的基本都背过&#xff0c;但是一紧张啥都忘了hhhhhhhhh

android 布局 横屏 android横屏适配

一、刘海屏适配 1、layoutInDisplayCutoutMode属性 Android 9.0系统中提供了3种layoutInDisplayCutoutMode属性来允许应用自主决定该如何对刘海屏设备进行适配。 LAYOUT_IN_DISPLAY_CUTOUT_MODE_DEFAULT 这是一种默认的属性&#xff0c;在不进行明确指定的情况下&#xff0c;系…

出行计划(2023寒假每日一题 16)

最近西西艾弗岛上出入各个场所都要持有一定时限内的核酸检测阴性证明。 具体来时&#xff0c;如果在 t t t 时刻做了核酸检测&#xff0c;则经过一段时间后可以得到核酸检测阴性证明。 这里我们假定等待核酸检测结果需要 k k k 个单位时间&#xff0c;即在 t k tk tk 时刻…

MapTR v2文章研读

MapTR v2论文来了&#xff0c;本文仅介绍v2相较于v1有什么改进之处&#xff0c;如果想了解v1版本的论文细节&#xff0c;可见链接。 相较于maptr&#xff0c;maptr v2改进之处&#xff1a; 在分层query机制中引进解耦自注意力机制&#xff0c;有效降低了内存消耗&#xff1b;…

Android 11 系统默认语言修改

Android 系统原版默认的语言为英文,但是对于中国大陆 Android 产品厂商来说,我们定制系统可能需要用户一开机就是简体中文。所以把 Android 系统出厂设置为简体中文对于 Android 系统产品化非常重要,我们可以通过修改系统属性来达到默认语言的作用。本文主要是在 Android 11…

【mysql】出现 slow sql 问题及建议

文章目录 1. SQL 执行什么情况下会变慢&#xff1f;2. 影响 SQL 语句执行效率的主要因素有哪些&#xff1f;3. 慢 SQL 是如何拖垮数据库的&#xff1f;4. 最佳实践建议 1. SQL 执行什么情况下会变慢&#xff1f; ● 数据量增加&#xff1a;数据库中的数据量可能会逐渐增加&…

Jenkins 页面部分显示Http状态403 被禁止

前言 生产环境Jenkins部署了一段时间了&#xff0c;结果今天在流水线配置中&#xff0c;部分页面显示Jenkins 页面部分显示Http状态403 被禁止&#xff0c;修改配置点击保存之后偶尔也会出现这个。 问题 以下是问题图片 解决 在全局安全配置里面&#xff0c;勾选上启用代…

电脑磁盘分区形式是什么?如何更改?

磁盘分区形式介绍 在了解为什么以及如何更改分区形式之前&#xff0c;让我们对磁盘分区形式有一个基本的了解。一般来说&#xff0c;分区形式是指主引导记录&#xff08;MBR&#xff09;和 GUID 分区表&#xff08;GPT&#xff09;。 MBR和GPT是Windows系统中常用…

003传统图机器学习、图特征工程

文章目录 一. 人工特征工程、连接特征二. 在节点层面对连接特征进行特征提取三. 在连接层面对连接特征进行特征提取四. 在全图层面对连接特征进行特征提取 一. 人工特征工程、连接特征 节点、连接、子图、全图都有各自的属性特征&#xff0c; 属性特征一般是多模态的。除属性特…

从构建者到设计者的低代码之路

低代码开发技术&#xff0c;是指无需编码或通过少量代码就可以快速生成应用程序的工具&#xff0c;一方面可降低企业应用开发人力成本和对专业软件人才的需求&#xff0c;另一方面可将原有数月甚至数年的开发时间成倍缩短&#xff0c;帮助企业实现降本增效、灵活迭代。那么&…

MySQL 8.0 驱动与阿里druid版本兼容操作

注意&#xff1a;这个异常表面druid数据源的版本与MySql 8.0的驱动版本不匹配&#xff0c;解决方法如下&#xff1a; 确保MySql 8.0的驱动如下网址&#xff1a; <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifact…

Linux——环境变量

✅<1>主页&#xff1a;&#xff1a;我的代码爱吃辣 &#x1f4c3;<2>知识讲解&#xff1a;Linux——环境变量 ☂️<3>开发环境&#xff1a;Centos7 &#x1f4ac;<4>前言&#xff1a;环境变量(environment variables)一般是指在操作系统中用来指定操作…

嵌入式学习之链表

对于链表&#xff0c;要重点掌握链表和数组区别和实现&#xff0c;链表静态添加和动态遍历&#xff0c;链表中pointpoint-next,链表节点个数的查找&#xff0c;以及链表从指定节点后方插入新节点的知识。

服务器巡检表-监控指标

1、巡检指标 系统资源K8S集群NginxJAVA应用RabbitMQRedisPostgreSQLElasticsearchELK日志系统 2、巡检项 检查项目 检查指标 检查标准 系统资源 CPU 使用率 正常&#xff1a;&#xff1c;70% 低风险&#xff1a;≥ 70% 中风险&#xff1a;≥ 85% 高风险&#xff1a;≥ 9…

图片怎么压缩大小?这样压缩图片很简单

在日常生活中&#xff0c;我们常常需要处理各种各样的图片文件&#xff0c;但有时候图片的大小可能会成为问题。比如在上传图片到网站或者将图片发送给朋友时&#xff0c;过大的图片可能会导致上传速度变慢或者占用过多内存。这时&#xff0c;我们就需要用到图片压缩了&#xf…