【数据采集与预处理】流数据采集工具Flume

目录

一、Flume简介

(一)Flume定义

(二)Flume作用

二、Flume组成架构

三、Flume安装配置

(一)下载Flume

(二)解压安装包

(三)配置环境变量

(四)查看Flume版本信息

四、Flume的运行

(一)Telnet准备工作

(二)使用Avro数据源测试Flume

(三)使用netcat数据源测试Flume

五、Flume作为Spark Streaming数据源

(一)Spark准备工作

(二)使用Flume作为Spark Streaming数据源


一、Flume简介

数据流 :数据流通常被视为一个随时间延续而无限增长的动态数据集合,是一组顺序、大量、快速、连续到达的数据序列。通过对流数据处理,可以进行卫星云图监测、股市走向分析、网络攻击判断、传感器实时信号分析。

(一)Flume定义

        Apache Flume是一种分布式、具有高可靠和高可用性的数据采集系统,可从多个不同类型、不同来源的数据流汇集到集中式数据存储系统中。Flume 基于流式架构,灵活简单。

(二)Flume作用

        Flume最主要的作用就是,实时读取服务器本地磁盘的数据,可将日志采集后传输到HDFS、Hive、HBase、Kafka等大数据组件。

二、Flume组成架构

1、Agent
        Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的,是 Flume 数据传输的基本单元。Agent 主要有 3 个部分组成,Source、Channel、Sink。

2、Source
        Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

3、Channel
        Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink的读取操作。
Flume 自带两种 Channel:Memory Channel 和 File Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

4、 Sink
        Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
        Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
        Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。

5、Event
        传输单元,Flume 数据传输的基本单元,以事件的形式将数据从源头送至目的地。

Flume Agent 内部原理:

三、Flume安装配置

(一)下载Flume

到Flume官网下载Flume1.7.0安装文件,下载地址如下:

http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

下载完成后上传到虚拟机的“/usr/local/uploads”目录下。

(二)解压安装包

首先进入到“uploads”目录下。将压缩包解压到“/usr/local”目录下

[root@bigdata zhc]# cd /usr/local/uploads
[root@bigdata uploads]# tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local

将解压的文件修改名字为flume,简化操作。把/usr/local/flume目录的权限赋予当前登录Linux系统的用户。

[root@bigdata uploads]# cd /usr/local
[root@bigdata local]# mv apache-flume-1.7.0-bin flume
[root@bigdata local]# chown -R zhc:zhc ./flume

 

(三)配置环境变量

首先,修改/etc/profile配置文件:

[root@bigdata local]# vi /etc/profile

export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin
export FLUME_CONF_DIR=$FLUME_HOME/conf

使文件生效:

[root@bigdata local]# source /etc/profile

下面修改 flume-env.sh 配置文件:

[root@bigdata local]# cd /usr/local/flume/conf
[root@bigdata conf]# cp flume-env.sh.template flume-env.sh
[root@bigdata conf]# vi flume-env.sh

在文件中增加一行内容,用于设置JAVA_HOME变量:

export JAVA_HOME=/usr/local/servers/jdk

然后,保存flume-env.sh文件,并退出vim编辑器。

(四)查看Flume版本信息

[root@bigdata conf]# cd /usr/local/flume
[root@bigdata flume]# ./bin/flume-ng version

然后就会发现如下报错: “错误: 找不到或无法加载主类”

原因分析:
(1)jdk 冲突
(2)安装了HBase就会报着个错

解决方法:

到“/usr/local/flume/bin”目录下修改flume-ng文件。

[root@bigdata flume]# cd /usr/local/flume/bin
[root@bigdata bin]# vi flume-ng

在文件中加入以下内容:

2>/dev/null | grep hbase

再次查看flume版本信息。

四、Flume的运行

(一)Telnet准备工作

后面的步骤中要用到telnet,在这里先安装:

[root@bigdata zhc]# yum install telnet

