flume案例

在构建数仓时,经常会用到flume接收日志数据,通常涉及到的组件为kafka,hdfs等。下面以一个flume接收指定topic数据,并存入hdfs的案例,大致了解下flume相关使用规则。

版本:1.9

Source

Kafka Source就是一个Apache Kafka消费者,它从Kafka的topic中读取消息。 如果运行了多个Kafka Source,则可以把它们配置到同一个消费者组,以便每个source都读取一组唯一的topic分区。

目前支持Kafka 0.10.1.0以上版本,最高已经在Kafka 2.0.1版本上完成了测试,这已经是Flume 1.9发行时候的最高的Kafka版本了。

属性名

默认值

解释

channels

与Source绑定的channel,多个用空格分开

type

组件类型,这个是: org.apache.flume.source.kafka.KafkaSource

kafka.bootstrap.servers

Source使用的Kafka集群实例列表

kafka.consumer.group.id

flume

消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID,表示它们是同一个消费者组

kafka.topics

将要读取消息的目标 Kafka topic 列表,多个用逗号分隔

kafka.topics.regex

会被Kafka Source订阅的 topic 集合的正则表达式。这个参数比 kafka.topics 拥有更高的优先级,如果这两个参数同时存在,则会覆盖kafka.topics的配置。

batchSize

1000

一批写入 channel 的最大消息数

batchDurationMillis

1000

一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。

backoffSleepIncrement

1000

当Kafka topic 显示为空时触发的初始和增量等待时间(毫秒)。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适, 但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。

maxBackoffSleep

5000

Kafka topic 显示为空时触发的最长等待时间(毫秒)。默认的5秒钟对于获取数据比较合适,但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。

useFlumeEventFormat

false

默认情况下,从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true,会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联,这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。

setTopicHeader

true

当设置为 true 时,会把存储Event的topic名字存储到header中,使用的key就是下面的 topicHeader 的值。

topicHeader

topic

如果 setTopicHeader 设置为 true ,则定义用于存储接收消息的 topic 使用header key。注意如果与 Kafka Sink 的 topicHeader 参数一起使用的时候要小心,避免又循环将消息又发送回 topic。

kafka.consumer.security.protocol

PLAINTEXT

设置使用哪种安全协议写入Kafka。可选值:SASL_PLAINTEXTSASL_SSLSSL ,有关安全设置的其他信息,请参见下文。

more consumer security props

如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议,参考 Kafka security 来为消费者增加安全相关的参数配置

Other Kafka Consumer Properties

其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数,比如: kafka.consumer.auto.offset.reset

必需的参数已用 粗体 标明。

已经弃用的一些属性:

属性名

默认值

解释

topic

改用 kafka.topics

groupId

flume

改用 kafka.consumer.group.id

zookeeperConnect

自0.9.x起不再受kafka消费者客户端的支持。以后使用kafka.bootstrap.servers与kafka集群建立连接

migrateZookeeperOffsets

true

如果找不到Kafka存储的偏移量,去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果在Zookeeper未找到偏移量,则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。

 Channel

此处选择memory channel,内存 channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。 必需的参数已用 粗体 标明。

属性

默认值

解释

type

组件类型,这个是: memory

capacity

100

内存中存储 Event 的最大数

transactionCapacity

100

source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大)

keep-alive

3

添加或删除一个 Event 的超时时间(秒)

byteCapacityBufferPercentage

20

指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比

byteCapacity

Channel 中最大允许存储所有 Event 的总字节数(bytes)。默认情况下会使用JVM可用内存的80%作为最大可用内存(就是JVM启动参数里面配置的-Xmx的值)。 计算总字节时只计算 Event 的主体,这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意,当你在一个 Agent 里面有多个内存 channel 的时候, 而且碰巧这些 channel 存储相同的物理 Event(例如:这些 channel 通过复制机制( 复制选择器 )接收同一个 source 中的 Event), 这时候这些 Event 占用的空间是累加的,并不会只计算一次。如果这个值设置为0(不限制),就会达到200G左右的内部硬件限制。

Sink

