3-Flume之拦截器与GangLia监控

Flume

Interceptor

概述

  1. Interceptor(拦截器)本身是Source的子组件之一,可以对数据进行拦截、过滤、替换等操作
  2. 不同于Selector,一个Source上可以配置多个Interceptor,构成拦截器链。需要注意的是,后一个拦截器不能和前一个拦截器的规则相反!

Timestamp Interceptor

  1. 在Event的headers中添加一个timestamp字段来表示数据被收集的时间戳(单位是毫秒!)

  2. 案例:Event的header中自动添加上时间戳

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    # 给拦截器起名
    a1.sources.s1.interceptors = i1
    # 配置Timestamp Interceptor
    # 类型必须是timestamp
    a1.sources.s1.interceptors.i1.type = timestampa1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000a1.sinks.k1.type = loggera1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    
  3. Timestamp Interceptor结合HDFS Sink可以实现数据的按时间段存放。文件名后会添加上年月日,并且每日的Event输出到当天对应的文件中。

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    a1.sources.s1.interceptors = i1
    a1.sources.s1.interceptors.i1.type = timestampa1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flume_data/logdate=%Y-%m-%d
    a1.sinks.k1.hdfs.rollInterval = 3600
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 1000000000
    a1.sinks.k1.hdfs.fileType = DataStreama1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

Host Interceptor

  1. 在Event的headers中添加一个host字段,用于标记这个数据的来源主机

  2. 案例

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    # 给拦截器起名。多个拦截器的命名必须在一行。不能写在两行
    a1.sources.s1.interceptors = i1 i2
    # 配置Timestamp Interceptor
    # 类型必须是timestamp
    a1.sources.s1.interceptors.i1.type = timestamp
    # 配置Host Interceptor
    # 类型必须是host
    a1.sources.s1.interceptors.i2.type = host
    # 使用IP还是主机名
    a1.sources.s1.interceptors.i2.useIP = falsea1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000a1.sinks.k1.type = loggera1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

Static Interceptor

  1. 在Event的headers中添加指定的字段以及指定的值,实际过程中用于做标记

  2. 案例 Event的header中自动添加 class:flume这一key value

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    # 给拦截器起名
    a1.sources.s1.interceptors = i1 i2 i3
    # 配置Timestamp Interceptor
    # 类型必须是timestamp
    a1.sources.s1.interceptors.i1.type = timestamp
    # 配置Host Interceptor
    # 类型必须是host
    a1.sources.s1.interceptors.i2.type = host
    # 使用IP还是主机名
    a1.sources.s1.interceptors.i2.useIP = true
    # 配置Static Interceptor
    # 类型必须是static
    a1.sources.s1.interceptors.i3.type = static
    # 指定键
    a1.sources.s1.interceptors.i3.key = class
    # 指定值
    a1.sources.s1.interceptors.i3.value = flumea1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000a1.sinks.k1.type = loggera1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

UUID Interceptor

  1. UUID计算产生一串编号,由于编号位数比较多,因此几乎不太可能产生重复的编号,因此实际过程中经常使用UUID作为唯一标记

  2. UUID Interceptor是在Event的headers中添加一个id字段来标记这个数据的唯一性

  3. 案例

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    # 给拦截器起名 
    a1.sources.s1.interceptors = i1 i2 i3 i4
    # 配置Timestamp Interceptor
    # 类型必须是timestamp
    a1.sources.s1.interceptors.i1.type = timestamp
    # 配置Host Interceptor
    # 类型必须是host
    a1.sources.s1.interceptors.i2.type = host
    # 使用IP还是主机名
    a1.sources.s1.interceptors.i2.useIP = true
    # 配置Static Interceptor
    # 类型必须是static
    a1.sources.s1.interceptors.i3.type = static
    # 指定键
    a1.sources.s1.interceptors.i3.key = class
    # 指定值
    a1.sources.s1.interceptors.i3.value = flume
    # 配置UUID Interceptor
    # 类型是UUIDInterceptor$Builder
    a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Buildera1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000a1.sinks.k1.type = loggera1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

