SequenceFile文件

    SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。目前,也有不少人在该文件的基础之上提出了一些HDFS中小文件存储的解决方案,他们的基本思路就是将小文件进行合并成一个大文件,同时对这些小文件的位置信息构建索引。不过,这类解决方案还涉及到Hadoop的另一种文件格式——MapFile文件。SequenceFile文件并不保证其存储的key-value数据是按照key的某个顺序存储的,同时不支持append操作。

      在SequenceFile文件中,每一个key-value被看做是一条记录(Record),因此基于Record的压缩策略,SequenceFile文件可支持三种压缩类型(SequenceFile.CompressionType):

NONE: 对records不进行压缩;

RECORD: 仅压缩每一个record中的value值;

BLOCK: 将一个block中的所有records压缩在一起;

那么,基于这三种压缩类型,Hadoop提供了对应的三种类型的Writer:

SequenceFile.Writer  写入时不压缩任何的key-value对(Record);

 

[java] view plaincopy
  1. public static class Writer implements java.io.Closeable {  
  2.   
  3. ...  
  4.    //初始化Writer  
  5.    void init(Path name, Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, CompressionCodec codec, Metadata metadata) throws IOException {  
  6.       this.conf = conf;  
  7.       this.out = out;  
  8.       this.keyClass = keyClass;  
  9.       this.valClass = valClass;  
  10.       this.compress = compress;  
  11.       this.codec = codec;  
  12.       this.metadata = metadata;  
  13.         
  14.       //创建非压缩的对象序列化器  
  15.       SerializationFactory serializationFactory = new SerializationFactory(conf);  
  16.       this.keySerializer = serializationFactory.getSerializer(keyClass);  
  17.       this.keySerializer.open(buffer);  
  18.       this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);  
  19.       this.uncompressedValSerializer.open(buffer);  
  20.         
  21.       //创建可压缩的对象序列化器  
  22.       if (this.codec != null) {  
  23.         ReflectionUtils.setConf(this.codec, this.conf);  
  24.         this.compressor = CodecPool.getCompressor(this.codec);  
  25.         this.deflateFilter = this.codec.createOutputStream(buffer, compressor);  
  26.         this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));  
  27.         this.compressedValSerializer = serializationFactory.getSerializer(valClass);  
  28.         this.compressedValSerializer.open(deflateOut);  
  29.       }  
  30.     }  
  31.       
  32.   
  33.   //添加一条记录(key-value,对象值需要序列化)  
  34.   public synchronized void append(Object key, Object val) throws IOException {  
  35.       if (key.getClass() != keyClass)  
  36.         throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass);  
  37.         
  38.       if (val.getClass() != valClass)  
  39.         throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass);  
  40.   
  41.       buffer.reset();  
  42.   
  43.       //序列化key(将key转化为二进制数组),并写入缓存buffer中  
  44.       keySerializer.serialize(key);  
  45.       int keyLength = buffer.getLength();  
  46.       if (keyLength < 0)  
  47.         throw new IOException("negative length keys not allowed: " + key);  
  48.   
  49.       //compress在初始化是被置为false   
  50.       if (compress) {  
  51.         deflateFilter.resetState();  
  52.         compressedValSerializer.serialize(val);  
  53.         deflateOut.flush();  
  54.         deflateFilter.finish();  
  55.       } else {  
  56.         //序列化value值(不压缩),并将其写入缓存buffer中  
  57.         uncompressedValSerializer.serialize(val);  
  58.       }  
  59.   
  60.       //将这条记录写入文件流  
  61.       checkAndWriteSync();                                // sync  
  62.       out.writeInt(buffer.getLength());                   // total record length  
  63.       out.writeInt(keyLength);                            // key portion length  
  64.       out.write(buffer.getData(), 0, buffer.getLength()); // data  
  65.     }  
  66.   
  67.     //添加一条记录(key-value,二进制值)  
  68.     public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {  
  69.       if (keyLength < 0)  
  70.         throw new IOException("negative length keys not allowed: " + keyLength);  
  71.   
  72.       int valLength = val.getSize();  
  73.   
  74.       checkAndWriteSync();  
  75.         
  76.       //直接将key-value写入文件流  
  77.       out.writeInt(keyLength+valLength);          // total record length  
  78.       out.writeInt(keyLength);                    // key portion length  
  79.       out.write(keyData, keyOffset, keyLength);   // key  
  80.       val.writeUncompressedBytes(out);            // value  
  81.     }  
  82.   
  83. ...  
  84.   
  85. }  

 

