【数仓】flume软件安装及配置

相关文章

  • 【数仓】基本概念、知识普及、核心技术
  • 【数仓】数据分层概念以及相关逻辑
  • 【数仓】Hadoop软件安装及使用(集群配置)
  • 【数仓】Hadoop集群配置常用参数说明
  • 【数仓】zookeeper软件安装及集群配置
  • 【数仓】kafka软件安装及集群配置
  • 【数仓】flume软件安装及配置
  • 【数仓】flume常见配置总结,以及示例

一、flume有什么作用

Apache Flume是一个分布式、可靠且可用的大数据日志采集、聚合和传输系统。它主要用于将大量的日志数据从不同的数据源收集起来,然后通过通道(Channel)进行传输,最终将数据传输到指定的目的地,如HDFS、HBase等。Flume具有高度可扩展性、容错性和灵活性,可以适应各种复杂的数据采集场景。

Flume的核心组件包括Source、Channel和Sink。Source负责从数据源中读取数据,可以是文件、网络套接字、消息队列等;Channel是数据的缓冲区,用于在Source和Sink之间传输数据;Sink负责将数据写入目标存储系统,如HDFS、HBase、Kafka等。此外,Flume还支持多种类型的Source、Channel和Sink,用户可以根据实际需求进行选择和配置。

Flume的主要作用是实现大规模数据采集和传输,实现数据的实时处理和分析,从而为企业提供更好的业务决策支持。在实际应用中,Flume可以用于日志收集、事件跟踪、数据流处理等场景。通过将数据从不同的数据源采集并传输到指定的目的地,Flume可以帮助企业实现数据的集中存储和管理,为后续的数据分析和挖掘提供基础。

此外,Flume还具有可靠性机制和故障转移和恢复机制,能够保证数据传输的可靠性和安全性。同时,Flume还支持客户扩展和自定义开发,用户可以根据自己的需求进行扩展和优化,使其更加适合特定的应用场景。

总的来说,Apache Flume是一个功能强大、灵活可靠的大数据日志采集、聚合和传输系统,它在大数据处理中起到了至关重要的作用。

二、环境准备

准备1台虚拟机

  • Hadoop131:192.168.56.131

本例系统版本 CentOS-7.8,已安装jdk1.8

关闭防火墙

systemctl stop firewalld

zookeeper、kafka 已安装,且已启动

三、flume安装配置

1、flume下载安装

# 下载解压
wget --no-check-certificate https://dlcdn.apache.org/flume/1.11.0/apache-flume-1.11.0-bin.tar.gz
tar -xzvf apache-flume-1.11.0-bin.tar.gz
mv apache-flume-1.11.0-bin/ /data/flume/

2、配置环境变量

新增环境变量文件

vi /etc/profile.d/flume_env.sh

export FLUME_HOME=/data/flume
export PATH=$PATH:$FLUME_HOME/bin

使用source让新增环境生效

source /etc/profile

在命令行中输入flume-ng version命令,如果返回Flume的版本信息,则说明安装成功。

3、配置flume服务器属性

本例演示 flume 读取日志文件,然后发送到kafka中

1)配置日志存储路径

在flume主目录,执行 vim conf/log4j2.xml

<Property name="LOG_DIR">/data/flume/logs</Property>

2)新建配置文件 conf/job/file_to_kafka.conf,内容如下:

# 定义Agent的组件
# 设置source的名称为rl
al.sources = rl
# 设置channel的名称为cl
al.channels = cl# 配置source
# 指定source的类型为TAILDIR,这是一个能够追踪文件变化并读取新增内容的source
al.sources.rl.type = TAILDIR
# 定义文件组fl,这里fl是一个标识符,可以定义多个文件组,每个文件组可以包含多个文件模式
al.sources.rl.filegroups = fl
# 指定文件组fl的文件路径模式,/data/applog/log/app.* 表示匹配/data/applog/log/目录下以app开头的所有文件
al.sources.rl.filegroups.fl = /data/applog/log/app.*
# 指定positionFile的位置,该文件用于记录TAILDIR source读取文件的偏移量,以便在Flume重启后可以从上次的位置继续读取
al.sources.rl.positionFile = /data/flume/data/taildir_position.json# 配置channel
# 设置channel的类型为KafkaChannel,即数据将发送到Kafka
al.channels.cl.type = org.apache.flume.channel.kafka.KafkaChannel
# 指定Kafka集群的地址和端口,这里配置了3个Kafka broker
al.channels.cl.kafka.bootstrap.servers = hadoop131:9092,hadoop132:9092,hadoop133:9092
# 设置发送到Kafka的主题名称
al.channels.cl.kafka.topic = topic_log
# 设置parseAsFlumeEvent为false,表示发送到Kafka的数据不会被封装为Flume的Event格式,而是保持原始格式
al.channels.cl.parseAsFlumeEvent= false# 组装source和channel
# 将source rl连接到channel cl,表示rl读取的数据将发送到cl指定的Kafka channel中
al.sources.rl.channels = cl

