Flume 的安装和使用方法(Spark-2.1.0)

一、Flume的安装

1.下载压缩包

https://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

2.上传到linux中

3.解压安装包
cd
#进入加载压缩包目录sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local
# 将 apache-flume-1.7.0-bin.tar.gz 解压到/usr/local 目录下sudo mv /usr/local/apache-flume-1.7.0-bin /usr/local/flume-1.7.0
将解压后的文件夹名称从 apache-flume-1.7.0-bin 改为 flume-1.7.0cd /usr/local/sudo chown -R qiangzi:qiangzi ./flume-1.7.0
#把/usr/local/flume-1.7.0 目录的权限赋予当前登录 Linux 系统的用户,这里假设是 qiangzi 用户
4.配置环境变量
cd /usr/local/sudo vim ~/.bashrc

然后在首行加入如下代码:

export JAVA_HOME=/usr/local/jdk1.8.0/;
export FLUME_HOME=/usr/local/flume-1.7.0 
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin

注意,上面的 JAVA_HOME,如果以前已经在.bashrc 文件中设置过,就不要重复添加 了,使用以前的设置即可。比如,以前设置得 JAVA_HOME 可能是“export JAVA_HOME=/usr/lib/jvm/default-java”,则使用原来的设置即可。

使环境变量生效:

[qiangzi@master local]$ source ~/.bashrc
5.修改 flume-env.sh 配置文件
cd /usr/local/flume-1.7.0/confsudo cp ./flume-env.sh.template ./flume-env.shsudo vim ./flume-env.sh

打开 flume-env.sh 文件以后,在文件的最开始位置增加一行内容,用于设置 JAVA_HOME 变量

export JAVA_HOME=/usr/local/jdk1.8.0/;
6.查看 flume 版本信息
cd /usr/local/flume-1.7.0./bin/flume-ng version #查看 flume 版本信息;

如果安装成功,出现如下图片

二、Flume的使用

1.使用 Avro 数据源测试 Flume

Avro 可以发送一个给定的文件给 Flume,Avro 源使用 AVRO RPC 机制。请对 Flume 的相关配置文件进行设置,从而可以实现如下功能:在一个终端中新建一个文件 helloworld.txt(里面包含一行文本“Hello World”),在另外一个终端中启动 Flume 以后, 可以把 helloworld.txt 中的文本内容显示出来。

(1)agent 配置文件
cd /usr/local/flumesudo vim ./conf/avro.conf #在 conf 目录下编辑一个 avro.conf 空文件

然后,在 avro.conf 写入以下内容

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

上面 Avro Source 参数说明如下: Avro Source 的别名是 avro,也可以使用完整类别名 称 org.apache.flume.source.AvroSource,因此,上面有一行设置是 a1.sources.r1.type = avro,表示数据源的类型是 avro。bind 绑定的 ip 地址或主机名,使用 0.0.0.0 表示绑定机 器所有的接口。a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。port 表示绑定 的端口。a1.sources.r1.port = 4141,表示绑定的端口是 4141。a1.sinks.k1.type = logger, 表示 sinks 的类型是 logger。

(2)启动 flume agent a1
/usr/local/flume-1.7.0/bin/flume-ng agent -c /usr/local/flume-1.7.0/conf/ -f /usr/local/flume-1.7.0/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

agent 窗口

(3)创建指定文件

先打开另外一个终端,在/usr/local/flume 下写入一个文件 log.00,内容为 hello,world

echo "hello, world" > /usr/local/flume-1.7.0/log.00

前面已经启动日志控制台,这里不再启动

再打开另外一个终端,执行:

cd /usr/local/flume-1.7.0bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /usr/local/flume-1.7.0/log.00

此时我们可以看到第一个终端(agent 窗口)下的显示,也就是在日志控制台,就会把 log.00 文件的内容打印出来:

avro source 执行成功!

2. 使用 netcat 数据源测试 Flume
(1)创建 agent 配置文件
cd /usr/local/flume-1.7.0
sudo vim ./conf/example.conf   #在 conf 目录创建 example.conf

在 example.conf 里写入以下内容:

#example.conf: A single-node Flume configuration 
# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 
# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 
#同上,记住该端口名
# Describe the sink 
a1.sinks.k1.type = logger 
# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1
(2)启动 flume agent (即打开日志控制台)
/usr/local/flume-1.7.0/bin/flume-ng agent --conf /usr/local/flume-1.7.0/conf --conf-file /usr/local/flume-1.7.0/conf/example.conf --name a1 -Dflume.root.logger=INFO,console

