Flume数据传输事务分析[转]

本文基于ThriftSource,MemoryChannel,HdfsSink三个组件,对Flume数据传输的事务进行分析,如果使用的是其他组件,Flume事务具体的处理方式将会不同。一般情况下,用MemoryChannel就好了,我们公司用的就是这个,FileChannel速度慢,虽然提供日志级别的数据恢复,但是一般情况下,不断电MemoryChannel是不会丢数据的。

Flume提供事物操作,保证用户的数据的可靠性,主要体现在:

  • 数据在传输到下个节点时(通常是批量数据),如果接收节点出现异常,比如网络异常,则回滚这一批数据。因此有可能导致数据重发
  • 同个节点内,Source写入数据到Channel,数据在一个批次内的数据出现异常,则不写入到Channel。已接收到的部分数据直接抛弃,靠上一个节点重发数据。

编程模型

Flume在对Channel进行Put和Take操作的时候,必须要用事物包住,比如:

Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
//事物开始
txn.begin();
try {Event eventToStage = EventBuilder.withBody("Hello Flume!",Charset.forName("UTF-8")); //往临时缓冲区Put数据 ch.put(eventToStage); //或者ch.take() //将这些数据提交到channel中 txn.commit(); } catch (Throwable t) { txn.rollback(); if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } 

Put事务流程

Put事务可以分为以下阶段:

  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并。
  • doRollback:channel内存队列空间不足,抛弃数据

我们从Source数据接收到写入Channel这个过程对Put事物进行分析。


ThriftSource会spawn多个Worker线程(ThriftSourceHandler)去处理数据,Worker处理数据的接口,我们只看batch批量处理这个接口:

    @Overridepublic Status appendBatch(List<ThriftFlumeEvent> events) throws TException {List<Event> flumeEvents = Lists.newArrayList();for(ThriftFlumeEvent event : events) {flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders())); } //ChannelProcessor,在Source初始化的时候传进来.将数据写入对应的Channel getChannelProcessor().processEventBatch(flumeEvents); ... return Status.OK; } 

事务逻辑都在processEventBatch这个方法里:

