Flume的安装及使用

Flume的安装及使用

文章目录

  • Flume的安装及使用
      • Flume的安装
        • 1、上传至虚拟机,并解压
        • 2、重命名目录,并配置环境变量
        • 3、查看flume版本
        • 4、测试flume
        • 5、flume的使用

Flume的安装

1、上传至虚拟机,并解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/soft/

在环境变量中增加如下命令,可以使用 soft 快速切换到 /usr/local/soft

alias soft=‘cd /usr/local/soft/’

2、重命名目录,并配置环境变量
mv apache-flume-1.9.0-bin/ flume-1.9.0
vim /etc/profile
source /etc/profile
3、查看flume版本
flume-ng version
[root@master soft]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@master soft]# 
4、测试flume
  • 监控端口,将数据打印至控制台

    # a1表示 agent名称,具体定义在启动命令中 -n 参数中
    # r1表示a1中的source名称
    a1.sources = r1
    # k1表示a1的sink名称
    a1.sinks = k1
    # c1表示a1中的channel名称
    a1.channels = c1# 表示a1的输入源类型为netcat端口类型
    a1.sources.r1.type = netcat# 表示a1的监听主机,表示任意ip都可以
    a1.sources.r1.bind = 0.0.0.0
    # 表示a1的监听端口号
    a1.sources.r1.port = 6666# 表示a1的sinkc输出的目的地是控制台的logger类型
    a1.sinks.k1.type = logger# 表示channel类型是memory内存类型
    a1.channels.c1.type = memory# 表示a1的channel总容量为1000 event
    a1.channels.c1.capacity=1000
    a1.channels.c1.transactionCapacity = 100# 表示将r1和c1连接起来
    a1.sources.r1.channels = c1# 表示将K1 和 c1 连接起来
    a1.sinks.k1.channel=c1
    

启动命令

flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/job/natcat2logger.conf -Dflume.root.logger=INFO,console

​ 参数说明:

​ -c 表示配置文件存储在conf目录中

​ -n 表示agent起名为a1

​ -f 表示flume启动读取的配置文件

​ -Dflume.root.logger=INFO,console 表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别 ,日志级别包括: log、info、warn、 error

6666端口来源:
在这里插入图片描述

在这里插入图片描述

  • 监控一个目录,将数据打印出来

    • 配置文件
    # 首先先给agent起一个名字 叫a1
    # 分别给source channel sink取名字
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1# 分别对source、channel、sink进行配置# 配置source
    # 将source的类型指定为 spooldir 用于监听一个目录下文件的变化
    # 因为每个组件可能会出现相同的属性名称,所以在对每个组件进行配置的时候 
    # 需要加上 agent的名字.sources.组件的名字.属性 = 属性值
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/data/
    a1.sources.r1.fileSuffix = .ok
    a1.sources.r1.fileHeader = true# 给r1这个souces配置一个拦截器并取名为 i1
    a1.sources.r1.interceptors = i1 
    # 将拦截器i1的类型设置为timestamp 会将处理数据的时间以毫秒的格式插入event的header中
    # a1.sources.r1.interceptors.i1.type = timestamp
    # 将拦截器i1的类型设置为regex_filter 会根据正则表达式过滤数据
    a1.sources.r1.interceptors.i1.type = regex_filter
    # 配置正则表达式
    # 如果匹配到那么获取整行数据 
    a1.sources.r1.interceptors.i1.regex = [0-9]+
    # excludeEvents = true 表示将匹配到的过滤,未匹配到的放行
    # a1.sources.r1.interceptors.i1.excludeEvents = true# 配置sink
    # 使用logger作为sink组件,可以将收集到数据直接打印到控制台
    a1.sinks.k1.type = logger# 配置channel
    # 将channel的类型设置为memory,表示将event缓存在内存中
    a1.channels.c1.type = memory# 组装
    # 将sources的channels属性指定为c1
    a1.sources.r1.channels = c1# 将sinks的channel属性指定为c1
    a1.sinks.k1.channel = c1
    
    • 启动agent
    flume-ng agent -n a1 -f ./spoolingtest.conf -Dflume.root.logger=DEBUG,console
    
    • 新建/root/data目录
    mkdir /root/data
    
    • 在/root/data/目录下新建文件,输入内容,观察flume进程打印的日志
    # 随意在a.txt中加入一些内容
    vim /root/data/a.txt
    