如图:

再打开一个终端,输入命令

telnet localhost 44444   #上面不能使用,使用nc
nc localhost 44444

 然后我们可以在终端下输入任何字符,第一个终端的日志控制台也会有相应的显示,如 我们输入”hello,world”或者其他内容,得出

       第一个终端的日志控制台显示:

netcatsource 运行成功!

这里补充一点,flume 只能传递英文和字符,不能用中文,我们先可以在第二个终端输 入“中国河南省”等字样:

第一个终端的日志控制台显示:

3.使用 Flume 作为 Spark Streaming 数据源

Flume 是非常流行的日志采集系统,可以作为 Spark Streaming 的高级数据源。请把 Flume Source 设置为 netcat 类型,从终端上不断给 Flume Source 发送各种消息,Flume 把消息汇集 到 Sink,这里把 Sink 类型设置为 avro,由 Sink 把消息推送给 Spark Streaming,由自己编写 的 Spark Streaming 应用程序对消息进行处理。

(1)配置 Flume 数据源

登录 Linux 系统,打开一个终端,执行如下命令新建一个 Flume 配置文件 flume-to-spark.conf

cd /usr/local/flume-1.7.0
cd conf
vim flume-to-spark.conf

flume-to-spark.conf 文件中写入如下内容:

在上面的配置文件中,我们把 Flume Source 类别设置为 netcat,绑定到 localhost 的 33333 端口,这样,我们后面就可以通过“telnet localhost 33333”命令向 Flume Source 发 送消息。

同时,我们把 Flume Sink 类别设置为 avro,绑定到 localhost 的 44444 端口,这样, Flume Source 把采集到的消息汇集到 Flume Sink 以后,Sink 会把消息推送给 localhost 的 44444 端口,而我们编写的 Spark Streaming 程序一直在监听 localhost 的 44444 端口,一 旦有消息到达,就会被 Spark Streaming 应用程序取走进行处理。

特别要强调的是,上述配置文件完成以后,暂时“不要”启动 Flume Agent,如果这个时 候使用“flume-ng agent”命令启动 agent,就会出现错误提示“localhost:44444 拒绝连接”,也 就是 Flume Sink 要发送消息给 localhost 的 44444 端口,但是,无法连接上 localhost 的 44444 端口。为什么会出现这个错误呢?因为,这个时候我们还没有启动 Spark Streaming 应用程序,也就没有启动 localhost 的 44444 端口,所以,Sink 是无法向这个端口发送消息 的。

