flume channel和interceptor简介及官方用例

一、Flume Channels

channel是在代理上暂存事件的存储库。Source 添加事件,Sink 将其删除。

1、Memory Channel

事件存储在具有可配置最大大小的内存中队列中。它非常适合需要更高吞吐量的流,但在agent发生故障时会丢失暂存数据

Property Name

Default

Description

type

The component type name, needs to be memory

Example for agent named a1:

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 10000

a1.channels.c1.byteCapacityBufferPercentage = 20

a1.channels.c1.byteCapacity = 800000

2、JDBC Channel

事件存储在由数据库支持的持久性存储中。JDBC 通道目前支持嵌入式 Derby。这是一个持久的通道,非常适合可恢复性很重要的流。

Property Name

Default

Description

type

The component type name, needs to be jdbc

Example for agent named a1:

a1.channels = c1

a1.channels.c1.type = jdbc

3、Kafka Channel

这些事件存储在 Kafka 集群中(必须单独安装)。Kafka 提供高可用性和复制,因此,如果agent或 Kafka 崩溃,这些事件会立即提供给其他sink

Kafka 通道可用于多种方案:

1. 使用 Flume source和sink - 它为事件提供了一个可靠且高度可用的通道

2. 使用 Flume source和interceptor,但没有sink - 它允许将 Flume 事件写入 Kafka topic,供其他应用程序使用

3. 使用 Flume sink,但没有source - 这是一种低延迟、高容错的方式,可以将事件从 Kafka 发送到 Flume sink,例如 HDFS、HBase 或 Solr

目前支持 Kafka 服务器版本 0.10.1.0 或更高版本。测试完成到 2.0.1,这是发布时最高的可用版本。

配置参数的组织方式如下

1. 与通道相关的配置值一般应用于通道配置级别,例如:a1.channel.k1.type =

2. 与 Kafka 或通道运行方式相关的配置值以“kafka”为前缀(这与 CommonClient 配置无关),例如:a1.channels.k1.kafka.topic 和 a1.channels.k1.kafka.bootstrap.servers。这与 hdfs 接收器的运行方式没有什么不同

3. 特定于生产者/消费者的属性以 kafka.producer 或 kafka.consumer 为前缀

4. 在可能的情况下,使用 Kafka 参数名称,例如:bootstrap.servers 和 acks

Property Name

Default

Description

type

The component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel

kafka.bootstrap.servers

List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port

Example for agent named a1:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092

a1.channels.channel1.kafka.topic = channel1

a1.channels.channel1.kafka.consumer.group.id = flume-consumer

4、File Channel

 默认情况下,文件通道使用用户主目录内的检查点和数据目录的路径。因此,如果代理中有多个文件通道实例处于活动状态,则只有一个实例能够锁定目录并导致另一个通道初始化失败。因此,有必要提供所有已配置通道的显式路径,最好是在不同的磁盘上。此外,由于文件通道将在每次提交后同步到磁盘,因此可能需要将其与将事件批处理在一起的sink/source耦合,以便在多个磁盘不可用于检查点和数据目录的情况下提供良好的性能。

Property Name Default

Description

 

type

The component type name, needs to be file.

Example for agent named a1:

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /mnt/flume/checkpoint

a1.channels.c1.dataDirs = /mnt/flume/data

二、Flume Channel Selectors¶

通道选择器,如果未指定类型,则默认为“复制”。

1、Replicating Channel Selector (default)

Property Name

Default

Description

selector.type

replicating

The component type name, needs to be replicating

selector.optional

Set of channels to be marked as optional

Example for agent named a1 and it’s source called r1:

a1.sources = r1

a1.channels = c1 c2 c3

a1.sources.r1.selector.type = replicating

a1.sources.r1.channels = c1 c2 c3

a1.sources.r1.selector.optional = c3