SequenceFile.RecordCompressWriter写入时只压缩key-value对(Record)中的value;

 

[java] view plaincopy
  1. static class RecordCompressWriter extends Writer {  
  2. ...  
  3.   
  4.    public synchronized void append(Object key, Object val) throws IOException {  
  5.       if (key.getClass() != keyClass)  
  6.         throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass);  
  7.         
  8.       if (val.getClass() != valClass)  
  9.         throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass);  
  10.   
  11.       buffer.reset();  
  12.   
  13.       //序列化key(将key转化为二进制数组),并写入缓存buffer中  
  14.       keySerializer.serialize(key);  
  15.       int keyLength = buffer.getLength();  
  16.       if (keyLength < 0)  
  17.         throw new IOException("negative length keys not allowed: " + key);  
  18.   
  19.       //序列化value值(不压缩),并将其写入缓存buffer中  
  20.       deflateFilter.resetState();  
  21.       compressedValSerializer.serialize(val);  
  22.       deflateOut.flush();  
  23.       deflateFilter.finish();  
  24.   
  25.       //将这条记录写入文件流  
  26.       checkAndWriteSync();                                // sync  
  27.       out.writeInt(buffer.getLength());                   // total record length  
  28.       out.writeInt(keyLength);                            // key portion length  
  29.       out.write(buffer.getData(), 0, buffer.getLength()); // data  
  30.     }  
  31.   
  32.     /** 添加一条记录(key-value,二进制值,value已压缩) */  
  33.     public synchronized void appendRaw(byte[] keyData, int keyOffset,  
  34.         int keyLength, ValueBytes val) throws IOException {  
  35.   
  36.       if (keyLength < 0)  
  37.         throw new IOException("negative length keys not allowed: " + keyLength);  
  38.   
  39.       int valLength = val.getSize();  
  40.         
  41.       checkAndWriteSync();                        // sync  
  42.       out.writeInt(keyLength+valLength);          // total record length  
  43.       out.writeInt(keyLength);                    // key portion length  
  44.       out.write(keyData, keyOffset, keyLength);   // 'key' data  
  45.       val.writeCompressedBytes(out);              // 'value' data  
  46.     }  
  47.       
  48.   } // RecordCompressionWriter  
  49.   
  50.   
  51. ...  
  52. }  

SequenceFile.BlockCompressWriter 写入时将一批key-value对(Record)压缩成一个Block;

 

