flume mysql hdfs_利用Flume将MySQL表数据准实时抽取到HDFS

一、为什么要用到Flume

在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行访问。这种方式只需要很少量的配置即可完成数据抽取任务,但缺点同样明显,那就是实时性。Sqoop使用MapReduce读写数据,而MapReduce是为了批处理场景设计的,目标是大吞吐量,并不太关心低延时问题。就像实验中所做的,每天定时增量抽取数据一次。

Flume是一个海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据。同时,Flume提供对数据进行简单处理,并写到各种数据接受方的能力。Flume以流方式处理数据,可作为代理持续运行。当新的数据可用时,Flume能够立即获取数据并输出至目标,这样就可以在很大程度上解决实时性问题。

Flume是最初只是一个日志收集器,但随着flume-ng-sql-source插件的出现,使得Flume从关系数据库采集数据成为可能。下面简单介绍Flume,并详细说明如何配置Flume将MySQL表数据准实时抽取到HDFS。

二、Flume简介

1. Flume的概念

Flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到HDFS,简单来说flume就是收集日志的,其架构如图1所示。

e0c978d15c420c869e0e4865bc71587a.png

图1

2. Event的概念

在这里有必要先介绍一下Flume中event的相关概念:Flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,Flume再删除自己缓存的数据。

在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?Event将传输的数据进行封装,是Flume传输数据的基本单位,如果是文本文件,通常是一行记录。Event也是事务的基本单位。Event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。Event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

3. Flume架构介绍

Flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent。Agent本身是一个Java进程,运行在日志收集节点——所谓日志收集节点就是服务器节点。 Agent里面包含3个核心的组件:source、channel和sink,类似生产者、仓库、消费者的架构。

Source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。

Channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。

Sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。

4. Flume的运行机制

Flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据输入的source,一个是数据输出的sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方,例如HDFS等。注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。

三、安装Hadoop和Flume

我的实验在HDP 2.5.0上进行,HDP安装中包含Flume,只要配置Flume服务即可。HDP的安装步骤参见“HAWQ技术解析(二) —— 安装部署”

四、配置与测试

1. 建立MySQL数据库表

建立测试表并添加数据。

use test;

create table  wlslog