(二)使用Avro数据源测试Flume

        Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。请对Flume的相关配置文件进行设置,从而可以实现如下功能:在一个终端中新建一个文件helloworld.txt(里面包含一行文本“Hello World”),在另外一个终端中启动Flume以后,可以把helloworld.txt中的文本内容显示出来。

1、创建agent配置文件

[root@bigdata zhc]# cd /usr/local/flume/conf
[root@bigdata conf]# vi avro.conf

在文件中加入以下内容: 

#/usr/local/flume/conf/avro.confa1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = avroa1.sources.r1.channels = c1a1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 4141#注意这个端口名,在后面的教程中会用得到# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

上面Avro Source参数说明如下:
        Avro Source的别名是avro,也可以使用完整类别名称org.apache.flume.source.AvroSource,因此,上面有一行设置是a1.sources.r1.type = avro,表示数据源的类型是avro。
        bind绑定的ip地址或主机名,使用0.0.0.0表示绑定机器所有的接口。a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。
        port表示绑定的端口。a1.sources.r1.port = 4141,表示绑定的端口是4141。
        a1.sinks.k1.type = logger,表示sinks的类型是logger。

2、启动flume agent a1

[root@bigdata conf]# /usr/local/flume/bin/flume-ng agent -c . -f /usr/local/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console 

这个终端不要关闭。

3、创建指定文件

新建一个终端,输入以下命令:

[root@bigdata mycode]# cd /home/zhc
[root@bigdata zhc]# cd /home/zhc/mycode
[root@bigdata mycode]# mkdir flume
[root@bigdata mycode]# cd flume
[root@bigdata flume]# echo "Hello World">> ./helloworld.txt
[root@bigdata flume]# /usr/local/flume/bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /home/zhc/mycode/flume/helloworld.txt

执行之后,我们就可以在前面不让关闭的那个终端看到Hello World了:

(三)使用netcat数据源测试Flume

        请对Flume的相关配置文件进行设置,从而可以实现如下功能:在一个Linux终端(这里称为“Flume终端”)中,启动Flume,在另一个终端(这里称为“Telnet终端”)中,输入命令“telnet localhost 44444”,然后,在Telnet终端中输入任何字符,让这些字符可以顺利地在Flume终端中显示出来。

1、创建netcat的agent配置

[root@bigdata conf]# cd /usr/local/flume/conf
[root@bigdata conf]# vi netcat.conf

在文件中加入以下内容:  

#/usr/local/flume/conf/netcat.conf# Name the components on this agent  a1.sources = r1  a1.sinks = k1  a1.channels = c1  # Describe/configure the source  a1.sources.r1.type = netcat  a1.sources.r1.bind = localhost  a1.sources.r1.port = 44444 #同上,记住该端口名# Describe the sink  a1.sinks.k1.type = logger  # Use a channel which buffers events in memory  a1.channels.c1.type = memory  a1.channels.c1.capacity = 1000  a1.channels.c1.transactionCapacity = 100  # Bind the source and sink to the channel  a1.sources.r1.channels = c1  a1.sinks.k1.channel = c1  

2、启动flume agent

[root@bigdata conf]# /usr/local/flume/bin/flume-ng agent --conf /usr/local/flume/conf --conf-file /usr/local/flume/conf/netcat.conf --name a1 -Dflume.root.logger=INFO,console

这个终端不要关闭。

3、新建一个终端输入

[root@bigdata flume]# telnet localhost 44444

在这个终端输入字符串就可以显示在前面那个终端里了,但是中文是不支持的,显示长度也有限。

五、Flume作为Spark Streaming数据源

(一)Spark准备工作

1、下载spark-streaming-flume_2.11-2.3.4.jar

首先,到官网下载spark-streaming-flume_2.11-2.3.4.jar:

https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume

上面的网址要是打不开,可以用下面的这个网址:

https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-flume_2.11

2、把这个jar文件放到“/usr/local/spark/jars/flume”目录下

