免费网站建设讯息/百度官方下载

免费网站建设讯息,百度官方下载,产品页面设计模板,江西省住房保障建设厅网站1. Flume 简介 Apache Flume 是一个专门用于高效地 收集、聚合、传输 大量日志数据的 分布式、可靠 的系统。它特别擅长将数据从各种数据源(如日志文件、消息队列等)传输到 HDFS、HBase、Kafka 等大数据存储系统。 特点: 可扩展&#xff1…

1. Flume 简介

Apache Flume 是一个专门用于高效地 收集、聚合、传输 大量日志数据的 分布式、可靠 的系统。它特别擅长将数据从各种数据源(如日志文件、消息队列等)传输到 HDFSHBaseKafka 等大数据存储系统。

特点

  • 可扩展:支持大规模数据传输,灵活扩展
  • 容错性:支持数据恢复和失败重试,确保数据不丢失
  • 多种数据源:支持日志文件、网络数据、HTTP请求、消息队列等多种来源
  • 流式处理:数据边收集边传输,支持实时传输

2. Flume 架构

Flume 的核心架构由三大组件构成,理解它们对掌握 Flume 的原理至关重要:

2.1 Source(来源)

负责从数据源获取数据,比如:

  • taildir(监听日志文件)
  • exec(执行命令读取数据)
  • kafka(从 Kafka 消费数据)
  • netcat(监听端口接收数据)

2.2 Channel(通道)

作为 缓冲区,临时存储数据,支持两种常见类型:

  • Memory Channel(内存通道):速度快,但重启可能丢数据
  • File Channel(文件通道):写入磁盘,保证数据持久性

2.3 Sink(下游输出)

负责将数据写入目标位置,支持:

  • HDFS(写入 Hadoop 分布式文件系统)
  • HBase(写入 HBase 数据库)
  • Kafka(推送到 Kafka)
  • ElasticSearch(支持实时检索)

补充组件

  • Sink Processor:管理多个 Sink,支持负载均衡、故障转移
  • Interceptor:在数据进入 Channel 前拦截处理,比如格式转换、过滤数据等

3. Flume 数据流动原理

数据在 Flume 中是按事件 (Event) 传输的,基本流程如下:

1️⃣ Source 从外部采集数据,将每条数据封装为一个 Event
2️⃣ Event 进入 Channel 暂存
3️⃣ Sink 从 Channel 拉取数据,写入目标系统

👉 示例流程
Web日志 -> Source(taildir) -> Channel(Memory Channel) -> Sink(HDFS)


4. Flume 部署模式

Flume 支持灵活的部署方式,主要有三种:

  • 单机模式:Source、Channel、Sink 都在同一节点,简单但不适合大规模数据
  • 多机流模式:多个 Flume 节点串联,Source 采集数据,Sink 输出到下一个 Flume 节点的 Source,逐层转发
  • 多 Agent 模式:多个 Flume Agent 独立采集数据,汇总到统一 Sink

5. 本地部署(单机)

下载地址:http://archive.apache.org/dist/flume/

tar -zxf apache-flume-1.9.0-bin.tar.gz -C /export/server/
cd /export/server/
mv apache-flume-1.9.0-bin/ flume--将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
cd flume/lib
rm guava-11.0.2.jar

6.Flume 入门案例

6.1 监控端口数据官方案例

案例需求: 使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

实现步骤:

yum install -y nc--判断 44444 端口是否被占用
netstat -nlp | grep 44444cd /export/server/flume/
mkdir job
cd job/vim net-flume-logger.conf
--添加如下内容
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 开启 flume 监听端口

第一种写法:

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/net-flume-logger.conf -
Dflume.root.logger=INFO,console

第二种写法:

bin/flume-ng agent -c conf/ -n a1 -f job/net-flume-logger.conf -Dflume.root.logger=INFO,console

参数说明:

--conf/-c:表示配置文件存储在 conf/目录

--name/-n:表示给 agent 起名为 a1

--conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件。

-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error。

此时服务端已经开启,新建一个会话作为客户端,使用 netcat 工具向本机的 44444 端口发送内容

nc localhost 44444
hello

在 Flume 监听页面观察接收数据情况

6.2 实时监控单个追加文件

案例需求:实时监控 Hive 日志,并上传到 HDFS 中

实现步骤:

确认 Hadoop 和 Hive 环境已经配置,没有配置的可以参考这两篇文章

本地部署HDFS集群https://blog.csdn.net/m0_73641796/article/details/145998092?spm=1001.2014.3001.5501


本地部署Hive集群https://blog.csdn.net/m0_73641796/article/details/146078614?spm=1001.2014.3001.5501

创建 flume-file-hdfs.conf 文件 

vim flume-file-hdfs.conf
--添加如下内容# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /export/server/hive/logs/hive.log# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://node1:8020/flume/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
# 重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