这份配置文件定义了一个简单的Flume Agent,它使用TAILDIR source来监控某个目录下的日志文件变化,并将新增的日志内容发送到Kafka。配置文件中的注释详细解释了每个配置项的作用和含义。在实际部署时,需要根据实际环境调整配置文件中的路径、Kafka集群地址、主题名称等参数。

另外,请注意,Flume的Kafka Channel在某些版本中可能已经被标记为过时,推荐使用Kafka Sink。如果你使用的是较新的Flume版本,并且希望使用推荐的配置,那么应该使用Kafka Sink而不是Kafka Channel。在这种情况下,你需要配置一个Kafka Sink并将其绑定到一个普通的Memory Channel或File Channel。

使用Kafka Sink的配置示例如下:

# Define the components of the agent
agent.sources = tailSource
agent.channels = memoryChannel
agent.sinks = kafkaSink# Configure the source - TAILDIR
agent.sources.tailSource.type = TAILDIR
agent.sources.tailSource.filegroups = f1
agent.sources.tailSource.filegroups.f1 = /path/to/your/logfile.log
agent.sources.tailSource.positionFile = /path/to/flume/taildir_position.json
agent.sources.tailSource.fileHeader = true# Configure the channel - Memory
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1000# Configure the sink - KafkaSink
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.kafka.bootstrap.servers = kafka-server-1:9092,kafka-server-2:9092
agent.sinks.kafkaSink.kafka.topic = flume-logs
agent.sinks.kafkaSink.channel = memoryChannel# Bind the source and channel, and the sink and channel
agent.sources.tailSource.channels = memoryChannel
agent.sinks.kafkaSink.channel = memoryChannel

在这个示例中,我们配置了一个TAILDIR Source、一个Memory Channel和一个Kafka Sink。TAILDIR Source读取日志文件,Memory Channel在内存中缓存事件,Kafka Sink负责将事件发送到Kafka。

5、启动flume

1)创建flume启动脚本f1.sh

vi /usr/bin/f1.sh
# 修改文件权限
chmod 777 /usr/bin/f1.sh

2)复制如下内容

#!/bin/bash#1. 判断参数个数
if [ $# -lt 1 ]
thenecho Not Enough Arguement!exit;
ficase $1 in
"start")#遍历集群所有机器for host in hadoop131doecho --------------------  $host flume 启动 --------------------ssh $host "/data/flume/bin/flume-ng agent -n al -c /data/flume/conf/ -f /data/flume/conf/job/file_to_kafka.conf >/dev/null 2>&1 &"done
;;
"stop")#遍历集群所有机器for host in hadoop131doecho --------------------  $host flume 停止 --------------------ssh $host "ps -ef | grep file_to_kafka | grep -v grep | awk '{print \$2}' |xargs -n1 kill 9"done
;;
*)echo "Input Args Error..."
;;
esac

3)通过集群脚本 f1.sh 操作

f1.sh start

flume启动命令说明

以下是flume启动命令的常用参数:

参数默认值说明
--name-n无默认值,必须指定指定启动的Flume Agent的名称。这个名称应该与配置文件中定义的agent的名称一致。
--conf-c无默认值,通常设置为flume配置文件的目录指定Flume配置文件的目录。这个目录下应该包含flume的配置文件。
--conf-file-f无默认值,必须指定指定具体的Flume配置文件名。这个文件应该包含了Flume Agent的配置信息。
--zkConnString-z无默认值当Flume配置使用Zookeeper进行集群管理时,指定Zookeeper的连接字符串。格式为主机名:端口号,多个节点用逗号分隔。
-Dflume.root.logger无默认值,通常设置为INFO,console设置Flume的日志级别和输出方式。例如,INFO,console表示日志级别为INFO,并输出到控制台。也可以设置为输出到日志文件。
--no-reload-conffalse如果设置为true,那么Flume将不会重新加载配置文件,即使配置文件发生了变化。
--help-h无默认值显示帮助信息,列出所有可用的启动参数。

