二百零七、Flume——Flume实时采集5分钟频率的Kafka数据直接写入ODS层表的HDFS文件路径下

一、目的

在离线数仓中,需要用Flume去采集Kafka中的数据,然后写入HDFS中。

由于每种数据类型的频率、数据大小、数据规模不同,因此每种数据的采集需要不同的Flume配置文件。玩了几天Flume,感觉Flume的使用难点就是配置文件

二、使用场景

转向比数据是数据频率为5分钟的数据类型代表,数据量很小、频率不高,因此搞定了转向比数据的采集就搞定了这一类低频率数据的实时采集问题

1台设备每日的转向比数据规模是30KB,25台设备的数据规模则是750KB

三、转向比数据ODS层建表

create external table  if not exists  ods_turnratio(turnratio_json  string
)
comment '转向比数据外部表——静态分区'
partitioned by (day string)
row format delimited fields terminated by '\x001'
lines terminated by '\n'
stored as SequenceFile
tblproperties("skip.header.line.count"="1");

四、转向比数据的配置文件

## agent a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1

## configure source s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092
a1.sources.s1.kafka.topics = topic_b_turnratio
a1.sources.s1.kafka.consumer.group.id = turnratio_group
a1.sources.s1.kafka.consumer.auto.offset.reset = latest
a1.sources.s1.batchSize = 1000

## configure channel c1
## a1.channels.c1.type = memory
## a1.channels.c1.capacity = 10000
## a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/turnratio
a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/turnratio

## configure sink k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_turnratio/day=%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = turnratio
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 62500
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 600
a1.sinks.k1.hdfs.minBlockReplicas = 1

## Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

注意:62500约为61KB

五、Flume写入HDFS结果

Flume根据时间戳按照ODS层表的分区,将数据写入对应HDFS文件

25台设备,50分钟1个文件,文件大小66.18 KB 

六、ODS表刷新分区后查验数据

(一)刷新表分区

MSCK REPAIR TABLE ods_turnratio;

(二)查看表数据

select * from ods_turnratio;

(三)验证数据完整性

--2023-11-19 数据基本完整  23时297条 标准300  少3条
--2023-11-20 数据基本完整  23时299条 标准300  少1条

数据基本完整,尤其是调度文件大小之后

19日a1.sinks.k1.hdfs.rollSize = 31250        数据基本完整 23时297条 标准300 少3条

20日a1.sinks.k1.hdfs.rollSize = 62500        数据基本完整 23时299条 标准300 少1条

七、注意点

(一)配置文件中的重点是红色标记的几点

a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 62500
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 600
a1.sinks.k1.hdfs.minBlockReplicas = 1

(二)任务配置文件中rollSize参数设置可大不可小

rollSize参数小的话数据会丢失,大的话没问题

配置文件的参数还是不断调试中,争取调到最优的状态。能够及时、完整的消费Kafka数据,并且能够最大化的利用HDFS资源。

目前就先这样,如果有问题的话后面再更新!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/157024.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

信号的处理时机(内核态,用户态,如何/为什么相互转换,内核空间,cpu寄存器),信号的处理流程详细介绍+抽象图解

目录 信号的处理时机 引入 思考 -- 什么时候才能算合适的时候呢? 用户态转为内核态 引入 内核地址空间 引入 思考 -- 进程为什么能切换成内核态呢? 虚拟地址空间 注意点 原理 (总结一下) 为什么如何进入内核态 引入 介绍 底层原理(int 80) cpu的寄存器 用…

nodejs express vue uniapp新闻发布系统源码

开发技术: node.js,mysql5.7,vscode,HBuilder nodejs express vue uniapp 功能介绍: 用户端: 登录注册 首页显示搜索新闻,新闻分类,新闻列表 点击新闻进入新闻详情&#xff0…

Jetson JetPack-5.1.2-L4T-R35.4.1 修复deskew algorithm的问题

1. 前言 官方Orin Nano开发套件 版本: JetPack 5.1.2 2. 问题描述 使用的是带有Orin Nano EVK的GMSL相机,但我无法看到MIPI帧。 这里是媒体设备信息: Media controller API version 5.10.120Media device information ------------------------ driver tegra-…

MySQL内部存储代码常用实现方式

MySQL内部存储代码方式包括SQL语言、存储过程、触发器、函数等。其中,最常用的是SQL语言和存储过程。 一个应用场景是,假设有一个电商网站的订单系统,需要在用户下单成功后自动发送一封邮件通知用户订单号和交易金额。这时可以通过存储过程实…

SSH配置免密登录

配置免密登录的步骤 要配置SSH免密登录,需要进行以下几步: 步骤一:生成SSH密钥对 首先,在本地计算机上生成SSH密钥对。这包括一个私钥(private key)和一个公钥(public key)。 打…

计算机网络——网络可靠性及网络出口配置

1. 前言: 学习目标: 1.了解链路聚合的作用 2. 了解ACL的工作原理 3. 了解NAT的工作原理和配置 2. 网络可靠性方案 网络可靠性是指网络在面对各种异常情况或故障时,能够维持正常运行和提供服务的能力。这包括防止网络中断、减小数据丢失的可能…

