Flume详解

Source

AVRO Source

  1. AVRO Source接收被AVRO序列化之后的数据,结合AVRO Sink,可以实现复杂的流动模型

  2. 案例

    1. 编辑文件

      cd /opt/software/flume-1.11.0/data/
      vim avrosource.properties

      在文件中添加

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      # 配置AVRO Source
      # 类型必须是avro
      a1.sources.s1.type = avro
      # 监听的主机
      a1.sources.s1.bind = 0.0.0.0
      # 监听的端口号
      a1.sources.s1.port = 6666
      ​
      a1.channels.c1.type = memory
      ​
      a1.sinks.k1.type = logger
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

    2. 启动

      flume-ng agent -n a1 -c $FLUME_HOME/conf -f avrosource.properties -Dflume.root.logger=INFO,console

    3. 在新窗口中启动AVRO客户端

      flume-ng avro-client -H hadoop01 -p 6666 -F a.txt

Spooling Directory Source

  1. 监听指定的目录,如果目录中产生了新的文件,那么自动的将新文件中的内容收集起来

  2. 默认情况下,这个文件如果被收集了,那么文件的后缀就是.COMPLETED

  3. 案例

    1. 创建目录

      mkdir /opt/flume_data

    2. 编辑文件

      vim spooldirsource.properties

      在文件中添加

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      # 配置Spooling Directory Source
      # 类型必须是spooldir
      a1.sources.s1.type = spooldir
      # 监听的目录
      a1.sources.s1.spoolDir = /opt/flume_data
      # 被收集过的文件后缀
      # 利用这条规则,可以过滤掉一部分不需要收集的文件
      a1.sources.s1.fileSuffix = .finished
      ​
      a1.channels.c1.type = memory
      ​
      a1.sinks.k1.type = logger
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

    3. 执行

      flume-ng agent -n a1 -c $FLUME_HOME/conf -f spooldirsource.properties -Dflume.root.logger=INFO,console

Taildir Source

  1. 可以用于监听一个或者一组文件,如果被监听的文件中添加了新数据,那么新添的数据会被自动收集

  2. Exec Source需要通过指定tail -F命令才能监听指定文件,Spooling Directory Source监听指定的目录,并不能确定文件中是否新添了数据

  3. 不同于Exec Source的地方在于,Taildir Source不需要指定命令,还可以监控一类文件,且Taildir Source通过记录偏移量实现断点续传效果

  4. 偏移量通过属性positionFile来决定,默认是~/.flume/taildir_position.json

  5. 需要注意的是,Taildir Source不支持在Windows中使用

  6. 案例:监听flume_data目录下所有的log和txt文件,如果文件被添加新数据,那么自动收集

    1. 编辑文件

      vim taildirsource.properties

    2. 在文件中添加

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      # 配置Taildir Source
      # 类型必须是TAILDIR
      a1.sources.s1.type = TAILDIR
      # 监听的一组文件的组名
      a1.sources.s1.filegroups = f1 f2
      # 文件组中的要监听的文件
      a1.sources.s1.filegroups.f1 = /opt/flume_data/.*log.*
      a1.sources.s1.filegroups.f2 = /opt/flume_data/.*txt.*
      # 偏移量的存储位置
      a1.sources.s1.positionFile = /opt/flume_data/taildir_position.json
      ​
      a1.channels.c1.type = memory
      ​
      a1.sinks.k1.type = logger
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

    3. 执行

      flume-ng agent -n a1 -c $FLUME_HOME/conf -f taildirsource.properties -Dflume.root.logger=INFO,console

NetCat TCP Source

  1. Netcat TCP Source监听TCP请求,在使用的时候需要监听指定的主机和端口,从这个指定主机的指定端口来接收TCP请求,并且将TCP请求内容作为日志来进行收集

  2. 默认情况下,每一条数据大小不能超过512B,可以通过参数max-line-length来修改

Sequence Generator Source

  1. 序列产生器,从0开始递增到totalEvents,默认情况下totalEvents的值Long.MAX_VALUE

  2. 实际过程中,会利用这个Source测试流动模型是否搭建成功

  3. 案例

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    ​
    # 配置Sequence Generator Source
    # 类型必须是seq
    a1.sources.s1.type = seq
    # 最大值
    a1.sources.s1.totalEvents = 100
    ​
    a1.channels.c1.type = memory
    ​
    a1.sinks.k1.type = logger
    ​
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1

HTTP Source

  1. 接收HTTP请求,并且将请求内容作为日志进行收集

  2. 只能接收GET和POST请求,其中GET请求接收只能用于实验,实际过程中使用HTTP Source来接收POST请求

  3. 案例

    1. 在文件中添加

      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      ​
      # 配置HTTP Source
      # 类型必须是http
      a1.sources.s1.type = http
      # 监听端口
      a1.sources.s1.port = 8888
      ​
      a1.channels.c1.type = memory
      ​
      a1.sinks.k1.type = logger
      ​
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1

    2. 启动Flume

    3. 发送POST请求

      curl -X POST -d '[{"headers":{"class":"flume"},"body":"welcome~~~"}]' http://hadoop01:8888

