Hadoop生态Flume(三)拦截器(Interceptor)介绍与使用(1)

转载自 Flume中的拦截器(Interceptor)介绍与使用(一)

Flume中的拦截器(interceptor)

用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。这在实际业务场景中非常有用.

Flume-ng 1.6中目前提供了以下拦截器:

Timestamp Interceptor;
Host Interceptor;
Static Interceptor;
UUID Interceptor;
Morphline Interceptor;
Search and Replace Interceptor;
Regex Filtering Interceptor;
Regex Extractor Interceptor;

本文对常用的几种拦截器进行学习和介绍,并附上使用示例。

本文中使用的Source为TaildirSource,就是监控一个文件的变化,将内容发送给Sink,具体可参考《Flume中的TaildirSource》.

Source配置如下:

#-->设置sources名称
agent_lxw1234.sources = sources1
#--> 设置channel名称
agent_lxw1234.channels = fileChannel
#--> 设置sink 名称
agent_lxw1234.sinks = sink1# source 配置
agent_lxw1234.sources.sources1.type = com.lxw1234.flume17.TaildirSource
agent_lxw1234.sources.sources1.positionFile = /tmp/flume/agent_lxw1234_position.json
agent_lxw1234.sources.sources1.filegroups = f1
agent_lxw1234.sources.sources1.filegroups.f1 = /tmp/lxw1234_.*.log
agent_lxw1234.sources.sources1.batchSize = 100
agent_lxw1234.sources.sources1.backoffSleepIncrement = 1000
agent_lxw1234.sources.sources1.maxBackoffSleep = 5000
agent_lxw1234.sources.sources1.channels = fileChannel

Flume Source中使用拦截器的相关配置如下:

## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1 i2
agent_lxw1234.sources.sources1.interceptors.i1.type = host
agent_lxw1234.sources.sources1.interceptors.i1.useIP = false
agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost
agent_lxw1234.sources.sources1.interceptors.i2.type = timestamp

对一个Source可以使用多个拦截器。

 

一、Timestamp Interceptor

时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多。比如在使用HDFS Sink时候,根据events的时间戳生成结果文件,hdfs.path = hdfs://cdh5/tmp/dap/%Y%m%d

hdfs.filePrefix = log_%Y%m%d_%H

会根据时间戳将数据写入相应的文件中。

但可以用其他方式代替(设置useLocalTimeStamp = true)。

 

二、Host Interceptor

主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)。

根据上面的Source,拦截器的配置如下:

## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = host
agent_lxw1234.sources.sources1.interceptors.i1.useIP = false
agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost# sink 1 配置
agent_lxw1234.sinks.sink1.type = hdfs
agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234/%Y%m%d
agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{agentHost}
agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log
agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream
agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true
agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text
agent_lxw1234.sinks.sink1.hdfs.rollCount = 0
agent_lxw1234.sinks.sink1.hdfs.rollSize = 0
agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600
agent_lxw1234.sinks.sink1.hdfs.batchSize = 500
agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10
agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0
agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1
agent_lxw1234.sinks.sink1.channel = fileChannel

该配置用于将source的events保存到HDFS上hdfs://cdh5/tmp/lxw1234的目录下,文件名为lxw1234_<主机名>.log

 

三、Static Interceptor

静态拦截器,用于在events header中加入一组静态的key和value。

根据上面的Source,拦截器的配置如下:

## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = static
agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true
agent_lxw1234.sources.sources1.interceptors.i1.key = static_key
agent_lxw1234.sources.sources1.interceptors.i1.value = static_value# sink 1 配置
agent_lxw1234.sinks.sink1.type = hdfs
agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234
agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{static_key}
agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log
agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream
agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true
agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text
agent_lxw1234.sinks.sink1.hdfs.rollCount = 0
agent_lxw1234.sinks.sink1.hdfs.rollSize = 0
agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600
agent_lxw1234.sinks.sink1.hdfs.batchSize = 500
agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10
agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0
agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1
agent_lxw1234.sinks.sink1.channel = fileChannel

看看最终Sink在HDFS上生成的文件结构:

flume interceptor

 

四、UUID Interceptor

UUID拦截器,用于在每个events header中生成一个UUID字符串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中读取并使用。根据上面的source,拦截器的配置如下:

