Flume 的基本介绍和安装部署

一、Flume 概述

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的框架服务
Flume 基于流式架构,灵活简单,能够实时读取服务器本地磁盘的数据,将数据写入到 HDFS

在这里插入图片描述

二、Flume 基础架构

在这里插入图片描述

  • Agent:一个 JVM 进程,它以事件的形式将数据从源头送至目的,一个 Flume 集群有多个 Agent。Agent 主要有 3 个部分组成:Source、Channel、Sink
  • Source:负责接收数据到 Flume Agent 的组件。可以处理各种类型、各种格式的日志数据,包括:avro、 thrift、 exec、 jms、 spooling directory、 netcat、 taildir、sequence generator、syslog、http、legacy
  • Sink:不断地轮询 Channel 中的 Event 且批量地移除 Event,并将这些 Event 批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。Sink 组件目的地包括:hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义
  • Channel:位于 Source 和 Sink 之间的缓冲区。Channel 允许 Source 和 Sink 运作在不同的速率上,是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。Flume 自带两种 Channel:
    • Memory Channel:内存中的队列,在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失
    • File Channel:将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据
  • Event:Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组(byte array),通过设置不同的 Header 可以让同一 Source 的 Event 进入不同的 Channel 和 Sink

三、Flume 安装部署

1. 下载

  • 进入下载地址:http://archive.apache.org/dist/flume/
  • 选择对应的版本点击进入
    在这里插入图片描述
  • 点击下载对应的 tar 安装包
    在这里插入图片描述

2. 安装

  • 将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的 /opt/software 目录下

  • 解压 apache-flume-1.9.0-bin.tar.gz 到 /opt/module/ 目录下

    tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/
    
  • 将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3

    rm -rf /opt/module/flume/lib/guava-11.0.2.jar
    
  • 注意:系统中必须配置 Java 和 Hadoop 的环境变量

四、入门案例

1. 监控端口数据

  • 需求:使用 Flume 监听一个端口,收集该端口数据,并打印到控制台

  • 思路:

    • Flume 监控本机的 44444 端口,通过 Source 端读取数据
    • 使用 netcat 工具向本机的 44444 端口发送数据
    • Flume 将获取的数据通过 Sink 端打印到控制台
  • 实现:

    • 前期准备

      # 安装 netcat 工具
      sudo yum install -y nc# 判断 44444 端口是否被占用
      sudo netstat -nlp | grep 44444
      
    • Flume 配置

      # 在 flume 安装目录下创建 job 文件夹
      cd /opt/module/flume
      mkdir job# 在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf 并添加内容
      vim job/flume-netcat-logger.conf# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
    • 开启 flume 监听端口

      # 语法一
      bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console# 语法二
      bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf 
      -Dflume.root.logger=INFO,console
      
      • --conf/-c:表示配置文件存储在 conf/ 目录
      • --name/-n:表示给 agent 起名为 a1
      • --conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件
      • -Dflume.root.logger=INFO,console:-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error
    • 使用 netcat 工具向本机的 44444 端口发送内容

      nc localhost 44444
      hello 
      
    • 在 Flume 监听页面观察接收数据情况

2. 实时监控单文件追加

  • 需求:实时监控 Hive 日志,并上传到 HDFS 中

  • 思路:

    • Flume 通过 Source 端执行 tail -F 命令监控 Hive 日志文件
    • 执行 Hive 操作命令生成日志信息
    • Flume 通过 Sink 端将获取到的日志信息传输到 HDFS
  • 实现:

    • 确保系统中配置 Java 和 Hadoop 环境变量

    • Flume 配置

      # 在 job 目录下创建 file-flume-hdfs.conf 文件并添加内容
      cd /opt/module/flumevim job/file-flume-hdfs.conf# Name the components on this agenta2.sources = r2a2.sinks = k2 a2.channels = c2# Describe/configure the sourcea2.sources.r2.type = execa2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log# Describe the sinka2.sinks.k2.type = hdfsa2.sinks.k2.hdfs.path = hdfs://hadoop102:9820/flume/%Y%m%d/%H#上传文件的前缀a2.sinks.k2.hdfs.filePrefix = logs-#是否按照时间滚动文件夹a2.sinks.k2.hdfs.round = true#多少时间单位创建一个新的文件夹a2.sinks.k2.hdfs.roundValue = 1#重新定义时间单位a2.sinks.k2.hdfs.roundUnit = hour#是否使用本地时间戳a2.sinks.k2.hdfs.useLocalTimeStamp = true#积攒多少个 Event 才 flush 到 HDFS 一次a2.sinks.k2.hdfs.batchSize = 100#设置文件类型,可支持压缩a2.sinks.k2.hdfs.fileType = DataStream#多久生成一个新的文件a2.sinks.k2.hdfs.rollInterval = 60#设置每个文件的滚动大小a2.sinks.k2.hdfs.rollSize = 134217700#文件的滚动与 Event 数量无关a2.sinks.k2.hdfs.rollCount = 0# Use a channel which buffers events in memorya2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2
      

      对于所有与时间相关的转义序列, Event Header 中必须存在以 “timestamp” 的key (除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加 timestamp)

    • 运行 Flume

      bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
      
    • 开启 Hadoop 和 Hive 并操作 Hive 产生日志

      sbin/start-dfs.sh
      sbin/start-yarn.sh
      bin/hive
      
    • 在 HDFS 上查看文件

  • 总结:Exec source 适用于监控一个实时追加的文件,但不能实现断点续传

