Hadoop 3.2.2
libhdfs/hdfs.c
struct hdfsFile_internal {void* file;enum hdfsStreamType type;int flags;
};
以上数据结构中的 flags 是由以下接口赋值
hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags,int bufferSize, short replication, tSize blockSize)
{struct hdfsStreamBuilder *bld = hdfsStreamBuilderAlloc(fs, path, flags);if (bufferSize != 0) {hdfsStreamBuilderSetBufferSize(bld, bufferSize);}if (replication != 0) {hdfsStreamBuilderSetReplication(bld, replication);}if (blockSize != 0) {hdfsStreamBuilderSetDefaultBlockSize(bld, blockSize);}return hdfsStreamBuilderBuild(bld);
}
int hdfsFileUsesDirectRead(hdfsFile file)
{return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_READ);
}void hdfsFileDisableDirectRead(hdfsFile file)
{file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
}
static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,int32_t bufferSize, int16_t replication, int64_t blockSize)
{
...
if ((flags & O_WRONLY) == 0) {// Try a test read to see if we can do direct readschar buf;if (readDirect(fs, file, &buf, 0) == 0) {// Success - 0-byte read should return 0file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;} else if (errno != ENOTSUP) {// Unexpected error. Clear it, don't set the direct flag.fprintf(stderr,"hdfsOpenFile(%s): WARN: Unexpected error %d when testing ""for direct read compatibility\n", path, errno);}}
...
}
hadoop 3.3.1
hadoop 3.3.1 版本该接口实现代码已经修改,
相关commit: https://github.com/apache/hadoop/pull/597/files#diff-c1385f6f8f4422f3f22bd28edd3123209d551e513b73429e58dd7c3d3350f59d
if ((flags & O_WRONLY) == 0) {// Check the StreamCapabilities of jFile to see if we can do direct// readsif (hdfsHasStreamCapability(jFile, "in:readbytebuffer")) {file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;}// Check the StreamCapabilities of jFile to see if we can do direct// preadsif (hdfsHasStreamCapability(jFile, "in:preadbytebuffer")) {file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;}}
判断一个数据流是否具备某个接口的能力
org.apache.hadoop.fs.FSDataInputStream.java
@Overridepublic boolean hasCapability(String capability) {return StoreImplementationUtils.hasCapability(in, capability);}
org.apache.hadoop.fs.impl.StoreImplementationUtils.java
/*** Probe for an input stream having a capability; returns true* if the stream implements {@link StreamCapabilities} and its* {@code hasCapabilities()} method returns true for the capability.* @param in input stream* @param capability capability to probe for* @return true if the stream declares that it supports the capability.*/public static boolean hasCapability(InputStream in, String capability) {return objectHasCapability(in, capability);}
这里 子 流 不是 StreamCapabilities, 直接返回的是 false
/*** Probe for an object having a capability; returns true* if the stream implements {@link StreamCapabilities} and its* {@code hasCapabilities()} method returns true for the capability.* This is a package private method intended to provided a common* implementation for input and output streams.* {@link StreamCapabilities#hasCapability(String)} call is for public use.* @param object object to probe.* @param capability capability to probe for* @return true if the object implements stream capabilities and* declares that it supports the capability.*/static boolean objectHasCapability(Object object, String capability) {if (object instanceof StreamCapabilities) {return ((StreamCapabilities) object).hasCapability(capability);}return false;}
Demo 测试
$ cat test_libhdfs_read.c#include "hdfs.h" #include <stdio.h>
#include <stdlib.h>
#include <iostream>int main(int argc, char **argv) {hdfsFS fs;const char *rfile = argv[1];tSize bufferSize = strtoul(argv[3], NULL, 10);hdfsFile readFile;char* buffer;tSize curSize;if (argc != 4) {fprintf(stderr, "Usage: hdfs_read <filename> <filesize> <buffersize>\n");exit(-1);}//fs = hdfsConnect("default", 0);fs = hdfsConnect("cosn://xiangx-guigu-1258469122", 0);if (!fs) {fprintf(stderr, "Oops! Failed to connect to hdfs!\n");exit(-1);} readFile = hdfsOpenFile(fs, rfile, O_RDONLY, bufferSize, 0, 0);std::cout << "readFile: " << readFile << std::endl;if (!readFile) {std::cout << "Failed to open: " << std::endl;fprintf(stdout, "Failed to open %s for writing!\n", rfile);fprintf(stderr, "Failed to open %s for writing!\n", rfile);exit(-2);}buffer = (char*)malloc(sizeof(char) * bufferSize);if(buffer == NULL) {return -2;}curSize = bufferSize;for (; curSize == bufferSize;) {curSize = hdfsRead(fs, readFile, (void*)buffer, curSize);}free(buffer);hdfsCloseFile(fs, readFile);hdfsDisconnect(fs);std::cout << "curSize: " << curSize << std::endl;return 0;
}
- 编译命令
g++ test_libhdfs_read.c -I$HADOOP_HOME/include -L$HADOOP_HOME/lib/native -L/usr/local/jdk/jre/lib/amd64/server/ -lhdfs -ljvm -o test_libhdfs_read -Wl,-rpath,/usr/local/jdk/jre/lib/amd64/server/
- 运行命令
export CLASSPATH=`hadoop classpath --glob`
./test_libhdfs_read cosn://xxx/testdata/testparquet/part-00000-4a31a445-8104-402e-ad60-486fba4ae5f6-c000-f8f4776d-5003-4888-872f-c9567471b5f5.snappy.parquet 100 100
readDirect: FSDataInputStream#read error:
UnsupportedOperationException: Byte-buffer read unsupported by input streamjava.lang.UnsupportedOperationException: Byte-buffer read unsupported by input streamat org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:150)
readFile: 0x33cd8b0
curSize: 99