【大数据学习 | flume】flume Sink Processors与拦截器Interceptor

1. Failover Sink Processor

故障转移处理器可以同时指定多个sink输出,按照优先级高低进行数据的分发,并具有故障转移能力。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-2
a1.sinks.k1.port = 55555a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

第二台和第三台agent编写如下:

a1.sources=r1
a1.sinks=k1
a1.channels=c1a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=loggera1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,由于第三台节点的优先级高,所以第三台会打印数据到控制台

如果此时第三台flume宕机,则会将数据发送到优先级略低的第二台服务器上

2. Load balancing Sink Processor

负载平衡处理器提供了在多个sink负载平衡流量的能力。支持两种模式:round_robin and random 。round_robin 可以将数据负载均衡到多个sink上,random支持随机分发到不同的sink上。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-2
a1.sinks.k1.port = 55555a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = randoma1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

第二台和第三台agent编写如下:

a1.sources=r1
a1.sinks=k1
a1.channels=c1a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=loggera1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,第二台和第一台会随机收集数据

还支持轮询分发数据到两个sink中,这里的轮询是的是sink的轮询,不是event的轮询。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-2
a1.sinks.k1.port = 55555a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robina1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

3. Multiplexing Channel Selector

多路复用信道选择器,source是通过 event header 来决定传输到哪一个 channel。

比如:一个日志文件(多个系统的日志都在该文件中),根据日志中某个字段值,比如type=1,是系统A日志,sink to hdfs;type=2,是系统B日志,sink to kafka,此时就可以使用Flume多路复用,通过event header 来决定传输到哪个Channel

a1.sources=r1
a1.sinks=k1 k2 
a1.channels=c1  c2a1.sources.r1.type=http
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.1= c1
a1.sources.r1.selector.mapping.2 = c2
a1.sources.r1.selector.default = c2a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=100a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555a1.sinks.k2.type=avro
a1.sinks.k2.hostname = worke-2
a1.sinks.k2.port = 55555a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

测试:

通过http协议并携带type头信息,测试type=1,type=2,type=3去往哪一台服务器

第二台服务器接收:

4. Interceptor拦截器

拦截器可以将flume收集到的event进行拦截,并使用对应的拦截器,对event进行简单修改,过滤。同时可以配置多个拦截器实现不同的功能,按照配置的先后顺序进行拦截处理。

常见的 Interceptor描述
timestamp Interceptor给event的头信息中添加时间戳
Static Interceptor给event的头信息中添加自定义键值
Host Interceptor给event的头信息中添加主机名或者ip信息
Search and Replace Interceptor拦截信息进行匹配和替换
Regex Filtering Interceptor拦截信息进行过滤

5. Timestamp Interceptor

此拦截器将插入事件标头,即它处理事件的时间(毫秒)到event中。

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.142.160
a1.sources.r1.port = 22222
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

6. Host Interceptor

此拦截器器插入主机的主机名或IP地址。插入带有key为host标头,值是主机的主机名称或IP地址。

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.142.160
a1.sources.r1.port = 22222
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

7. Static Interceptor

静态拦截器允许用户将带有静态值的静态标头附加到所有事件。

当前实现不允许同时指定多个标头。相反,用户可以使用多个静态拦截器,每个拦截器定义一个静态标头。

# Static Interceptor
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = name
a1.sources.r1.interceptors.i1.value = zhangsan#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

8. Search and Replace Interceptor

这个拦截器基于Java正则表达式提供了简单的基于字符串的搜索和替换功能

# Search and Replace Interceptor
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern = [a-z]
a1.sources.r1.interceptors.i1.replaceString =*#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

9. Regex Filtering Interceptor

该拦截器通过将event解释为文本并将文本与配置的正则表达式匹配来选择性地过滤事件。提供的正则表达式可用于包含事件或排除事件。

# Regex Filtering Interceptor
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=regex_filter
a1.sources.r1.interceptors.i1.regex=^jp.*
a1.sources.r1.interceptors.i1.excludeEvents=true#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

10. Regex Extractor Interceptor