确保 hadoop 用户有写入权限

mkdir -p /export/server/flume/logs
chown -R hadoop:hadoop /export/server/flume/logs
chmod -R 755 /export/server/flume/logs

 运行 Flume(用Hadoop用户运行,因为要操作HDFS) 

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

新建会话,开启 Hadoop 和 Hive 并操作 Hive 产生日志

su hadoopstart-dfs.sh
start-yarn.shcd /export/server/hive/
nohup bin/hive --service metastore  >> logs/metastore.log 2>&1 &
bin/hive

在 HDFS 上查看文件

6.3 实时监控目录下多个新文件

案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS

实现步骤:

创建配置文件

vim flume-dir-hdfs.conf
--添加如下内容a3.sources = r3
a3.sinks = k3
a3.channels = c3# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /export/server/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
# 忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload/%Y%m%d/%H
# 上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
# 是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
# 重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
# 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
# 多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 30
# 设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

创建目录并启动

--创建目录
mkdir upload
chown hadoop:hadoop upload/--启动flume(用hadoop用户)
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

新建会话,向 upload 文件夹中添加文件

cd /export/server/flume/upload/
echo "hello world" > 1.txt

查看 HDFS 上的数据

6.4 实时监控目录下的多个追加文件

案例需求:使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS

实现步骤:

创建配置文件

vim flume-taildir-hdfs.conf
--添加如下内容a3.sources = r3
a3.sinks = k3
a3.channels = c3# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /export/server/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /export/server/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /export/server/flume/files2/.*log.*# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

创建目录并启动

--创建目录
mkdir files files2
chown hadoop:hadoop files
chown hadoop:hadoop files2--启动flume(用hadoop用户)
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf

向 upload 文件夹中添加文件

cd files
echo hello >> file1.txt

查看 HDFS 上的数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/898146.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring源码解析

第一讲 容器接口 BeanFactory和ApplicationContext接口的具体继承关系: ApplicationContext 间接继承了BeanFactory BeanFactory是父接口ApplicationContext是子接口,里面一些功能调用了BeanFactory BeanFactory的功能 表面上只有 getBean&#xff0…

Django Rest Framework 创建纯净版Django项目部署DRF

描述创建纯净版的Django项目和 Django Rest Framework 环境的部署 一、创建Django项目 1. 环境说明 操作系统 Windows11python版本 3.9.13Django版本 V4.2.202. 操作步骤(在Pycharm中操作) 创建Python项目drfStudy、虚拟环境 ​虚拟环境中安装 jdangopip install django==4.…

图解AUTOSAR_CP_NetworkManagementInterface

AUTOSAR 网络管理接口(Nm)详解 AUTOSAR 网络管理接口规范与实现指南 目录 1. 概述 1.1. 网络管理接口的作用1.2. 网络管理接口的特点 2. 网络管理接口架构 2.1. 架构概览2.2. 模块交互关系 3. 网络管理状态机 3.1. 状态定义3.2. 状态转换 4. 协调算法 4.1. 协调关闭流程4.2. 同…

java学习总结(八):Spring boot

一、SpringBoot简介 传统Spring开发缺点: 1、导入依赖繁琐 2、项目配置繁琐 Spring Boot是全新框架(更像是一个工具, 脚手架),是Spring提供的一个子项目, 用于快速构建Spring应用程序。 随着Spring 3.0的发布,Spring 团…

openEuler24.03 LTS下安装MySQL8

前提条件 拥有openEuler24.03 LTS环境,可参考:Vmware下安装openEuler24.03 LTS 步骤 卸载原有mysql及mariadb sudo systemctl stop mysql mysqld 2>/dev/null sudo rpm -qa | grep -i mysql\|mariadb | xargs -n1 sudo rpm -e --nodeps 2>/dev/…

【失败了】LazyGraphRAG利用本地ollama提供Embedding model服务和火山引擎的deepseek API构建本地知识库

LazyGraphRAG测试结果如下 数据: curl https://www.gutenberg.org/cache/epub/24022/pg24022.txt -o ./ragtest/input/book.txt 失败了 气死我也!!!对deepseek-V3也不是很友好啊,我没钱prompt 微调啊,晕死…

ccfcsp3402矩阵重塑(其二)

//矩阵重塑&#xff08;其二&#xff09; #include<iostream> using namespace std; int main(){int n,m,t;cin>>n>>m>>t;int c[10000][10000];int s0,sum0;int d[10000],k[100000];for(int i0;i<n;i){for(int j0;j<m;j){cin>>c[i][j];d[s…

Unity Shader - UI Sprite Shader之简单抠图效果

Sprite抠图效果&#xff1a; 前言 在PhotoShop中我们经常会用到抠图操作&#xff0c;现在就用Shader实现一个简单的抠图效果。 实现原理&#xff1a; 使用当前像素颜色与需要抠掉的颜色相减作比较&#xff0c;然后与一个指定的阈值比较以决定是否将其显示出来&#xff1b; U…