[root@bigdata flume]# cd /usr/local/spark/jars
[root@bigdata jars]# mkdir flume
[root@bigdata jars]# cd flume
[root@bigdata flume]# cp /usr/local/uploads/spark-streaming-flume_2.11-2.3.4.jar .

注意:此处不要将“/usr/local/flume/lib”目录下的所有jar包都拷贝到“/usr/local/spark/jars/flume” 目录下,不然会使Spark和Hadoop版本与Guava库的版本不兼容,从而导致后面运行程序时会报错!

错误如下图所示:

3、修改spark-env.sh文件

修改spark目录下conf/spark-env.sh文件中的SPARK_DIST_CLASSPATH变量。把flume的相关jar包添加到此文件中。

[root@bigdata flume]# cd /usr/local/spark/conf
[root@bigdata conf]# vi spark-env.sh

将如下内容加到文件中: 

:/usr/local/spark/jars/flume/*:/usr/local/flume/lib/*

这样,Spark环境就准备好了。

(二)使用Flume作为Spark Streaming数据源

        Flume是非常流行的日志采集系统,可以作为Spark Streaming的高级数据源。请把Flume Source设置为netcat类型,从终端上不断给Flume Source发送各种消息,Flume把消息汇集到Sink,这里把Sink类型设置为avro,由Sink把消息推送给Spark Streaming,由自己编写的Spark Streaming应用程序对消息进行处理。

1、创建flume-to-spark.conf

[root@bigdata conf]# cd /usr/local/flume/conf
[root@bigdata conf]# vi flume-to-spark.conf

输入以下内容: 

#/usr/local/flume/conf/flume-to-spark.conf
#flume-to-spark.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 33333# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = localhosta1.sinks.k1.port =44444# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000000a1.channels.c1.transactionCapacity = 1000000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

#说明:
1、Flume suorce类为netcat,绑定到localhost的33333端口,消息可以通过telnet localhost 33333 发送到flume suorce
2、Flume Sink类为avro,绑定44444端口,flume sink通过localhost 44444端口把消息发送出来。而spark streaming程序一直监听44444端口。

#注意!!先不要启动Flume agent,因为44444端口还没打开,sink的消息无处可去,44444端口由spark streaming程序打开。

2、编写Spark程序使用Flume数据源

(1)创建python文件

[root@bigdata flume]# cd /home/zhc/mycode/flume
[root@bigdata flume]# vi FlumeEventCount.py

在FlumeEventCount.py中输入以下代码:  

#/home/zhc/mycode/flume/FlumeEventCount.pyfrom __future__ import print_functionimport sysfrom pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
import pyspark
if __name__ == "__main__":if len(sys.argv) != 3:print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)exit(-1)sc = SparkContext(appName="FlumeEventCount")ssc = StreamingContext(sc, 2)hostname= sys.argv[1]port = int(sys.argv[2])stream = FlumeUtils.createStream(ssc, hostname, port,pyspark.StorageLevel.MEMORY_AND_DISK_SER_2)stream.count().map(lambda cnt : "Recieve " + str(cnt) +" Flume events!!!!").pprint()ssc.start()ssc.awaitTermination()
~                               

注意:可能需要安装pyspark,命令为:

[root@bigdata flume]# pip3 install pyspark

(2)测试实际效果

 首先,启动Spark streaming程序:

[root@bigdata flume]# cd /usr/local/spark
[root@bigdata spark]# ./bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/flume/* /home/zhc/mycode/flume/FlumeEventCount.py localhost 44444

然后,启动一个新的终端,启动Flume Agent:

[root@bigdata zhc]# cd /usr/local/flume
[root@bigdata flume]# bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

最后,再启动一个新的终端连接33333端口:

现在你可以在最后这个终端里输入一些字符了。在你输入字符后可以看到第一个终端会显示如下的信息:

-------------------------------------------
Time: 1488029430000 ms
-------------------------------------------
Received 1 flume events!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/617489.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

环形链表[简单]

优质博文&#xff1a;IT-BLOG-CN 一、题目 给你一个链表的头节点head&#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪next指针再次到达&#xff0c;则链表中存在环。为了表示给定链表中的环&#xff0c;评测系统内部使用整数pos来表示链…

数据结构中的一棵树

一、树是什么&#xff1f; 有根有枝叶便是树&#xff01;根只有一个&#xff0c;枝叶可以有&#xff0c;也可以没有&#xff0c;可以有一个&#xff0c;也可以有很多。 就像这样&#xff1a; 嗯&#xff0c;应该是这样&#xff1a; 二、一些概念 1、高度 树有多高&#x…

MySQL之导入导出远程备份(详细讲解)

文章目录 一、Navicat导入导出二、mysqldump命令导入导出2.1导出2.2导入&#xff08;使用mysqldump导入 包含t_log表的整个数据库&#xff09; 三、LOAD DATA INFILE命令导入导出3.1设置;3.2导出3.3导入(使用单表数据导入load data infile的方式) 四、远程备份4.1导出4.2导入 一…

redis系列:01 数据类型及操作

redis的数据类型有哪些 string,list,set,sorted_set,hash 操作 sting: set name maliao get name exists name expire name 5 ttl name del name setex name 10 maliao 设置key和过期时间 setnx name maliao 当key不存在时才添加list&#xff1a; lpush letter a lpush le…

OpenCV-22高斯滤波

一、高斯函数的基础 要理解高斯滤波首先要直到什么是高斯函数&#xff0c;高斯函数是符合高斯分布的&#xff08;也叫正态分布&#xff09;的数据的概率密度函数。 高斯函数的特点是以x轴某一点&#xff08;这一点称为均值&#xff09;为对称轴&#xff0c;越靠近中心数据发生…

【Linux实用篇】Linux常用命令(1)

目录 1.1 Linux命令初体验 1.1.1 常用命令演示 1.1.2 Linux命令使用技巧 1.1.3 Linux命令格式 1.2 文件目录操作命令 1.2.1 ls 1.2.2 cd 1.2.3 cat 1.2.4 more 1.2.5 tail 1.2.6 mkdir 1.2.7 rmdir 1.2.8 rm 1.1 Linux命令初体验 1.1.1 常用命令演示 在这一部分中…

遥感影像-语义分割数据集:Landsat8云数据集详细介绍及训练样本处理流程

原始数据集详情 简介&#xff1a;该云数据集包括RGB三通道的高分辨率图像&#xff0c;在全球不同区域的分辨率15米。这些图像采集自Lansat8的五种主要土地覆盖类型&#xff0c;即水、植被、湿地、城市、冰雪和贫瘠土地。 KeyValue卫星类型landsat8覆盖区域未知场景水、植被、…

uniapp中按钮点击跳转页面失效,纠正错误(亲测可用)

不知道伙伴你的错误和我是否一致&#xff1f; 我当时为了点击跳转按钮发现跳转不了&#xff0c;如下错误提示&#xff1a; worker.js?libNameWAAccelerateWorker.js:1 [Deprecation] SharedArrayBuffer will require cross-origin isolation as of M92, around July 2021. S…

【Java SE语法篇】6.数组

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ 文章目录 1.数组的基本概念1.1 为什么使用数组&#xff1f;1.…

MATLAB - 四旋翼飞行器动力学方程

系列文章目录 前言 本例演示了如何使用 Symbolic Math Toolbox™&#xff08;符号数学工具箱&#xff09;推导四旋翼飞行器的连续时间非线性模型。具体来说&#xff0c;本例讨论了 getQuadrotorDynamicsAndJacobian 脚本&#xff0c;该脚本可生成四旋翼状态函数及其雅各布函数…

streamlit中文开发手册(详细版)

目录 一、安装与配置 1.1 安装 Streamlit 1.2 配置文件 1.3 运行Streamlit应用 二、streamlit显示数据 2.1 显示标题 2.2 显示文本 2.3 显示代码段 2.4 通用显示方法 2.5 显示表格 2.6 显示JSON 2.7 显示pyplot图表 2.8 显示地图 2.9 显示图像 2.10 显示视频 三…

2024年腾讯云新用户专属优惠活动及代金券活动汇总

腾讯云作为国内领先的云计算服务提供商&#xff0c;一直致力于为用户提供优质、高效的服务。为了更好地满足新用户的需求&#xff0c;腾讯云在2024年推出了一系列新用户专属优惠活动和代金券活动。本文将为大家详细介绍这些活动&#xff0c;帮助大家更好地了解和利用这些优惠。…

Gogs - 管理协作者

Gogs - 管理协作者 References 仓库设置 管理协作者 权限设置 References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/

Android 13(T) - Media框架(2)- libmedia

这一节学习有两个目标&#xff1a; 1 熟悉Android Media API的源码路径与调用层次 2 从MediaPlayer的创建与销毁了解与native的串接 1、源码路径 Media相关的API位于&#xff1a;frameworks/base/media/java/android/media&#xff0c;里面提供有MediaPlayer MediaCodecList M…

代币合约 ERC20 Token接口

代币合约 在以太坊上发布代币就要遵守以太坊的规则&#xff0c;那么以太坊有什么规则呢?以太坊的精髓就是利用代码规定如何运作&#xff0c;由于在以太坊上发布智能合约是不能修改和删除的&#xff0c;所以智能合约一旦发布&#xff0c;就意味着永久有效&#xff0c;不可篡改…

如何解决NAND系统性能问题?-- NAND接口分类

三、NAND接口 NAND闪存接口是连接主机控制器与NAND存储芯片的通信桥梁&#xff0c;负责命令、地址和数据的传输。典型的NAND闪存接口包括一组I/O线&#xff08;通常为8条或更多&#xff09;用于数据传输&#xff0c;以及若干控制信号线。 基本接口信号&#xff1a; Chip Enable…

吲哚及其衍生物:连接肠道炎症与神经健康的隐秘调节剂

谷禾健康 你敢相信吗&#xff1f;从粪便中提取出具有强烈粪臭味的物质&#xff0c;当用酒精稀释上千倍后&#xff0c;脱胎换骨变成了一种香味。这就是一种吲哚衍生物——3-甲基吲哚(又名粪臭素) 吲哚&#xff0c;是所有花香类原精的关键成分&#xff0c;这种物质在低剂量1-3%浓…

如何利用RPA做UI自动化测试对传统自动化的降维打击

写在前面 RPA软件一开始的目的并不是自动化测试&#xff0c;而是要把电脑上面几十个、上百个常用的软件&#xff0c;通过机器人流程自动化来打通&#xff0c;通过一个软件来控制几十个、上百个软件。而这个过程&#xff0c;其实覆盖了软件自动化测试。 所谓降维打击&#xff0c…

【第二课课后作业】书生·浦语大模型实战营-轻松玩转书生·浦语大模型趣味Demo

目录 轻松玩转书生浦语大模型趣味Demo课后作业1. 基础作业1.1 使用 InternLM-Chat-7B 模型生成 300 字的小故事&#xff1a;1.2 熟悉 hugging face 下载功能&#xff0c;使用 huggingface_hub python 包&#xff0c;下载 InternLM-20B 的 config.json 文件到本地 2. 进阶作业2.…

强化学习应用(三):基于Q-learning的无人机物流路径规划研究(提供Python代码)

一、Q-learning简介 Q-learning是一种强化学习算法&#xff0c;用于解决基于马尔可夫决策过程&#xff08;MDP&#xff09;的问题。它通过学习一个价值函数来指导智能体在环境中做出决策&#xff0c;以最大化累积奖励。 Q-learning算法的核心思想是通过不断更新一个称为Q值的…