[java] view plaincopy
  1. static class BlockCompressWriter extends Writer {  
  2. ...  
  3.   
  4.    void init(int compressionBlockSize) throws IOException {  
  5.       this.compressionBlockSize = compressionBlockSize;  
  6.       keySerializer.close();  
  7.       keySerializer.open(keyBuffer);  
  8.       uncompressedValSerializer.close();  
  9.       uncompressedValSerializer.open(valBuffer);  
  10.     }  
  11.       
  12.     /** Workhorse to check and write out compressed data/lengths */  
  13.     private synchronized void writeBuffer(DataOutputBuffer uncompressedDataBuffer) throws IOException {  
  14.       deflateFilter.resetState();  
  15.       buffer.reset();  
  16.       deflateOut.write(uncompressedDataBuffer.getData(), 0, uncompressedDataBuffer.getLength());  
  17.       deflateOut.flush();  
  18.       deflateFilter.finish();  
  19.         
  20.       WritableUtils.writeVInt(out, buffer.getLength());  
  21.       out.write(buffer.getData(), 0, buffer.getLength());  
  22.     }  
  23.       
  24.     /** Compress and flush contents to dfs */  
  25.     public synchronized void sync() throws IOException {  
  26.       if (noBufferedRecords > 0) {  
  27.         super.sync();  
  28.           
  29.         // No. of records  
  30.         WritableUtils.writeVInt(out, noBufferedRecords);  
  31.           
  32.         // Write 'keys' and lengths  
  33.         writeBuffer(keyLenBuffer);  
  34.         writeBuffer(keyBuffer);  
  35.           
  36.         // Write 'values' and lengths  
  37.         writeBuffer(valLenBuffer);  
  38.         writeBuffer(valBuffer);  
  39.           
  40.         // Flush the file-stream  
  41.         out.flush();  
  42.           
  43.         // Reset internal states  
  44.         keyLenBuffer.reset();  
  45.         keyBuffer.reset();  
  46.         valLenBuffer.reset();  
  47.         valBuffer.reset();  
  48.         noBufferedRecords = 0;  
  49.       }  
  50.         
  51.     }  
  52.   
  53.   
  54.    //添加一条记录(key-value,对象值需要序列化)  
  55.    public synchronized void append(Object key, Object val) throws IOException {  
  56.       if (key.getClass() != keyClass)  
  57.         throw new IOException("wrong key class: "+key+" is not "+keyClass);  
  58.         
  59.       if (val.getClass() != valClass)  
  60.         throw new IOException("wrong value class: "+val+" is not "+valClass);  
  61.   
  62.       //序列化key(将key转化为二进制数组)(未压缩),并写入缓存keyBuffer中  
  63.       int oldKeyLength = keyBuffer.getLength();  
  64.       keySerializer.serialize(key);  
  65.       int keyLength = keyBuffer.getLength() - oldKeyLength;  
  66.       if (keyLength < 0)  
  67.         throw new IOException("negative length keys not allowed: " + key);  
  68.       WritableUtils.writeVInt(keyLenBuffer, keyLength);  
  69.   
  70.       //序列化value(将value转化为二进制数组)(未压缩),并写入缓存valBuffer中  
  71.       int oldValLength = valBuffer.getLength();  
  72.       uncompressedValSerializer.serialize(val);  
  73.       int valLength = valBuffer.getLength() - oldValLength;  
  74.       WritableUtils.writeVInt(valLenBuffer, valLength);  
  75.         
  76.       // Added another key/value pair  
  77.       ++noBufferedRecords;  
  78.         
  79.       // Compress and flush?  
  80.       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();  
  81.       //block已满,可将整个block进行压缩并写入文件流  
  82.       if (currentBlockSize >= compressionBlockSize) {  
  83.         sync();  
  84.       }  
  85.     }  
  86.       
  87.     /**添加一条记录(key-value,二进制值,value已压缩). */  
  88.     public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {  
  89.         
  90.       if (keyLength < 0)  
  91.         throw new IOException("negative length keys not allowed");  
  92.   
  93.       int valLength = val.getSize();  
  94.         
  95.       // Save key/value data in relevant buffers  
  96.       WritableUtils.writeVInt(keyLenBuffer, keyLength);  
  97.       keyBuffer.write(keyData, keyOffset, keyLength);  
  98.       WritableUtils.writeVInt(valLenBuffer, valLength);  
  99.       val.writeUncompressedBytes(valBuffer);  
  100.   
  101.       // Added another key/value pair  
  102.       ++noBufferedRecords;  
  103.   
  104.       // Compress and flush?  
  105.       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();   
  106.       if (currentBlockSize >= compressionBlockSize) {  
  107.         sync();  
  108.       }  
  109.     }  
  110.       
  111.   } // RecordCompressionWriter  
  112.   
  113.   
  114. ...  
  115. }  

     源码中,block的大小compressionBlockSize默认值为1000000,也可通过配置参数io.seqfile.compress.blocksize来指定。

 

   根据三种压缩算法,共有三种类型的SequenceFile文件格式:

1). Uncompressed SequenceFile

    

 

 

2). Record-Compressed SequenceFile

3). Block-Compressed SequenceFile

 

转载于:https://www.cnblogs.com/mfryf/p/7072446.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/468332.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

win10右键闪退到桌面_WIN10设置闪退,桌面右键个性化显示设置等均无效

开始菜单点击设置&#xff0c;会报错&#xff0c;详细信息记不清了&#xff0c;提示路径C:\Windows\ImmersiveControlPanel\SystemSettings.exe&#xff0c;使用fix it修复工具无效尝试更新系统&#xff0c;更新至最新版本后仍然存在问题事件管理器中找到了这个错误应用程序名称…

Linux 内核红黑树分析

Android binder 内核实现是用红黑树的&#xff0c;理解红黑树我觉得是每一个Linux er的重中之重&#xff0c;感谢格子森同学的投稿&#xff0c;周末愉快。内核版本为 linux4.2.1 本文主要从红黑树的代码实现入手&#xff0c;来讨论linux内核中是如何实现红黑树的(主要是插入和删…

postgresql数据库安装及简单操作

自从MySQL被Oracle收购以后&#xff0c;PostgreSQL逐渐成为开源关系型数据库的首选。 本文介绍PostgreSQL的安装和基本用法&#xff0c;供初次使用者上手。以下内容基于Debian操作系统&#xff0c;其他操作系统实在没有精力兼顾&#xff0c;但是大部分内容应该普遍适用。 一、安…

周末随想,野路子

焦虑不知道是不是因为科技太发达的原因&#xff0c;晚上睡觉之前总是要看看手机&#xff0c;现在写公众号之后&#xff0c;也经常有读者问问题&#xff0c;总是担心错过哪条消息&#xff0c;所以时刻想看手机&#xff0c;而且因为太过于焦虑的原因&#xff0c;我把微信设置为静…

sizeof你真的弄明白了吗?

sizeof基础在C语言中&#xff0c;sizeof是一个操作符&#xff08;operator&#xff09;&#xff0c;而不是函数&#xff01;其用于判断数据类型或者表达式长度&#xff08;所占的内存字节数&#xff09;。其有两种表达形式&#xff1a;&#xff08;1&#xff09;sizeof(类型说明…

一道90%都会做错的指针题

今天&#xff0c;在我们的一个小群里&#xff0c;一个同学发了一道题目给我看&#xff0c;这道题目应该是C语言面试的一股清流了&#xff0c;各种招聘笔试上都可以看到&#xff0c;我试着发到我的大群里去&#xff0c;发现有人对这个理解不是很深刻&#xff0c;所以再发出来&am…

python调用arcgis_arcgis python 调用工具两种两种方法

原博文 2019-09-20 11:26 − arcpy.Select_analysis("p","kk") arcpy.analysis.Select("p","kk1") ... 相关推荐 2019-12-18 20:28 − import time import wmi, zlib def get_cpu_info(): tmpdict {} tmpdict["CpuCores"] …

ibatis mysql 同时删多个表报错_MySQL中Multiple primary key defined报错的解决办法

MySQL中Multiple primary key defined报错的解决办法创建主键可以有两种方式&#xff1a;create table 表名(字段名 类型&#xff0c;字段名 类型&#xff0c;……primary key(name));或者是create table 表名(字段名 类型 primary key&#xff0c;字段名 类型&#xff0c;………

Android ANR视角InputDispatcher

作者&#xff1a;王小二前言有好多人向我咨询过Input ANR问题&#xff0c;说实话&#xff0c;我也是一直无法彻底的解释清楚&#xff0c;我下决心要彻底搞懂这块知识点。话不多说先上图一个event的正常流程InputReader线程1.InputReader线程一旦发现有新的event&#xff0c;判断…

频繁跳槽,这谁顶得住~

最近应该是校招的时候&#xff0c;相信很多人都面临择业的问题&#xff0c;正念同学的文章&#xff0c;记录了自己一个嵌入式工程师这几年找工作换工作的经历。加我好友的都知道&#xff0c;我这几天发了一个朋友圈&#xff0c;说不要乱跳槽&#xff0c;我想表达的是&#xff0…