Search And Replace Interceptor

  1. 在使用的时候需要指定正则表达式,会将满足正则表达式的数据替换为指定形式的数据。在替换的时候,只替换body中的数据,不替换headers中的数据!!!

  2. 案例

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1a1.sources.s1.type = http
    a1.sources.s1.port = 8888
    a1.sources.s1.interceptors = i1
    # 类型必须是search_replace
    a1.sources.s1.interceptors.i1.type = search_replace
    # 指定正则表达式
    a1.sources.s1.interceptors.i1.searchPattern = [0-9]
    # 指定替换形式
    a1.sources.s1.interceptors.i1.replaceString = *a1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

Regex Filtering Interceptor

  1. 在使用的时候需要指定正则表达式。通过属性excludeEvents来决定过滤方式。如果excludeEvents的值为true,表示符合正则表达式形式的数据会被过滤掉;如果excludeEvents的值为false,那么表示不符合正则表达式形式的数据会被过滤掉

  2. 案例

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8888
    a1.sources.s1.interceptors = i1
    # 类型必须是regex_filter
    a1.sources.s1.interceptors.i1.type = regex_filter
    # 指定正则表达式
    a1.sources.s1.interceptors.i1.regex = .*[0-9].*
    # 指定过滤方式
    a1.sources.s1.interceptors.i1.excludeEvents = truea1.channels.c1.type = memorya1.sinks.k1.type = loggera1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

Custom Interceptor

  1. 定义一个类实现Interceptor接口,同时还需要提供内部类Builder
//模拟TimeStamp
public class AuthInterceptor implements Interceptor {@Overridepublic void initialize() {}//单条处理@Overridepublic Event intercept(Event event){Map<String, String> headers = event.getHeaders();if(headers.containsKey("time")||headers.containsKey("timestamp"))  return event;//时间格式可自定义headers.put("time", String.valueOf(System.currentTimeMillis()));return event;}//按批处理@Overridepublic List<Event> intercept(List<Event> list) {// 定义集合存储处理之后的数据List<Event> es = new ArrayList<>();for (Event event : es) {Event e = intercept(event);es.add(e);}return es;}@Overridepublic void close() {}//这个权限修饰符手动改为public  。  默认为default,外部程序访问不到
public static class AuthBuilder implements Builder {// 通过这个函数来获取当前拦截器对象@Overridepublic Interceptor build() {return new AuthInterceptor();}// 获取配置@Overridepublic void configure(Context context) {}
}
}
  1. 打成jar包,然后放到Flume的lib目录下

    cd /opt/software/flume-1.11.0/lib/
    rz
    
  2. 格式文件

    cd ../data
    

    在文件中添加

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    a1.sources.s1.interceptors = i1
    a1.sources.s1.interceptors.i1.type = com.fesco.in.AuthInterceptor$AuthBuildera1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000a1.sinks.k1.type = loggera1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

**拦截器的功能:**自动给headers加时间戳。将events按时间在channel中存储在不同文件。给Headers中自动添加该条events的来源主机号;自定义一对kv添加到headers中。将body中的内容按正则匹配并调换。 将body中的内容过滤。 生成UUID唯一标识。

其他

执行流程

Flume执行流程
  1. Source采集数据,数据被采集之后,会交给ChannelProcessor来处理
  2. ChannelProcessor收到数据之后,会将数据交给Interceptor来进行过滤、拦截、替换等操作。需要注意的是,可以存在多个Interceptor,构成拦截器链
  3. Interceptor处理完数据之后,会将数据交给Selector来进行分发。Selector有3种模式:replicatingmultipexingload balancing。根据指定的模式,将数据分发给对应的Channel
  4. Channel收到数据之后,会将数据推送给SinkProcessor。SinkProcessor本质上是一个SinkGroup,需要将一个或者多个Sink绑定到一个组中,支持三种模式:defaultfailoverload balancing
  5. SinkProcessor收到数据之后,将数据按照指定模式推送给Sink,Sink将数据写到目的地

扩展:Flume监控 - Ganglia