【Mac】安装 Parallels Desktop、Windows、Rocky Linux

一、安装PD 理论上&#xff0c;PD只支持试用15天&#xff01;当然&#xff0c;你懂的。 第一步&#xff0c;在 Parallels Desktop for Mac 官网 下载 Install Parallels Desktop.dmg第二步&#xff0c;双击 Install Parallels Desktop.dmg 第三步&#xff0c;双击安装Paralle…

学习单片机需要多长时间才能进行简单的项目开发?

之前有老铁问我&#xff0c;学单片机到底要多久&#xff0c;才能进行简单的项目开发&#xff1f;是三个月速成&#xff0c;还是三年磨一剑&#xff1f; 今天咱们就来聊聊这个话题&#xff0c;我不是什么高高在上的专家&#xff0c;就是个踩过无数坑、烧过几块板子的“技术老友”…

centos操作系统上传和下载百度网盘内容

探序基因 整理 进入百度网盘官网百度网盘 客户端下载 下载linux的rpm格式的安装包 在linux命令行中输入&#xff1a;rpm -ivh baidunetdisk_4.17.7_x86_64.rpm 出现报错&#xff1a; 错误&#xff1a;依赖检测失败&#xff1a; libXScrnSaver 被 baidunetdisk-4.17.7-1.x8…

蓝牙系统的核心组成解析

一、硬件层&#xff1a;看得见的物理载体 1. 射频模块&#xff08;Radio Frequency Module&#xff09; 专业描述&#xff1a;工作在2.4GHz ISM频段&#xff0c;支持GFSK/π/4 DQPSK/8DPSK调制方式 功能类比&#xff1a;相当于人的"嘴巴"和"耳朵" 发射端…

go的gmp

参考链接&#xff1a;https://www.bilibili.com/video/BV19r4y1w7Nx Golang的GMP调度模型(协程调度器)是其并发编程的核心。GMP代表Goroutine、Machine和Processor三个关键组成部分。Goroutine是Go语言中的轻量级线程&#xff0c;Machine是操作系统的线程&#xff0c;Processor…

Vue3-高级特性

一、Vue中自定义指令 1.认识自定义指令 在Vue的模板语法中我们学习过各种各样的指令&#xff1a;v-show、v-for、v-model等等&#xff0c;除了使用这些指令之外&#xff0c;Vue也允许我们来 自定义自己的指令。 注意&#xff1a;在Vue中&#xff0c;代码的复用和抽象主要还是…

基于CPLD电力/轨道交通3U机箱开关量输出板(DO)

板卡简介&#xff1a; 本板为开关量输出板&#xff08;DO&#xff09;&#xff0c;采用固态继电器用于电平输出或负载驱动&#xff0c;典型输出高电平为DC110V&#xff0c;低电平为0V。 性能规格&#xff1a; 电源&#xff1a;DC5V&#xff0c;DC3.3V&#xff0c;DC15V&#…

【C++经典例题】反转字符串中单词的字符顺序:两种实现方法详解

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 ⏩ 文章专栏&#xff1a;C经典例题 期待您的关注 目录 问题描述 基于快慢指针的解法 基于索引的解法 两种方法的比较 问题描述 在处理字符串相关的问题…

Java基础语法练习45(网络编程)

目录 一、网络的相关概念 1.网络通信 2.网络 3.ip 地址 4.ipv4 地址分类 5.域名 6.网络通信协议 7.TCP 和 UDP 二、InetAddress类 1.相关方法 2.代码示例如下&#xff1a; 三、Socket 1.基本介绍 四、TCP 网络通信编程 1.基本介绍 2.应用示例&#xff1a; 2.1…

【Json—RPC框架】:宏定义不受命名空间限制,续行符的错误使用造成的bug

为什么不受命名空间的限制&#xff1f; 宏处理在预处理阶段&#xff0c; 预处理在编译之前&#xff0c;编译才进行语法分析&#xff0c;语义分析。命名空间也只能限制这部分。 在Json-RPC框架的实现中&#xff0c;遇到如下问题。一开始以为是在实现日志宏的时候&#xff0c;有…

四川省包含哪些水系

背景&#xff1a; 想知道四川省包含哪些水系&#xff0c;以及各个水系的分布&#xff0c;起点、流经省市、终点等 {label: "嘉陵江",value: "嘉陵江",},{label: "渠江",value: "渠江",},{label: "涪江",value: "涪江&q…

子序列问题写法

子序列问题可以按照动态规划的思想去写。 子序列问题类型 子序列 是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。 例如&#xff0c;[3,6,2,7] 是数组 [0,3,1,6,2,2,7] 的子序列。 写法思路 创建两层for循环…