java script object_javascript Object与Array用法

引用类型&#xff1a;引用类型是一种数据结构&#xff0c;用于将数据和功能组织在一起。引用类型的值是引用类型的一个实例。一、ObjectECMAScript中的对象其实就是一组数据和功能的结合。Object类型其实是所有它的实例的基础&#xff0c;换句话说&#xff0c;Object类型所有具…

Linux内核编程广泛使用的前向声明(Forward Declaration)

前向声明编程定律先强调一点&#xff1a;在一切可能的场景&#xff0c;尽可能地使用前向声明(Forward Declaration)。这符合信息隐蔽的原则。一个例子regmap那么前向声明究竟是个什么鬼&#xff1f;在内核写代码和看代码的童鞋&#xff0c;经常发现Linux内核里面充斥着这样的代…

Top 10 Project Management Software

转载于:https://www.cnblogs.com/shy1766IT/p/7082910.html

java 二维数组 floyd_Floyd算法(一)之 C语言详解

本章介绍弗洛伊德算法。和以往一样&#xff0c;本文会先对弗洛伊德算法的理论论知识进行介绍&#xff0c;然后给出C语言的实现。后续再分别给出C和Java版本的实现。弗洛伊德算法介绍和Dijkstra算法一样&#xff0c;弗洛伊德(Floyd)算法也是一种用于寻找给定的加权图中顶点间最短…

南拳北腿

昨晚&#xff0c;熬夜看了篮球综艺节目&#xff0c;《我要打篮球》&#xff0c;实在话&#xff0c;我是林书豪的球迷&#xff0c;所以我肯定是希望林书豪球队能获胜&#xff0c;最后也如我所愿&#xff0c;两场3v3&#xff0c;电光飞侠都是在处于被动的情况下完成自我救赎&…

C语言const 关键字

面试的时候&#xff0c;应该有遇到const相关的&#xff0c;毕竟也是学习中的一个知识点&#xff0c;看完我们这篇文章&#xff0c;我觉得你应该可以在面试中完完全全的吃透const这个点。const和变量const uint32_t hello 3;编译的时候&#xff0c;编译器就知道了 hello 这个变…

pandas 第一行_Pandas数据预处理相关经验

在这里记录一些平常用的pandas操作以供参考。学习相关操作的最好方法还是找官方的文档最好&#xff0c;否则就会产生百度1小时&#xff0c;查文档3分钟的尴尬处境&#xff0c;之前为了找python里类似 in 的操作搜了半天资料也没找到&#xff0c;结果文档里就是 isin 函数就好了…

单机 amp; 弱联网手游 防破解、金币改动 简单措施

单机 &amp; 弱联网手游 防破解、金币改动 简单措施 手游经常使用破解方法 对于一个弱联网或者单机游戏&#xff0c;能够从下面方面去破解&#xff1a; 1、找得到存档文件的&#xff0c;直接破解改动存档文件。 2、找不到存档文件&#xff0c;就在游戏执行时借助一些软件来改…

Linux 内核通知链和例程代码

概念大多数内核子系统都是相互独立的&#xff0c;因此某个子系统可能对其它子系统产生的事件感兴趣。为了满足这个需求&#xff0c;也即是让某个子系统在发生某个事件时通知其它的子系统&#xff0c;Linux内核提供了通知链的机制。通知链表只能够在内核的子系统之间使用&#x…

faster rcnn resnet_RCNN系列、Fast-RCNN、Faster-RCNN、R-FCN检测模型对比

RCNN系列、Fast-RCNN、Faster-RCNN、R-FCN检测模型对比一&#xff0e;RCNN问题一&#xff1a;速度经典的目标检测算法使用滑动窗法依次判断所有可能的区域。本文则预先提取一系列较可能是物体的候选区域&#xff0c;之后仅在这些候选区域上提取特征&#xff0c;进行判断。问题二…