flume介绍

flume

1.flume是什么

Flume:** Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、传输、聚合的系统。** Flume仅仅运行在linux环境下** flume.apache.org(Documentation--Flume User Guide)
Flume体系结构(Architecture):
Source: 用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel
Channel:连接 source 和 sink的数据传输通道
Sink:     从Channel收集数据,将数据写到目标源,可以是下一个Source也可以是HDFS或者HBase

2.flume安装

----flume安装-----------------------------1、解压(建议安装到cdh目录里)2、改名,并修改flume-env.sh
$ mv flume-env.sh.template flume-env.sh
export JAVA_HOME=/opt/modules/jdk1.7.0_673、使用flume-ng命令
$ bin/flume-ng 
--conf         指定配置目录
--name         指定Agent的名称
--conf-file    指定具体的配置文件

3.案例:

需求:使用flume监控某个端口,把从端口写入的数据输出为logger1、复制
$ cp -a flume-conf.properties.template flume-telnet.conf2、修改flume-telnet.conf# Name the components on this agent
# a1为代理(中介)实例名,任意命名,agent分三部分
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
# netcat是用于调试和检查网络的工具包,windows和linux(redhat)均可用,需要安装
a1.sources.r1.type = netcat    
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
# 可以在文档Flume Sinks--Logger Sink部分查找
# 往日志文件里面写
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
# 内存channel
a1.channels.c1.type = memory
# channel里存放的最大event数
a1.channels.c1.capacity = 1000
# 每个事务支持的最大event数
a1.channels.c1.transactionCapacity = 100# 绑定source和sink到channel
# 注意:这里有's'
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1*** 配置文件的使用:
a) 命名
b) 配置source、sink、channel
c) 关联---------------------

测试:
*** 安装telnet
$ su -
# yum -y install telnet*** 启动flume,'-D'设置日志级别和输出源
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/flume-telnet.conf -Dflume.root.logger=INFO,console    //把日志结果输出到控制台*** 打开另外一个窗口
$ netstat -an|grep 44444    --检查是否有程序(flume)在监听44444端口
$ telnet localhost 44444    --连接本机的44444端口,telnet是访问这个端口的客户端
然后随意输入字符串...PS:
a) 退出telnet:'ctrl+]',然后输入quit。
b) 若flume-ng无法退出,则打开一个新的窗口,jps(或netstat -antp|grep 44444)查找pid,使用 kill -9
需求:实时抽取新生成的日志文件内容 -->  追加到HDFS上对应文件的末尾
      本例使用flume去监控某个文件,将新增添的内容抽取到其他地方,如HDFS本例监控的是apache的日志文件 /var/log/httpd/access_log----安装Apache服务器-------

$ su -
# yum -y install httpd
# service httpd start
# service httpd status
** 编辑主页,/var/www/html是Apache web服务器根目录
# vi /var/www/html/index.html
随意输入内容...
** 打开浏览器,http://192.168.2.200访问网页** 授权
# chmod 755 /var/log/httpd/** 动态监看日志变化,刷新页面可以触发日志生成
# su - tom
$ tail -f /var/log/httpd/access_log    --'-F'和'-f'效果相同----------------------------