## source 拦截器
agent_lxw1234.sources.sources1.interceptors = i1
agent_lxw1234.sources.sources1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
agent_lxw1234.sources.sources1.interceptors.i1.headerName = uuid
agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true
agent_lxw1234.sources.sources1.interceptors.i1.prefix = UUID_# sink 1 配置
agent_lxw1234.sinks.sink1.type = logger
agent_lxw1234.sinks.sink1.channel = fileChannel

运行后在日志中查看header信息:

flume interceptor

 

五、Morphline Interceptor

Morphline拦截器,该拦截器使用Morphline对每个events数据做相应的转换。关于Morphline的使用,可参考

http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html

后续再研究这块。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322858.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SOA对微服务的残余影响

近日&#xff0c;Tareq Abedrabbo在伦敦2017 Con微服务大会上说&#xff0c;SOA对微服务架构设计的残余影响仍然存在&#xff0c;包括技术选型和组织方面的问题。最直接的一个例子就是大多数企业仍然区分对待架构师和开发人员&#xff0c;架构师负责出规范&#xff0c;开发人员…

Hadoop生态Flume(四)拦截器(Interceptor)介绍与使用(2)

转载自 Flume中的拦截器&#xff08;Interceptor&#xff09;介绍与使用&#xff08;二&#xff09; lume中的拦截器&#xff08;interceptor&#xff09;&#xff0c;用户Source读取events发送到Sink的时候&#xff0c;在events header中加入一些有用的信息&#xff0c;或者对…

协作更进一步:微软隆重介绍Visual Studio动态分享功能

微软刚刚在 Visual Studio Code 网站上宣布了“动态分享”&#xff08;Live Share&#xff09;功能&#xff0c;开发者们可以在 VS 2017 或 VS Code 中体验全新的实施协作。微软表示&#xff0c;Live Share 可让团队在相同的代码库上启用快速协作&#xff0c;而无需同步代码或配…

python打包exe文件

首先安装pyinstaller pip3 install pyinstaller接着导报指定文件 pyinstaller.exe -F 文件路径文件名 举例 pyinstaller.exe -F C:\Users\Tecna1205\Desktop\工作目录\Python工作目录\测试\3.3\test\tk.py 如果有图形界面&#xff0c;不想打开命令行&#xff0c;可在打包命令…

使用MS Test做单元测试

声明&#xff1a;本篇博客翻译自&#xff1a;http://www.c-sharpcorner.com/article/unit-testing-with-ms-tests-in-c-sharp/ 写在翻译之前&#xff1a; 依然清晰的记得刚工作的第一个项目中&#xff0c;在完成一个功能模块开发后&#xff0c;师傅让我把代码做一下单元测试。当…

Vue组件传参

父组件向子组件传参 数据&#xff1a; 父组件 <test :message"msg"></test>msg: Hello uniApp子组件&#xff08;test.vue&#xff09; <text>{{message}}</text>props:["message"]子组件向父组件传参 子组件&#xff1a; &…

如果不懂Service mesh,就不要谈微服务了

提到微服务&#xff0c;spring cloud等经典框架被使用的最为广泛&#xff0c;但是在2016年才被提起的Service Mesh&#xff0c;已经被Paypal、Lyft、Ticketmaster和Credit Karma等等一些大流量平台所使用&#xff0c;在生产应用中添加了Service mesh。今年随着Linkerd传入国内&…

使用AspectCore动态代理

前言 最近越来越多的同学关注到AspectCore&#xff0c;并且提出不少中肯的建议&#xff0c;其中最多的提议是希望能够看到更多的关于AspectCore使用方式的文章和Demo。那么在这篇文章里&#xff0c;我们就来聊聊AspectCore核心之一的动态代理。 动态代理 在.NET平台中&#xff…

已经安装完成mysql后wamp怎么配置

如果之前安装过mysql&#xff0c;然后想要安装wamp&#xff0c;那么怎么配置呢 先安装好wamp&#xff0c;然后在以下目录中修改my.ini 将密码改为自己的mysql密码即可 这时你发现启动wamp还是黄的 不要慌&#xff0c;因为你已经安装过了wamp&#xff0c;所以wamp自己的mys…

通过 Visual Studio 的“代码度量值”来改进代码质量