(2)Spark 的准备工作(版本2.4.8以下,新版本没有相关jar包

Kafka 和 Flume 等高级输入源,需要依赖独立的库(jar 文件)。按照我们前面安装好 的 Spark 版本,这些 jar 包都不在里面,为了证明这一点,我们现在可以测试一下。请打开 一个新的终端,然后启动 spark-shell:

cd /usr/local/spark-2.1.0
./bin/spark-shell

启动成功后,在 spark-shell 中执行下面 import 语句:

你可以看到,马上会报错,因为找不到相关的 jar 包。所以,现在我们就需要下载 spark-streaming-flume_2.11-2.1.0.jar,其中2.11表示对应的Scala版本号,2.1.0表示Spark 版本号。打开下方的网址 http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.11/2.1.0,里 面有提供 spark-streaming-flume_2.11-2.1.0.jar 文件的下载。

查看Spark和Scala版本(Spark安装版本最好在2.4.8版本之下,高版本没有相关jar包

Spark下载网址:Index of /dist/spark (apache.org)

进入网站,点击红框内的jar下载

下载好的文件上传到 Linux 登录用户目录下

现在,我们在“/usr/local/spark-2.1.0/jars”(由于版本不匹配)目录下新建一个“flume”目录,就把这个文件复制到 Spark 目录的“/usr/local/spark-2.1.0/jars/flume”目录下。请新打开一个终端,输入下面命令:      

cd /usr/local/spark-2.1.0/jars
mkdir flume
cd ~
mv ./spark-streaming-flume_2.12-2.4.8.jar /usr/local/spark-2.1.0/jars/flume

我们就成功地把 spark-streaming-flume_2.11-2.1.0.jar 文件拷贝到了 “/usr/local/spark/jars/flume”目录下。

下面还要继续把 Flume 安装目录的 lib 目录下的所有 jar 文件复制到 “/usr/local/spark-2.4.8/jars/flume”目录下,请在终端中执行下面命令:

cd /usr/local/flume-1.7.0/lib
ls
cp ./* /usr/local/spark-2.1.0/jars/flume
(3)编写 Spark 程序使用 Flume 数据源

新打开一个终端,然后,执行命令创建代码目录:

cd /usr/local/spark-2.1.0/mycode
mkdir flume
cd flume
mkdir -p src/main/scala
cd src/main/scala
vim FlumeEventCount.scala

在 FlumeEventCount.scala 代码文件中输入以下代码:

package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
object FlumeEventCount {def main(args: Array[String]) {if (args.length < 2) {System.err.println("Usage: FlumeEventCount <host> <port>")System.exit(1)}StreamingExamples.setStreamingLogLevels()val Array(host, IntParam(port)) = argsval batchInterval = Milliseconds(2000)// Create the context and set the batch sizeval sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf, batchInterval)// Create a flume streamval stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)// Print out the count of events received from this server in each batchstream.count().map(cnt => "Received " + cnt + " flume events." ).print()ssc.start()ssc.awaitTermination()}
}

保存 FlumeEventCount.scala 文件并退出 vim 编辑器。FlumeEventCount.scala 程序在 编译后运行时,需要我们提供 host 和 port 两个参数,程序会对指定的 host 和指定的 port 进行监听,Milliseconds(2000)设置了时间间隔为 2 秒,所以,该程序每隔 2 秒就会从指定 的端口中获取由 Flume Sink 发给该端口的消息,然后进行处理,对消息进行统计,打印出 “Received 0 flume events.”这样的信息。

然后再使用 vim 编辑器新建 StreamingExamples.scala 文件,输入如下代码,用于控 制日志输出格式:

package org.apache.spark.examples.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}}
}

保存 StreamingExamples.scala 文件并退出 vim 编辑器。

在“/usr/local/spark-2.4.8/mycode/flume/src/main/scala”目录下,就有了如下两个 代码文件:

然后,执行下面命令新建一个 simple.sbt 文件:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.1.0" 

查看目录

保存文件退出 vim 编辑器。然后执行下面命令,进行打包编译:

cd /usr/local/spark-2.1.0/mycode/flume/
/usr/local/sbt/sbt package  #执行这一步之前需要先安装 sbt 插件

如果看到类似如下的屏幕信息,就表示打包成功了:

打包成功后,就可以执行程序测试效果了

(4)测试程序效果

关闭之前打开的所有终端。首先,请新建第 1 个 Linux 终端,启动 Spark Streaming 应 用程序,命令如下:

cd /usr/local/spark-2.1.0./bin/spark-submit --driver-class-path /usr/local/spark-2.1.0/jars/*:/usr/local/spark-2.1.0/jars/flume/* --class "org.apache.spark.examples.streaming.FlumeEventCount" /usr/local/spark-2.1.0/mycode/flume/target/scala-2.11/simple-project_2.11-1.0.jar localhost 44444

通过上面命令,我们为应用程序提供 host 和 port 两个参数的值分别为 localhost 和 44444,程序会对 localhost 的 44444 端口进行监听,Milliseconds(2000)设置了时间间隔为 2 秒,所以,该程序每隔 2 秒就会从指定的端口中获取由 Flume Sink 发给该端口的消息, 然后进行处理,对消息进行统计,打印出“Received 0 flume events.”这样的信息。

执行该命令后,屏幕上会显示程序运行的相关信息,并会每隔 2 秒钟刷新一次信息, 大量信息中会包含如下重要信息:

因为目前 Flume 还没有启动,没有给 FlumeEventCount 发送任何消息,所以 Flume Events 的数量是 0。第 1 个终端不要关闭,让它一直处于监听状态。

现在,另外新建第 2 个终端,在这个新的终端中启动 Flume Agent,命令如 下:

cd /usr/local/flume-1.7.0
bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

启动 agent 以后,该 agent 就会一直监听 localhost 的 33333 端口,这样,我们下面就 可以通过“telnet localhost 33333”命令向 Flume Source 发送消息。第 2 个终端也不要关闭, 让它一直处于监听状态。

另外新建第 3 个终端,执行如下命令:

nc localhost 33333

执行该命令以后,就可以在这个窗口里面随便敲入若干个字符和若干个回车,这些消息 都会被 Flume 监听到,Flume 把消息采集到以后汇集到 Sink,然后由 Sink 发送给 Spark 的 FlumeEventCount 程序进行处理。然后,你就可以在运行 FlumeEventCount 的前面那个 终端窗口内看到类似如下的统计结果:

从屏幕信息中可以看出,我们在 telnet 那个终端内发送的消息,都被成功发送到 Spark 进行处理了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/837450.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

有哪些值得买的开放式耳机推荐?2024年开放式运动耳机选购指南

开放式耳机因其独特设计&#xff0c;能在一定程度上保护听力。相较于传统封闭式耳机&#xff0c;开放式设计允许周围环境声音自然流入耳内&#xff0c;降低了耳内共振和声压&#xff0c;减少了耳道的不适感&#xff0c;从而减轻了对听力的潜在损害。对于追求音质与听力保护并重…

国外新闻媒体推广:多元化媒体分发投放-大舍传媒

前言 &#xff1a;随着全球化的进程&#xff0c;国外新闻市场呈现出快速发展的趋势。在这个趋势下&#xff0c;国外新闻媒体推广成为了各行业企业宣传业务的重要一环。本文将重点介绍大舍传媒的多元化媒体分发投放服务&#xff0c;以及对国外新闻媒体推广的意义。 1. 多元化媒…

阿赵UE引擎C++编程学习笔记——解决中文乱码问题

大家好&#xff0c;我是阿赵。   在UE编写C的时候&#xff0c;可能有些朋友发现&#xff0c;在C里面如果打印输出或者赋值一些中文的字符串的时候&#xff0c;会出现各种的报错&#xff0c;要么乱码&#xff0c;要么直接编译不过。   这个问题&#xff0c;其实和UE本身没什…

OSEK应用模式

1 前言 应用模式&#xff08;Application modes)用于区分不同的场景&#xff0c;以便在系统运行时&#xff0c;组织各自相互独立的OS相关的资源集合&#xff0c;是一种分而治之的思想体现。不同的应用模式是互斥的&#xff0c;即系统当前必须在一种应用模式&#xff08;且只能在…

Java面试八股之反射慢在哪里

Java反射慢在哪里 动态类型检查&#xff1a; 在反射过程中&#xff0c;Java需要在运行时确定类、方法、字段等的类型信息。这与编译时已经确定类型信息的常规对象访问不同&#xff0c;反射需要额外的类型查询和验证&#xff0c;增加了性能开销。 安全检查&#xff1a; 反射…

对文本框做字数限制

效果图 实现步骤 其中绝对布局根据需求自行调整 <!--单文本输入框--> <div class"form-group"><label class"col-sm-2 control-label is-required">面试公司&#xff1a;</label><div class"col-sm-9"><input …

前端崽的java study笔记

文章目录 basic1、sprint boot概述2、sprint boot入门3、yml 配置信息书写和获取 持续更新ing~ basic 1、sprint boot概述 sprint boot特性&#xff1a; 起步依赖&#xff08;maven坐标&#xff09;&#xff1a;解决配置繁琐的问题&#xff0c;只需要引入sprint boot起步依赖的…

对关系型数据库管理系统的介绍

1.数据库的相关介绍 关系型数据库管理系统&#xff1a;&#xff08;英文简称&#xff1a;RDBMS&#xff09; 为我们提供了一种存储数据的特定格式&#xff0c;所谓的数据格式就是表&#xff0c; 在数据库中一张表就称为是一种关系. 在关系型数据库中表由两部分组成&#xf…

大企业总部与分部组网方案

在全球化的经济环境中&#xff0c;大企业往往设有总部和多个地理分散的分部。为了确保信息的快 速流通、资源的优化配置以及管理的高效运作&#xff0c;构建一个稳定、安全且高效的组网方案显 得尤为重要。本文将探讨大企业如何通过技术手段和管理策略&#xff0c;实现总部与分…

学习古琴律学的好东西,帮您从基因里学古琴

《从基因里学懂古琴》是一本关于古琴律学的著作&#xff0c;作者通过基因的角度来解读古琴音乐的奥秘和美妙。古琴作为我国传统文化的瑰宝之一&#xff0c;具有悠久的历史和独特的音乐风格&#xff0c;但其律学原理一直以来都是一个谜。本书从基因的角度探讨了古琴音乐的律学特…

DigitalOcean 的PostgreSQL、MySQL、Redis、Kafka托管数据库,现已支持自定义指标收集功能

近期&#xff0c;我们的几个托管数据库&#xff08;PostgreSQL、MySQL、Redis和Kafka&#xff09;引入了自定义数据指标功能&#xff08;scrapable metrics&#xff09;。这些指标使您更具体、更细致地了解数据库的性能&#xff0c;包括延迟、资源利用率和错误率。然后&#xf…

vuex的基本认知

目录 一、什么是vuex 二、vuex的应用场景 三、vuex的优势 一、什么是vuex Vuex是一个vue的状态管理工具&#xff0c;状态就是数据。 进一步解释&#xff1a;vuex是一个插件&#xff0c;可以帮助我们管理vue通用的数据&#xff08;多组件共享的数据&#xff09; 二、vuex的…

Git 分支命令操作详解

目录 1、分支的特点 2、分支常用操作 3、分支的使用 3.1、查看分支 3.2、创建分支 3.3、修改分支 3.4、切换分支 3.5、合并分支 3.6、产生冲突 3.7、解决冲突 3.8、创建分支和切换分支说明 1、分支的特点 同时并行推进多个功能开发&#xff0c;提高开发效率。各个分…

C# 在Excel中添加筛选器并执行筛选 (日期筛选、文本筛选、数字筛选)

自动筛选器是 Excel 中的一个基本但极其有用的功能&#xff0c;它可以让你根据特定的条件来自动隐藏和显示你的数据。当有大量的数据需要处理时&#xff0c;这个功能可以帮你快速找到你需要的信息&#xff0c;从未更加有效地分析和处理相关数据。 下面将介绍如何使用免费.NET …

传感数据分析——加速度、速度与位移

传感数据分析——加速度、速度与位移 在许多科学和工程应用中&#xff0c;传感器数据的分析是一项至关重要的任务。特别是在运动、运输、结构监测等领域&#xff0c;传感器能够提供有关物体运动和变形的宝贵信息。本文将介绍如何利用Python进行传感器数据分析&#xff0c;重点…

TCP/UDP通信中的部分函数

UDP&#xff08;User Datagram Protocol&#xff0c;用户数据报协议&#xff09;和TCP&#xff08;Transmission Control Protocol&#xff0c;传输控制协议&#xff09;是互联网协议套件中最常用的两种传输层协议&#xff0c;它们负责在互联网中端到端地传输数据。尽管它们服务…

Laravel中使用MinIO进行文件操作及ZIP解压

Laravel中使用MinIO进行文件操作及ZIP解压指南 介绍 在本指南中&#xff0c;我们将详细介绍如何在laravel框架中操作minio&#xff0c;包含方法有&#xff1a;桶列表&#xff0c;创建桶&#xff0c;修改桶&#xff0c;上传文件&#xff0c;删除文件&#xff0c;生成直传链接&…

Linux系统编程:进程控制

1.进程创建 1.1 fork函数 fork&#xff08;&#xff09;通过复制调用进程来创建一个新进程。新进程称为子进程&#xff0c;是调用进程的精确副本 进程&#xff0c;但以下几点除外&#xff1a; 子进程有自己的PID&#xff0c;此PID与任何现有进程组的ID不匹配子进程的父进程ID…

Uncaught InternalError: too much recursion

今天在敲代码的时候偶然间发现项目因为一次操作导致浏览器变得非常卡&#xff0c;而且控制台还报错了 Uncaught InternalError: too much recursior 页面截图如下 &#xff1a; 突如起来的报错和页面异常卡顿给我整不会了ovo&#xff0c;点开报错的地方&#xff0c;直接跳转到对…

HTML满屏漂浮爱心

目录 写在前面 满屏爱心 代码分析 系列推荐 写在最后 写在前面 小编给大家准备了满屏漂浮爱心代码&#xff0c;一起来看看吧~ 满屏爱心 文件heart.svg <svg xmlns"http://www.w3.org/2000/svg" width"473.8px" height"408.6px" view…