$ cp -a flume-telnet.conf flume-apache.conf a2.sources = r2
a2.channels = c2
a2.sinks = k2# define sources
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /var/log/httpd/access_log
# '-c'表示命令行,必需写
a2.sources.r2.shell = /bin/bash -c# define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100# define sinks
#启用设置多级目录,这里按"年月日/时"2级目录,每1小时生成一个文件夹
a2.sinks.k2.type = hdfs
#目录会自动生成
a2.sinks.k2.hdfs.path=hdfs://192.168.2.200:8020/flume/%Y%m%d/%H
# 文件前缀
a2.sinks.k2.hdfs.filePrefix = accesslog
#启用按时间生成文件夹
a2.sinks.k2.hdfs.round=true
#设置round值:1,单位:小时  
a2.sinks.k2.hdfs.roundValue=1
a2.sinks.k2.hdfs.roundUnit=hour
#使用本地时间戳,如:用来命名文件
a2.sinks.k2.hdfs.useLocalTimeStamp=true# 缓冲到hdfs之前,用以写文件的事件的最大数
a2.sinks.k2.hdfs.batchSize=1000
a2.sinks.k2.hdfs.fileType=DataStream
a2.sinks.k2.hdfs.writeFormat=Text#解决文件过多过小的问题(若是使用默认配置,会生成很多个小文件)
#每600秒生成一个文件
a2.sinks.k2.hdfs.rollInterval=600
#当文件达到128000000字节时,会创建一个新文件
#实际环境中如果一个文件块128M,那么这里一般设置成127M(127*1024*1024)
a2.sinks.k2.hdfs.rollSize=128000000
#设置文件的生成和events数无关
a2.sinks.k2.hdfs.rollCount=0
#需要设置为1,否则当有副本复制时,就重新生成文件,上面三条则会失效
a2.sinks.k2.hdfs.minBlockReplicas=1# bind the sources and sinks to the channels
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2测试:
a) 启动CDH Hadoop
$ sbin/start-dfs.sh ; sbin/start-yarn.sh ; sbin/mr-jobhistory-daemon.sh start historyserver
b) 启动Apache
# service httpd start
c) 启动flume
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file conf/flume-apache.conf
d) 刷新http://192.168.2.200监看web日志:$ tail -f /var/log/httpd/access_log 监看HDFS:   $ bin/hdfs dfs -tail -f /flume/20170519/10/accesslog.1495161507253.tmp
利用flume监控某个目录(/home/tom/log),把里面回滚好的文件实时抽取到HDFS平台。$ mkdir /home/hadoop/log
$ cd log
$ cp /var/log/httpd/access_log access_log.1
$ cp /var/log/httpd/access_log access_log.2
需求:抽取文件access_log.1和access_log.2$ mkdir /opt/cdh-5.3.6/apache-flume-1.5.0-cdh5.3.6-bin/checkpoint
$ mkdir /opt/cdh-5.3.6/apache-flume-1.5.0-cdh5.3.6-bin/checkdata$ cp -a flume-apache.conf  flume-dir.confa3.sources = r3
a3.channels = c3
a3.sinks = k3# define sources
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /home/tom/log
# 使用正则表达式指定忽略的文件
# '.'表示除了'\r\n'以外的任意字符,'*'表示0-n个
a3.sources.r3.ignorePattern = ^.*\_log$# define channels
# 通过临时文件进行转存(即把数据缓存到一个临时文件中,然后一起flush),速度慢,但数据相对安全
# 这里使用memory channel也可以
a3.channels.c3.type = file
# checkpoint文件存放的地方,checkpoint里存储着元数据信息,比如哪些文件被抽取过,哪些还没有...
a3.channels.c3.checkpointDir = /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/checkpoint
# 临时文件存放的地方
a3.channels.c3.dataDirs = /opt/modules/cdh/apache-flume-1.5.0-cdh5.3.6-bin/checkdata# define sinks
#启用设置多级目录,这里按"年月日/时"2级目录,每1小时生成一个文件夹
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path=hdfs://192.168.122.128:8020/flume2/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = accesslog
#启用按时间生成文件夹
a3.sinks.k3.hdfs.round=true
a3.sinks.k3.hdfs.roundValue=1
a3.sinks.k3.hdfs.roundUnit=hour
#使用本地时间戳  
a3.sinks.k3.hdfs.useLocalTimeStamp=truea3.sinks.k3.hdfs.batchSize=1000
a3.sinks.k3.hdfs.fileType=DataStream
a3.sinks.k3.hdfs.writeFormat=Text#解决文件过多过小问题
#每600秒生成一个文件
a3.sinks.k3.hdfs.rollInterval=600
a3.sinks.k3.hdfs.rollSize=128000000
#设置文件的生成和events数无关
a3.sinks.k3.hdfs.rollCount=0
#设置成1,否则当有副本复制时就重新生成文件,上面三条则会失去效果
a3.sinks.k3.hdfs.minBlockReplicas=1# bind the sources and sinks to the channels
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3测试:
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file conf/flume-dir.conf
去http://192.168.2.200:50070查看即可
** 进入log/,可以看到,带后缀的表示抽取完成
$ ls
access_log.1.COMPLETED  access_log.2.COMPLETED再次生成一个日志文件,会发现其会被立即抽取
$ cp access_log.1.COMPLETED access_log.3
$ ls
access_log.1.COMPLETED  access_log.3.COMPLETED    access_log.2.COMPLETED
在同一个服务器启动三个agent:
agent1:用于实时监控/var/log/httpd/access_log** flume-apache.conf# 配置agent1
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1# define sources
agent1.sources.r1.type = exec
# 注意:执行flume命令的用户对/var/log/httpd/access_log文件一定要有可读权限
agent1.sources.r1.command = tail -F /var/log/httpd/access_log
agent1.sources.r1.shell = /bin/bash -c# define channels
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100# define sinks
# 一种序列号技术
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = 192.168.2.200
agent1.sinks.k1.port = 4545# bind the sources and sinks to the channels
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1测试:
启动Apache启动agent1:
$ bin/flume-ng agent --conf conf/ --name agent1 --conf-file conf/flume-apache.conf
$ tail -F /var/log/httpd/access_log
刷新网页,查看变化------------------

