jclouds
1.目标
在上一篇文章中 ,我们研究了如何使用jclouds中的通用Blob API将内容上传到S3。 在本文中,我们将使用jclouds的S3特定的异步API上传内容并利用S3提供的分段上传功能。
2.准备
2.1。 设置自定义API
上传过程的第一部分是创建jclouds API-这是针对Amazon S3的自定义API:
public AWSS3AsyncClient s3AsyncClient() {String identity = ...String credentials = ...BlobStoreContext context = ContextBuilder.newBuilder('aws-s3').credentials(identity, credentials).buildView(BlobStoreContext.class);RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();return providerContext.getAsyncApi();
}
2.2。 确定内容的零件数
Amazon S3对于每个要上传的部分都有5 MB的限制。 因此,我们需要做的第一件事是确定可以分割内容的适当数量的部分,以使我们没有低于5 MB限制的部分:
public static int getMaximumNumberOfParts(byte[] byteArray) {int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024if (numberOfParts== 0) {return 1;}return numberOfParts;
}
2.3。 将内容分成几部分
将把字节数组分成一定数量的部分:
public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);int fullSize = byteArray.length;long dimensionOfPart = fullSize / maxNumberOfParts;for (int i = 0; i < maxNumberOfParts; i++) {int previousSplitPoint = (int) (dimensionOfPart * i);int splitPoint = (int) (dimensionOfPart * (i + 1));if (i == (maxNumberOfParts - 1)) {splitPoint = fullSize;}byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);parts.add(partBytes);}return parts;
}
我们将测试将字节数组分成多个部分的逻辑–我们将生成一些字节,将字节数组拆分,使用Guava将其重新组合在一起,并验证是否可以恢复原始字节:
@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {byte[] byteArray = randomByteData(16);int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length,equalTo(byteArray.length));byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));assertThat(byteArray, equalTo(unmultiplexed));
}
要生成数据,我们只需使用Random的支持:
byte[] randomByteData(int mb) {byte[] randomBytes = new byte[mb * 1024 * 1024];new Random().nextBytes(randomBytes);return randomBytes;
}
2.4。 创建有效载荷
既然我们已经为内容确定了正确的部分数量,并且设法将内容分解为多个部分,那么我们需要为jclouds API 生成Payload对象 :
public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {List<Payload> payloads = Lists.newArrayList();for (byte[] filePart : fileParts) {byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();Payload partPayload = Payloads.newByteArrayPayload(filePart);partPayload.getContentMetadata().setContentLength((long) filePart.length);partPayload.getContentMetadata().setContentMD5(partMd5Bytes);payloads.add(partPayload);}return payloads;
}
3.上载
上传过程是一个灵活的多步骤过程-这意味着:
- 可以在拥有所有数据之前开始上传-数据可以在输入时上传
- 数据分块上传-如果这些操作之一失败,则可以简单地将其检索
- 块可以并行上传–这可以大大提高上传速度,尤其是在大文件的情况下
3.1。 启动上传操作
Upload操作的第一步是启动该过程 。 对S3的请求必须包含标准的HTTP标头–特别是内容 – MD5标头。 我们将在这里使用Guava哈希函数支持:
Hashing.md5().hashBytes(byteArray).asBytes();
这是整个字节数组(而不是各个部分)的md5哈希 。
为了启动上载以及与S3的所有进一步交互,我们将使用AWSS3AsyncClient –我们之前创建的异步API:
ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();
密钥是分配给对象的句柄–它必须是客户端指定的唯一标识符。
还要注意,即使我们使用的是异步版本的API, 我们也阻止了该操作的结果–这是因为我们需要初始化的结果才能继续前进。
操作的结果是S3返回的上载ID –这将在整个生命周期中识别上载,并将出现在所有后续的上载操作中。
3.2。 上载零件
下一步是上传零件 。 我们的目标是并行发送这些请求,因为上载零件操作代表了大部分上载过程:
List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {ListenableFuture<String> future = s3AsyncApi.uploadPart(container, key, partNumber + 1, uploadId, payloads.get(partNumber));ongoingOperations.add(future);
}
零件编号必须是连续的,但发送请求的顺序无关紧要。
提交所有上载零件请求后,我们需要等待它们的响应,以便我们可以收集每个零件的单独ETag值:
Function<ListenableFuture<String>, String> getEtagFromOp =new Function<ListenableFuture<String>, String>() {public String apply(ListenableFuture<String> ongoingOperation) {try {return ongoingOperation.get();} catch (InterruptedException | ExecutionException e) {throw new IllegalStateException(e);}}
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);
如果由于某种原因,上载部分操作之一失败, 则可以重试该操作,直到成功为止。 上面的逻辑不包含重试机制,但是建立它应该足够简单。
3.3。 完成上传操作
上传过程的最后一步是完成分段操作 。 S3 API要求以Map的形式上传来自先前零件的响应,现在我们可以从上面获得的ETag列表中轻松创建这些响应:
Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {parts.put(i + 1, etagsOfParts.get(i));
}
最后,发送完整的请求:
s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();
这将返回完成对象的最终ETag,并完成整个上传过程。
4。结论
在本文中,我们使用自定义S3 jclouds API构建了一个支持多部分的,完全并行的S3上传操作。 此操作可以按原样使用,但是可以通过几种方法进行改进 。 首先,应在上传操作周围添加重试逻辑,以更好地处理失败。 接下来,对于非常大的文件,即使该机制并行发送所有上载的多部分请求, 限制机制仍应限制发送的并行请求的数量。 这既可以避免带宽成为瓶颈,又可以确保Amazon本身不会将上传过程标记为超过每秒允许的请求限制– Guava RateLimiter可能非常适合此操作。
翻译自: https://www.javacodegeeks.com/2013/04/multipart-upload-on-s3-with-jclouds.html
jclouds