在上面的配置中,c3 是可选通道。写入 c3 失败不会对事务产生影响。而 c1 和 c2 未标记为可选(optional),写入c1 和 c2失败将导致事务失败。

2、Load Balancing Channel Selector

负载平衡通道选择器提供了在多个通道上对流量进行负载均衡的能力。这 有效地允许在多个线程上处理传入数据。它维护一个索引列表,该列表必须在其上分配负载。实现支持使用round_robin或随机选择机制分配负载。选择机制的选择默认为round_robin类型,但可以通过配置进行覆盖。

Property Name

Default

Description

selector.type

replicating

The component type name, needs to be load_balancing

selector.policy

round_robin

Selection mechanism. Must be either round_robin or random.

Example for agent named a1 and it’s source called r1:

a1.sources = r1

a1.channels = c1 c2 c3 c4

a1.sources.r1.channels = c1 c2 c3 c4

a1.sources.r1.selector.type = load_balancing

a1.sources.r1.selector.policy = round_robin

3、Multiplexing Channel Selector

多路复用通道选择器

Property Name

Default

Description

selector.type

replicating

The component type name, needs to be multiplexing

selector.header

flume.selector.header

selector.default

selector.mapping.*

Example for agent named a1 and it’s source called r1:

a1.sources = r1

a1.channels = c1 c2 c3 c4

a1.sources.r1.selector.type = multiplexing

#header的值对应自定义Interceptor中header的key

a1.sources.r1.selector.header = state

# CZ、US对应自定义Interceptor中header的value

a1.sources.r1.selector.mapping.CZ = c1

a1.sources.r1.selector.mapping.US = c2 c3

a1.sources.r1.selector.default = c4

三、Flume Interceptors

Flume 能够在数据传输中修改/删除事件。这是在拦截器Interceptors的帮助下完成的。拦截器是实现 org.apache.flume.interceptor.Interceptor 接口的类。拦截器可以根据拦截器开发人员选择的任何条件修改甚至删除事件。Flume 支持拦截器的链接。这是通过在配置中指定拦截器生成器类名列表来实现的。拦截器在源配置中被指定为空格分隔列表。配置中的拦截器的顺序即是调用拦截器的顺序。一个拦截器返回的事件列表将传递给链中的下一个拦截器。拦截器可以修改或删除事件。如果拦截器需要删除事件,则它不会在返回的列表中返回该事件。如果要删除所有事件,则仅返回一个空列表。拦截器是需要自编的组件,下面是一个示例

1、自定义interceptors

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# 先调用i1再调用i2

a1.sources.r1.interceptors = i1 i2

#这里自编的拦截器名为HostInterceptor

a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder

a1.sources.r1.interceptors.i1.preserveExisting = false

a1.sources.r1.interceptors.i1.hostHeader = hostname

a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d

a1.sinks.k1.channel = c1

请注意,拦截器生成器将传递给 type config 参数。拦截器本身就是 可配置,并且可以传递配置值,就像传递给任何其他可配置组件一样。 在上面的示例中,事件首先传递给 HostInterceptor,然后由 HostInterceptor 返回事件 然后传递给 TimestampInterceptor。可以指定完全限定的类名 (FQCN) 或别名时间戳。如果您有多个收集器写入同一 HDFS 路径,则还可以使用 HostInterceptor。

2、Timestamp Interceptor

此拦截器将处理事件的时间(以毫秒)插入到事件标头中。这个拦截器 插入一个具有键 timestamp (或由 header 属性指定) 的标头,其值为相关时间戳。 如果配置中已存在现有时间戳,则此侦听器可以保留该时间戳。

Property Name

Default

Description

type

The component type name, has to be timestamp or the FQCN

headerName

timestamp

The name of the header in which to place the generated timestamp.

preserveExisting

false

If the timestamp already exists, should it be preserved - true or false

Example for agent named a1:

a1.sources = r1

a1.channels = c1

a1.sources.r1.channels =  c1

a1.sources.r1.type = seq

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