Custom Source

  1. Flume支持用户自定义Source。Flume针对Source提供了顶级接口Source,但是实际过程中,并不是实现Source接口,而是实现子接口之一:

    1. EventDrivenSource:事件驱动Source,本身是一个被动型Source,需要自己定义线程来获取数据以及封装数据

    2. PollableSource:拉取Source,本身是一个主动型Source,提供了线程来获取数据,只需要考虑数据怎么封装即可

  2. 由于在自定义Source的时候,还需要考虑获取格式文件中的参数值,所以还需要实现Configurable接口

  3. 实际过程中,考虑到要覆盖的方法比较多,所以继承AbstractSource

自定义EventDrivenSource
  1. 导入pom文件后,定义类继承AbstractSource,实现EventDrivenSourceConfigurable接口

  2. 打成jar包,上传到Flume安装目录的lib目录下

    cd /opt/software/flume-1.11.0/lib/
    rz

  3. 回到格式文件目录下,编辑文件

    cd /opt/software/flume-1.11.0/data/
    vim authdrivensource.properties

    在文件中添加

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    ​
    # 配置自定义EventDrivenSource
    # 类型必须是类的全路径名
    a1.sources.s1.type = com.fesco.source.AuthDrivenSource
    # 起始值
    a1.sources.s1.start = 10
    # 结束值
    a1.sources.s1.end = 100
    # 步长
    a1.sources.s1.step = 5
    ​
    a1.channels.c1.type = memory
    ​
    a1.sinks.k1.type = logger
    ​
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1

  4. 启动Flume

自定义PollableSource
  1. 定义一个类继承AbstractSource,实现PollableSourceConfigurable接口

  2. 打成jar包,上传到lib目录下

    cd ../lib
    rz

  3. 回到格式文件目录下,编辑文件

    cd ../data/
    vim authpollablesource.properties

    在文件中添加

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    ​
    # 配置自定义PollableSource
    # 类型必须是类的全路径名
    a1.sources.s1.type = com.fesco.source.AuthPollableSource
    # 起始值
    a1.sources.s1.min = 10
    # 结束值
    a1.sources.s1.max = 1000
    # 步长
    a1.sources.s1.step = 5
    ​
    a1.channels.c1.type = memory
    ​
    a1.sinks.k1.type = logger
    ​
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1

  4. 启动flume

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/764869.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python的ITS 信息平台的设计与实现flask-django-nodejs-php

第二,陈列说明该系统实现所采用的架构、系统搭建采用的服务器、系统开发环境和使用的工具,以及系统后台采用的数据库。 最后,对系统进行全面测试,主要包括功能测试、查询性能测试、安全性能测试。 分析系统存在的不足以及将来改进…

stable-diffusion-electron-clickstart 支持windows AMD显卡

前言 使用vue3 vite electron element-plus构建,正好学习下electrongithub stable-diffusion “画境导航者” 启动器 简介 stable-diffusion “画境导航者” 启动器支持功能 一键启动打开文件夹(tmp、txt2img-images)等模型所在文件夹&…

ios symbolicatecrash 符号化crash

一、准备 1.1 .crash 文件获取 设备连接电脑 打开XCode, 依次 XCode -> Windows -> Device and Simulator -> Open Recent Logs 找到 (对应app名+时间点) -> 右键 Show in Finder 1.2 .dSYM 和 .app 文件获取 .dSYM是十六进制函数地址映射信息的中转文件,调试的…

中国光伏展

河北省京津冀国际光伏展是一场专注于光伏产业的展览会。作为中国光伏行业的重要展会之一,该展会旨在推动京津冀地区光伏产业的发展,促进光伏技术的交流与合作。 光伏展将汇集来自国内外的光伏企业、科研机构、专家学者等相关人士,展示最新的光…

面试算法-84-删除有序数组中的重复项

题目 给你一个 非严格递增排列 的数组 nums ,请你 原地 删除重复出现的元素,使每个元素 只出现一次 ,返回删除后数组的新长度。元素的 相对顺序 应该保持 一致 。然后返回 nums 中唯一元素的个数。 考虑 nums 的唯一元素的数量为 k &#x…

C语言中__attribute__()

__attribute__() 属性: interrupt(“”) 我们知道,当发生中断的时候,系统会通过中断向量表跳转到相应的函数中并执行。等中断服务函数执行完后,又继续回到程序中。因此这个过程中涉及到一个现场的保护和恢复。那是否需要在中断处理函数里去…

Jetson AGX ORIN 配置 FGVC-PIM 神经网络

Jetson AGX ORIN 配置 FGVC-PIM 神经网络 文章目录 Jetson AGX ORIN 配置 FGVC-PIM 神经网络配置 ORIN 环境创建 FGVC-PIM 虚拟环境安装 PyTorch安装 torchvision安装其他依赖包 配置 ORIN 环境 首先先配置 ORIN 的环境,可以参考这个链接: Jetson AGX …

