flume mysql hdfs_利用Flume将MySQL表数据准实时抽取到HDFS

一、为什么要用到Flume

在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行访问。这种方式只需要很少量的配置即可完成数据抽取任务,但缺点同样明显,那就是实时性。Sqoop使用MapReduce读写数据,而MapReduce是为了批处理场景设计的,目标是大吞吐量,并不太关心低延时问题。就像实验中所做的,每天定时增量抽取数据一次。

Flume是一个海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据。同时,Flume提供对数据进行简单处理,并写到各种数据接受方的能力。Flume以流方式处理数据,可作为代理持续运行。当新的数据可用时,Flume能够立即获取数据并输出至目标,这样就可以在很大程度上解决实时性问题。

Flume是最初只是一个日志收集器,但随着flume-ng-sql-source插件的出现,使得Flume从关系数据库采集数据成为可能。下面简单介绍Flume,并详细说明如何配置Flume将MySQL表数据准实时抽取到HDFS。

二、Flume简介

1. Flume的概念

Flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到HDFS,简单来说flume就是收集日志的,其架构如图1所示。

e0c978d15c420c869e0e4865bc71587a.png

图1

2. Event的概念

在这里有必要先介绍一下Flume中event的相关概念:Flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,Flume再删除自己缓存的数据。

在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?Event将传输的数据进行封装,是Flume传输数据的基本单位,如果是文本文件,通常是一行记录。Event也是事务的基本单位。Event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。Event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

3. Flume架构介绍

Flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent。Agent本身是一个Java进程,运行在日志收集节点——所谓日志收集节点就是服务器节点。 Agent里面包含3个核心的组件:source、channel和sink,类似生产者、仓库、消费者的架构。

Source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。

Channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。

Sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。

4. Flume的运行机制

Flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据输入的source,一个是数据输出的sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方,例如HDFS等。注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。

三、安装Hadoop和Flume

我的实验在HDP 2.5.0上进行,HDP安装中包含Flume,只要配置Flume服务即可。HDP的安装步骤参见“HAWQ技术解析(二) —— 安装部署”

四、配置与测试

1. 建立MySQL数据库表

建立测试表并添加数据。

use test;

create table  wlslog