agent2:用于实时监控/opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs/hive.log
$ mkdir logs
$ vi conf/hive-log4j.properties
hive.log.dir=/opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs** flume-hive.conf# 配置agent2
agent2.sources = r2
agent2.channels = c2
agent2.sinks = k2# define sources
agent2.sources.r2.type = exec
agent2.sources.r2.command = tail -F /opt/modules/cdh/hive-0.13.1-cdh5.3.6/logs/hive.log
agent2.sources.r2.shell = /bin/bash -c# define channels
agent2.channels.c2.type = memory
agent2.channels.c2.capacity = 1000
agent2.channels.c2.transactionCapacity = 100# define sinks
agent2.sinks.k2.type = avro
agent2.sinks.k2.hostname = 192.168.2.200
agent2.sinks.k2.port = 4545# bind the sources and sinks to the channels
agent2.sources.r2.channels = c2
agent2.sinks.k2.channel = c2测试:
启动agent2:
$ bin/flume-ng agent --conf conf/ --name agent2 --conf-file conf/flume-hive.conf
$ tail -F /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
进入hive,随便执行几条语句,查看日志变化
hive> show databases;
...-------------------

agent3:用于实时监控收集agent1和agent2传递过来的数据** flume-collector.conf# 配置agent3
agent3.sources = r3
agent3.channels = c3
agent3.sinks = k3# define sources
agent3.sources.r3.type = avro
agent3.sources.r3.bind = 192.168.2.200
agent3.sources.r3.port = 4545# define channels
agent3.channels.c3.type = memory
agent3.channels.c3.capacity = 1000
agent3.channels.c3.transactionCapacity = 100# define sinks
# 启用设置多级目录,这里按"年月日"时 2级目录,每个小时生成一个文件夹
agent3.sinks.k3.type = hdfs
agent3.sinks.k3.hdfs.path=hdfs://192.168.2.200:8020/flume3/%Y%m%d/%H
agent3.sinks.k3.hdfs.filePrefix = accesslog# 启用按小时生成文件夹
agent3.sinks.k3.hdfs.round=true 
agent3.sinks.k3.hdfs.roundValue=1
agent3.sinks.k3.hdfs.roundUnit=hour  
agent3.sinks.k3.hdfs.useLocalTimeStamp=trueagent3.sinks.k3.hdfs.batchSize=1000
agent3.sinks.k3.hdfs.fileType=DataStream
agent3.sinks.k3.hdfs.writeFormat=Text# 解决文件过多过小的问题
# 每600秒生成一个文件
agent3.sinks.k3.hdfs.rollInterval=600
agent3.sinks.k3.hdfs.rollSize=128000000
# 设置文件的生成和events数无关
agent3.sinks.k3.hdfs.rollCount=0
# 设置成1,否则当有副本复制时就会重新生成文件,上面三条则会失效
agent3.sinks.k3.hdfs.minBlockReplicas=1# bind the sources and sinks to the channels
agent3.sources.r3.channels = c3
agent3.sinks.k3.channel = c3测试:
启动agent3:
$ bin/flume-ng agent --conf conf/ --name agent3 --conf-file conf/flume-collector.conf
进入CDH Hadoop,监控日志变化,注意:路径要修改(监控.temp文件效果会明显点)
$ bin/hdfs dfs -tail -f /flume3/20161220/11/accesslog.1482203839459

 

