在上一篇文章中,我们学习了如何解析过大的XML文件并将其转换为RxJava流。 这次让我们看一个大的JSON文件。 我们的示例将基于微小的colors.json,其中包含将近150种这种格式的记录:
{"aliceblue": [240, 248, 255, 1],"antiquewhite": [250, 235, 215, 1],"aqua": [0, 255, 255, 1],"aquamarine": [127, 255, 212, 1],"azure": [240, 255, 255, 1],//...
鲜为人知的事实: 天蓝色也是一种颜色,而Python是蛇。 但是回到RxJava。 这个文件很小,但是我们将用它来学习一些原理。 如果遵循它们,您将能够加载和连续处理任意大,甚至无限长的JSON文件。 首先,标准的“ Jackson ”方式类似于JAXB:将整个文件加载到内存中并将其映射到Java bean。 但是,如果文件的大小为兆字节或千兆字节(由于某种原因,您发现JSON是存储千兆字节数据的最佳格式……),则此技术将无法使用。 幸运的是,杰克逊提供了类似于StAX的流模式。
使用Jackson逐个令牌加载JSON文件
使用JSON并将其转换为对象集合的标准ObjectMapper
没错。 但是为了避免将所有内容加载到内存中,我们必须使用下面的ObjectMapper
使用的较低级API。 让我们再次看一下JSON示例:
{"aliceblue": [240, 248, 255, 1],"antiquewhite": [250, 235, 215, 1],//...
从磁盘和内存的角度来看,这是一个单维字节流,我们可以在逻辑上将其聚合为JSON令牌:
START_OBJECT '{'
FIELD_NAME 'aliceblue'
START_ARRAY '['
VALUE_NUMBER_INT '240'
VALUE_NUMBER_INT '248'
VALUE_NUMBER_INT '255'
VALUE_NUMBER_INT '1'
END_ARRAY ']'
FIELD_NAME 'antiquewhite'
START_ARRAY '['
VALUE_NUMBER_INT '250'
VALUE_NUMBER_INT '235'
VALUE_NUMBER_INT '215'
VALUE_NUMBER_INT '1'
END_ARRAY ']'
...
你明白了。 如果您熟悉编译器理论,这是编译期间的第一步。 编译器将源代码从字符转换为令牌。
但是,如果您了解编译器理论,则可能不是为了生存而解析JSON。 无论如何! Jackson库以这种方式工作,我们可以在没有透明对象映射的情况下使用它:
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;JsonParser parser = new JsonFactory().createParser(new File("colors.json"));
parser.nextToken(); // JsonToken.START_OBJECT;
while (parser.nextToken() != JsonToken.END_OBJECT) {final String name = parser.getCurrentName();parser.nextToken(); // JsonToken.START_ARRAY;parser.nextValue();final int red = parser.getIntValue();parser.nextValue();final int green = parser.getIntValue();parser.nextValue();final int blue = parser.getIntValue();parser.nextValue();parser.getIntValue();System.out.println(name + ": " + red + ", " + green + ", " + blue);parser.nextToken(); // JsonToken.END_ARRAY;
}
parser.close();
…或者如果您摆脱了一些重复,并使代码更易于阅读:
import lombok.Value;JsonParser parser = new JsonFactory().createParser(new File("colors.json"));
parser.nextToken(); // JsonToken.START_OBJECT;
while (parser.nextToken() != JsonToken.END_OBJECT) {System.out.println(readColour(parser));
}
parser.close();//...private Colour readColour(JsonParser parser) throws IOException {final String name = parser.getCurrentName();parser.nextToken(); // JsonToken.START_ARRAY;final Colour colour = new Colour(name,readInt(parser),readInt(parser),readInt(parser),readInt(parser));parser.nextToken(); // JsonToken.END_ARRAY;return colour;
}private int readInt(JsonParser parser) throws IOException {parser.nextValue();return parser.getIntValue();
}@Value
class Colour {private final String name;private final int red;private final int green;private final int blue;private final int alpha;
}
它与RxJava有什么关系? 您可能会猜测–我们可以按需逐块读取此JSON文件。 这使背压机制可以无缝工作:
final Flowable colours = Flowable.generate(() -> parser(new File("colors.json")),this::pullOrComplete,JsonParser::close);
让我解释一下这三个lambda表达式在做什么。 第一个设置JsonParser
我们的可变状态,将用于产生( 拉动 )更多项目:
private JsonParser parser(File file) throws IOException {final JsonParser parser = new JsonFactory().createParser(file);parser.nextToken(); // JsonToken.START_OBJECT;return parser;
}
没有什么花哨。 第二个lambda表达式至关重要。 每当订户希望接收更多项目时,都会调用它。 如果它要求100个项目,则此lambda表达式将被调用100次:
private void pullOrComplete(JsonParser parser, Emitter<Colour> emitter) throws IOException {if (parser.nextToken() != JsonToken.END_OBJECT) {final Colour colour = readColour(parser);emitter.onNext(colour);} else {emitter.onComplete();}
}
当然,如果到达END_OBJECT
(关闭整个JSON文件),则表明流已结束。 最后一个lambda表达式仅允许清除状态,例如通过关闭JsonParser
和基础File
。 现在想象一下这个JSON文件的大小为数百GB。 有了Flowable<Colour>
我们可以以任意速度安全地使用它,而不会冒内存过载的风险。
翻译自: https://www.javacodegeeks.com/2017/09/streaming-large-json-file-jackson-rxjava-faq.html