用Java处理大文件

最近,我不得不处理一组包含逐笔历史汇率市场数据的文件,并很快意识到使用传统的InputStream都无法将它们读取到内存中,因为每个文件的大小都超过4 GB。 Emacs甚至无法打开它们。

在这种特殊情况下,我可以编写一个简单的bash脚本,将文件分成小块,然后像往常一样读取它们。 但是我不希望这样,因为二进制格式会使这种方法无效。

因此,正确处理此问题的方法是使用内存映射文件逐步处理数据区域。 内存映射文件的优点在于它们不消耗虚拟内存或分页空间,因为它们由磁盘上的文件数据支持。

Okey,让我们看一下这些文件并提取一些数据。 似乎它们包含带有逗号分隔字段的ASCII文本行。

格式: [currency-pair],[timestamp],[bid-price],[ask-price]

例如: EUR/USD,20120102 00:01:30.420,1.29451,1.2949

公平地说,我可以为该格式编写程序。 但是读取和解析文件是正交的概念。 因此,让我们退后一步,考虑一下可以在将来遇到类似问题时可以重用的通用设计。

问题归结为对一组以无限长字节数组编码的条目进行增量解码,而不会耗尽内存。 示例格式以逗号/行分隔的文本编码的事实与一般解决方案无关,因此很明显,需要解码器接口才能处理不同的格式。

同样,在处理完整个文件之前,无法解析每个条目并将其保留在内存中,因此我们需要一种方法来逐步移交可以在其他位置(磁盘或网络)写入的条目块,然后再进行垃圾回收。 迭代器是处理此要求的很好的抽象方法,因为它们的行为就像游标一样,这正是重点。 每次迭代都会转发文件指针,然后让我们对数据进行处理。

所以首先是Decoder接口。 这个想法是从MappedByteBuffer增量解码对象,或者如果缓冲区中没有对象,则返回null。

public interface Decoder<T> {public T decode(ByteBuffer buffer);
}

然后是实现IterableFileReader 。 每次迭代将处理下一个4096字节的数据,并使用Decoder将其Decoder为对象列表。 请注意, FileReader接受文件列表,这很不错,因为它允许遍历数据而无需担心跨文件聚合。 顺便说一下,对于较大的文件,4096个字节的块可能会有点小。