概述
  1. 实际过程中,可以使用Ganglia监控Flume的数据流。Ganglia是Berkeley发起的一个开源的集群监控项目,可以检测数以千计的节点的性能
  2. Ganglia包含三个模块
    1. gmond(Ganglia Monitoring Daemon):轻量级的监控服务,需要监控哪一个节点的性能,就在这个节点上安装gmond服务,可以监控当前节点(系统)的各种指标数据:CPU、内存、磁盘、网络等信息
    2. gmetad(Ganglia Meta Daemon):轻量级的汇合服务,可以将监控信息以RRD格式来存储到磁盘上
    3. gweb(Ganglia Web):Ganglia提供的轻量级的可视化服务,本身是使用PHP来开发的,提供了WEB页面,能够使得用户较为直观和简便的查看到节点的性能
安装
  1. 三个节点上都需要安装httpd和php服务

    yum -y install httpd php
    
  2. 三个节点上都需要安装rrd服务

    yum -y install rrdtool perl-rrdtool rrdtool-devel apr-devel
    
  3. 三个节点依赖Epel

    yum -y install epel-release
    
  4. 第一个节点上安装Ganglia

    yum -y install ganglia-gmetad ganglia-gmond ganglia-web
    
  5. 其他两个节点上安装gmond服务

    yum -y install ganglia-gmond
    
  6. 第一个节点上修改ganglia.conf

    vim /etc/httpd/conf.d/ganglia.conf
    

    文件修改如下

    <Location /ganglia># Require local# Require ip 10.1.2.3# Require host example.orgRequire all granted
    </Location>
    
  7. 第一个节点上修改gmetad.conf

    vim /etc/ganglia/gmetad.conf
    

    修改data_source属性的值

    data_source "flume_cluster" hadoop01
    
  8. 三个节点上修改gmond.conf

    vim /etc/ganglia/gmond.conf
    

    修改cluster中的属性值

    cluster {name = "flume_cluster"owner = "unspecified"latlong = "unspecified"url = "unspecified"
    }
    

    修改udp_send_channel中的属性值

    udp_send_channel {#bind_hostname = yes # Highly recommended, soon to be default.# This option tells gmond to use a source address# that resolves to the machine's hostname.  Without# this, the metrics may appear to come from any# interface and the DNS names associated with# those IPs will be used to create the RRDs.# mcast_join = 239.2.11.71# 将监控的信息发送到指定的节点收集host = hadoop01port = 8649ttl = 1
    }
    

    修改udp_recv_channel中的属性值

    udp_recv_channel {# mcast_join = 239.2.11.71port = 8649# 接收任意主机的连接bind = 0.0.0.0retry_bind = true# Size of the UDP buffer. If you are handling lots of metrics you really# should bump it up to e.g. 10MB or even higher.# buffer = 10485760
    }
    
  9. 三个节点启动gmond服务

    systemctl start gmond
    # 查看进程是否启动
    ps -ef | grep -i gmond
    
  10. 在一个节点上启动gmetad和httpd服务

    :systemctl start gmetad
    systemctl start httpd
    
  11. 在浏览器中输入http://IP/ganglia/

监控Flume
  1. 修改Flume的配置

    # 进入Flume的配置目录
    cd /opt/software/flume-1.11.0/conf/
    # 复制文件
    cp flume-env.sh.template flume-env.sh
    # 编辑文件
    vim flume-env.sh
    # 在文件尾部添加
    export JAVA_HOME=/opt/software/jdk1.8
    export JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.root.monitoring.hosts=hadoop01:8649 -Xms100m -Xmx200m"
    # 保存退出,生效
    source flume-env.sh
    
  2. 启动Flume

    cd ../data//type =ganglia      type和=之间有空格会报错//开启监控后,在执行之间的格式文件,就要用这个格式的指令
    flume-ng agent -n a1 -c $FLUME_HOME/conf -f basic.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=hadoop01:8649
    
  3. 属性解释

    属性解释
    ChannelCapacityChannel的容量
    ChannelFillPercentageChannel的利用率
    ChannelSizeChannel的大小
    EventPutAttemptCountPutList向Channel尝试推送数据的次数
    EventPutSuccessCountPutList向Channel推送数据成功的次数
    EventTakeAttemptCountTakeList向Sink推送数据的次数
    EventTakeSuccessCountTakeList向Sink推送数据成功的次数
    StartTime起始时间
    StopTime结束时间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/771939.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

芒果YOLOv8改进130:Neck篇,即插即用,CCFM重构跨尺度特征融合模块,构建CCFM模块,助力小目标检测涨点