public void processEventBatch(List<Event> events) {...//预处理每行数据,有人用来做ETL嘛events = interceptorChain.intercept(events);...//分类数据,划分不同的channel集合对应的数据 // Process required channels Transaction tx = reqChannel.getTransaction(); ... //事务开始,tx即MemoryTransaction类实例 tx.begin(); List<Event> batch = reqChannelQueue.get(reqChannel); for (Event event : batch) { // 这个put操作实际调用的是transaction.doPut reqChannel.put(event); } //提交,将数据写入Channel的队列中 tx.commit(); } catch (Throwable t) { //回滚 tx.rollback(); ... } } ... } 

每个Worker线程都拥有一个Transaction实例,保存在Channel(BasicChannelSemantics)里的ThreadLocal变量currentTransaction.

那么,事务到底做了什么?

实际上,Transaction实例包含两个双向阻塞队列LinkedBlockingDeque(感觉没必要用双向队列,每个线程写自己的putList,又不是多个线程?),分别为:

  • putList
  • takeList

对于Put事物操作,当然是只用到putList了。putList就是一个临时的缓冲区,数据会先put到putList,最后由commit方法会检查channel是否有足够的缓冲区,有则合并到channel的队列。

channel.put -> transaction.doPut:

    protected void doPut(Event event) throws InterruptedException {//计算数据字节大小 int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); //写入临时缓冲区putList if (!putList.offer(event)) { throw new ChannelException( "Put queue for MemoryTransaction of capacity " + putList.size() + " full, consider committing more frequently, " + "increasing capacity or increasing thread count"); } putByteCounter += eventByteSize; } 

transaction.commit:

@Overrideprotected void doCommit() throws InterruptedException { //检查channel的队列剩余大小是否足够 ... int puts = putList.size(); ... synchronized(queueLock) { if(puts > 0 ) { while(!putList.isEmpty()) { //写入到channel的队列 if(!queue.offer(putList.removeFirst())) { throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); } } } //清除临时队列 putList.clear(); ... } ... } 

如果在事务期间出现异常,比如channel剩余空间不足,则rollback:

@Overrideprotected void doRollback() {...//抛弃数据,没合并到channel的内存队列 putList.clear(); ... } 

Take事务

Take事务分为以下阶段:

  • doTake:先将数据取到临时缓冲区takeList
  • 将数据发送到下一个节点
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列。


Sink其实是由SinkRunner线程调用Sink.process方法来了处理数据的。我们从HdfsEventSink的process方法说起,Sink类都有个process方法,用来处理传输数据的逻辑。:

public Status process() throws EventDeliveryException {...Transaction transaction = channel.getTransaction();...//事务开始transaction.begin();...for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { //take数据到临时缓冲区,实际调用的是transaction.doTake Event event = channel.take(); if (event == null) { break; } ... //写数据到HDFS bucketWriter.append(event); ... // flush all pending buckets before committing the transaction for (BucketWriter bucketWriter : writers) { bucketWriter.flush(); } //commit transaction.commit(); ... } catch (IOException eIO) { transaction.rollback(); ... } finally { transaction.close(); } } 

大致流程图:

接着看看channel.take,作用是将数据放到临时缓冲区,实际调用的是transaction.doTake:

protected Event doTake() throws InterruptedException {...//从channel内存队列取数据synchronized(queueLock) {event = queue.poll();}...//将数据放到临时缓冲区 takeList.put(event); ... return event; } 

接着,HDFS写线程bucketWriter将take到的数据写到HDFS,如果批数据都写完了,则要commit了:

protected void doCommit() throws InterruptedException {...takeList.clear();...
}

很简单,其实就是清空takeList而已。如果bucketWriter在写数据到HDFS的时候出现异常,则要rollback:

protected void doRollback() {int takes = takeList.size();//检查内存队列空间大小,是否足够takeList写回去 synchronized(queueLock) { Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " + "queue to rollback takes. This should never happen, please report"); while(!takeList.isEmpty()) { queue.addFirst(takeList.removeLast()); } ... } ... }

转载于:https://www.cnblogs.com/whtydn/p/4384199.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/259134.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

yii2中的rules验证规则

2019独角兽企业重金招聘Python工程师标准>>> Rules验证规则&#xff1a;required : 必须值验证属性||CRequiredValidator 的别名, 确保了特性不为空.[[字段名],required,requiredValue>必填值,message>提示信息];email : 邮箱验证||CEmailValidator 的别名,确…

opengl 如何加阴影_动漫嘴唇厚涂如何绘制?厚涂嘴唇正确画法

动漫嘴唇厚涂如何绘制&#xff1f;厚涂嘴唇正确画法&#xff01;嘴巴怎么画&#xff1f;画嘴巴真的很考验一个画师功力&#xff0c;好看的嘴巴生动而丰满&#xff0c;可以给整幅画作添上亮点&#xff0c;而画的不好的嘴巴呢&#xff0c;就容易把画面整体的风格打破。那么零基础…

详解JMeter函数和变量

详解JMeter函数和变量&#xff08;1&#xff09; JMeter函数可以被认为是某种特殊的变量&#xff0c;它们可以被采样器或者其他测试元件所引用。函数调用的语法如下&#xff1a; ${__functionName(var1,var2,var3)} 其中&#xff0c;__functionName匹配被调用的函数名称。用圆括…

hdu 5199 map或二分或哈希

题目描述&#xff1a;给出n棵树的高度&#xff0c;每棵树上都站着一只鸟&#xff0c;枪手Jack站在最左边那棵树的左边对鸟进行射击&#xff0c;当Jack在高度为H的地方向右发射一颗子弹的时候&#xff0c;高度为H的树上的鸟儿就会掉落&#xff08;注&#xff1a;其他树上的鸟儿不…

数字电路实验怎么接线视频讲解_家庭影院中音箱、功放、投影机、4K播放机不知道怎么连接?手把手教你...

家庭影院中音箱、功放、投影机、4K播放机不知道怎么连接&#xff1f;手把手教你有不少用户收到从家庭影院器材之后&#xff0c;表示完全不会连接。翻看说明书也觉得头大&#xff0c;知识太多&#xff0c;然而却很难找到要点。今天主要跟大家讲讲如何连接音箱、功放、投影机和影…

.NET开发过程中的全文索引使用技巧之Solr

前言&#xff1a;相信许多人都听说过.net开发过程中基于Lucene.net实现的全文索引&#xff0c;而Solr是一个高性能&#xff0c;基于Lucene的全文搜索服务器。同时对其进行了扩展&#xff0c;提供了比Lucene更为丰富的查询语言&#xff0c;同时实现了可配置、可扩展并对查询性能…

auto.js停止所有线程_Java线程与并发编程实践:深入理解volatile和final变量

同步有两种属性&#xff1a;互斥性和可见性。synchronized关键字与两者都有关系。Java同时也提供了一种更弱的、仅仅包含可见性的同步形式&#xff0c;并且只以volatile关键字关联。假设你自己设计了一个停止线程的机制(因为无法使用Thread不安全的stop()方法))。清单1中Thread…

项目实例改编:利用structs2的action 实时显示图片、pdf和其他内容的框架抽取。(转)...

转自&#xff1a;http://www.verydemo.com/demo_c167_i1382.html 针对&#xff1a;预览文件&#xff08;图片&#xff0c;PDF&#xff09;文件来源为action中的inputStream 重点&#xff1a; structs2的action的配置 action的写法和结果类型 resulttype的写法 网页上实…

开始Go开发之旅-Golang架构师之路系列实战

2019独角兽企业重金招聘Python工程师标准>>> 作者: gomaster.me(冯琪超) 系列:Golang架构师之路 巧妇难做无米之炊&#xff0c;golang sdk就是gopher的大米 下载golang 点击 官网下载golang sdk 根据不同系统&#xff0c;官网下载链接会选择相应的平台进行链接跳转&…

安卓9.0官方系统升级包_华为、荣耀公布可升级安卓10.0机型,你的手机在名单之内吗?...

在近两个月以前&#xff0c;美方将华为关进了小黑屋&#xff0c;随后谷歌也将华为旗下的机型移出了安卓10.0升级名单&#xff0c;这一波操作之后&#xff0c;引起了不小的“恐慌”&#xff0c;许多华为用户也在担心是否还能正常使用安卓系统服务&#xff0c;不过&#xff0c;让…

2. Mysql数据库的入门知识

2. Mysql数据库的入门知识 &#xff08;1&#xff09;打开Windows系统提供的服务查看相应的服务。 &#xff08;2&#xff09;在Windows任务管理器的进程中查看 &#xff08;3&#xff09;使用命令行管理windows的Mysql数据库服务。 Net start 服务名 Net stop 服务名 mysql -h…

nginx php-fpm 输出php错误日志(转)

nginx是一个web服务器&#xff0c;因此nginx的access日志只有对访问页面的记录&#xff0c;不会有php 的 error log信息。 nginx把对php的请求发给php-fpm fastcgi进程来处理&#xff0c;默认的php-fpm只会输出php-fpm的错误信息&#xff0c;在php-fpm的errors log里也看不到ph…

win7优化设置_win7蓝牙怎么打开?

当电脑需要连接蓝牙设备的时候&#xff0c;就需要打开蓝牙设置才行。鉴于一些win7的用户还不知道蓝牙功能在哪&#xff0c;win7蓝牙怎么打开&#xff0c;故系统圣地分享本篇教程。1、win7蓝牙怎么打开?首先要你的电脑支持蓝牙功能。如果你的电脑有蓝牙功能的话那么在电脑的右下…

Doxygen从零学起———安装和配置

Doxygen可以为多种语言生成说明文档&#xff08;从程序的源代码中提取其中按照约定格式写的注释中提取信息&#xff09; 例如C, Objective-C, C#, C, PHP, Python, IDL (Corba, Microsoft, and UNO/OpenOffice flavors), Fortran, VHDL, Tcl, D ,从这期开始&#xff0c;我将系…

Java中的ClassLoader

Java中类的加载过程&#xff08;如Dog类&#xff09;&#xff1a; 通过类型信息定位Dog.class文件。载入Dog.class文件&#xff0c;创建相应的Class对象。执行父类的静态字段定义时初始化语句和父类的静态初始化块。执行子类的静态字段定义时初始化语句和子类的静态初始化块。当…

excel删除无尽空白行_excel如何批量删除空白行 巧用 ctrl+G 只需1秒 最常用的技巧...

工作中我们使用excel通常都会遇到这种情况&#xff0c;就是表格中有很多多余的空行。我们需要把多余的空行删除。 如果空行只有一两行的话&#xff0c;可以把鼠标放在空白行上&#xff0c;然后点击鼠标右键&#xff0c;在弹出的菜单中选择删除菜单。 在弹出的删除确定窗口中&am…

DevOps的前世今生

2019独角兽企业重金招聘Python工程师标准>>> 目前在国外&#xff0c;互联网巨头如Google、Facebook、Amazon、LinkedIn、Netflix、Airbnb&#xff0c;传统软件公司如Adobe、IBM、Microsoft、SAP等&#xff0c;亦或是网络业务非核心企业如苹果、沃尔玛、索尼影视娱乐…

【转】最牛B的编码套路

最近&#xff0c;我大量阅读了Steve Yegge的文章。其中有一篇叫“Practicing Programming”&#xff08;练习编程&#xff09;&#xff0c;写成于2005年&#xff0c;读后令我惊讶不已&#xff1a; 与你所相信的恰恰相反&#xff0c;单纯地每天埋头于工作并不能算是真正意义上的…

ecshop 广告设置

最近公司准备做个商城&#xff0c;让我从JAVA转过去&#xff0c;好吧&#xff0c;先看下吧&#xff0c;反正也得做。接到手里的是一套已经成型的模板&#xff0c;但是二次开发必须得了解下机制、文件、响应、设置什么的&#xff0c;也是个新手&#xff0c;写点东西给后面更新的…

linux 信号_Linux信号机制

信号就是一条消息&#xff0c;通知进程系统中发生了什么事&#xff0c;每种信号都对应着某种系统事件。一般的底层硬件异常是由内核的异常处理程序处理的&#xff0c;它对用户进程来说是透明的。而信号机制&#xff0c;提供了一种方法通知用户进程发生了这些异常。例如&#xf…