spring boot零配置

spring boot是如何选择tomcat还是Jett作为底层服务器的呢? springboot通过ServletWebServerApplicationContext的onRefresh()方法,会创建web服务 protected void onRefresh() {super.onRefresh();try {// 创建web服务createWebServer();}catch (Throwab…

PyTorch 之 Dataset 类入门学习

PyTorch 之 Dataset 类入门学习 Dataset 类简介 PyTorch 中的 Dataset 类是一个抽象类,用来表示数据集。通过继承 Dataset 类可以进行自定义数据集的格式、大小和其它属性,供后续使用; 可以看到官方封装好的数据集也是直接或间接的继承自 …

6.Gin 路由详解 - GET POST 请求以及参数获取示例

6.Gin 路由详解 - GET POST 请求以及参数获取示例 GET POST 请求以及参数获取示例 Get 请求:获取 Quary 参数 // 获取query参数示例:GET /user?uid20&namejack&page1 r.GET("/user", func(c *gin.Context) {// 获取参数// Query获取参…

[机缘参悟-119] :一个IT人的反思:反者道之动;弱者,道之用 VS 恒者恒强,弱者恒弱的马太效应

目录 前言: 一、道家的核心思想 二、恒者恒强,弱者恒弱的马太效应 三、马太效应与道家思想的统一 3.1 大多数的理解 3.2 个人的理解 四、矛盾的对立统一 前言: 马太效应和强弱互转的道家思想,都反应了自然规律和社会规律&…

SVN 修改版本库地址url路径

一、win11用户 1. win11系统右链菜单比较优秀,如果菜单中选择“TortoiseSVN”找不到“重新定位”,如下图所示,则需要添加右键菜单: 2.添加右键菜单:选择“TortoiseSVN”,点击设置,如下图所示&a…

Vue 项目实战——如何在页面中展示 PDF 文件以及 PDFObject 插件实战

文章目录 📋前言🎯使用 HTML 标签🧩 embed 标签🧩 object标签🧩 iframe标签🧩完整代码 🎯使用 PDFObject 插件🧩为什么使用 PDFObject 插件(AI翻译)&#x1f…

【微服务】SaaS云智慧工地管理平台源码

智慧工地系统是一种利用人工智能和物联网技术来监测和管理建筑工地的系统。它可以通过感知设备、数据处理和分析、智能控制等技术手段,实现对工地施工、设备状态、人员安全等方面的实时监控和管理。 一、智慧工地让工程施工智能化 1、内容全面,多维度数…

uniapp相关记录

一、自定义我的物品组件 my_goods.vue <template><view class"goods-item"><!-- 左侧 --><view class"goods-item-left"><radio :checked"goods.goods_state" color"#c00000" v-if"showRadio" …

【洛谷 B2003】输出第二个整数 题解(顺序结构+输入输出)

输出第二个整数 题目描述 输入三个整数&#xff0c;整数之间由一个空格分隔。把第二个输入的整数输出。 输入格式 只有一行&#xff0c;共三个整数&#xff0c;整数之间由一个空格分隔。 输出格式 只有一行&#xff0c;一个整数&#xff0c;即输入的第二个整数。 样例 #…

华为云IoT与OpenHarmony深度协同,加速设备上鸿即上云【云驻共创】

本次专题论坛探讨了华为云IoT与Open Harmony的深度协同、边缘屏蔽硬件差异、实现智慧隧道全方位智能化管理&#xff0c;以及华为云与Open Harmony生态的合作。同时也介绍了华为云物联网卡平台、HTTP2协议以及华为物联网在交通领域的应用。 一&#xff0e;华为云IoT与Open Harm…

Vue rules校验规则详解

Vue.js 提供了一套轻量级的、可扩展的模板校验规则。这些规则可以通过在v-model绑定中添加.modifier来使用&#xff0c;例如v-model.trim 下面是一些常见的 Vue.js 校验规则&#xff1a; required: 检查值是否非空email: 检查值是否符合电子邮件格式min: 检查值是否大于等于指…

数学几百年重大错误:将两异函数误为同一函数

黄小宁 因各实数都可是数轴上点的坐标所以数集A可形象化为数轴上的点集A&#xff0c;从而使x∈R变换为实数yxδ的几何意义可是&#xff1a;一维空间“管道”g内R轴上的质点x∈R(x是点的坐标)运动到新的位置yxδ还在管道g内&#xff08;设各点只作位置改变而没别的改变即变位前…

Vue学习

1。 搭框架 依赖等 创建vue项目 vue create 项目名称 vue create [options] <app-name>使用vite npm init vitelatest <app-name>-- --template vue 目录调整1 apiutilsvenderimages、styles 配置文件 jsconfig.json 配置之后路径可以直接使用 / {"comp…

mysql 查询

-- 多表查询select * from tb_dept,tb_emp; 内来链接 -- 内连接 -- A 查询员工的姓名 &#xff0c; 及所属的部门名称 &#xff08;隐式内连接实现&#xff09;select tb_emp.name,tb_dept.name from tb_emp,tb_dept where tb_emp.idtb_emp.id;-- 推荐使用select a.name,b.n…