(id         int not null,

time_stamp varchar(40),

category   varchar(40),

type       varchar(40),

servername varchar(40),

code       varchar(40),

msg        varchar(40),

primary key ( id )

);

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(1,'apr-8-2014-7:06:16-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to standby');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(2,'apr-8-2014-7:06:17-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to starting');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(3,'apr-8-2014-7:06:18-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to admin');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(4,'apr-8-2014-7:06:19-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to resuming');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(5,'apr-8-2014-7:06:20-pm-pdt','notice','weblogicserver','adminserver','bea-000361','started weblogic adminserver');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(6,'apr-8-2014-7:06:21-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to running');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(7,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

commit;

2. 建立相关目录与文件

(1)创建本地状态文件

mkdir -p /var/lib/flume

cd /var/lib/flume

touch sql-source.status

chmod -R 777 /var/lib/flume

(2)建立HDFS目标目录

hdfs dfs -mkdir -p /flume/mysql

hdfs dfs -chmod -R 777 /flume/mysql

3. 准备JAR包

从http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下载flume-ng-sql-source-1.3.7.jar文件,并复制到Flume库目录。

cp flume-ng-sql-source-1.3.7.jar /usr/hdp/current/flume-server/lib/

将MySQL JDBC驱动JAR包也复制到Flume库目录。

cp mysql-connector-java-5.1.17.jar /usr/hdp/current/flume-server/lib/mysql-connector-java.jar

4. 建立HAWQ外部表

create external table ext_wlslog

(id         int,

time_stamp varchar(40),

category   varchar(40),

type       varchar(40),

servername varchar(40),

code       varchar(40),

msg        varchar(40)

) location ('pxf://mycluster/flume/mysql?profile=hdfstextmulti') format 'csv' (quote=e'"');

5. 配置Flume

在Ambari -> Flume -> Configs -> flume.conf中配置如下属性:

agent.channels.ch1.type = memory

agent.sources.sql-source.channels = ch1

agent.channels = ch1

agent.sinks = HDFS

agent.sources = sql-source

agent.sources.sql-source.type = org.keedio.flume.source.SQLSource

agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test

agent.sources.sql-source.user = root

agent.sources.sql-source.password = 123456

agent.sources.sql-source.table = wlslog

agent.sources.sql-source.columns.to.select = *

agent.sources.sql-source.incremental.column.name = id

agent.sources.sql-source.incremental.value = 0

agent.sources.sql-source.run.query.delay=5000

agent.sources.sql-source.status.file.path = /var/lib/flume

agent.sources.sql-source.status.file.name = sql-source.status

agent.sinks.HDFS.channel = ch1

agent.sinks.HDFS.type = hdfs

agent.sinks.HDFS.hdfs.path = hdfs://mycluster/flume/mysql

agent.sinks.HDFS.hdfs.fileType = DataStream

agent.sinks.HDFS.hdfs.writeFormat = Text

agent.sinks.HDFS.hdfs.rollSize = 268435456

agent.sinks.HDFS.hdfs.rollInterval = 0

agent.sinks.HDFS.hdfs.rollCount = 0

Flume在flume.conf文件中指定Source、Channel和Sink相关的配置,各属性描述如表1所示。

属性

描述

agent.channels.ch1.type

Agent的channel类型

agent.sources.sql-source.channels

Source对应的channel名称

agent.channels

Channel名称

agent.sinks

Sink名称

agent.sources

Source名称

agent.sources.sql-source.type

Source类型

agent.sources.sql-source.connection.url

数据库URL

agent.sources.sql-source.user

数据库用户名

agent.sources.sql-source.password

数据库密码

agent.sources.sql-source.table

数据库表名

agent.sources.sql-source.columns.to.select

查询的列

agent.sources.sql-source.incremental.column.name

增量列名

agent.sources.sql-source.incremental.value

增量初始值

agent.sources.sql-source.run.query.delay

发起查询的时间间隔,单位是毫秒

agent.sources.sql-source.status.file.path

状态文件路径

agent.sources.sql-source.status.file.name

状态文件名称

agent.sinks.HDFS.channel

Sink对应的channel名称

agent.sinks.HDFS.type

Sink类型

agent.sinks.HDFS.hdfs.path

Sink路径

agent.sinks.HDFS.hdfs.fileType

流数据的文件类型

agent.sinks.HDFS.hdfs.writeFormat

数据写入格式

agent.sinks.HDFS.hdfs.rollSize

目标文件轮转大小,单位是字节

agent.sinks.HDFS.hdfs.rollInterval

hdfs sink间隔多长将临时文件滚动成最终目标文件,单位是秒;如果设置成0,则表示不根据时间来滚动文件

agent.sinks.HDFS.hdfs.rollCount

当events数据达到该数量时候,将临时文件滚动成目标文件;如果设置成0,则表示不根据events数据来滚动文件

表1

6. 运行Flume代理

保存上一步的设置,然后重启Flume服务,如图2所示。

2b240c9593645f13c33234f14a05f3fb.png

图2

重启后,状态文件已经记录了将最新的id值7,如图3所示。

d5e813d00d6f6ebf5aad2569a0bc95a3.png

图3

查看目标路径,生成了一个临时文件,其中有7条记录,如图4所示。

5420277b577be18363c1b9ff65cdd82b.png

图4

查询HAWQ外部表,结果也有全部7条数据,如图5所示。

74fd2ac4e9c81f160636fc31a2e5c3b6.png

图5

至此,初始数据抽取已经完成。

7. 测试准实时增量抽取

在源表中新增id为8、9、10的三条记录。

use test;

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(8,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(9,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(10,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

commit;

5秒之后查询HAWQ外部表,从图6可以看到,已经查询出全部10条数据,准实时增量抽取成功。

65d3aa6d6005a8ee1116077e1664ae3f.png

图6

五、方案优缺点

利用Flume采集关系数据库表数据最大的优点是配置简单,不用编程。相比tungsten-replicator的复杂性,Flume只要在flume.conf文件中配置source、channel及sink的相关属性,已经没什么难度了。而与现在很火的canal比较,虽然不够灵活,但毕竟一行代码也不用写。再有该方案采用普通SQL轮询的方式实现,具有通用性,适用于所有关系库数据源。

这种方案的缺点与其优点一样突出,主要体现在以下几方面。

在源库上执行了查询,具有入侵性。

通过轮询的方式实现增量,只能做到准实时,而且轮询间隔越短,对源库的影响越大。

只能识别新增数据,检测不到删除与更新。

要求源库必须有用于表示增量的字段。

即便有诸多局限,但用Flume抽取关系库数据的方案还是有一定的价值,特别是在要求快速部署、简化编程,又能满足需求的应用场景,对传统的Sqoop方式也不失为一种有效的补充。

参考:

Flume架构以及应用介绍

Streaming MySQL Database Table Data to HDFS with Flume

how to read data from oracle using FLUME to kafka broker

https://github.com/keedio/flume-ng-sql-source

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/432594.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Gdiplus::GdiplusBase::operator new 函数不接受3 个参数问题的处理

微软的 DEBUG_NEW 和 GDI 不匹配造成的。 方法: 注释掉: #ifdef _DEBUG #define new DEBUG_NEW #endif

Careercup - Microsoft面试题 - 5428361417457664

2014-05-11 03:37 题目链接 原题: You have three jars filled with candies. One jar is filled with banana candies, one jar is filled with lemon candies and one jar has a mix of both. All the jars are mislabelled (i.e. all the jars have wrong labels…

erpnext mysql_windows7+docker+erpnext部署

erpnext 为开源的erp系统,采用python语言编写,本次部署采用windows7docker方式进行,erpnext需要运行python、redis、mariadb或者mysql、nginx。参考资料docker镜像erpnext基于ubuntu操作系统构建。pull erpnext镜像docker pull lukptr/erpnex…

一种解决运行程序报“应用程序配置不正确”的问题

在我们开发工程中,可能有些情况下,不能在本机进行调试。这个时候我们一般会使用VM(vmware)建立一个虚拟机环境,然后把编译过的程序放在该虚拟机环境下执行调试。可是在某些情况下,不管我们编译的是debug还是release版本…

mysql group by自定义_mysql – GROUP BY和自定义顺序

我已经阅读了MySQL order by before group by的答案,但是将它应用于我的查询最后会在子查询中找到一个子查询,这是一个相当简单的案例,所以我想知道这是否可以简化:带有样本数据的模式为简洁起见,我省略了成员表中的其他字段.此外,在实际应用程序中还有更多表加入,但…

SharePoint 2010 RBS 安装和配置遇到的一个问题

在按照微软官方的文档按照配置的时候遇到下面问题&#xff1a; <Event xmlns"http://schemas.microsoft.com/win/2004/08/events/event"> - <System><Provider Name"RBS" /> <EventID Qualifiers"0">18639</EventID&…

X210烧写linux系统

准备&#xff1a; x210开发板USB OTG线串口线一张SD卡windows PC 说明&#xff1a; 开发板里面已经有系统&#xff0c;为wince系统&#xff0c;wince系统的调试串口是UART0准备烧写为linux系统&#xff0c;调试串口是UART2 操作及原理&#xff1a; 1. 破坏EMMC里面的bootloader…

mysql+ubunt+绿色安装_Mysql在ubuntu18上的安装及简单使用

数据相关行业都离不开数据库&#xff0c;mysql在ubuntu上的安装比在windows上安装简单多了&#xff0c;下面我记录一下自己成功安装的步骤和使用。1.安装软件首先更新一下源&#xff1a;sudo apt-get update然后安装mysql服务器端&#xff1a;sudo apt-get install mysql-serve…

小组互评(杨波组)

我组经过认真使用杨波组的软件有以下心得&#xff1a; 刘铸辉&#xff1a;界面挺美观的&#xff0c;功能实现也不错&#xff0c;对学生查询教室这方面实用性很好&#xff0c;就是功能布局界面有点小&#xff0c;希望下一个版本可以改善下 解凤娇&#xff1a;首先此软件的名字ji…

矩阵运算——平移,旋转,缩放

平时开发程序&#xff0c;免不了要对图像做各种变换处理。有的时候变换可能比较复杂&#xff0c;比如平移之后又旋转&#xff0c;旋转之后又平移&#xff0c;又缩放。 直接用公式计算&#xff0c;不但复杂&#xff0c;而且效率低下。这时可以借助变换矩阵和矩阵乘法&#xff0c…

sql字符串拼接_Mybatis的SqlSession执行sql过程

上一篇分析了SqlSession执行sql的过程&#xff0c;其中并没有分析sql是从哪里来的&#xff0c;今天就来仔细分析下。Sql来源从上一篇的最后一步执行sql那里倒推sql的来源&#xff0c;源码主要过程如下图&#xff1a;可以看到最后是通过BoundSql直接获取的sql&#xff0c;然后往…

KMP算法----java实现

字符串的模式匹配本文先实现最基本的回溯实现的已经KMP算法&#xff0c;BM算法后面博文继续实现。ps:本篇博文强烈参考了July大神的作品&#xff0c;地址http://blog.csdn.net/v_july_v/article/details/6545192.再次感激大神~~ 1.最基本的回溯实现字符串模式匹配 package com.…

深入浅出FSUIPC的作用以及使用方法

看此贴前您需要掌握的技能或知识&#xff1a;1. 有FSX或FS2004并正确安装了FSUIPC 2. 具备一定的C语言理解能力&#xff0c;C语言是一切高级语言的基础&#xff0c;单片机主要也用的C语言。 3. 掌握以下几种编程语言之一即可&#xff1a; VC\VB\C#\DELPHI\JAVA\CMFC\.NET版的C或…

PHP面向对象2之变量、方法

1 <?php2 /**3 * PHP面向对象基础 调用变量4 */5 class Computer{6 //字段成员的声明格式&#xff1a;修饰符 变量名[xxx]7 public $_name;//public 表示类外可以访问&#xff0c;为公有变量8 public $_model;9 } 10 //创建一个对象…

在 VC6 中使用 GdiPlus-安装

安装三部曲&#xff1a; Step1&#xff1a;下载 GdiPlus SDK 文件包&#xff1b; 链接地址1&#xff1a;http://www.codeguru.com/code/legacy/gdi/GDIPlus.zip 链接地址2&#xff1a;http://www.codersource.net/samples/mfcgdiplus.zip Step2&#xff1a;安装&#xff1b; &a…

算法训练|实现 Trie (前缀树)

208. 实现 Trie (前缀树) - 力扣&#xff08;LeetCode&#xff09; 总结&#xff1a; Trie&#xff0c;又称前缀树或字典树&#xff0c;是一棵有根树&#xff0c;其每个节点包含以下字段&#xff1a; 指向子节点的指针数组 children。对于本题而言&#xff0c;数组长度为 26…

ASP.NET MVC 的多国语系支持

ASP.NET MVC 的多国语系支持 posted on 2014-05-14 11:31 stickout 阅读(...) 评论(...) 编辑 收藏 转载于:https://www.cnblogs.com/linhui/p/3727364.html

airtest web 录制滑块_Airtest之web自动化(一)

Airtest之web自动化(一)[此文档有许多涉及到gif动图的地方&#xff0c;请全屏观看]了解Airtest&#xff1a;简介&#xff1a;Airtest是由网易团队开发的一款自动化框架&#xff0c;前期运用与游戏测试(通过截图识别)&#xff0c;后来又被运用到安卓测试以及web测试。这款自动化…

error C2065: 'ULONG_PTR' : undeclared identifier

处理方法: 把#define ULONG_PTR ULONG 加到 stdafx文件 靠前面的位置 原因&#xff1a; Visual C 6.0 开发环境, gdi 的头文件和库文件并没有被包含在环境中, 需要您手工安装 VC6 中没有 ULONG_PTR 类型

重构价格日历

重构价格日历转载于:https://www.cnblogs.com/usual2013blog/p/3728655.html