HDFS Sink ,这个Sink将Event写入Hadoop分布式文件系统(也就是HDFS)。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件(关闭当前文件并创建新文件)。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符,会由HDFS Sink进行动态地替换,以生成用于存储Event的目录或文件名。 使用此Sink需要安装hadoop, 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。 注意, 需要使用支持sync() 调用的Hadoop版本

属性名

默认值

解释

channel

与 Sink 连接的 channel

type

组件类型,这个是: hdfs

hdfs.path

HDFS目录路径(例如:hdfs://namenode/flume/webdata/)

hdfs.filePrefix

FlumeData

Flume在HDFS文件夹下创建新文件的固定前缀

hdfs.fileSuffix

Flume在HDFS文件夹下创建新文件的后缀(比如:.avro,注意这个“.”不会自动添加,需要显式配置)

hdfs.inUsePrefix

Flume正在写入的临时文件前缀,默认没有

hdfs.inUseSuffix

.tmp

Flume正在写入的临时文件后缀

hdfs.emptyInUseSuffix

false

如果设置为 false 上面的 hdfs.inUseSuffix 参数在写入文件时会生效,并且写入完成后会在目标文件上移除 hdfs.inUseSuffix 配置的后缀。如果设置为 true 则上面的 hdfs.inUseSuffix 参数会被忽略,写文件时不会带任何后缀

hdfs.rollInterval

30

当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒

hdfs.rollSize

1024

当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节

hdfs.rollCount

10

当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)

hdfs.idleTimeout

0

关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒

hdfs.batchSize

100

向 HDFS 写入内容时每次批量操作的 Event 数量

hdfs.codeC