转载于:https://www.cnblogs.com/yin-fei/p/10778719.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/394037.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

threadx 信号量 应用_操作系统及ThreadX简介.ppt

操作系统及ThreadX简介操作系统及ThreadX简介 软件二部 2006.09 主要内容 多任务操作系统概述 ThreadX简介 关于驱动的交流 操作系统概述 什么是操作系统 管理计算机的所有资源,并为应用程序提供服务的最重要的系统软件 操作系统的目的 为用户编程提供简单的接口&am…

java中同步组件_Java并发编程(自定义同步组件)

并发包结构图:编写一个自定义同步组件来加深对同步器的理解业务要求:* 编写一个自定义同步组件来加深对同步器的理解。* 设计一个同步工具:该工具在同一时刻,只允许至多两个线程同时访问,超过两个线程的* 访问将被阻塞…

maven学习资料

maven学习资料maven学习教程:What、How、Whyhttp://www.flyne.org/article/167Maven 那点事儿 https://my.oschina.net/huangyong/blog/194583项目管理工具:Maven教程http://www.flyne.org/article/884转载于:https://www.cnblogs.com/zhao1949/p/634641…

leetcode127. 单词接龙(bfs)

给定两个单词(beginWord 和 endWord)和一个字典,找到从 beginWord 到 endWord 的最短转换序列的长度。转换需遵循如下规则: 每次转换只能改变一个字母。 转换过程中的中间单词必须是字典中的单词。 说明: 如果不存在这样的转换序…

算法之旅 | 快速排序法

HTML5学堂-码匠:前几期“算法之旅”跟大家分享了冒泡排序法和选择排序法,它们都属于时间复杂度为O(n^2)的“慢”排序。今天跟大家分享多种排序算法里使用较广泛,速度快的排序算法—— 快速排序法 [ 平均时间复杂度为O (n logn) ]。Tips 1&…

springmvd接收参数问题

问题描述: 好久不写博客了,今天遇到一个问题,那就是post请求时,参数接收不到,当时我很纳闷,看代码: 就是这样几个参数,我使用postman请求时无法获取参数: 报错信息&#…

figma下载_如何在Figma中创建逼真的3D对象