3、Host Interceptor

此侦听器插入运行此代理的主机的主机名或 IP 地址。它插入一个标题 使用密钥主机或已配置的密钥,其值为主机的主机名或 IP 地址(基于配置)

Property Name

Default

Description

type

The component type name, has to be host

preserveExisting

false

If the host header already exists, should it be preserved - true or false

useIP

true

Use the IP Address if true, else use hostname.

hostHeader

host

The header key to be used.

Example for agent named a1:

a1.sources = r1

a1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = host

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/14218.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k近邻和kd树

K近邻 选取k值的时候可以采用交叉验证的方法 一般采用欧氏距离 kd树 采用树这个特殊的数据结构来实现k近邻算法 先假设是二维的情况 下面讲解kd树的完整构造过程 找这个中位数是按照每棵子树来创建的 前提是已经有了一棵kd树,然后来一个实例点

java组合设计模式Composite Pattern

组合设计模式(Composite Pattern)是一种结构型设计模式,它允许你将对象组合成树形结构来表示“部分-整体”的层次结构。组合模式使得客户端对单个对象和组合对象的使用具有一致性。 // Component - 图形接口 interface Graphic {void draw()…

Python UDP编程简单实例

TCP是建立可靠的连接,并且通信双方都可以以流的形式发送数据。 相对于TCP,UDP则是面向无连接的协议,不需要建立连接,只需要知道对方IP地址和端口号,就可以直接发送数据包。但是只管发送不保证到达。 虽然UDP传输数据…

Docker快速部署Seata的TC服务以及微服务引入Seata教程

目录 一、使用docker部署Seata的TC服务 1、拉取TC服务镜像 2、创建并运行容器 ​3、修改配置文件 4、在Nacos中添加TC服务的配置 5、重启TC服务 二、微服务集成Seata 1、引入依赖 2、修改配置文件 Seata是阿里的一个开源的分布式事务解决方案,能够为分布…

STM32学习和实践笔记(31):输入捕获实验

1.输入捕获介绍 在定时器中断实验章节中我们介绍了通用定时器具有多种功能,输入捕获就是其中一种。STM32F1除了基本定时器TIM6和TIM7,其他定时器都具有输入捕获功能。输入捕获可以对输入的信号的上升沿,下降沿或者双边沿进行捕获,…

【博客主页】博客主旨 精华

前言 与博客园不同, 最近CSDN在进行资本化的转型.其一部分的VIP代码和小册我也有相关消费, 个人认为是一部分做的比较成过, 另一部分又不是特别成功. 其CSDN博客已经失去其原本技术交流的意义, 变成一种免费的知识引流和收费交流. 这其实与我们的开源社区背道而驰, 但是又吸引…

世界电信日 | 紫光展锐以科技创新支撑数字经济可持续发展

专注科技创新,打造全球数字经济技术基石 紫光展锐坚持科技创新,为数字经济蓬勃发展提供基石力量。 面对5G-A技术的巨大潜力,紫光展锐与众多生态伙伴紧密合作,积极推动5G-A的商用进程。紫光展锐提出的两项R18 eRedCap演进方案已被3GPP标准采…

为什么要实现设备之间的互通?

设备之间的互通是电信设备的普遍性要求,特别是在接入网领域中,不同厂商的局端设备与用户端(终端)设备之间的互通显得尤其重要。 一、互通能为产业链的各个环节带宽积极影响。 (1)对用户而言,互…

安装新版的Ubuntu WSL以使能BBR拥塞控制算法

【多次尝试成功的方案】通过> wsl - -list -online列出可以安装的版本,用命令> wsl --install -d Ubuntu-24.04 安装。 【未成功的方案】通过挂在ubuntu24.04.iso到E盘后,用命令> wsl --import Ubuntu24.04 C:\WSL\Ubuntu24.04\ E:\ --versio…

Redis系统架构中各个处理模块是干什么的?no.19