5、flume的使用
  • spoolingToHDFS.conf

    • 配置文件
    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    #指定spooldir的属性
    a.sources.r1.type = spooldir 
    a.sources.r1.spoolDir = /root/data 
    a.sources.r1.fileHeader = true 
    a.sources.r1.interceptors = i1 
    a.sources.r1.interceptors.i1.type = timestamp
    #指定sink的类型
    a.sinks.k1.type = hdfs
    a.sinks.k1.hdfs.path = /flume/data/dir1
    # 指定文件名前缀
    a.sinks.k1.hdfs.filePrefix = student
    # 指定达到多少数据量写一次文件 单位:bytes
    a.sinks.k1.hdfs.rollSize = 102400
    # 指定多少条写一次文件
    a.sinks.k1.hdfs.rollCount = 1000
    # 指定文件类型为 流 来什么输出什么
    a.sinks.k1.hdfs.fileType = DataStream
    # 指定文件输出格式 为text
    a.sinks.k1.hdfs.writeFormat = text
    # 指定文件名后缀
    a.sinks.k1.hdfs.fileSuffix = .txt#指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 在 /root/data/目录下准备数据
    The Zen of Python, by Tim PetersBeautiful is better than ugly.
    Explicit is better than implicit.
    Simple is better than complex.
    Complex is better than complicated.
    Flat is better than nested.
    Sparse is better than dense.
    Readability counts.
    Special cases aren't special enough to break the rules.
    Although practicality beats purity.
    Errors should never pass silently.
    Unless explicitly silenced.
    In the face of ambiguity, refuse the temptation to guess.
    There should be one-- and preferably only one --obvious way to do it.
    Although that way may not be obvious at first unless you're Dutch.
    Now is better than never.
    Although never is often better than *right* now.
    If the implementation is hard to explain, it's a bad idea.
    If the implementation is easy to explain, it may be a good idea.
    Namespaces are one honking great idea -- let's do more of those!
    
    • 启动agent
    flume-ng agent -n a -f ./spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
    

    注意如果Hadoop是3.x版本,那么需要将Hdfs中的lib目录下的guava包复制到Flume

    执行如下:

    mv $FLUME_HOME/lib/guava-11.0.2.jar $FLUME_HOME/lib/guava-11.0.2.jar.bak

    cd /usr/local/soft/hadoop-3.1.3/share/hadoop/hdfs/lib

    cp guava-27.0-jre.jar $FLUME_HOME/lib/

  • httpToLogger

    • 配置文件
    # a表示给agent命名为a
    # 给source组件命名为r1
    a.sources = r1
    # 给sink组件命名为k1
    a.sinks = k1 
    # 给channel组件命名为c1
    a.channels = c1
    #指定http的属性
    a.sources.r1.type = http
    a.sources.r1.port = 6666 #指定sink的类型
    a.sinks.k1.type = logger
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次会从channel里取多少数据
    a.channels.c1.transactionCapacity = 100
    # 组装
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 启动

      • 先启动agent
      flume-ng agent -n a -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
      
      • 再使用curl发起一个http请求
      curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"},{ "headers" :{"a2" : "a11","b2" : "b11"},"body" : "hello~http~flume2~"}]' http://master:6666
      
  • exec

    • 配置文件

      # Name the components on this agent
      a2.sources = r2
      a2.sinks = k2
      a2.channels = c2# Describe/configure the source
      a2.sources.r2.type = exec
      a2.sources.r2.command = tail -F /usr/local/soft/hive-3.1.2/log/hive.log# Describe the sink
      a2.sinks.k2.type = hdfs
      a2.sinks.k2.hdfs.path = /flume/hivelog/%Y%m%d/%H
      #上传文件的前缀
      a2.sinks.k2.hdfs.filePrefix = logs-
      #是否按照时间滚动文件夹
      a2.sinks.k2.hdfs.round = true
      #多少时间单位创建一个新的文件夹
      a2.sinks.k2.hdfs.roundValue = 1
      #重新定义时间单位
      a2.sinks.k2.hdfs.roundUnit = hour
      #是否使用本地时间戳
      a2.sinks.k2.hdfs.useLocalTimeStamp = true
      #积攒多少个Event才flush到HDFS一次
      a2.sinks.k2.hdfs.batchSize = 100
      #设置文件类型,可支持压缩
      a2.sinks.k2.hdfs.fileType = DataStream
      #多久生成一个新的文件
      a2.sinks.k2.hdfs.rollInterval = 60
      #设置每个文件的滚动大小
      a2.sinks.k2.hdfs.rollSize = 134217700
      #文件的滚动与Event数量无关
      a2.sinks.k2.hdfs.rollCount = 0a2.channels.c2.type = memory
      a2.channels.c2.capacity = 1000
      a2.channels.c2.transactionCapacity = 100a2.sources.r2.channels = c2
      a2.sinks.k2.channel = c2 
    • 启动

      启动flume

      bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
      

      启动hive,之后查看HDFS

  • 单数据源多输出

    ​ Flume1监控文件内容变动,将监控到的内容分别给到flume2和flume3,flume2将内容写到HDFS, Flume3将数据写到本地文件系统

    • 配置文件

      flume1.conf #Named
      a1.sources = r1
      a1.channels = c1 c2
      a1.sinks = k1 k2#Source
      a1.sources.r1.type = TAILDIR
      a1.sources.r1.filegroups = f1
      a1.sources.r1.filegroups.f1 = /usr/local/soft/flume-1.9.0/job/taildir/.*\.txt
      # 对于监听的文件监听位置信息需要保存到该json文件中
      a1.sources.r1.positionFile = /usr/local/soft/flume-1.9.0/job/position/position.json# channel selector
      a1.sources.r1.selector.type = replicating#Channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memory
      a1.channels.c2.capacity = 10000
      a1.channels.c2.transactionCapacity = 100#Sink
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = localhost
      a1.sinks.k1.port = 7777a1.sinks.k2.type = avro
      a1.sinks.k2.hostname = localhost
      a1.sinks.k2.port = 8888#Bind
      a1.sources.r1.channels = c1 c2  
      a1.sinks.k1.channel = c1 
      a1.sinks.k2.channel = c2 flume2.confa2.sources = r1
      a2.channels = c1
      a2.sinks = k1 #Source
      a2.sources.r1.type = avro
      a2.sources.r1.bind = localhost
      a2.sources.r1.port = 7777#Channel
      a2.channels.c1.type = memory
      a2.channels.c1.capacity = 10000
      a2.channels.c1.transactionCapacity = 100#Sink
      a2.sinks.k1.type = hdfs
      a2.sinks.k1.hdfs.path = /flume/replicating/%Y%m%d/%H
      a2.sinks.k1.hdfs.filePrefix = logs-
      a2.sinks.k1.hdfs.round = true
      a2.sinks.k1.hdfs.roundValue = 1
      a2.sinks.k1.hdfs.roundUnit = hour
      a2.sinks.k1.hdfs.useLocalTimeStamp = true
      a2.sinks.k1.hdfs.batchSize = 100
      a2.sinks.k1.hdfs.fileType = DataStream
      a2.sinks.k1.hdfs.rollInterval = 60
      a2.sinks.k1.hdfs.rollSize = 134217700
      a2.sinks.k1.hdfs.rollCount = 0#Bind
      a2.sources.r1.channels = c1 
      a2.sinks.k1.channel = c1 flume3.conf #Named
      a3.sources = r1
      a3.channels = c1
      a3.sinks = k1 #Source
      a3.sources.r1.type = avro
      a3.sources.r1.bind = localhost
      a3.sources.r1.port = 8888#Channel
      a3.channels.c1.type = memory
      a3.channels.c1.capacity = 10000
      a3.channels.c1.transactionCapacity = 100#Sink
      a3.sinks.k1.type = file_roll
      # 将数据保存到本地路径中 不断滚动创建,如果某个时间段没有数据,那么会创建一个空的文件 
      a3.sinks.k1.sink.directory = /usr/local/soft/flume-1.9.0/job/fileroll#Bind
      a3.sources.r1.channels = c1 
      

    a3.sinks.k1.channel = c1

    
    * 启动```shell
    flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,consoleflume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,consoleflume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/827583.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git TortoiseGit 详细安装使用教程

前言 Git 是一个免费的开源分布式版本控制系统,是用来保存工程源代码历史状态的命令行工具,旨在处理从小型到非常大型的项目,速度快、效率高。《请查阅Git详细说明》。TortoiseGit 是 Git 的 Windows Shell 界面工具,基于 Tortoi…

05_FreeRTOS信号量

信号量 信号量信号量简介常用信号量API函数 信号量 信号量简介 信号量(Semaphore)是一种实现任务间通信的机制,可以实现任务之间同步或临界资源的互斥访问,常用于协助一组相互竞争的任务来访问临界资源。在多任务系统中&#xf…

wireshark RTP分析参数

主要看丢弃和Delta, 丢弃就是丢掉的udp包,所占的比率 Delta是当前udp包接收到的时间减去上一个udp包接收到的时间 根据载荷可以知道正确的delta应该是多少,比如G711A,ptime20,那么delta理论上应该趋近于20. 这里的de…

python创建线程和结束线程

👽发现宝藏 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 python创建线程和结束线程 在 Python 中,线程是一种轻量级的执行单元&#xff…

进程概念(进程第1篇)【Linux复习篇】

目录 1、冯诺依曼体系结构怎么画?中央处理器是什么?存储器是什么?每个部分有什么作用? 2、什么是操作系统? 3、什么叫进程?操作系统如何管理进程的? 4、怎么查看进程? 5、C语言…

基于MLP算法实现交通流量预测(Pytorch版)

在海量的城市数据中,交通流量数据无疑是揭示城市运行脉络、洞察出行规律的关键要素之一。实时且精准的交通流量预测不仅能为交通规划者提供科学决策依据,助力提升道路使用效率、缓解交通拥堵,还能为公众出行提供参考,实现个性化导…

指令和界面【Linux】

指令和界面 前言一、指令 vs 界面交互的需求满足需求的第一阶段——指令满足需求的第二阶段-界面时间 二、指令和界面交互区别为什么要学命令行总结 前言 Linux操作系统提供了丰富的命令行界面和图形用户界面工具,用户可以根据自己的需求选择适合的界面进行操作。命…

【好书推荐7】《机器学习平台架构实战》

【好书推荐7】《机器学习平台架构实战》 写在最前面《机器学习平台架构实战》编辑推荐内容简介作者简介目  录前  言本书读者内容介绍充分利用本书下载示例代码文件下载彩色图像本书约定 🌈你好呀!我是 是Yu欸 🌌 2024每日百字篆刻时光&…

论文阅读:BEVBert: Multimodal Map Pre-training for Language-guided Navigation

BEVBert:语言引导导航的多模态地图预训练 摘要 现存的问题:目前大多数现有的预训练方法都采用离散的全景图来学习视觉-文本关联。这要求模型隐式关联全景图中不完整、重复的观察结果,这可能会损害智能体的空间理解。 本文解决方案&#xf…

TikTok账号0播放是限流了吗?想要播放破万,试试这些方法!

前言 账号0播放问题,想必困扰着许多的TikTok运营同学。精心制作的短视频发布在TikTok,不是零播放,就是仅自己可见。那么,TikTok账号0播放是不是真的意味着被限流了呢?本篇总结了账号0播放的原因并附上解决方案&#xf…

[Flutter3] Json转dart模型举例

记录一下 Android studio plugin -> FlutterJsonBeanFactory 处理json转dart 模型 案例 json字符串, 一个 response的data返回数据 {"code":1,"msg":"\u64cd\u4f5c\u6210\u529f","data":{"list":{"id":"8…

SwiftUI 5.0(iOS 17.0)触摸反馈“震荡波”与触发器模式趣谈

概览 要想创作出一款精彩绝伦的 App,绚丽的界面和灵动的动画并不是唯一吸引用户的要素。有时我们还希望让用户真切的感受到操作引发的触觉反馈,直击使用者的灵魂。 所幸的是新版 SwiftUI 原生提供了实现触觉震动反馈的机制。在介绍它之后我们还将进一步…

等保测评之主机测评详解(二级)

等保测评之主机测评详解(二级)服务器——Windows 身份鉴别: 测评项a): a)应对登录的用户进行身份标识和鉴别,身份标识具有唯一性,身份鉴别信息具有复杂度要求并定期更换; 整改方…

antd中Upload上传图片宽高限制以及上传文件的格式限制

项目中有一个需求,要上传轮播图,且有尺寸要求,所以就需要在上传图片的时候进行尺寸限制,使用了Upload组件,需要在组件的beforeUpload方法中进行限制。 定义一个上传前的方法,并且添加一个图片尺寸获取的方…

【Redis】Zset 数据类型

文章目录 常用命令zaddzcard & zcountzrange & zrevrangezpopmax & bzpopmaxzpopmin & bzpopminzrank & zrevrankzscore & zremzremrangebyrank & zremrangebyscorezincrby 多个集合间的交互命令交集 & zinterstore并集 & sunionstore 内部…

【声呐仿真】学习记录0.5-配置ssh远程连接docker、在docker中使用nvidia显卡

【声呐仿真】学习记录0.5-配置ssh远程连接docker、在docker中使用nvidia显卡 配置ssh远程连接docker1.端口映射2.配置ssh 在docker中使用nvidia显卡配置CUDA 注意:之前已经创建过容器的,需要打包成镜像,重新创建容器,因为要在创建…

【C++庖丁解牛】C++11---右值引用和移动语义

🍁你好,我是 RO-BERRY 📗 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 🎄感谢你的陪伴与支持 ,故事既有了开头,就要画上一个完美的句号,让我们一起加油 目录 1 左值引用和右值引用2 左…

第一个Spring Boot程序

目录 一、Spring Boot介绍 二、创建Spring Boot项目 1、插件安装(专业版不需要) 2、创建SpringBoot项目 (1)这里如果插件下载失败,解决方案: (2)项目启动失败,解决…

web测试基础知识

目录 web系统的基础 web概念(worldwideweb) 网络结构 发展 架构 B/S C/S P2P 工作原理 静态页面 动态页面 web客户端技术 浏览器的核心--渲染引擎 web服务器端技术 web服务器 应用服务器 集群环境 数据库 案例-URL 协议类型 主机名 端口 IP地址 分类 …

C#开发的全套成熟的LIS系统源码JavaScript+SQLserver 2012区域云LIS系统源码

C#开发的全套成熟的LIS系统源码JavaScriptSQLserver 2012区域云LIS系统源码 医院云LIS系统是一套成熟的实验室信息管理系统,目前已在多家三级级医院应用,并不断更新。云LIS系统是为病人为中心、以业务处理为基础、以提高检验科室管理水平和工作效率为目标…