figma下载by Gbolahan Taoheed Fawale通过Gbolahan Taoheed Fawale 如何在Figma中创建逼真的3D对象 (How to create realistic 3D objects in Figma) Prior to using Figma, I used Adobe Illustrator for most of my designs (like logos, mockups, illustrations, and so on…

OpenGL中的二维编程——从简单的矩形开始

一、OpenGL的组成 图元函数(primitive function)指定要生成屏幕图像的图元。包括两种类型:可以在二维、三维或者四维空间进行定义的几何图元,如多边形;离散实体;位图。属性函数(attribute funct…

圆与平面的接触面积_如果一个绝对的圆放在绝对的平面上,接触面是不是无限小?...

这种问题其实并不难解答:如果你真的能找到一个绝对的圆还有一个绝对平的平面上,并且保证放上去之后圆和平面不会有任何变化,那么接触面就可以是无限小!如果不能,很抱歉,接触面很显然就不会是无限小&#xf…

leetocde1129. 颜色交替的最短路径(bfs)

在一个有向图中,节点分别标记为 0, 1, …, n-1。这个图中的每条边不是红色就是蓝色,且存在自环或平行边。 red_edges 中的每一个 [i, j] 对表示从节点 i 到节点 j 的红色有向边。类似地,blue_edges 中的每一个 [i, j] 对表示从节点 i 到节点…

第38天:运算符、字符串对象常用方法

一、运算符 一元操作符 &#xff0c; --&#xff0c; &#xff0c; - 5 -6 逻辑操作符 !&#xff0c; &&&#xff0c; || 基本运算符 , -, *, /, % 关系操作符 >, <, >, <, , , !, ! 赋值 判断 全等 条件操作符 &#xff08;三…

Redux Todos Example

此项目模板是使用Create React App构建的&#xff0c;它提供了一种简单的方法来启动React项目而无需构建配置。 使用Create-React-App构建的项目包括对ES6语法的支持&#xff0c;以及几种非官方/尚未最终形式的Javascript语法 先看效果 这个例子可以帮助你深入理解在 Redux 中 …

有效电子邮件地址大全_如何优雅有效地处理介绍电子邮件

有效电子邮件地址大全by DJ Chung由DJ Chung 如何优雅有效地处理介绍电子邮件 (How to handle intro emails gracefully and effectively) 您想帮个忙时不想忘恩负义... (You don’t want to sound ungrateful when asking for a favor…) Let me tell you the story that ins…

notability录音定位_Notability的一些使用技巧?

作为使用了一年Notability的考研狗 今天也来回答回答这个问题&#xff0c;希望可以给考研的同学一点点帮助。这个软件的优点估计大家都知道&#xff0c;我在这里就不多说了。好吧&#xff0c;还有一个原因是我比较懒&#xff01;好了不多说废话了&#xff0c;等会你们要打我了本…

python实现软件的注册功能(机器码+注册码机制)

sklearn实战-乳腺癌细胞数据挖掘 https://study.163.com/course/introduction.htm?courseId1005269003&utm_campaigncommission&utm_sourcecp-400000000398149&utm_mediumshare 一、前言&#xff1a;目的&#xff1a;完成已有python图像处理工具的注册功能功能&am…

leetcode1306. 跳跃游戏 III(bfs)

这里有一个非负整数数组 arr&#xff0c;你最开始位于该数组的起始下标 start 处。当你位于下标 i 处时&#xff0c;你可以跳到 i arr[i] 或者 i - arr[i]。 请你判断自己是否能够跳到对应元素值为 0 的 任一 下标处。 注意&#xff0c;不管是什么情况下&#xff0c;你都无法…

Win10 UWP开发系列:使用VS2015 Update2+ionic开发第一个Cordova App

原文:Win10 UWP开发系列&#xff1a;使用VS2015 Update2ionic开发第一个Cordova App安装VS2015 Update2的过程是非常曲折的。还好经过不懈的努力&#xff0c;终于折腾成功了。 如果开发Cordova项目的话&#xff0c;推荐大家用一下ionic这个框架&#xff0c;效果还不错。对于Cor…

vavr_使用Vavr在Java 8流中更好的异常处理

vavrby Rajasekar Elango由Rajasekar Elango In this post, I will provide tips for better exception handling in Java 8 streams using the Functional Java library Vavr.在这篇文章中&#xff0c;我将提供使用Functional Java库Vavr在Java 8流中更好地处理异常的技巧。 …

Python-strace命令追踪ssh操作

Python-strace命令追踪ssh操作 通过strace 命令追踪ssh的进程ID&#xff0c;记录操作的命令[实际上是内核里面记录的东西]&#xff0c;进行操作日志的Py解析达到效果 追踪进程并写入ssh操作到文件中 Ps: 此时机器A已经ssh登录了机器B&#xff0c;取得它的ssh进程PID 机器A登录后…

java h2 derby_嵌入式H2数据库的Spring配置以进行测试

小编典典由于我不知道是否有任何工具可以检查数据库&#xff0c;我认为一个简单的解决方案是使用支持HSQL&#xff0c;H2和Derby 的Spring嵌入式数据库(3.1.x docs&#xff0c;current docs)。 。使用H2&#xff0c;你的xml配置如下所示&#xff1a;如果你更喜欢基于Java的配置…