需要注意的是,Flume的启动参数可能会因版本和具体的使用场景而有所不同。上表中的参数是最常用的,但并不是所有的参数都在所有版本的Flume中都可用。在实际使用时,建议查阅对应版本的Flume官方文档或使用flume-ng agent --help命令查看可用的参数列表。

6、验证日志采集通路

1)在指定的log目录中生成日志文件

cat app.log >> /data/applog/log/app1.log

2)查看flume.log日志文件,发现自动读取到文件

Opening file: /data/applog/log/app1.log, inode: 34663712, pos: 0

3)查看kafka数据,发现自动创建配置的topic,topic_log

[root@hadoop131 kafka]# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
topic_log

参考

  • https://flume.apache.org/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/725706.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决WordPress更新插件或者更新版本报WordPress 需要访问您网页服务器的权限的问题

文章目录 前言一、原因二、解决步骤总结 前言 当对WordPress的插件或者版本进行更新时报错&#xff1a;要执行请求的操作&#xff0c;WordPress 需要访问您网页服务器的权限。 请输入您的 FTP 登录凭据以继续。 如果您忘记了您的登录凭据&#xff08;如用户名、密码&#xff09…

光线追踪7 - 抗锯齿(Antialiasing)

目前为止&#xff0c;如果你放大渲染出的图像&#xff0c;可能会注意到图像边缘的明显“阶梯状”效果。这种阶梯效果通常被称为“走样”或“锯齿”。当真实相机拍摄图片时&#xff0c;边缘通常没有锯齿&#xff0c;因为边缘像素是一些前景和一些背景的混合。请考虑&#xff0c;…

5. 链接和加载(linker and loader)

链接和加载(linker and loader)&#xff1a; linker即链接器&#xff0c;它负责将多个.c编译生成的.o文件&#xff0c;链接成一个可执行文件或者是库文件&#xff1b; loader即加载器&#xff0c;它原本的功能很单一只是将可执行文件的段拷贝到编译确定的内存地址即可&#x…

英福康INFICON残余气体RGA General Chinese中文培训PPT课件

英福康INFICON残余气体RGA General Chinese中文培训PPT课件

【树上倍增】【割点】 【换根法】3067. 在带权树网络中统计可连接服务器对数目

作者推荐 视频算法专题 本文涉及知识点 树上倍增 树 图论 并集查找 换根法 深度优先 割点 LeetCode3067. 在带权树网络中统计可连接服务器对数目 给你一棵无根带权树&#xff0c;树中总共有 n 个节点&#xff0c;分别表示 n 个服务器&#xff0c;服务器从 0 到 n - 1 编号…

Java | 在消息对话框中显示文本

首先需要导入JOptionPane类&#xff0c;JOptionPane类属于Swing组件中的一种&#xff0c;其导入方式如下&#xff1a; import javax.swing.JOptionPane;可以使用JOptionPane的showMessageDialog方法显示消息文本。 参数格式&#xff1a; JOptionPane.showMessageDialog(paren…

【C语言】指针详细解读2

1.const 修饰指针 1.1 const修饰变量 变量是可以修改的&#xff0c;如果把变量的地址交给⼀个指针变量&#xff0c;通过指针变量的也可以修改这个变量。 但是如果我们希望⼀个变量加上⼀些限制&#xff0c;不能被修改&#xff0c;怎么做呢&#xff1f;这就是const的作⽤。 …

AI推介-多模态视觉语言模型VLMs论文速览(arXiv方向):2024.03.01-2024.03.05

论文目录~ 1.CLEVR-POC: Reasoning-Intensive Visual Question Answering in Partially Observable Environments2.Feast Your Eyes: Mixture-of-Resolution Adaptation for Multimodal Large Language Models3.MADTP: Multimodal Alignment-Guided Dynamic Token Pruning for …