芒果专栏 基于 CCFM 的改进结构,改进源码教程 | 详情如下🥇 💡本博客 改进源代码改进 适用于 YOLOv8 按步骤操作运行改进后的代码即可 即插即用 结构。博客 包括改进所需的 核心结构代码 文件 YOLOv8改进专栏完整目录链接:👉 芒果YOLOv8深度改进教程 | 🔥 订阅一个…

HarmonyOS 健康系统联系案例 创建项目

上文 HarmonyOS 健康系统联系案例 整体原型图介绍 我们 介绍了健康系统的整体 UI 然后 我们一点一点来 今天先搭个环境 首先 我们打开开发工具首页 创建项目 一个非常令人怀念的步骤啊 我们点击 Create Project 创建一个新的工程 模板 还是选最基础的 Empty Ability 然后 …

Docker系列

目录 练习&#xff1a;去DockerHub搜索并拉取一个Redis镜像 docker下载nacos 练习&#xff1a;去DockerHub搜索并拉取一个Redis镜像 目标&#xff1a; 1&#xff09;去DockerHub搜索Redis镜像 2&#xff09;查看Redis镜像的名称和版本 3&#xff09;利用docker pull命令…

Java异常知识点详解

目录 1. 异常的概念与体系结构 1.1 异常的概念 1. 算术异常 2. 数组越界异常 3. 空指针异常 1.2 异常的体系结构 1.3 异常的分类 2. 异常的处理 2.1 防御式编程 2.2 异常的抛出 2.3 异常声明throws 2.4 try-catch捕获并处理 2.5 finally 2.4 异常的处理流程 3. 自…

Linux基本指令解析二

Linux基本指令解析二 常见指令1.date指令2.find指令3.grep指令4.zip/unzip指令5.tar指令6.bc指令7.uname –r指令 重要的几个热键关机 常见指令 1.date指令 date 指定格式显示时间&#xff1a; date %Y:%m:%d date 用法&#xff1a;date [OPTION]... [FORMAT] 1.在显示方面…

【Docker】Docker安全与最佳实践:保护你的容器化应用程序

欢迎来到英杰社区&#xff1a; https://bbs.csdn.net/topics/617804998 欢迎来到阿Q社区&#xff1a; https://bbs.csdn.net/topics/617897397 &#x1f4d5;作者简介&#xff1a;热爱跑步的恒川&#xff0c;致力于C/C、Java、Python等多编程语言&#xff0c;热爱跑步&#xff…

PostgreSQL数据库中表的物理大小, 妙懂

数据库中表的物理大小 这是一个很有意思的话题。尤其是在我们做物理设计和空间大小评估的时候。 PostgreSQL中对于稍长一点的列&#xff0c;直接使用了TOAST表来存储&#xff0c;默认是会对表中的数据进行压缩的。关于TOAST, 以后或有时间专门做简单介绍。 先看看相关函数的定…

javaWeb学生宿舍管理系统

一、摘要 本博客介绍了如何使用Spring Boot和MySQL构建一个功能完善的JavaWeb学生宿舍管理系统。该系统分为三个角色&#xff1a;管理员、宿管和学生。管理员拥有对整个系统的全面管理权限&#xff0c;包括学生管理、宿舍管理、入住管理和管理员管理&#xff1b;宿管负责宿舍的…

3.Python数据分析—数据分析入门知识图谱索引(知识体系中篇)

3.Python数据分析—数据分析入门知识图谱&索引-知识体系中篇 一个人简介二数据获取和处理2.1 数据来源&#xff1a;2.2 数据清洗&#xff1a;2.2.1 缺失值处理&#xff1a;2.2.2 异常值处理&#xff1a; 2.3 数据转换&#xff1a;2.3.1 数据类型转换&#xff1a;2.3.2 数据…

【Java程序设计】【C00360】基于Springboot的考研互助交流平台(有论文)

基于Springboot的考研互助交流平台&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 项目获取 &#x1f345;文末点击卡片获取源码&#x1f345; 开发环境 运行环境&#xff1a;推荐jdk1.8&#xff1b; 开发工具&#xff1a;eclipse以及i…

java常用应用程序编程接口(API)——IO流概述及字节流的使用