public class FileReader implements Iterable<List<T>> {private static final long CHUNK_SIZE = 4096;private final Decoder<T> decoder;private Iterator<File> files;private FileReader(Decoder<T> decoder, File... files) {this(decoder, Arrays.asList(files));}private FileReader(Decoder<T> decoder, List<File> files) {this.files = files.iterator();this.decoder = decoder;}public static <T> FileReader<T> create(Decoder<T> decoder, List<File> files) {return new FileReader<T>(decoder, files);}public static <T> FileReader<T> create(Decoder<T> decoder, File... files) {return new FileReader<T>(decoder, files);}@Overridepublic Iterator<List<T>> iterator() {return new Iterator<List<T>>() {private List<T> entries;private long chunkPos = 0;private MappedByteBuffer buffer;private FileChannel channel;@Overridepublic boolean hasNext() {if (buffer == null || !buffer.hasRemaining()) {buffer = nextBuffer(chunkPos);if (buffer == null) {return false;}}T result = null;while ((result = decoder.decode(buffer)) != null) {if (entries == null) {entries = new ArrayList<T>();}entries.add(result);}// set next MappedByteBuffer chunkchunkPos += buffer.position();buffer = null;if (entries != null) {return true;} else {Closeables.closeQuietly(channel);return false;}}private MappedByteBuffer nextBuffer(long position) {try {if (channel == null || channel.size() == position) {if (channel != null) {Closeables.closeQuietly(channel);channel = null;}if (files.hasNext()) {File file = files.next();channel = new RandomAccessFile(file, "r").getChannel();chunkPos = 0;position = 0;} else {return null;}}long chunkSize = CHUNK_SIZE;if (channel.size() - position < chunkSize) {chunkSize = channel.size() - position;}return channel.map(FileChannel.MapMode.READ_ONLY, chunkPos, chunkSize);} catch (IOException e) {Closeables.closeQuietly(channel);throw new RuntimeException(e);}}@Overridepublic List<T> next() {List<T> res = entries;entries = null;return res;}@Overridepublic void remove() {throw new UnsupportedOperationException();}};}
}

下一个任务是编写一个Decoder ,我决定为任何逗号分隔的文本文件格式实现一个通用的TextRowDecoder ,接受每行的字段数和一个字段定界符,并返回一个字节数组数组。 然后, TextRowDecoder可以由可能处理不同字符集的格式特定的解码器重用。

public class TextRowDecoder implements Decoder<byte[][]> {private static final byte LF = 10;private final int numFields;private final byte delimiter;public TextRowDecoder(int numFields, byte delimiter) {this.numFields = numFields;this.delimiter = delimiter;}@Overridepublic byte[][] decode(ByteBuffer buffer) {int lineStartPos = buffer.position();int limit = buffer.limit();while (buffer.hasRemaining()) {byte b = buffer.get();if (b == LF) { // reached line feed so parse lineint lineEndPos = buffer.position();// set positions for one row duplicationif (buffer.limit() < lineEndPos + 1) {buffer.position(lineStartPos).limit(lineEndPos);} else {buffer.position(lineStartPos).limit(lineEndPos + 1);}byte[][] entry = parseRow(buffer.duplicate());if (entry != null) {// reset main bufferbuffer.position(lineEndPos);buffer.limit(limit);// set start after LFlineStartPos = lineEndPos;}return entry;}}buffer.position(lineStartPos);return null;}public byte[][] parseRow(ByteBuffer buffer) {int fieldStartPos = buffer.position();int fieldEndPos = 0;int fieldNumber = 0;byte[][] fields = new byte[numFields][];while (buffer.hasRemaining()) {byte b = buffer.get();if (b == delimiter || b == LF) {fieldEndPos = buffer.position();// save limitint limit = buffer.limit();// set positions for one row duplicationbuffer.position(fieldStartPos).limit(fieldEndPos);fields[fieldNumber] = parseField(buffer.duplicate(), fieldNumber, fieldEndPos - fieldStartPos - 1);fieldNumber++;// reset main bufferbuffer.position(fieldEndPos);buffer.limit(limit);// set start after LFfieldStartPos = fieldEndPos;}if (fieldNumber == numFields) {return fields;}}return null;}private byte[] parseField(ByteBuffer buffer, int pos, int length) {byte[] field = new byte[length];for (int i = 0; i < field.length; i++) {field[i] = buffer.get();}return field;}
}

这就是文件的处理方式。 每个列表包含从单个缓冲区解码的元素,每个元素都是由TextRowDecoder指定的字节数组的数组。

TextRowDecoder decoder = new TextRowDecoder(4, comma);
FileReader<byte[][]> reader = FileReader.create(decoder, file.listFiles());
for (List<byte[][]> chunk : reader) {// do something with each chunk
}

我们可以在这里停下来,但还有其他要求。 每行都包含一个时间戳记,并且必须按时间段而不是按天或按小时对缓冲区进行分组。 我仍然想遍历每个批次,因此立即的反应是为FileReader创建一个Iterable包装器,以实现此行为。 另外一个细节是,每个元素必须通过实现PeriodEntries Timestamped接口(此处未显示)为PeriodEntries提供其时间戳。

public class PeriodEntries<T extends Timestamped> implements Iterable<List<T>> {private final Iterator<List<T extends Timestamped>> entriesIt;private final long interval;private PeriodEntries(Iterable<List<T>> entriesIt, long interval) {this.entriesIt = entriesIt.iterator();this.interval = interval;}public static <T extends Timestamped> PeriodEntries<T> create(Iterable<List<T>> entriesIt, long interval) {return new PeriodEntries<T>(entriesIt, interval);}@Overridepublic Iterator<List<T extends Timestamped>> iterator() {return new Iterator<List<T>>() {private Queue<List<T>> queue = new LinkedList<List<T>>();private long previous;private Iterator<T> entryIt;@Overridepublic boolean hasNext() {if (!advanceEntries()) {return false;}T entry =  entryIt.next();long time = normalizeInterval(entry);if (previous == 0) {previous = time;}if (queue.peek() == null) {List<T> group = new ArrayList<T>();queue.add(group);}while (previous == time) {queue.peek().add(entry);if (!advanceEntries()) {break;}entry = entryIt.next();time = normalizeInterval(entry);}previous = time;List<T> result = queue.peek();if (result == null || result.isEmpty()) {return false;}return true;}private boolean advanceEntries() {// if there are no rows leftif (entryIt == null || !entryIt.hasNext()) {// try get more rows if possibleif (entriesIt.hasNext()) {entryIt = entriesIt.next().iterator();return true;} else {// no more rowsreturn false;}}return true;}private long normalizeInterval(Timestamped entry) {long time = entry.getTime();int utcOffset = TimeZone.getDefault().getOffset(time);long utcTime = time + utcOffset;long elapsed = utcTime % interval;return time - elapsed;}@Overridepublic List<T> next() {return queue.poll();}@Overridepublic void remove() {throw new UnsupportedOperationException();}};}
}

引入此功能后,最终处理代码并没有太大变化,只有一个干净紧凑的for循环,不必关心跨文件,缓冲区和句点对元素进行分组。 PeriodEntries也足够灵活,可以管理间隔上的任何长度。

TrueFxDecoder decoder = new TrueFxDecoder();
FileReader<TrueFxData> reader = FileReader.create(decoder, file.listFiles());
long periodLength = TimeUnit.DAYS.toMillis(1);
PeriodEntries<TrueFxData> periods = PeriodEntries.create(reader, periodLength);for (List<TrueFxData> entries : periods) {// data for each dayfor (TrueFxData entry : entries) {// process each entry}
}

正如您可能意识到的那样,不可能用集合来解决这个问题。 选择迭代器是一项关键的设计决策,它能够解析TB级的数据而不会占用太多的堆空间。

参考: Deephacks博客上的JCG合作伙伴 Kristoffer Sjogren 使用Java处理大型文件 。

翻译自: https://www.javacodegeeks.com/2013/01/processing-huge-files-with-java.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/370070.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java IO(一):File类

1.File类简介 File类位于java.io包中。它面向文件层次级别操作、查看文件&#xff0c;而字节流、字符流操作数据时显然比之更底层。 学习File类包括以下几个重点&#xff1a;文件路径、文件分隔符、创建文件(目录)、删除文件(目录)、查看文件内容(输出目录内文件)、判断文件(是…

android listview 开发,android开发之ListView实现

今天又初步学习了一下ListView控件&#xff0c;看看效果如下&#xff1a;LisViewActivity.java源码&#xff1a;package com.jinhoward.UI_listview;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import android.os.Bundl…

input ios问题 小程序_微信小程序开发常见问题汇总

原标题&#xff1a;微信小程序开发常见问题汇总1、域名必须是https非https的域名不被微信小程序允许。2、input组件placeholder字体颜色卸载placeholder-class里面的color并不生效&#xff0c;需要写在placeholder-style里面就可以了。3、wx.navigateTo无法跳转到带tabbar的页面…

https://github.com/

https://github.com/ qq邮箱 转载于:https://www.cnblogs.com/chang1/p/7133251.html

Less 的用法

1. node.js node.js是一个前端的框架 自带一个包管理工具npm node.js 的安装 官网&#xff1a;http://nodejs.cn/ 在命令行检验是否安装成功 切换到项目目录&#xff0c;初始化了一个package.json文件 安装与卸载jQuery包&#xff08;例子&#xff09; 安装 卸载 安装淘宝…

浅谈springboot整合ganymed-ssh2远程访问linux

环境介绍 技术栈 springbootmybatis-plusmysqlganymed-ssh2 软件 版本 mysql 8 IDEA IntelliJ IDEA 2022.2.1 JDK 1.8 Spring Boot 2.7.13 mybatis-plus 3.5.3.2 SSH(远程连接工具)连接原理&#xff1a;ssh服务是一个守护进程(demon)&#xff0c;系统后台监听客户…

优化Neo4j Cypher查询

上周&#xff0c;我花了很多时间尝试使用实时系统中的数据来优化大约20个执行失败的Cypher查询&#xff08;36866ms至155575ms&#xff09;。 经过一番尝试和错误&#xff0c;以及来自Michael的大量投入&#xff0c;我能够大致确定对查询进行哪些操作才能使它们性能更好-最后&a…

python 多文件知识

对于一个大型的项目&#xff0c;会存在很多个py文件&#xff0c;本文记录与多文件有关的内容。 1. python 如何在一个.py文件中调用另一个.py文件的类 如果是在同一个 module中(也就是同一个py 文件里),直接用就可以如果在不同的module里,例如a.py里有 class A:b.py 里有 class…

android pick file,LFilePicker---文件选择利器,各种样式有它就够了

LFilePicker在 Android 开发中如果需要选择某个文件&#xff0c;可以直接调取系统的文件管理器进行选择&#xff0c;但是无法保证各个厂商的手机界面一致&#xff0c;而且解析Uri 还比较繁琐&#xff0c;如果还需要多选呢&#xff1f;需要文件类型过滤呢&#xff1f;老板说界面…

老笔记整理二:网页小问题汇总

最近有一些小问题。想在这里写出来。一是方便大家排错&#xff0c;再是自己也整理一下。 1。很傻的小问题。。。参数提交方式有一个应该是form而不是from。&#xff08;英语老师&#xff0c;我对不起你。。。&#xff09; 2。用超链接传参数&#xff0c;在&#xff1f;后面不能…

在JVM之下–类加载器

在许多开发人员中&#xff0c;类加载器是Java语言的底层&#xff0c;并且经常被忽略。 在ZeroTurnaround上 &#xff0c;我们的开发人员必须生活&#xff0c;呼吸&#xff0c;饮食&#xff0c;喝酒&#xff0c;并且几乎与类加载器保持亲密关系&#xff0c;才能生产JRebel技术&a…

matplotlib绘制饼状图

源自http://blog.csdn.net/skyli114/article/details/77508430?ticketST-41707-PzNbUDGt6R5KYl3TkWDg-passport.csdn.net pyplot使用plt.pie()来绘制饼图 1 import matplotlib.pyplot as plt 2 labels frogs, hogs, dogs, logs 3 sizes 15, 20, 45, 10 # [15,20,45,10…

自适应宽度元素单行文本省略用法探究

单行文本省略是现代网页设计中非常常用的技术&#xff0c;几乎每个站点都会用到。单行文本省略适用于显示摘要信息的场景&#xff0c;如列表标题、文章摘要等。在响应式开发中&#xff0c;自适应宽度元素单行文本省略容易失效不起作用&#xff0c;对网页开发这造成困扰。因此&a…

P3390 【模板】矩阵快速幂

题目背景 矩阵快速幂 题目描述 给定n*n的矩阵A&#xff0c;求A^k 输入输出格式 输入格式&#xff1a; 第一行&#xff0c;n,k 第2至n1行&#xff0c;每行n个数&#xff0c;第i1行第j个数表示矩阵第i行第j列的元素 输出格式&#xff1a; 输出A^k 共n行&#xff0c;每行n个数&…

c#精彩编程200例百度云_永安市教育局被授予“人工智能编程教育试验区”

11月28日&#xff0c;“第二届人工智能与机器人教育大会青少年人工智能与编程教育主题论坛”在厦门召开。永安市教育局被中国教育发展战略学会人工智能与机器人教育专委会授予“人工智能编程教育试验区”牌匾&#xff0c;巴溪湾小学、西门小学、三中、一中附属学校、实验小学等…

python中+=和=+的区别

本文原创&#xff0c;版权属作者个人所有&#xff0c;如需转载请联系作者本人。Q&微&#xff1a;155122733 -------------------------------------------------------------------------------------------------------- a1 代表在原值上更改 aa1相当于先定义一个变量&…

Spring Data JPA和分页

让我们从支持分页的经典JPA方法开始。 考虑一个简单的域类–一个具有名字&#xff0c;姓氏的“成员”。 为了支持在成员列表上进行分页&#xff0c;JPA方法是支持一种查找器&#xff0c;该查找器将获取第一个结果&#xff08;firstResult&#xff09;的偏移量和要检索的结果&am…

南阳理工 题目63 小猴子下落

小猴子下落 时间限制&#xff1a;3000 ms | 内存限制&#xff1a;65535 KB 难度&#xff1a;3 描述 有一颗二叉树&#xff0c;最大深度为D,且所有叶子的深度都相同。所有结点从左到右从上到下的编号为1,2,3&#xff0c;&#xff0c;2的D次方减1。在结点1处放一个小猴子&#…

vue 方法获取返回值_vue.js - vuex异步提交,怎么获取返回数据

问 题 做登录页面时,在vuex中的action异步提交后获取的数据在mutations中存储在state里面,但是总感觉不对,没有返回数据,我前端页面怎么获取数据,用mapgetter获取不到数据,是不是他不是实时更新的,而且输出的mapgetter输出的数据还在action的前面。下面是我前端部分代码…

Windows环境下安装、卸载Apache

安装Apache 服务 打开 Apcahe的目录 &#xff0c;打开bin目录&#xff0c; 如&#xff1a;E:\wamp\Apache24\bin &#xff0c;打开目录&#xff0c;Shift键 鼠标右键 &#xff0c; 点击 在此处打开命令窗口或者W快捷键直接到此处&#xff0c; 也可以Window键r&#xff0c;输入…