Redis 系统架构 通过前面的学习,相信你已经掌握了 Redis 的原理、数据类型及访问协议等内容。本课时,我将进一步分析 Redis 的系统架构,重点讲解 Redis 系统架构的事件处理机制、数据管理、功能扩展、系统扩展等内容。 事件处理机制 Redis…

API-BigInteger、BigDecimal

BigInteger: demo1: package BigInteger;import java.math.BigInteger; import java.util.Random;public class demo1 {public static void main(String[] args) {//获取一个随机最大整数BigInteger bd1 new BigInteger(5, new Random());System.out.println(bd1…

SSMP整合案例第一步 制作分析模块创建与开发业务实体类

制作分析 我们要实现一个模块的增删改查 实际开发中mybatisplus用的不多,他只能对没有外键的单表进行简单的查询 但在这个案例中我们还是选择mybatisplus开发 模块创建 我们把所有服务器都放在一起 就不用前后端分离 我们尝试用后端开发进行全栈开发 新建项目添…

macos brew安装多版本protobuf,切换指定版本protobuf 为默认版本方法

protobuf 不同的版本语法相差很大, 而在不同的项目中可能使用的protobuf版本也不同,所以我们的电脑就可能需要安装多个版本的protobuf, 下面介绍macos下如何通过brew安装多版本和设置想要的默认版本的方法 安装,则可以先执行 bre…

Thinkphp3.2.3网站后台不能访问如何修复

我是使用Thinkphp3.2.3新搭建的PHP网站,但是网站前台可以访问,后台访问出现如图错误: 由于我使用的Hostease的Linux虚拟主机产品默认带普通用户权限的cPanel面板,对于上述出现的问题不清楚如何处理,因此联系Hostease的…

(3)医疗图像处理:MRI磁共振成像-快速采集--(杨正汉)

目录 一、磁共振快速采集技术基础 1.K空间的基本特点 2.快速成像的理由: 3.快速成像的硬件要求: 二、磁共振快速采集技术 1.采集更少的相位编码线 2.平行采集技术PAT 3.其他与快速采集有关的技术 1)部分回波技术 2)频率…

java实现一个动态监控系统,监控接口请求超时的趋势

目录 整体思路案例实现1. 数据收集2. 数据聚合3. 趋势分析4. 异常检测5. 异常处理定时任务 整体思路 理想情况下,你可以实现一个简单的动态监控算法来检测渠道请求的响应时间趋势,并在发现频繁超时的情况下进行处理。以下是一个可能的算法框架&#xff…

Oracle表关联更新几种方法

1、测试表及数据准备 create table T_update01(ID int ,infoname varchar2(32),sys_guid varchar2(36)); create table T_update02(ID int ,infoname varchar2(32),sys_guid varchar2(36));insert into T_update01 select 1,N1_updateName,sys_guid() from dual union select …

java如何获取IP和IP的归属地?

在Java中,获取IP地址通常指的是获取本地机器的IP地址或者通过某种方式(如HTTP请求)获取的远程IP地址。代码案例如下: 而要获取IP的归属地(地理位置信息),则通常需要使用第三方IP地址查询服务,我…

c++ 排序算法merge使用要求

在C中&#xff0c;std::merge是一个算法&#xff0c;它用于合并两个已排序的范围&#xff08;例如数组或容器中的一部分&#xff09;到一个新的范围中。这个函数在<algorithm>头文件中定义。 输入范围必须已排序 std::merge要求输入的两个范围都必须是已排序的&#xf…

23种设计模式顺口溜

口诀&#xff1a; 原型 抽风 &#xff0c;单独 建造 工厂 &#xff08;寓意&#xff1a;&#xff08;这里代指本来很简单的东西&#xff0c;却要干工厂这里复杂的业务&#xff09; 抽风&#xff1a;抽象工厂单独&#xff1a;单例桥代理组合享元适配器&#xff0c;&#xff0…