一、Flume的安装
1.下载压缩包
https://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
2.上传到linux中
3.解压安装包
cd
#进入加载压缩包目录sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local
# 将 apache-flume-1.7.0-bin.tar.gz 解压到/usr/local 目录下sudo mv /usr/local/apache-flume-1.7.0-bin /usr/local/flume-1.7.0
将解压后的文件夹名称从 apache-flume-1.7.0-bin 改为 flume-1.7.0cd /usr/local/sudo chown -R qiangzi:qiangzi ./flume-1.7.0
#把/usr/local/flume-1.7.0 目录的权限赋予当前登录 Linux 系统的用户,这里假设是 qiangzi 用户
4.配置环境变量
cd /usr/local/sudo vim ~/.bashrc
然后在首行加入如下代码:
export JAVA_HOME=/usr/local/jdk1.8.0/;
export FLUME_HOME=/usr/local/flume-1.7.0
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin
注意,上面的 JAVA_HOME,如果以前已经在.bashrc 文件中设置过,就不要重复添加 了,使用以前的设置即可。比如,以前设置得 JAVA_HOME 可能是“export JAVA_HOME=/usr/lib/jvm/default-java”,则使用原来的设置即可。
使环境变量生效:
[qiangzi@master local]$ source ~/.bashrc
5.修改 flume-env.sh 配置文件
cd /usr/local/flume-1.7.0/confsudo cp ./flume-env.sh.template ./flume-env.shsudo vim ./flume-env.sh
打开 flume-env.sh 文件以后,在文件的最开始位置增加一行内容,用于设置 JAVA_HOME 变量
export JAVA_HOME=/usr/local/jdk1.8.0/;
6.查看 flume 版本信息
cd /usr/local/flume-1.7.0./bin/flume-ng version #查看 flume 版本信息;
如果安装成功,出现如下图片
二、Flume的使用
1.使用 Avro 数据源测试 Flume
Avro 可以发送一个给定的文件给 Flume,Avro 源使用 AVRO RPC 机制。请对 Flume 的相关配置文件进行设置,从而可以实现如下功能:在一个终端中新建一个文件 helloworld.txt(里面包含一行文本“Hello World”),在另外一个终端中启动 Flume 以后, 可以把 helloworld.txt 中的文本内容显示出来。
(1)agent 配置文件
cd /usr/local/flumesudo vim ./conf/avro.conf #在 conf 目录下编辑一个 avro.conf 空文件
然后,在 avro.conf 写入以下内容
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
上面 Avro Source 参数说明如下: Avro Source 的别名是 avro,也可以使用完整类别名 称 org.apache.flume.source.AvroSource,因此,上面有一行设置是 a1.sources.r1.type = avro,表示数据源的类型是 avro。bind 绑定的 ip 地址或主机名,使用 0.0.0.0 表示绑定机 器所有的接口。a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。port 表示绑定 的端口。a1.sources.r1.port = 4141,表示绑定的端口是 4141。a1.sinks.k1.type = logger, 表示 sinks 的类型是 logger。
(2)启动 flume agent a1
/usr/local/flume-1.7.0/bin/flume-ng agent -c /usr/local/flume-1.7.0/conf/ -f /usr/local/flume-1.7.0/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
agent 窗口
(3)创建指定文件
先打开另外一个终端,在/usr/local/flume 下写入一个文件 log.00,内容为 hello,world
echo "hello, world" > /usr/local/flume-1.7.0/log.00
前面已经启动日志控制台,这里不再启动
再打开另外一个终端,执行:
cd /usr/local/flume-1.7.0bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /usr/local/flume-1.7.0/log.00
此时我们可以看到第一个终端(agent 窗口)下的显示,也就是在日志控制台,就会把 log.00 文件的内容打印出来:
avro source 执行成功!
2. 使用 netcat 数据源测试 Flume
(1)创建 agent 配置文件
cd /usr/local/flume-1.7.0
sudo vim ./conf/example.conf #在 conf 目录创建 example.conf
在 example.conf 里写入以下内容:
#example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#同上,记住该端口名
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(2)启动 flume agent (即打开日志控制台)
/usr/local/flume-1.7.0/bin/flume-ng agent --conf /usr/local/flume-1.7.0/conf --conf-file /usr/local/flume-1.7.0/conf/example.conf --name a1 -Dflume.root.logger=INFO,console
如图:
再打开一个终端,输入命令
telnet localhost 44444 #上面不能使用,使用nc
nc localhost 44444
然后我们可以在终端下输入任何字符,第一个终端的日志控制台也会有相应的显示,如 我们输入”hello,world”或者其他内容,得出
第一个终端的日志控制台显示:
netcatsource 运行成功!
这里补充一点,flume 只能传递英文和字符,不能用中文,我们先可以在第二个终端输 入“中国河南省”等字样:
第一个终端的日志控制台显示:
3.使用 Flume 作为 Spark Streaming 数据源
Flume 是非常流行的日志采集系统,可以作为 Spark Streaming 的高级数据源。请把 Flume Source 设置为 netcat 类型,从终端上不断给 Flume Source 发送各种消息,Flume 把消息汇集 到 Sink,这里把 Sink 类型设置为 avro,由 Sink 把消息推送给 Spark Streaming,由自己编写 的 Spark Streaming 应用程序对消息进行处理。
(1)配置 Flume 数据源
登录 Linux 系统,打开一个终端,执行如下命令新建一个 Flume 配置文件 flume-to-spark.conf
cd /usr/local/flume-1.7.0
cd conf
vim flume-to-spark.conf
flume-to-spark.conf 文件中写入如下内容:
在上面的配置文件中,我们把 Flume Source 类别设置为 netcat,绑定到 localhost 的 33333 端口,这样,我们后面就可以通过“telnet localhost 33333”命令向 Flume Source 发 送消息。
同时,我们把 Flume Sink 类别设置为 avro,绑定到 localhost 的 44444 端口,这样, Flume Source 把采集到的消息汇集到 Flume Sink 以后,Sink 会把消息推送给 localhost 的 44444 端口,而我们编写的 Spark Streaming 程序一直在监听 localhost 的 44444 端口,一 旦有消息到达,就会被 Spark Streaming 应用程序取走进行处理。
特别要强调的是,上述配置文件完成以后,暂时“不要”启动 Flume Agent,如果这个时 候使用“flume-ng agent”命令启动 agent,就会出现错误提示“localhost:44444 拒绝连接”,也 就是 Flume Sink 要发送消息给 localhost 的 44444 端口,但是,无法连接上 localhost 的 44444 端口。为什么会出现这个错误呢?因为,这个时候我们还没有启动 Spark Streaming 应用程序,也就没有启动 localhost 的 44444 端口,所以,Sink 是无法向这个端口发送消息 的。
(2)Spark 的准备工作(版本2.4.8以下,新版本没有相关jar包)
Kafka 和 Flume 等高级输入源,需要依赖独立的库(jar 文件)。按照我们前面安装好 的 Spark 版本,这些 jar 包都不在里面,为了证明这一点,我们现在可以测试一下。请打开 一个新的终端,然后启动 spark-shell:
cd /usr/local/spark-2.1.0
./bin/spark-shell
启动成功后,在 spark-shell 中执行下面 import 语句:
你可以看到,马上会报错,因为找不到相关的 jar 包。所以,现在我们就需要下载 spark-streaming-flume_2.11-2.1.0.jar,其中2.11表示对应的Scala版本号,2.1.0表示Spark 版本号。打开下方的网址 http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.11/2.1.0,里 面有提供 spark-streaming-flume_2.11-2.1.0.jar 文件的下载。
查看Spark和Scala版本(Spark安装版本最好在2.4.8版本之下,高版本没有相关jar包)
Spark下载网址:Index of /dist/spark (apache.org)
进入网站,点击红框内的jar下载
下载好的文件上传到 Linux 登录用户目录下
现在,我们在“/usr/local/spark-2.1.0/jars”(由于版本不匹配)目录下新建一个“flume”目录,就把这个文件复制到 Spark 目录的“/usr/local/spark-2.1.0/jars/flume”目录下。请新打开一个终端,输入下面命令:
cd /usr/local/spark-2.1.0/jars
mkdir flume
cd ~
mv ./spark-streaming-flume_2.12-2.4.8.jar /usr/local/spark-2.1.0/jars/flume
我们就成功地把 spark-streaming-flume_2.11-2.1.0.jar 文件拷贝到了 “/usr/local/spark/jars/flume”目录下。
下面还要继续把 Flume 安装目录的 lib 目录下的所有 jar 文件复制到 “/usr/local/spark-2.4.8/jars/flume”目录下,请在终端中执行下面命令:
cd /usr/local/flume-1.7.0/lib
ls
cp ./* /usr/local/spark-2.1.0/jars/flume
(3)编写 Spark 程序使用 Flume 数据源
新打开一个终端,然后,执行命令创建代码目录:
cd /usr/local/spark-2.1.0/mycode
mkdir flume
cd flume
mkdir -p src/main/scala
cd src/main/scala
vim FlumeEventCount.scala
在 FlumeEventCount.scala 代码文件中输入以下代码:
package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
object FlumeEventCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println("Usage: FlumeEventCount <host> <port>")System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(host, IntParam(port)) = argsval batchInterval = Milliseconds(2000)// Create the context and set the batch sizeval sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf, batchInterval)// Create a flume streamval stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)// Print out the count of events received from this server in each batchstream.count().map(cnt => "Received " + cnt + " flume events." ).print()ssc.start()ssc.awaitTermination()}
}
保存 FlumeEventCount.scala 文件并退出 vim 编辑器。FlumeEventCount.scala 程序在 编译后运行时,需要我们提供 host 和 port 两个参数,程序会对指定的 host 和指定的 port 进行监听,Milliseconds(2000)设置了时间间隔为 2 秒,所以,该程序每隔 2 秒就会从指定 的端口中获取由 Flume Sink 发给该端口的消息,然后进行处理,对消息进行统计,打印出 “Received 0 flume events.”这样的信息。
然后再使用 vim 编辑器新建 StreamingExamples.scala 文件,输入如下代码,用于控 制日志输出格式:
package org.apache.spark.examples.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}}
}
保存 StreamingExamples.scala 文件并退出 vim 编辑器。
在“/usr/local/spark-2.4.8/mycode/flume/src/main/scala”目录下,就有了如下两个 代码文件:
然后,执行下面命令新建一个 simple.sbt 文件:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.1.0"
查看目录
保存文件退出 vim 编辑器。然后执行下面命令,进行打包编译:
cd /usr/local/spark-2.1.0/mycode/flume/
/usr/local/sbt/sbt package #执行这一步之前需要先安装 sbt 插件
如果看到类似如下的屏幕信息,就表示打包成功了:
打包成功后,就可以执行程序测试效果了。
(4)测试程序效果
关闭之前打开的所有终端。首先,请新建第 1 个 Linux 终端,启动 Spark Streaming 应 用程序,命令如下:
cd /usr/local/spark-2.1.0./bin/spark-submit --driver-class-path /usr/local/spark-2.1.0/jars/*:/usr/local/spark-2.1.0/jars/flume/* --class "org.apache.spark.examples.streaming.FlumeEventCount" /usr/local/spark-2.1.0/mycode/flume/target/scala-2.11/simple-project_2.11-1.0.jar localhost 44444
通过上面命令,我们为应用程序提供 host 和 port 两个参数的值分别为 localhost 和 44444,程序会对 localhost 的 44444 端口进行监听,Milliseconds(2000)设置了时间间隔为 2 秒,所以,该程序每隔 2 秒就会从指定的端口中获取由 Flume Sink 发给该端口的消息, 然后进行处理,对消息进行统计,打印出“Received 0 flume events.”这样的信息。
执行该命令后,屏幕上会显示程序运行的相关信息,并会每隔 2 秒钟刷新一次信息, 大量信息中会包含如下重要信息:
因为目前 Flume 还没有启动,没有给 FlumeEventCount 发送任何消息,所以 Flume Events 的数量是 0。第 1 个终端不要关闭,让它一直处于监听状态。
现在,另外新建第 2 个终端,在这个新的终端中启动 Flume Agent,命令如 下:
cd /usr/local/flume-1.7.0
bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console
启动 agent 以后,该 agent 就会一直监听 localhost 的 33333 端口,这样,我们下面就 可以通过“telnet localhost 33333”命令向 Flume Source 发送消息。第 2 个终端也不要关闭, 让它一直处于监听状态。
另外新建第 3 个终端,执行如下命令:
nc localhost 33333
执行该命令以后,就可以在这个窗口里面随便敲入若干个字符和若干个回车,这些消息 都会被 Flume 监听到,Flume 把消息采集到以后汇集到 Sink,然后由 Sink 发送给 Spark 的 FlumeEventCount 程序进行处理。然后,你就可以在运行 FlumeEventCount 的前面那个 终端窗口内看到类似如下的统计结果:
从屏幕信息中可以看出,我们在 telnet 那个终端内发送的消息,都被成功发送到 Spark 进行处理了。