此拦截器使用指定的正则表达式提取正则表达式匹配组,并将匹配组作为标头附加到事件上。

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555
a1.sources.r1.interceptors = i1  
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = (^[a-zA-Z]*)\\s([0-9]*$)
a1.sources.r1.interceptors.i1.serializers = s1 s2
# key name
a1.sources.r1.interceptors.i1.serializers.s1.name = word
a1.sources.r1.interceptors.i1.serializers.s2.name = digital 
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

测试:

收到:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/59749.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 字典应用

using System;using System.Collections.Generic;class Program{static void Main(){// 创建一个字典&#xff0c;键是字符串类型&#xff0c;值是整数类型Dictionary<string, int> studentScores new Dictionary<string, int>();// 向字典中添加键值对// 原理&am…

如何从头开始构建神经网络?(附教程)

随着流行的深度学习框架的出现&#xff0c;如 TensorFlow、Keras、PyTorch 以及其他类似库&#xff0c;学习神经网络对于新手来说变得更加便捷。虽然这些框架可以让你在几分钟内解决最复杂的计算任务&#xff0c;但它们并不要求你理解背后所有需求的核心概念和直觉。如果你知道…

Conda安装与使用中的若干问题记录

Conda安装与使用中的若干问题记录 1.Anaconda 安装失败1.1.问题复述1.2.问题解决&#xff08;安装建议&#xff09; 2.虚拟环境pip install未安装至本虚拟环境2.1.问题复述2.2.问题解决 3.待补充 最近由于工作上的原因&#xff0c;要使用到Conda进行虚拟环境的管理&#xff0c;…

『OpenCV-Python』视频的读取和保存

点赞 + 关注 + 收藏 = 学会了 推荐关注 《OpenCV-Python专栏》 上一讲介绍了 OpenCV 的读取图片的方法,这一讲简单聊聊 OpenCV 读取和保存视频。 视频的来源主要有2种,一种是本地视频文件,另一种是实时视频流,比如手机和电脑的摄像头。 要读取这两种视频的方法都是一样的…

python关键字和内置函数有哪些?

Python关键字 Python 是一种高级编程语言&#xff0c;具有许多关键字。关键字是语言的保留字&#xff0c;它们在语法上具有特殊的含义&#xff0c;不能用作变量名、函数名或其他标识符。以下是 Python 的一些主要关键字&#xff1a; False - 布尔值假None - 空值或无值True -…

docker构建多平台容器

1.创建builder配置文件 buildkitd.toml debug true [registry."docker.io"] #mirrors ["hub.dvcloud.xin"] http true insecure true2.定义需要构建的平台 platform"linux/amd64,linux/arm64" 3.创建builder if ! docker buildx ls |g…

SQL 中 BETWEEN AND 用于字符串的理解

SQL 中 BETWEEN AND 用于字符串的理解 在 SQL 中&#xff0c;BETWEEN AND 关键字可以用在数值和日期类型上&#xff0c;非常好理解。同时也可以用于字符串类型&#xff0c;它用于选择在两个指定值之间的数据&#xff0c;包括边界值。本文主要总结一下BETWEEN AND用于string类型…

字节青训-字符串字符类型排序问题、小C点菜问题

目录 一、字符串字符类型排序问题 题目 样例 输入&#xff1a; 输出&#xff1a; 输入&#xff1a; 输出&#xff1a; 输入&#xff1a; 输出&#xff1a; 解题思路&#xff1a; 问题理解 数据结构选择 算法步骤 最终代码&#xff1a; 运行结果&#xff1a; ​…

ES数据迁移方式

elasticdump 需要安装elasticdump &#xff0c;node插件 #!/bin/bashindexes("index1" "index2")for index in "${indexes[]}" doecho "backup ${index} start"#--type: 迁移类型&#xff0c;默认为 data&#xff0c;表明只迁移数据…

深入理解接口测试:实用指南与最佳实践5.0(二)

✨博客主页&#xff1a; https://blog.csdn.net/m0_63815035?typeblog &#x1f497;《博客内容》&#xff1a;.NET、Java.测试开发、Python、Android、Go、Node、Android前端小程序等相关领域知识 &#x1f4e2;博客专栏&#xff1a; https://blog.csdn.net/m0_63815035/cat…

CSS基础知识05(弹性盒子、布局详解,动画,3D转换,calc)

目录 0、弹性盒子、布局 0.1.弹性盒子的基本概念 0.2.弹性盒子的主轴和交叉轴 0.3.弹性盒子的属性 flex-direction row row-reverse column column-reverse flex-wrap nowrap wrap wrap-reverse flex-dirction和flex-wrap的组合简写模式 justify-content flex-s…

任务调度工具Spring Test

Spring Task 是Spring框架提供的任务调度工具&#xff0c;可以按照约定的时间自动执行某个代码逻辑。 作用&#xff1a;定时自动执行某段Java代码 应用场景&#xff1a; 信用卡每月还款提醒 银行贷款每月还款提醒 火车票售票系统处理未支付订单 入职纪念日为用户发送通知 一.…

微信小程序实战篇-分类页面制作

一、项目背景与目标 在微信小程序开发中&#xff0c;分类页面是一个常见且重要的功能模块。它能够帮助用户快速定位和浏览不同类别的商品或信息&#xff0c;提升用户体验和操作效率。今天&#xff0c;我们将深入探讨如何制作一个实用的微信小程序分类页面&#xff0c;先来看一下…

.NET 9 中 IFormFile 的详细使用讲解

在.NET应用程序中&#xff0c;处理文件上传是一个常见的需求。.NET 9 提供了 IFormFile 接口&#xff0c;它可以帮助我们轻松地处理来自客户端的文件上传。以下是 IFormFile 的详细使用讲解。 IFormFile 接口简介 IFormFile 是一个表示上传文件的接口&#xff0c;它提供了以下…

嵌入式硬件杂谈(二)-芯片输入接入0.1uf电容的本质(退耦电容)

引言&#xff1a;对于嵌入式硬件这个庞大的知识体系而言&#xff0c;太多离散的知识点很容易疏漏&#xff0c;因此对于这些容易忘记甚至不明白的知识点做成一个梳理&#xff0c;供大家参考以及学习&#xff0c;本文主要针对芯片输入接入0.1uf电容的本质的知识点的进行学习。 目…

数据结构(单向链表——c语言实现)

链式存储的优缺点&#xff1a; 优点&#xff1a; 1、动态分配内存&#xff1a; 链式存储不需要在数据插入之前分配固定大小的数组或内存块&#xff0c;因此它更适合存储动态变化的数据 2、高效的插入和删除操作&#xff1a; 在链表中插入或删除元素只需要调整相邻节点的指…

基于Spring Boot的电子商务平台架构

2 相关技术 2.1 SpringBoot框架介绍 Spring Boot是一种不需要代码生成的一种框架&#xff0c;并且可以不需要配置任何的XML文件就可以&#xff0c;因为Spring Boot里面自带了很多接口&#xff0c;只需要配置不同的接口就会自动的应用并且识别需要的依赖&#xff0c;在配置方面非…

高斯数据库Postgresql死锁和锁表解决方法

解决死锁进方法&#xff1a; 查询死锁进程列表 select * from pg_stat_activity where waiting‘t’ 发现有好几条挂起的记录&#xff0c;记录下所有或需要解锁的pid 解决死锁进程 select pg_cancel_backend(‘pid值’) 解决完后&#xff0c;刷新后测试&#xff0c;恢复正…

Level DB --- Block

class Block class Block是Level DB里面的重要数据结构&#xff0c;该数据结构用来承载已经存储到文件中的数据。已经被存储的数据当需要再次加载、应用&#xff08;例如搜索&#xff09;&#xff0c;这时首先要把数据加载、初始化到class Block里面。 数据的组织形式&#x…

梯度下降和梯度上升的区别

目录 梯度下降梯度上升总结 梯度下降 定义和目的&#xff1a; 梯度下降是一种优化算法&#xff0c;用于最小化一个目标函数 J ( θ ) J(\theta) J(θ)。常用于监督学习&#xff0c;表示模型预测和实际结果之间的误差。梯度下降的目的是找到使损失函数最小化的参数 θ \theta…