3. 实时监控目录新增文件

  • 需求:使用 Flume 监听整个目录的文件,并上传至 HDFS

  • 思路:

    • Flume 通过 Source 端监控一个目录
    • 向被监控的目录中添加文件,已经上传的后缀为 .completed,未上传的为 .tmp
    • Flume 通过 Sink 端将目录中后缀为 .completed 的文件传输到 HDFS
  • 实现:

    • Flume 配置

      # 在 job 目录下创建配置文件 dir-flume-hdfs.conf 并添加内容
      cd /opt/module/flumevim job/dir-flume-hdfs.conf# Name the components on this agenta3.sources = r3a3.sinks = k3a3.channels = c3# Describe/configure the sourcea3.sources.r3.type = spooldira3.sources.r3.spoolDir = /opt/module/flume/uploada3.sources.r3.fileSuffix = .COMPLETEDa3.sources.r3.fileHeader = true#忽略所有以.tmp 结尾的文件,不上传a3.sources.r3.ignorePattern = ([^ ]*\.tmp)# Describe the sinka3.sinks.k3.type = hdfsa3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload/%Y%m%d/%H#上传文件的前缀a3.sinks.k3.hdfs.filePrefix = upload-#是否按照时间滚动文件夹a3.sinks.k3.hdfs.round = true#多少时间单位创建一个新的文件夹a3.sinks.k3.hdfs.roundValue = 1#重新定义时间单位a3.sinks.k3.hdfs.roundUnit = hour#是否使用本地时间戳a3.sinks.k3.hdfs.useLocalTimeStamp = true#积攒多少个 Event 才 flush 到 HDFS 一次a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩a3.sinks.k3.hdfs.fileType = DataStream#多久生成一个新的文件a3.sinks.k3.hdfs.rollInterval = 60#设置每个文件的滚动大小大概是 128Ma3.sinks.k3.hdfs.rollSize = 134217700#文件的滚动与 Event 数量无关a3.sinks.k3.hdfs.rollCount = 0# Use a channel which buffers events in memorya3.channels.c3.type = memorya3.channels.c3.capacity = 1000a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3
      
    • 启动 Flume

      bin/flume-ng agent -n a3 -c conf/ -f job/dir-flume-hdfs.conf
      
    • 在被监控的目录下创建新文件

      touch hello.txt
      
    • 查看 HDFS 上的数据

  • 总结:Spooldir Source 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步