压缩算法。可选值:gzipbzip2lzolzop` 、 ``snappy

hdfs.fileType

SequenceFile

文件格式,目前支持: SequenceFileDataStreamCompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数

hdfs.maxOpenFiles

5000

允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭

hdfs.minBlockReplicas

指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置。

hdfs.writeFormat

Writable

文件写入格式。可选值: TextWritable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件。

hdfs.threadsPoolSize

10

每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)

hdfs.rollTimerPoolSize

1

每个HDFS Sink实例调度定时文件滚动的线程数

hdfs.kerberosPrincipal

用于安全访问 HDFS 的 Kerberos 用户主体

hdfs.kerberosKeytab

用于安全访问 HDFS 的 Kerberos keytab 文件

hdfs.proxyUser

代理名

hdfs.round

false

是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符)

hdfs.roundValue

1

向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30

hdfs.roundUnit

second

向下舍入的单位,可选值: secondminutehour

hdfs.timeZone

Local Time

解析存储目录路径时候所使用的时区名,例如:America/Los_Angeles、Asia/Shanghai

hdfs.useLocalTimeStamp

false

使用日期时间转义符时是否使用本地时间戳(而不是使用 Event header 中自带的时间戳)

hdfs.closeTries

0

开始尝试关闭文件时最大的重命名文件的尝试次数(因为打开的文件通常都有个.tmp的后缀,写入结束关闭文件时要重命名把后缀去掉)。

如果设置为1,Sink在重命名失败(可能是因为 NameNode 或 DataNode 发生错误)后不会重试,这样就导致了这个文件会一直保持为打开状态,并且带着.tmp的后缀;

如果设置为0,Sink会一直尝试重命名文件直到成功为止;

关闭文件操作失败时这个文件可能仍然是打开状态,这种情况数据还是完整的不会丢失,只有在Flume重启后文件才会关闭。

hdfs.retryInterval

180

连续尝试关闭文件的时间间隔(秒)。 每次关闭操作都会调用多次 RPC 往返于 Namenode ,因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小,则如果第一次尝试失败,将不会再尝试关闭文件,并且可能导致文件保持打开状态或扩展名为“.tmp”。

serializer

TEXT

Event 转为文件使用的序列化器。其他可选值有: avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。

serializer.*

根据上面 serializer 配置的类型来根据需要添加序列化器的参数

完整案例如下:

a1.sources=r1
a1.channels=c1
a1.sinks=k1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hmcs030:9092,hmcs031:9092,hmcs032:9092
a1.sources.r1.kafka.topics= hmcs_network_enterprise_climb
a1.sources.r1.kafka.consumer.group.id = hmcs_network_enterprise_climb_group
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hmcs.interceptor.DecodeInterceptor$Buildera1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.parseAsFlumeEvent = falsea1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/flume/enterprise/networkEnterData/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = climbNetworkEnter-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 300
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType=DataStreama1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

启动命令如下:

nohup /usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/job/kafka_memory_hdfs.conf -n a1 -Dflume.root.logger=info,console >/usr/local/flume/logs/kafka_memory_hdfs.log 2>&1 &

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/639620.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

有色金属市场分析:预计2023年产量增幅在3.5%左右

上周各有色金属品种走势接近,均呈现出周初持续走弱、最后两个交易日反弹的走势。影响有色金属行情的主线逻辑一个是美国债务上限谈判的进展情况,另一个是全球经济衰退的预期。上周四和上周五市场整体反弹,主要由于美国债务上限谈判出现进展&a…

【FAQ】NPM 引入本地依赖包

背景 npm 本地依赖包分为 本地文件夹类型 本地文件夹类型的依赖包适用于在编写插件的 dome 示例项目时使用,在无需将包发布到 npm 仓库的情况,做到实时编译,修改 本地压缩包类型 压缩包类型的依赖包适用于没有外网和没有 npm 私有仓库&…

day27 组合总和 组合总和Ⅱ 分割回文串

题目1:39 组合总和 题目链接:39 组合总和 题意 找出无重复元素的正整数数组candidates中元素和为目标数target的所有不同组合,同一个数字可重复选取 回溯 回溯三部曲: 1)参数和返回值 2)终止条件 3…

php基础学习之常量

php常量的基本概念 常量是在程序运行中的一种不可改变的量(数据),常量一旦定义,通常不可改变(用户级别)。 php常量的定义形式 使用define函数:define("常量名字", 常量值);使用cons…

对有状态组件和无状态组件的理解及使用场景

(1)有状态组件 特点: ● 是类组件 ● 有继承 ● 可以使用this ● 可以使用react的生命周期 ● 使用较多,容易频繁触发生命周期钩子函数,影响性能 ● 内部使用 state,维护自身状态的变化,有状态组…

【C++】函数重载

C 中的函数重载(Function Overloading)是指在同一个作用域内,可以定义多个名称相同但参数列表不同的函数。通过函数重载,可以根据传递给函数的参数类型或数量的不同,选择适当的函数来执行。 函数重载的条件 C 函数重…

EasyDarwin计划新增将各种流协议(RTSP、RTMP、HTTP、TCP、UDP)、文件转推RTMP到其他视频直播平台,支持转码H.264、文件直播推送

之前我们尝试做过EasyRTSPLive(将RTSP流转推RTMP)和EasyRTMPLive(将各种RTSP/RTMP/HTTP/UDP流转推RTMP,这两个服务在市场上都得到了比较多的好评,其中: 1、EasyRTSPLive用的是EasyRTSPClient取流&#xff…

信号量、互斥锁并发机制

区分: 信号:通讯机制 信号量:并发控制 一、信号量:基于阻塞的并发控制机制 a.定义信号量 struct semaphore sem; b.初始化信号量 void sema_init(struct semaphore *sem, int val); c.获得信号量P操作 int down(struct semap…

内网穿透的应用-如何使用Docker部署Redis数据库并结合内网穿透工具实现公网远程访问

文章目录 前言1. 安装Docker步骤2. 使用docker拉取redis镜像3. 启动redis容器4. 本地连接测试4.1 安装redis图形化界面工具4.2 使用RDM连接测试 5. 公网远程访问本地redis5.1 内网穿透工具安装5.2 创建远程连接公网地址5.3 使用固定TCP地址远程访问 前言 本文主要介绍如何在Ub…

QT中QApplication对象有且只有一个

QT中QApplication对象有且只有一个 QApplication对象 QApplication对象 QApplication是应用程序对象 #include <QApplication> int main(int argc,char* argv[]); {//a对象在一个程序中有且只有一个&#xff0c;QT中要求必须有一个QApplication a&#xff08;argc,argv…

RT-Thread 17. 中断发送信号量后线程去处理信号量

1. 代码 //sem.c #include <rtthread.h> #include <rtdevice.h> #include "drv_gpio.h"#define THREAD_PRIORITY 25 #define THREAD_TIMESLICE 5#define LED3_PIN GET_PIN(B, 3) ALIGN(RT_ALIGN_SIZE) static char thread3_stack[1024]; static struct…

【Spring 篇】MyBatis多表操作:编织数据的交响乐

欢迎来到MyBatis的多表操作世界&#xff01;在这个充满交响乐的舞台上&#xff0c;我们将探索如何巧妙地编织多个数据表的数据&#xff0c;创造出一场旋律动听的数据交响曲。无需繁琐的SQL拼接&#xff0c;MyBatis让多表操作变得优雅而简单。让我们一起进入这个音乐殿堂&#x…

司铭宇老师:房地产电话营销培训:房地产电话销售话术与销售技巧:打造高转化率的电话营销策略

房地产电话营销培训&#xff1a;房地产电话销售话术与销售技巧&#xff1a;打造高转化率的电话营销策略 在房地产销售中&#xff0c;电话销售是一种常见且有效的营销手段。它不仅可以快速触达潜在客户&#xff0c;还能够建立起销售人员与客户之间的初步信任关系。然而&#xff…

大模型学习之书生·浦语大模型6——基于OpenCompass大模型评测

基于OpenCompass大模型评测 关于评测的三个问题Why/What/How Why What 有许多任务评测&#xff0c;包括垂直领域 How 包含客观评测和主观评测&#xff0c;其中主观评测分人工和模型来评估。 提示词工程 主流评测框架 OpenCompass 能力框架 模型层能力层方法层工具层 支持丰富…

C++从小白到初级工程师【个人学习笔记】

目录 1.背景2.基础二维数组概念二维数组定义方式 二维数组数组名称概念例子 函数的分文件编写概念示例 指针指针的基本概念指针变量的定义和使用 空指针和野指针空指针实例野指针实例 const修饰指针概念const修饰指针 --- 常量指针 指针和数组作用示例 指针和函数作用示例 指针…

项目解决方案:某城区(区县)社会面视频监控资源接入汇聚解决方案

目 录 一、概述 二、建设目标及需求 1.建设目标 2.需求分析 2.1 总体需求 2.2 需求细化 三、方案设计 1.设计依据 2.设计原则 3.设计方案 3.1.方案描述 3.2.组网说明 四、产品介绍 1.视频监控综合资源管理平台介绍 2.视频录像服务器和存储 2.1…

Android studio 之 弹窗PopupWindow

1.准备弹窗视图 popuop_layout.xml <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"android:orientation"horizontal"android:background"#00ffff&…

python算法与数据结构(搜索算法和拓扑排序算法)---深度优先搜索

课程目标 了解树/图的深度遍历&#xff0c;宽度遍历基本原理&#xff1b;会使用python语言编写深度遍历&#xff0c;广度遍历代码&#xff1b;掌握拓扑排序算法 搜索算法的意义和作用 搜索引擎 提到搜索两个子&#xff0c;大家都应该会想到搜索引擎&#xff0c;搜索引擎的基…

网络安全(初版,以后会不断更新)

1.网络安全常识及术语 资产 任何对组织业务具有价值的信息资产&#xff0c;包括计算机硬件、通信设施、IT 环境、数据库、软件、文档 资料、信息服务和人员等。 漏洞 上边提到的“永恒之蓝”就是windows系统的漏洞 漏洞又被称为脆弱性或弱点&#xff08;Weakness&#xff09;&a…

DC-7靶机做题记录

靶机下载地址&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1w2c_QKd_hOoR2AzNrdZjMg?pwdtdky 提取码&#xff1a;tdky 参考&#xff1a; DC7靶机地址&#xff1a;http://www.five86.com/downloads/DC-7.zipDC7靶场介绍: https://www.vulnhub.com/entry/dc-7,356/…