(id         int not null,

time_stamp varchar(40),

category   varchar(40),

type       varchar(40),

servername varchar(40),

code       varchar(40),

msg        varchar(40),

primary key ( id )

);

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(1,'apr-8-2014-7:06:16-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to standby');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(2,'apr-8-2014-7:06:17-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to starting');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(3,'apr-8-2014-7:06:18-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to admin');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(4,'apr-8-2014-7:06:19-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to resuming');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(5,'apr-8-2014-7:06:20-pm-pdt','notice','weblogicserver','adminserver','bea-000361','started weblogic adminserver');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(6,'apr-8-2014-7:06:21-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to running');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(7,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

commit;

2. 建立相关目录与文件

(1)创建本地状态文件

mkdir -p /var/lib/flume

cd /var/lib/flume

touch sql-source.status

chmod -R 777 /var/lib/flume

(2)建立HDFS目标目录

hdfs dfs -mkdir -p /flume/mysql

hdfs dfs -chmod -R 777 /flume/mysql

3. 准备JAR包

从http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下载flume-ng-sql-source-1.3.7.jar文件,并复制到Flume库目录。

cp flume-ng-sql-source-1.3.7.jar /usr/hdp/current/flume-server/lib/

将MySQL JDBC驱动JAR包也复制到Flume库目录。

cp mysql-connector-java-5.1.17.jar /usr/hdp/current/flume-server/lib/mysql-connector-java.jar

4. 建立HAWQ外部表

create external table ext_wlslog

(id         int,

time_stamp varchar(40),

category   varchar(40),

type       varchar(40),

servername varchar(40),

code       varchar(40),

msg        varchar(40)

) location ('pxf://mycluster/flume/mysql?profile=hdfstextmulti') format 'csv' (quote=e'"');

5. 配置Flume

在Ambari -> Flume -> Configs -> flume.conf中配置如下属性:

agent.channels.ch1.type = memory

agent.sources.sql-source.channels = ch1

agent.channels = ch1

agent.sinks = HDFS

agent.sources = sql-source

agent.sources.sql-source.type = org.keedio.flume.source.SQLSource

agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test

agent.sources.sql-source.user = root

agent.sources.sql-source.password = 123456

agent.sources.sql-source.table = wlslog

agent.sources.sql-source.columns.to.select = *

agent.sources.sql-source.incremental.column.name = id

agent.sources.sql-source.incremental.value = 0

agent.sources.sql-source.run.query.delay=5000

agent.sources.sql-source.status.file.path = /var/lib/flume

agent.sources.sql-source.status.file.name = sql-source.status

agent.sinks.HDFS.channel = ch1

agent.sinks.HDFS.type = hdfs

agent.sinks.HDFS.hdfs.path = hdfs://mycluster/flume/mysql

agent.sinks.HDFS.hdfs.fileType = DataStream

agent.sinks.HDFS.hdfs.writeFormat = Text

agent.sinks.HDFS.hdfs.rollSize = 268435456

agent.sinks.HDFS.hdfs.rollInterval = 0

agent.sinks.HDFS.hdfs.rollCount = 0

Flume在flume.conf文件中指定Source、Channel和Sink相关的配置,各属性描述如表1所示。

属性

描述

agent.channels.ch1.type

Agent的channel类型

agent.sources.sql-source.channels

Source对应的channel名称

agent.channels

Channel名称

agent.sinks

Sink名称

agent.sources

Source名称

agent.sources.sql-source.type

Source类型

agent.sources.sql-source.connection.url

数据库URL

agent.sources.sql-source.user

数据库用户名

agent.sources.sql-source.password

数据库密码

agent.sources.sql-source.table

数据库表名

agent.sources.sql-source.columns.to.select

查询的列

agent.sources.sql-source.incremental.column.name

增量列名

agent.sources.sql-source.incremental.value

增量初始值

agent.sources.sql-source.run.query.delay

发起查询的时间间隔,单位是毫秒

agent.sources.sql-source.status.file.path

状态文件路径

agent.sources.sql-source.status.file.name

状态文件名称

agent.sinks.HDFS.channel

Sink对应的channel名称

agent.sinks.HDFS.type

Sink类型

agent.sinks.HDFS.hdfs.path

Sink路径

agent.sinks.HDFS.hdfs.fileType

流数据的文件类型

agent.sinks.HDFS.hdfs.writeFormat

数据写入格式

agent.sinks.HDFS.hdfs.rollSize

目标文件轮转大小,单位是字节

agent.sinks.HDFS.hdfs.rollInterval

hdfs sink间隔多长将临时文件滚动成最终目标文件,单位是秒;如果设置成0,则表示不根据时间来滚动文件

agent.sinks.HDFS.hdfs.rollCount

当events数据达到该数量时候,将临时文件滚动成目标文件;如果设置成0,则表示不根据events数据来滚动文件

表1

6. 运行Flume代理

保存上一步的设置,然后重启Flume服务,如图2所示。

2b240c9593645f13c33234f14a05f3fb.png

图2

重启后,状态文件已经记录了将最新的id值7,如图3所示。

d5e813d00d6f6ebf5aad2569a0bc95a3.png

图3

查看目标路径,生成了一个临时文件,其中有7条记录,如图4所示。

5420277b577be18363c1b9ff65cdd82b.png

图4

查询HAWQ外部表,结果也有全部7条数据,如图5所示。

74fd2ac4e9c81f160636fc31a2e5c3b6.png

图5

至此,初始数据抽取已经完成。

7. 测试准实时增量抽取

在源表中新增id为8、9、10的三条记录。

use test;

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(8,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(9,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(10,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

commit;

5秒之后查询HAWQ外部表,从图6可以看到,已经查询出全部10条数据,准实时增量抽取成功。

65d3aa6d6005a8ee1116077e1664ae3f.png

图6

五、方案优缺点

利用Flume采集关系数据库表数据最大的优点是配置简单,不用编程。相比tungsten-replicator的复杂性,Flume只要在flume.conf文件中配置source、channel及sink的相关属性,已经没什么难度了。而与现在很火的canal比较,虽然不够灵活,但毕竟一行代码也不用写。再有该方案采用普通SQL轮询的方式实现,具有通用性,适用于所有关系库数据源。

这种方案的缺点与其优点一样突出,主要体现在以下几方面。

在源库上执行了查询,具有入侵性。

通过轮询的方式实现增量,只能做到准实时,而且轮询间隔越短,对源库的影响越大。

只能识别新增数据,检测不到删除与更新。

要求源库必须有用于表示增量的字段。

即便有诸多局限,但用Flume抽取关系库数据的方案还是有一定的价值,特别是在要求快速部署、简化编程,又能满足需求的应用场景,对传统的Sqoop方式也不失为一种有效的补充。

参考:

Flume架构以及应用介绍

Streaming MySQL Database Table Data to HDFS with Flume

how to read data from oracle using FLUME to kafka broker

https://github.com/keedio/flume-ng-sql-source

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/432594.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一种解决运行程序报“应用程序配置不正确”的问题

在我们开发工程中,可能有些情况下,不能在本机进行调试。这个时候我们一般会使用VM(vmware)建立一个虚拟机环境,然后把编译过的程序放在该虚拟机环境下执行调试。可是在某些情况下,不管我们编译的是debug还是release版本…

mysql+ubunt+绿色安装_Mysql在ubuntu18上的安装及简单使用

数据相关行业都离不开数据库,mysql在ubuntu上的安装比在windows上安装简单多了,下面我记录一下自己成功安装的步骤和使用。1.安装软件首先更新一下源:sudo apt-get update然后安装mysql服务器端:sudo apt-get install mysql-serve…

sql字符串拼接_Mybatis的SqlSession执行sql过程

上一篇分析了SqlSession执行sql的过程,其中并没有分析sql是从哪里来的,今天就来仔细分析下。Sql来源从上一篇的最后一步执行sql那里倒推sql的来源,源码主要过程如下图:可以看到最后是通过BoundSql直接获取的sql,然后往…

深入浅出FSUIPC的作用以及使用方法

看此贴前您需要掌握的技能或知识:1. 有FSX或FS2004并正确安装了FSUIPC 2. 具备一定的C语言理解能力,C语言是一切高级语言的基础,单片机主要也用的C语言。 3. 掌握以下几种编程语言之一即可: VC\VB\C#\DELPHI\JAVA\CMFC\.NET版的C或…

在 VC6 中使用 GdiPlus-安装

安装三部曲: Step1:下载 GdiPlus SDK 文件包; 链接地址1:http://www.codeguru.com/code/legacy/gdi/GDIPlus.zip 链接地址2:http://www.codersource.net/samples/mfcgdiplus.zip Step2:安装; &a…

ASP.NET MVC 的多国语系支持

ASP.NET MVC 的多国语系支持 posted on 2014-05-14 11:31 stickout 阅读(...) 评论(...) 编辑 收藏 转载于:https://www.cnblogs.com/linhui/p/3727364.html

aliyun centos6 安装mysql_阿里云CentOS6.8安装MySQL5.6

1、使用SSH Secure Shell工具连接阿里云服务器2、使用SSH Secure File Transfer工具上传MySQL压缩包3、解压MySQL压缩包到指定目录(需要在先/usr/local下创建mysql目录)进入压缩文件存放位置,进行解打包:tar -xvf MySQL-5.6.22-1.el6.i686.rpm-bundle.ta…

用Javascript获取页面元素的位置

制作网页的过程中,你有时候需要知道某个元素在网页上的确切位置。 下面的教程总结了Javascript在网页定位方面的相关知识。 一、网页的大小和浏览器窗口的大小 首先,要明确两个基本概念。 一张网页的全部面积,就是它的大小。通常情况下&#…

[Qt] 利用QtWebKit完成JavaScript访问C++对象

http://blog.csdn.net/longsir_area/article/details/42965565 一. 介绍 在浏览器扩展或者WebApp的项目经常用的脚本语言JavaScript有很多局限性,比如,javascript语言不能够夸窗口访问js对象,不能直接读写磁盘文件(这个…

mysql三大范式_MySQL学习笔记

1、数据库结构设计1、总-总体流程图2、分-【提取属性】业务分析评价的属性:{用户,课程主标题,内容,综合评分,内容实用,简洁易懂,逻辑分析,发布时间} 问答评论属性:{类型,…

QT webkit 各个类之间关系--QWebView-QWebPag

一、QT webkit简介 1.Qt Qt(发音同 cute)是一个跨平台的C应用程式开发框架,有时又被称为C部件工具箱。Qt被用在KDE桌面环境、Opera、Google Earth、Skype、Adobe Photoshop Album和VirtualBox的开发中。它是挪威Qt Software 的产品&#xff0…

vue 文件转换二进制_在vue中使用axios实现post方式获取二进制流下载文件(实例代码)...

需求点击导出下载表格对应的excel文件在 vue 项目中,使用的 axios ,后台 java 提供的 post 接口 api实现第一步,在 axios 请求中加入参数,表示接收的数据为二进制文件流responseType: "blob"第二步,在拿到数据流之后,把流转为指定文件格式并创建a标签,模拟点击下载,实…

vs生成qt moc文件

1. 右键需要生成moc文件的头文件 2. 将生产的moc加入工程中

javascript好文---深入理解定位父级offsetParent及偏移大小

前面的话 偏移量(offset dimension)是javascript中的一个重要的概念。涉及到偏移量的主要是offsetLeft、offsetTop、offsetHeight、offsetWidth这四个属性。当然,还有一个偏移参照——定位父级offsetParent。本文将详细介绍该部分内容 offsetParent定位父级 在理解…

bash中将字符串split成数组的方法

相信编程时,字符串的处理是很频繁被处理的问题,其中大家肯定不陌生各种语言的string.split(sp)将字符串按照某个字符或子串切分成一个数组。 同样,我们在用shell处理文本信息时也可以方便地实现该功能。 这里主要使用了bash中关于字符串变量的…

理解 e.clientX,e.clientY e.pageX

event.clientX、event.clientY 鼠标相对于浏览器窗口可视区域的X,Y坐标(窗口坐标),可视区域不包括工具栏和滚动条。IE事件和标准事件都定义了这2个属性 event.pageX、event.pageY 类似于event.clientX、event.clientY,…

基于FlashPaper的文档播放器

本文主要讨论、描述了使用Adobe公司的Flex与FlashPaper产品完成对发布到网上的文档资料进行只读控制,也就是说只允许浏览操作、对下载、打印进行控制。FlashPaper FlashPaper是Macromedia的一款用于将操作系统所识别的文档的内容通过虚拟打印机制将内容转换为swf文件…

python经纬度转换xy坐标公式 pyqt_EXCEL公式进行经纬度与XY坐标的相互转换

一、用EXCEL进行高斯投影换算从经纬度B、L换算到高斯平面直角坐标X、Y(高斯投影正算),或从X、Y换算成B、L(高斯投影反算),一般需要专用计算机软件完成。在目前流行的换算软件中不足之处,就是灵活性较差,大都需要一个点一个点地进行…

java桥_JAVA 桥模式

桥梁模式的用意是“将抽象化(Abstraction)与实现化(Implementation)脱耦,使得二者可以独立地变化”。这句话很短,但是第一次读到这句话的人很可能都会思考良久而不解其意。这句话有三个关键词,也就是抽象化、实现化和脱耦。理解这三个词所代表…

java utf8 byte_byte以及UTF-8的转码规则

https://www.cnblogs.com/hell8088/p/9184336.html多年来闲麻烦,只记录笔记,不曾编写BLOG,本文为原创,如需转载请标明出处废话不说,直奔主题ascii计算机只接受 “高”、“低”电压,所以使用二进制 1 和 …