1 软件度量值指标 1.1 可维护性指数 表示源代码的可维护性&#xff0c;数值越高可维护性越好。该值介于0到100之间。绿色评级在20到100之间&#xff0c;表明该代码具有高度的可维护性&#xff1b;黄色评级在10到19之间&#xff0c;表示该代码适度可维护&#xff1b;红色评级在0…

php如何接收前端返回的各种类型的数据

之前学习node后端的时候&#xff0c;因为始终无法在网上找到接收json数据的函数&#xff0c;所以后来就放弃了。最近又心血来潮&#xff0c;想学习php. 这次已经有了之前学习php的基础&#xff0c;所以直接入手thinkphp5.0 这次php的学习&#xff0c;主要是为了解决之前遗留的问…

ASP.NET Core 认证与授权[5]:初识授权

经过前面几章的姗姗学步&#xff0c;我们了解了在 ASP.NET Core 中是如何认证的&#xff0c;终于来到了授权阶段。在认证阶段我们通过用户令牌获取到用户的Claims&#xff0c;而授权便是对这些的Claims的验证&#xff0c;如&#xff1a;是否拥有Admin的角色&#xff0c;姓名是否…

uni-app打包h5

如果我们想打包成直接浏览的h5&#xff0c;我们需要配置manifest.json这个文件&#xff0c;在其中的h5配置中加入publicPath配置&#xff0c;配置如下&#xff1a; 代码为&#xff1a; "h5" : {"publicPath": "./"},配置好这个后&#xff0c;以后…

[52ABP实战系列] .NET CORE实战入门视频课程出来啦

“ .NET CORE实战入门视频&#xff0c;要是有讲的不好的地方&#xff0c;还请留言。” 早安&#xff01; 各位道友好&#xff0c;.NET CORE入门视频的第一章已经录制完毕了。视频会放在传课网、网易云课堂及segment fault。 本来想的是第一章合计6个小节就可以播放完毕的&#…

ASP.NET Core缓存静态资源

背景 缓存样式表&#xff0c;JavaScript或图像文件等静态资源可以提高您网站的性能。在客户端&#xff0c;总是从缓存中加载一个静态文件&#xff0c;这样可以减少对服务器的请求数量&#xff0c;从而减少获取页面及其资源的时间。在服务器端&#xff0c;由于它们的请求较少&am…

【程序员】保持一颗虚心好学的心态去敲代码

最近&#xff0c;我感觉是自己突破最大的一段时间&#xff0c;为什么呢&#xff1f;主要是打通了接口这一块&#xff0c;就是用postman发送各种数据&#xff0c;我都能用后端接受到相关数据&#xff0c;并且解析出来。 在这之前我尝试过spring boot 和node&#xff0c;前者是太…

跟着老桂学ASP.NET Core 2.0

.net core作为微软开发技术中跨平台的利器&#xff0c;2.0的发布已经有一段时间了&#xff0c;asp.net core是新一代微软的BS开发框架&#xff0c;同时兼容.net core和.net framework&#xff0c;它的出现&#xff0c;使基于微软体系的BS开发迎来新的契机&#xff0c;开源&…

如何安装并启动django

这里我用的是pip3&#xff0c;一般没装两个版本的用pip就行了 安装 pip3 install django如何检测 python3 -m django --version显示版本号即可 如何创建并启动项目 创建 django-admin startproject HelloWorld启动 然后cd到HelloWorld目录里 python manage.py runserver…

ASP.NET Core 认证与授权[6]:授权策略是怎么执行的?

在上一章中ASP.NET Core 认证与授权[5]:初识授权&#xff0c;详细介绍了 ASP.NET Core 中的授权策略&#xff0c;在需要授权时&#xff0c;只需要在对应的Controler或者Action上面打上[Authorize]特性&#xff0c;并指定要执行的策略名称即可&#xff0c;但是&#xff0c;授权策…

.net core2.0下使用Identity改用dapper存储数据

前言、 已经好多天没写博客了&#xff0c;鉴于空闲无聊之时又兴起想写写博客&#xff0c;也当是给自己做个笔记。过了这么些天&#xff0c;我的文笔还是依然那么烂就请多多谅解了。今天主要是分享一下在使用.net core2.0下的实际遇到的情况。在使用webapi时用了identity做用户验…