Java Spring使用event-stream进行数据推送

前端使用EventSource方式向后台发送请求&#xff0c;后端接收到之后使用event-stream方式流式返回。可以应用在时钟、逐字聊天等场景。 前端js示例代码&#xff08;向后台请求数据&#xff0c;并展示到“iddate”的div上&#xff09; <script type"text/javascript&q…

【Hive】with 语法 vs cache table 语法

语法分别如下&#xff1a; cache table table_name as (select ... from ... )with table_name as (select ... from ... )需要注意&#xff0c;with语法只相当于一个视图&#xff0c;并不会将数据缓存&#xff1b;如果要将数据缓存&#xff0c;需要使用cache table语法。 参考…

nuclei使用方法

nuclei使用方法 查看帮助 nuclei -h 列出所有模板 nuclei -tl 查找某种cms的相关漏洞模板&#xff0c;wordpress为例 nuclei -tl -tc "contains(name,wordpress)"便会列出内容里含有wordpress关键字的漏洞检测模板 使用与某cms相关的所有漏洞模板进行扫描&#…

css的background详解

CSS的background属性是一个复合属性&#xff0c;用于设置元素的背景效果。这个属性实际上是多个背景属性的简写形式&#xff0c;包括背景颜色&#xff08;background-color&#xff09;、背景图片&#xff08;background-image&#xff09;、背景重复&#xff08;background-re…

基于Lealfet.js展示Turf.js生成的平滑曲线实践

目录 前言 一、问题的由来 1、创建网页框架 2、创建map对象 3、构建点位&#xff0c;生成路线 二、Turf.js平滑曲线改造 1、官网方法介绍 2、0.4弯曲度曲线 3、0.85弯曲度曲线 4、0.1度弯曲曲线 5、综合对比 总结 前言 在很多的关于路线的gis应用中&#xff0c;我们…

无法加载DLL“SQLite.Interop.dll“:找不到指定模块

当系统在加载 DLL 时出现“找不到指定模块”的错误时&#xff0c;通常是因为系统无法找到所需的 DLL 文件。这个问题通常出现在使用第三方库的时候。 在这种情况下&#xff0c;你可以尝试以下几种解决方法&#xff1a; 确认 SQLite.Interop.dll 文件是否存在于正确的目录中。 …

开源项目ChatGPT-Next-Web的容器化部署(三)-- k8s deployment.yaml部署

一、说在前面的话 有了docker镜像&#xff0c;要把一个项目部署到K8S里&#xff0c;主要就是编写deployment.yaml。 你需要考虑的是&#xff1a; 环境变量服务的健康检测持久化启动命令程序使用的数据源程序使用的配置文件 因为本前端项目比较简单&#xff0c;这里只做一个…

网络工程师笔记15(OSPF协议-2)

OSPF协议 OSPF是典型的链路状态路由协议&#xff0c;是目前业内使用非常广泛的 IGP 协议之一。 Router-ID(Router ldentifier&#xff0c;路由器标识符)&#xff0c;用于在一个 OSPF 域中唯一地标识一台路由器。Router-ID 的设定可以通过手工配置的方式&#xff0c;或使用系统自…

RuoYi 自定义字典列表页面编码翻译

“字典数据”单独维护&#xff0c;而不是使用系统自带的字典表&#xff0c;应该如何使用这样的字典信息呢&#xff1f; 系统字典的使用&#xff0c;请参考&#xff1a; 《RuoYi列表页面字典翻译的实现》 https://blog.csdn.net/lxyoucan/article/details/136877238 需求说明…

IPC网络摄像头媒体视屏流MI_VIF结构体

一个典型的IPC数据流 下图是一个典型的IPC数据流模型&#xff0c;流动过程如下&#xff1a; 1. 建立Vif->Vpe->Venc的绑定关系&#xff1b; 2. Sensor 将数据送入vif处理&#xff1b; 3. Vif 将处理后的数据写入Output Port申请的内存&#xff0c;送入下一级&#xff1b;…

CI/CD环境搭建

服务简介 Gitlab 官网&#xff1a;https://about.gitlab.com/ GitLab 是一个用于仓库管理系统的开源项目&#xff0c;使用Git作为代码管理工具&#xff0c;并在此基础上搭建起来的Web服务。安装方法是参考GitLab在GitHub上的Wiki页面。Gitlab是被广泛使用的基于git的开源代码管…

设计数据库之内部模式:SQL基本操作

Chapter4&#xff1a;设计数据库之内部模式&#xff1a;SQL基本操作 笔记来源&#xff1a; 1.《漫画数据库》—科学出版社 2.SQL | DDL, DQL, DML, DCL and TCL Commands 设计数据库的步骤&#xff1a; 概念模式 概念模式(conceptual schema)是指将现实世界模型化的阶段进而&…

golang学习网址

.1LearnKu 终身编程者的知识社区 https://learnku.com/