RK3568平台开发系列讲解(基础篇)注册字符设备

🚀返回专栏总目录 文章目录 一、字符设备初始化二、字符设备的注册和注销三、实验代码沉淀、分享、成长,让自己和他人都能有所收获!😄 注册字符设备可以分为两个步骤: 字符设备初始化字符设备的添加一、字符设备初始化 字符设备初始化所用到的函数为 cdev_init(…),在对…

Django面对高并发现象时处理方法

首先&#xff0c;我们需要使用适当的数据库引擎来处理高并发。默认情况下&#xff0c;Django使用的是SQLite数据库&#xff0c;但在高并发的情况下&#xff0c;它可能会变得非常慢。我们可以考虑使用更适合高并发的数据库&#xff0c;如MySQL或PostgreSQL。这些数据库引擎具有更…

解决QMYSQL driver not loaded问题

前言 之前都是在Qt5.51上开发&#xff0c;连接mysql数据库一直没有问题&#xff0c;换到5.15.2后一直报错 一查才发现\5.15.2\msvc2019_64\plugins\sqldrivers目录下没有qsqlmysql了&#xff0c;5.5.1是有的&#xff0c;5.15.2是要自己编译的。。。 下载源码 安装qt的时候没…

什么是IoC和AOP?

如何在实际项目中应用这些设计模式&#xff1f; 在实际项目中应用设计模式需要根据项目的需求和特点进行具体的选择和实现。以下是一些常见的方法和建议&#xff1a; 了解设计模式&#xff1a; 首先需要对各种设计模式有深入的了解&#xff0c;包括它们的原理、优缺点以及适用…

Vue tree树状结构数据转扁平数据

//数据结构可参考饿了么UItreeData: [{id: 1,label: Level one 1,type: 1,children: [{id: 4,label: Level two 1-1,type: 2,children: [{id: 9,label: Level three 1-1-1,type: 3}, {id: 10,label: Level three 1-1-2,type: 3}]}, {id: 11,label: Level three 1-2,type: 2,chi…

查看kafka消息消费堆积情况

查看主题命令 展示topic列表 ./kafka-topics.sh --list --zookeeper zookeeper_ip:2181描述topic ./kafka-topics.sh --describe --zookeeper zookeeper_ip:2181 --topic topic_name查看topic某分区偏移量最大&#xff08;小&#xff09;值 ./kafka-run-class.sh kafka.too…

2024-3-6 python列表的切片赋值

切片赋值 如果把切片放在赋值语句的左边&#xff0c;或把它作为del操作的对象&#xff0c;我们就可以对序列进行嫁接、切除 或就地修改操作。 >>> l [i for i in range(20)] >>> l [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 1…

小白如何快速入门计算机视觉?

去年 11 月的时候&#xff0c;给自己了一个目标&#xff0c;希望在未来的 3个月时间里&#xff0c;写满100篇关于从零入门AI视觉的算法、代码文字。 历经 3 个月&#xff0c;终于在今天 100 篇文章写完了&#xff0c;代码也全部调试完成&#xff0c;上传到 github 上开源给大家…

旧物回收小程序开发:环保与科技的创新结合

随着科技的飞速发展&#xff0c;我们的日常生活越来越离不开手机应用程序。而在环保日益成为社会焦点的今天&#xff0c;如何将科技与环保相结合&#xff0c;成为了一个值得深思的问题。今天&#xff0c;我们将探讨旧物回收小程序的开发&#xff0c;它如何助力环保&#xff0c;…

【重要公告】BSV区块链上线TypeScript SDK,未来将支持更多开发语言

​​发表时间&#xff1a;2024年2月21日 BSV区块链协会宣布上线JavaScript和TypeScript SDK&#xff08;即“标准开发工具包”&#xff09;。TypeScript SDK旨在为开发者提供新版统一核心代码库&#xff0c;以便利开发者在BSV区块链上开发能够任意扩容的应用程序。新上线的SDK替…

SpringBoot集成自然语言处理hanlp工具包

HanLP是一系列模型与算法组成的NLP工具包&#xff0c;目标是普及自然语言处理在生产环境中的应用。 下载与配置 <dependency><groupId>com.hankcs</groupId><artifactId>hanlp</artifactId><version>portable-1.8.4</version> <…