前言&#xff1a; IO流和File是用于把数据存放在硬盘里的工具。File可以把文件存至硬盘&#xff0c;但不能更改里面的数据。通过IO流可以改写硬盘里的数据。整理下学习笔记&#xff0c;打好基础&#xff0c;daydayup!!! IO流 I指Input&#xff0c;称为输入流&#xff1a;负责把…

智慧医疗包括哪些方面?智慧医疗发展前景如何?

近年来&#xff0c;随着云计算、物联网&#xff08;internet of things&#xff0c;IOT&#xff09;、移动互联网、大数据、人工智能&#xff08;artificial intelligence&#xff0c;AI&#xff09;、5G网络、区块链等新一代信息技术的逐步成熟和广泛应用&#xff0c;信息化已…

【码云Git提交】Windows

一、第一次提交 1.登录码云创仓库 2.观察创建后的提示&#xff0c;就有步骤命令了 3.我们在系统中打开一个测试文件夹窗口打开GitBash PS&#xff1a;&#xff08;你需要提前装一个Node&#xff0c;本章不介绍&#xff09; 我们打开一个创建的test测试文件夹窗口&#xff0c;…

阿里 Modelscope 创空间部署在本地环境操作文档

创建创空间的步骤直接跳过。 备注:我的电脑是Windows 第一步&#xff1a;获取创空间代码&#xff0c;直接下载代码太慢了&#xff0c;建议通过git获取代码 第二步:复制链接,打开cmd 直接粘贴回车下载。下载完之后的到了我的Service-Assistant文件夹。再git clone https://gith…

可编程液冷负载的核心功能

可编程液冷负载核心功能在于根据设备的工作状态和环境温度&#xff0c;自动调整冷却液的流量和温度&#xff0c;以实现精确的散热控制。这种技术以其高效、智能的特性&#xff0c;为多个领域提供了全新的散热解决方案。 可编程液冷负载的核心功能在于其可编程性&#xff0c;这意…

【倪琴仲尼式-雷伴】全新倪诗韵精品杉木古琴

试音中的用弦&#xff1a;梦音&#xff0c;视频录音无任何处理&#xff0c;所见即所得。 现琴比照片更好看。倪琴吊牌、琴额后面的编码和倪琴官网上的序列号是一一对应的&#xff0c;可查。 雷伴&#xff0c;“伴”字取意陪伴、相伴、依随。栗壳色&#xff0c;纯鹿角霜生漆工艺…

解决“ModuleNotFoundError: No module named ‘transformers’”错误的全面指南

一、问题背景与原因 在Python编程中&#xff0c;ModuleNotFoundError是一个常见的错误&#xff0c;表明解释器无法在指定的路径或Python环境中找到所需的模块。特别是当我们尝试导入像transformers这样的第三方库时&#xff0c;如果库没有被正确安装&#xff0c;就会遇到这样的…

稀碎从零算法笔记Day28-LeetCode:零钱兑换

前言&#xff1a;鸽了好多天了哈哈哈&#xff0c;虽然C站没更但是LC还是坚持刷的&#xff0c;任重道远啊&#xff01;(可恶的寝室熄灯) 题型&#xff1a;动态规划 链接&#xff1a;322. 零钱兑换 - 力扣&#xff08;LeetCode&#xff09; 来源&#xff1a;LeetCode 题目描述…

紫鸾5.0:紫光云新一代敏捷应用开发平台全家桶

曾几何时&#xff0c;“瀑布式”占据了二十世纪软件开发的主流&#xff0c;开发时间往往以年计&#xff0c;一款软件应用动辄几年才能交付。而随着社会生产力的跃升&#xff0c;“瀑布式”已严重跟不上时代的节奏&#xff0c;2001年&#xff0c;“敏捷宣言”的发布&#xff0c;…

微信小程序使用Vant组件库流程

目前 Vant 官方提供了 Vue 2 版本、Vue 3 版本和微信小程序版本&#xff0c;并由社区团队维护 React 版本和支付宝小程序版本。这样开发原生微信小程序的会方便很多。 官方网址&#xff1a;Vant Weapp - 轻量、可靠的小程序 UI 组件库 步骤一 通过 npm 安装 npm i vant/weap…