4. 实时监控目录多文件追加

  • 需求:使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS

  • 实现:

    • Flume 配置

      # 在 job 目录下创建 taildir-flume-hdfs.conf 配置文件并添加内容
      cd /opt/module/flumevim job/taildir-flume-hdfs.confa3.sources = r3a3.sinks = k3a3.channels = c3# Describe/configure the sourcea3.sources.r3.type = TAILDIRa3.sources.r3.positionFile = /opt/module/flume/tail_dir.jsona3.sources.r3.filegroups = f1 f2a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*# Describe the sinka3.sinks.k3.type = hdfsa3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H#上传文件的前缀a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹a3.sinks.k3.hdfs.round = true#多少时间单位创建一个新的文件夹a3.sinks.k3.hdfs.roundValue = 1#重新定义时间单位a3.sinks.k3.hdfs.roundUnit = hour#是否使用本地时间戳a3.sinks.k3.hdfs.useLocalTimeStamp = true#积攒多少个 Event 才 flush 到 HDFS 一次a3.sinks.k3.hdfs.batchSize = 100#设置文件类型,可支持压缩a3.sinks.k3.hdfs.fileType = DataStream#多久生成一个新的文件a3.sinks.k3.hdfs.rollInterval = 60#设置每个文件的滚动大小大概是 128Ma3.sinks.k3.hdfs.rollSize = 134217700#文件的滚动与 Event 数量无关a3.sinks.k3.hdfs.rollCount = 0# Use a channel which buffers events in memorya3.channels.c3.type = memorya3.channels.c3.capacity = 1000a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3
      
    • 启动 Flume

      bin/flume-ng agent -n a3 -c conf/ -f job/taildir-flume-hdfs.conf
      
    • 向被监控的目录中添加文件并追加内容

      echo hello >> file1.txt
      echo flume >> file2.txt
      
    • 查看 HDFS 上的数据

  • 说明:

    • Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File 中更新每个文件读取到的最新的位置,因此能够实现断点续传

    • Position File 的格式:

      {"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
      {"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
      
    • Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件

  • 问题:

    • Taildir Source 是通过 inode 和 fileName 共同监控一个文件的,只要其中一个发生改变,Flume 即认为文件发生变化,就会同步上传数据到 HDFS
    • 很多程序使用的日志框架为 log4j,其特点是当天生成的日志文件名为 xxx.log,第二天将前一天的日志文件更名为 xxx.log-yyyy-MM-dd,所以 Flume 会重复上传日志数据
    • 解决:
      • 更换日志框架为 logback
      • 修改源码
        1. 下载源码包 src (flume-taildir-source)
        2. 修改 TailFile 类的 updatePos 方法和 RliableTaildirEventReader 类的 256 行
        3. 重新打包源码
        4. 将修改后的源码包上传到 flume 安装目录下的 lib 目录中替换原来的 jar 包
  • 总结:Taildir Source 适合用于监听多个实时追加的文件,并且能够实现断点续传

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/14161.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Cloneable 接口和深拷贝,浅拷贝

目录 一.Cloneable 接口 二.浅拷贝 三.深拷贝 四.comparable接口、 五.comparator接口 1.Java 中内置了一些很有用的接口 , Cloneable 就是其中之一 . Object 类中存在一个 clone 方法 , 调用这个方法可以创建一个对象的 " 拷贝 ". 2.来说说调用 clone 方法…

基于深度学习的表情识别系统

欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景 随着人工智能技术的快速发展,表情识别成为了人机交互领域的一个研究热点。表情识别技术旨…

Python数字比大小获取大的数

目录 一、引言 二、数字比较的基本语法 三、获取较大的数 使用条件语句 使用内置函数 四、处理特殊情况 比较非数字类型 处理无穷大和NaN 五、应用实例 在游戏开发中比较分数 在数据分析中找出最大值 六、优化与性能 七、总结 一、引言 在Python编程的广阔天地中…

巧秒用AI写作工具做影视解说文案,效率高!

在自媒体内容输出的快节奏当下,影视解说已经成为一种受欢迎的内容形式。然而,创作高质量的影视解说文案往往需要花费大量的时间和精力。随着人工智能技术的不断发展,AI写作工具为我们提供了一种全新的、高效的解决方案。 AI写作工具利用先进的…

AI服务器 IO互联芯片解决方案pcie switch国产替代博通

服务器是大数据、人工智能、区块链、云计算、元宇宙等的基础设施,全国每年400万台服务器出货,预计 2025年超过500万台(中商产业研究院),高性能企业级互联芯片控制着服务器的神经系统和循环系统。 市场痛点&#xff1…

大厂程序员离职,开发一个盲盒小程序2万,一周开发完!

大家好,我是程序员小孟! 前面接了一个盲盒的小程序,主要的还是商城,盲盒的话只是其中的有一个活动。 现在的年轻人是真的会玩,越来越新的东西出来,越来越好玩的东西流行。 就像最近很火的地摊盲盒。 讲…

第N4周:中文文本分类——Pytorch实现

🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 | 接辅导、项目定制 数据集:train 一.加载数据 import torch import torch.nn as nn import torchvision from torchvision import transforms,d…

vue3的核心API功能:computed()API使用

常规使用方法: 这样是常规使用方法. 另一种使用方法: 这样分别定义computed的get回调函数和set回调函数, 上面例子定义了plusOne.value的值为1, 那么这时候就走了computed的set回调函数,而没有走get回调函数. 当我们打印plusOne.value的值的时候,走的是get的回调函数而不是…

ios 原生项目迁移flutter第一天环境

由于公司已经有第一个吃螃蟹的项目组,我在迁移的时候想着站在巨人的肩膀上,但是搭配环境一定要问清楚对方flutter版本,路径也要安排好,不然就不行。 对着自己的项目照着葫芦画瓢,我刚开始为了配置管理图个方便随便放&…

Unity3D读取Excel表格写入Excel表格

系列文章目录 unity工具 文章目录 系列文章目录👉前言👉一、读取Excel表格👉二、写入Excel表格👉三、Fileinfo和Directoryinfo的操作👉四、壁纸分享👉总结 👉前言 有时候难免会遇到读取文件写…

提供一个c# winform的多语言框架源码,采用json格式作为语言包,使用简单易于管理加载且不卡UI,支持“语言分级”管理

提供一个c# winform的多语言框架源码,采用json格式作为语言包,不使用resx资源,当然本质一样的,你也可以改为resx 一、先看下测试界面 演示了基本的功能:切换语言,如何加载语言,如何分级加载语…

【webrtc】内置opus解码器的移植

m98 ,不知道是什么版本的opus,之前的交叉编译构建: 【mia】ffmpeg + opus 交叉编译 【mia】ubuntu22.04 : mingw:编译ffmpeg支持opus编解码 看起来是opus是1.3.1 只需要移植libopus和opus的webrtc解码部分即可。 linux构建的windows可运行的opus库 G:\NDDEV\aliply-0.4\C…

如何为社交feed场景设计缓存体系?no.35

Feed 流场景分析 Feed 流是很多移动互联网系统的重要一环,如微博、微信朋友圈、QQ 好友动态、头条/抖音信息流等。虽然这些产品形态各不相同,但业务处理逻辑却大体相同。用户日常的“刷刷刷”,就是在获取 Feed 流,这也是 Feed 流的…

达梦数据库详解

达梦认证是指针对中国数据库管理系统(DBMS)厂商达梦公司所推出的数据库产品,即达梦数据库(DMDB),进行的一种官方认证体系。达梦认证旨在验证数据库管理人员对达梦数据库产品的掌握程度,及其在数…

【HUST】信道编码|基于LDPC码的物理层安全编码方案概述

本文对方案的总结是靠 Kimi 阅读相关论文后生成的,我只看了标题和摘要感觉确实是这么回事,并没有阅读原文。 行文逻辑:是我自己设定的,但我并不是这个研究领域的,所以如果章节划分时有问题,期待指出&#x…

FTP文件传输议

FTP是一种文件传输协议:用来上传和下载,实现远程共享文件,和统一管理文件 工作原理:用于互联网上的控制文件的双向传输是一个应用程序。工作在TCP/IP协议簇的,其传输协议是TCP协议提高文件传输的共享性和可靠性&#…

8.STL中Vector容器的常见操作(附习题)

目录 1.vector的介绍 2 vector的使用 2.1 vector的定义 2.2 vector iterator 的使用 2.3 vector 空间增长问题 2.3 vector 增删查改 2.4 vector 迭代器失效问题 2.5 vector 在OJ中的使用 1.vector的介绍 vector是表示可变大小数组的序列容器。 就像数组一样&#xff0…

【Unitydemo制作】音游制作—控制器与特效

👨‍💻个人主页:元宇宙-秩沅 👨‍💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍💻 本文由 秩沅 原创 👨‍💻 收录于专栏:就业…

儿童卧室灯品牌该如何挑选?几款专业儿童卧室灯品牌分享

近视在儿童中愈发普遍,许多家长开始认识到,除了学业成绩之外,孩子的视力健康同样重要。毕竟,学业的落后可以逐渐弥补,而一旦孩子近视,眼镜便可能成为长期伴随。因此,专业的护眼台灯对于每个家庭…

大泽动力应急排水方舱功能介绍

一、排水方舱简介及其应用 排水方舱,亦被称为扬水设备,主要用于排除船舶内的积水,保证船体内的稳定与干燥。它常与抽水设备结合使用,能将船体内的水抽离并排放到外部,从而确保船只的正常运行。 二、排水方舱的运作方式…