HDFS WebHDFS 读写文件分析及HTTP Chunk Transfer Coding相关问题探究

文章目录

  • 前言
  • 需要回答的问题
    • DataNode端基于Netty的WebHDFS Service的实现
  • 基于重定向的文件写入流程
    • 写入一个大文件时WebHDFS和Hadoop Native的块分布差异
  • 基于重定向的数据读取流程
    • 尝试读取一个小文件
    • 尝试读取一个大文件
  • 读写过程中的Chunk Transfer-Encoding支持
    • 写文件使用Chunk Transfer-Encoding
    • 读文件使用Chunk Transfer-Encoding
      • Response Header中为什么没有Transfer-Encoding: chunked
      • 测试WebHDFS是否支持chunk Transfer-Encoding时的一个错误导致的错误判断
  • WebHDFS的特性和不足总结

前言

最近在调研使用WebHDFS进行HDFS文件读写的相关调查,因此需要对WebHDFS的整个读写流程进行探究,其中涉及到的必要的http重定向的整个流程必须搞清楚。
同时,由于HDFS涉及到大量的流式写和大文件读,因为我们比较关心WebHDFS对Chunked Transfer Coding的支持,我们对WebHDFS的这个特性进行了测试和代码层面的研究。

需要回答的问题

我们基于常规的hadoop-native 方式连接去读写文件的时候,会首先与NameNode通信,然后再与DataNode通信读取具体数据,这其中的处理逻辑都放在了hadoop-native中的具体客户端代码中。
但是一旦使用WebHDFS,我们所有的通信方式仅仅是HTTP。hadoop-native中的这种多次通信逻辑放在一个http请求中该怎么处理呢?所以,我们在测试WebHDFS以前,就首先有了疑问: 基于HTTP的WebHDFS该怎么处理先到NameNode然后到DataNode的整个过程?总不会让数据从DataNode、经过NameNode再到客户端吧?

DataNode端基于Netty的WebHDFS Service的实现

在DataNode启动的时候,会启动一个DatanodeHttpServer,用来启动对应的WebHDFS Service:

  private void startInfoServer()throws IOException {// SecureDataNodeStarter will bind the privileged port to the channel if// the DN is started by JSVC, pass it along.ServerSocketChannel httpServerChannel = secureResources != null ?secureResources.getHttpServerChannel() : null;httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);httpServer.start();

DataNodeHttpServer 构造的时候,实际上构造了基于Netty的DataNode Http Service。我们特别注意到,在initChannel()方法中,添加了对Chunked Transfer Encoding的支持handler ChunkedWriteHandler。后面我们会具体讲这个类在我们通过WebHDFS读取文件时的作用。

      this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new HttpRequestDecoder(),new HttpResponseEncoder());if (restCsrfPreventionFilter != null) {p.addLast(new RestCsrfPreventionFilterHandler(restCsrfPreventionFilter));}p.addLast(new ChunkedWriteHandler(), //支持chunk的数据写入new URLDispatcher(jettyAddr, conf, confForCreate));}});

上面的代码其实是创建一个Netty服务器端的基本流程(以下基本流程来自ChatGPT的回答,个人认为非常准确,因此直接粘贴):

  • 创建 EventLoopGroupEventLoopGroup 是一个处理 I/O 操作的线程池。在服务器端,我们通常需要创建两个 EventLoopGroup,一个用于接受客户端的连接(Boss Group),另一个用于处理连接的数据流(Worker Group)。

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    
  • 创建 ServerBootstrapServerBootstrap 是用于启动 Netty 服务器的引导类。它配置了服务器的各种参数,如线程模型、通道类型、处理器等。

    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyChannelInitializer()); // 设置处理连接的 ChannelInitializer
    
  • 设置 ChannelInitializerChannelInitializer 是一个特殊的处理器,用于配置新连接的 ChannelPipeline。在这里,你可以添加自定义的处理器来处理连接的数据流。在上面的HDFS代码中,创建了一个匿名的ChannelInitializer实现,重写了initChannel()方法:

    this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new HttpRequestDecoder(),new HttpResponseEncoder());.....}
    
  • 实现自定义的处理器: 在 ChannelInitializerinitChannel()方法中,添加Netty集成的或者自定义的处理器ChannelHandler,用于处理连接的数据流。这些处理器会成为一个处理器链,挂载在ChannelPipeline中。很显然,这个Pipeline下面的Handler时按照addLast()添加的顺序有序的,比如WebHDFS在下面的代码中,意味着读数据的时候,数据先经过ChunkedWriteHandler,然后经过URLDispatcher,而向客户端写数据的时候,数据流先经过URLDispatcher,然后经过ChunkedWriteHandler

      p.addLast(new ChunkedWriteHandler(), //支持chunk的数据写入new URLDispatcher(jettyAddr, conf, confForCreate));
    
  • 绑定端口并启动服务器: 最后,将服务器绑定到指定的端口,并启动服务器。

    ChannelFuture f = httpServer.bind(infoAddr);try {f.syncUninterruptibly();......
    

在DataNode端, 用户可以通过dfs.datanode.http.address配置了对应的WebHDFS地址,默认是0.0.0.0:9864

  public static final String  DFS_DATANODE_HTTP_ADDRESS_KEY = "dfs.datanode.http.address";public static final int     DFS_DATANODE_HTTP_DEFAULT_PORT = 9864;public static final String  DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT;

基于重定向的文件写入流程

写入一个大文件时WebHDFS和Hadoop Native的块分布差异

我们通过基础的java.net提供的HttpURLConnection创建Http客户端向HDFS写入文件。

通过命令, 可以看到其实这个文件的第一个Block都是在同一个机器上。
hdfs -copyFromLocal这种基于hadoop native的api拷贝文件,这个文件的block是逐个申请的,NameNode会为每一个Block根据当前的BlockPlacementPolicy分配对应的replica位置,但是,基于WebHDFS的NameNode只能在请求写文件的时候和NameNode进行一次通信,返回一个Location信息供客户端进行Redirect的写操作,因此,这导致这个文件的所有Block的第一个Replica都在这台机器上。至于每一个Block剩下的Replica的复制,按照HDFS的块写入策略,都是由第一个Replica复制到剩余两个Replica上去的。

我们做测试的时候,通过WebHDFS上传一个300MB的文件到HDFS。之所以选择300MB,是因为HDFS的块大小是128MB,因此300MB的文件需要创建3个Block,从而观察3个Block的第一个 Replica 是否是同一台机器。
下面是使用Java的HTTP客户端基于WebHDFS向HDFS写入一个大概300MB的文件的代码:

public class WebHDFSWriteFileExample
{public static void write_file(){try {// WebHDFS endpointString webHdfsEndpointProd = "http://dddddd-101.prod.corp.com:50070";// HDFS path where the file will be writtenString hdfsPath = "/user/hadoop/hello-world/330MB.txt";// Local file path to be uploadedString localFilePath = "/Users/cwu/330MB.txt";// Hadoop usernameString username = "hadoop";// WebHDFS REST API URL for file creation (PUT method)String webHdfsUrl = webHdfsEndpointProd + "/webhdfs/v1" + hdfsPath + "?op=CREATE&user.name=" + username + "&overwrite=true";// Open the local file for readingFileInputStream fileInputStream = new FileInputStream(localFilePath);BufferedReader reader = new BufferedReader(new InputStreamReader(fileInputStream));// Create HTTP connectionHttpURLConnection connection = createConnectionWithURL(webHdfsUrl);// Write file content to HDFSOutputStream outputStream = connection.getOutputStream();String inputLine;while ((inputLine = reader.readLine()) != null) {outputStream.write(inputLine.getBytes());outputStream.write("\r\n".getBytes());}outputStream.write("0\r\n\r\n".getBytes());// Close the connectionconnection.disconnect();}}// 基于WebHDFS的url创建一个HttpURLConnectionprivate static HttpURLConnection createConnectionWithURL(String webHdfsUrl) throws IOException {URL url = new URL(webHdfsUrl);HttpURLConnection connection = (HttpURLConnection) url.openConnection();connection.setDoOutput(true);connection.setRequestMethod("PUT");return connection;}}

文件上传以后,我们通过fsck命令查看上传的文件的块分布情况,结果验证了我们的猜想:文件的所有Block的第一个replica都是机器81.2.20.332,显然,这个机器就是我们向NameNode发送请求的时候NameNode返回给我们的307重定向的DataNode地址。重定向地址只有一个,因此,所有的块的写入就只能写到一个机器上去。

root@dddddd-101:~# HADOOP_USER_NAME=hdfs hdfs fsck hdfs://drdencrypted/user/hadoop/hello-world/330MB.txt -files -locations -blocks
Connecting to namenode via http://dddddd-101.prod.corp.com:50070/fsck?ugi=hdfs&files=1&locations=1&blocks=1&path=%2Fuser%2Fhadoop%2Fhello-world%2F330MB.txt
FSCK started by hdfs (auth:SIMPLE) from /81.2.2.108 for path /user/hadoop/hello-world/330MB.txt at Wed Jan 10 14:49:14 UTC 2024
/user/hadoop/hello-world/330MB.txt 348093376 bytes, 3 block(s):  OK
0. BP-332385978-81.2.1.108-1654488140099:blk_5650223226_4580715954 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-fa64ea42-f4e8-4ebb-999e-1727fcfb879a,DISK], DatanodeInfoWithStorage[81.2.12.132:50010,DS-88228db5-1c5b-4715-8b36-2bb653201367,DISK], DatanodeInfoWithStorage[81.2.11.125:50010,DS-e6936497-52f8-495e-9d80-0d7d5684369a,DISK]]
1. BP-332385978-81.2.1.108-1654488140099:blk_5650225022_4580717750 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-c0cb79b2-7679-4961-8bb4-917224b248ed,DISK], DatanodeInfoWithStorage[81.2.8.131:50010,DS-368a6917-1a36-419f-a6bc-7129978d0926,DISK], DatanodeInfoWithStorage[81.2.8.132:50010,DS-56b945ff-004f-4af5-879e-2941db14693e,DISK]]
2. BP-332385978-81.2.1.108-1654488140099:blk_5650228459_4580721187 len=79657920 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-0bf35d03-c89c-4e9a-86ce-af9c1a149b52,DISK], DatanodeInfoWithStorage[81.2.11.126:50010,DS-74da8386-1369-4562-bfec-ae6350b48fbd,DISK], DatanodeInfoWithStorage[81.2.12.132:50010,DS-9bcd388e-a129-4f5c-99b3-6611e59f5a12,DISK]]

因此,通过WebHDFS写文件其实是有一个隐含的重定向过程,但是读者看上面的代码,并没有看到任何重定向的处理逻辑,这是因为,在默认情况下,HttpURLConnection帮我们自动处理了重定向,即,如果服务器端返回307状态码,那么客户端自动重新建立到header中的Location中到连接,然后一样的请求发送给redirect地址,这一切对用户隐藏。
如果我们的确想观察重定向的详细过程,完全可以通过connection.setInstanceFollowRedirects(false);关闭客户端的自动重定向,手动处理重定向。同时,后面会讲,假如我们enable了Chunked Transfer Coding,自动重定向处理也将被关闭。
没有了自动重定向,就需要手动创建两次http请求,第一次请求发往NameNode,NameNode返回307,并附带了指向某台DataNode的Location,然后我们再依据Location第二次建立连接,将文件写入到对应的DataNode。因此基于上面的WebHDFSWriteFileExample,我们修改代码进行手动重定向以观察重定向细节:

public class WebHDFSWriteFileExample
{public static void write_file(){try {....... 发送请求到NameNodeif (responseCode == HttpURLConnection.HTTP_CREATED) {System.out.println("File created successfully on HDFS.");} else { // 发现了重定向System.out.println("Error creating file on HDFS. Response Code: " + responseCode);String newUrl = connection.getHeaderField("Location"); //获取header中的Location,即重定向的目标地址connection.disconnect(); // 关闭原先的对NameNode的连接,开始创建针对DataNode的连接HttpURLConnection connection2 = createConnectionWithURL(newUrl);// Write file content to HDFSoutputStream = connection2.getOutputStream();fileInputStream = new FileInputStream(localFilePath);reader = new BufferedReader(new InputStreamReader(fileInputStream));while ((inputLine = reader.readLine()) != null) {outputStream.write(inputLine.getBytes());}}}private static HttpURLConnection createConnectionWithURL(String webHdfsUrl) throws IOException {.....connection.setInstanceFollowRedirects(false); //disable掉自动重定向......
}

基于重定向的数据读取流程

我们将分别测试和研究读取小文件(不足一个Block)和大文件(两个或者更多Block)的读取流程,二者理论上会产生区别,因为假如文件来自两个或者更多的Block,基于WebHDFS的重定向似乎就没那么简单了。
下文来讲解具体的读取过程。

尝试读取一个小文件

对于只占用一个Block的小文件,我们发送读取请求给NameNode,NameNode会返回一个307的重定向,这个重定向一定是指向这个唯一的Block的其中一个Replica所在的DataNode的。
比如,读取一个1MB的小文件的代码如下:

public class WebHDFSReadFileExample {private static void read(String[] args){try {// WebHDFS endpointString webHdfsEndpointProd = "http://dddddd-101.prod.corp.com:50070";// HDFS path where the file will be writtenString hdfsPath = "/user/hadoop/hello-world/1MB.txt";// Local file path to be uploadedString localFilePath = "/Users/cwu/1MB.txt";// HDFS path to the file// Hadoop usernameString username = "hadoop";// WebHDFS REST API URL for file reading (GET method)String webHdfsUrl = webHdfsEndpoint + "/webhdfs/v1" + hdfsPath + "?op=OPEN&user.name=" + username;// Open the connectionHttpURLConnection connection = createConnectionWithURL(webHdfsUrl, false, "GET", false);if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {System.out.println("File created successfully on HDFS.");} else if (connection.getHeaderField("Location") != null){ // 307 redirect// 创建http连接,基于get请求读文件,关闭自动重定向,不enable chunkHttpURLConnection connection2 = createConnectionWithURL(newUrl, false, "GET", false);// Write file content to HDFSBufferedReader reader = new BufferedReader(new InputStreamReader(connection2.getInputStream()));// read the file from BufferReader.........}....}// 创建链接,用户可以选择是否打开自动重定向,是否使用chunkprivate static HttpURLConnection createConnectionWithURL(String webHdfsUrl, Boolean autoRedirect, String method, Boolean enable_chunk) throws IOException {URL url = new URL(webHdfsUrl);HttpURLConnection connection = (HttpURLConnection) url.openConnection();connection.setInstanceFollowRedirects(autoRedirect); // 关闭自动重定向connection.setRequestMethod(method);connection.setDoOutput(true);if(enable_chunk)connection.setChunkedStreamingMode(-1); //选择是否打开chunked streaming out。后面会说,该选项决定的是客户端的写,基于GET读取文件的时候,该选项不可以打开return connection;}

读取一个小文件的过程在预期之内,HttpClient首先与NameNode 建立Http连接, NameNode响应了307状态码同时在Response Header中包含了目标location的地址信息。在我们关闭了自动重定向的情况下,建立了与Location中指示的URL建立连接,然后读取到对应的文件。

尝试读取一个大文件

我们已经知道读取文件的时候,NameNode会返回一个重定向地址给我们,告诉我们去联系对应的DataNode读取对应数据。但是,假如这个文件不止一个block,那么,是不是在读取下一个block的时候DataNode也会返回下一个DataNode的地址给我们接着进行下一个Block的读取呢?

我们创建了一个200MB的文件,显然,这个文件会有两个Block,并且,我们从fsck可以看到,这个文件的两个块的总共6个副本,分布在6台不同的机器上。

root@rccp103-2d:/var/log/hadoop-hdfs# HADOOP_USER_NAME=hdfs hdfs fsck hdfs://rccp103-2d.iad5.prod.corp.com:8020/user/hadoop/hello-world/200MB.log -files -locations -blocks
Connecting to namenode via http://rccp103-2d.iad5.prod.corp.com:50070/fsck?ugi=hdfs&files=1&locations=1&blocks=1&path=%2Fuser%2Fhadoop%2Fhello-world%2Fhadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1
FSCK started by hdfs (auth:SIMPLE) from /71.9.2.108 for path /user/hadoop/hello-world/hadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1 at Thu Jan 11 08:38:25 UTC 2024
/user/hadoop/hello-world/hadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1 209715453 bytes, 2 block(s):  OK
0. BP-332385978-71.9.1.108-1654488140099:blk_5655503893_4585996621 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[71.9.9.131:50010,DS-a8904c39-cb1c-4fe0-b9e8-625f3bad85be,DISK], DatanodeInfoWithStorage[71.9.7.130:50010,DS-8e0d756e-bab4-4b44-b131-6bb9af01362b,DISK], DatanodeInfoWithStorage[71.9.8.128:50010,DS-622eb32d-e292-4824-818e-a72a12144fe3,DISK]]
1. BP-332385978-71.9.1.108-1654488140099:blk_5655503948_4585996676 len=75497725 Live_repl=3 [DatanodeInfoWithStorage[71.9.10.128:50010,DS-3e0bbada-d77c-46af-a1fa-573e54443706,DISK], DatanodeInfoWithStorage[71.9.12.129:50010,DS-7942484b-2b8e-438a-9d4b-8bf8d892c0cf,DISK], DatanodeInfoWithStorage[71.9.12.131:50010,DS-d2e9c177-4ea0-4232-9f28-f8a52644be8a,DISK]]

我们还是使用上面的WebHDFSReadFileExample代码,发现,读取文件的时候,NameNode返回了307重定向,并且指向了第一个块的某一个副本机器,文件的第一个和第二个块都是从这个机器上读取的,尽管,第二个块并不是分布在这台机器上

我们检查代码,发现了其实现的原理,简而言之,第一个Block的Replica的机器,充当了整个文件读取的中转,而不是仅仅负责自己存储的那个Replica的读取。
DataNode通过向Netty注册的WebHdfsHandler这个自定义的ChannelHandler来处理发往DataNode请求(上文讲过Netty的一些基本实体的定义)。WebHdfsHandler.onOpen()方法用来处理用户的读文件请求。我们从下面的onOpen()代码的实现细节可以看到,实际上,DataNode并不是将用户请求的文件在自己上面的Replica返回给Http客户端,而是将自己作为中转,创建一个Hadoop Client,读取文件,然后将读取到的文件的InputStream封装程一个ChunkedStream返回给客户端。所以,当客户端调用connection2.getInputStream()就能够拿到对应DataNode创建的读取文件的InputStream

 private void onOpen(ChannelHandlerContext ctx) throws IOException {// DataNode自己创建一个读取文件的Client,这是一个普通的Hadoop Native客户端,与普通的HDFS Java 客户端没有任何去呗final DFSClient dfsclient = newDfsClient(nnId, conf);HdfsDataInputStream in = dfsclient.createWrappedInputStream(dfsclient.open(path, bufferSize, true));in.seek(offset);data = in;ctx.write(resp);ctx.writeAndFlush(new ChunkedStream(data) { // 封装好InputStream,返回给客户端,可以看到,是以Chunk的方式。后面会讲,ChunkedStream会被当前的Handler中的下一个Handler即ChunkedWriteHandler进一步处理@Overridepublic void close() throws Exception {super.close();dfsclient.close();}}).addListener(ChannelFutureListener.CLOSE);}

读写过程中的Chunk Transfer-Encoding支持

Chunk Transfer-Encoding的支持会极大影响流式数据读写的性能。我们看看什么是Chunk Transfer-Encoding以及它能给我们带来什么:

分块传输编码(Chunk Transfer-Encoding)是一种 HTTP 传输机制,它将消息主体分割成一系列块,并使用长度前缀来标识每个块的大小。这种编码方式带来了一些好处:

  • 实时传输: 分块传输编码允许服务器在生成响应的同时将数据发送给客户端,而不必等到整个响应主体准备就绪。这对于实时生成的内容或流式传输非常有用。

  • 无需预先知道内容大小: 使用分块传输编码时,不需要预先知道整个消息主体的大小。这对于动态生成的内容或流式传输,其中内容的大小可能在生成过程中动态变化,非常有用。

  • 提高响应速度: 分块传输编码可以在数据准备的同时发送,减少了等待整个响应准备就绪的时间。这有助于提高响应速度,特别是对于大型或动态生成的内容。

  • 降低延迟: 分块传输编码允许客户端在接收到一部分数据时就开始处理,而不必等待整个响应完成。这有助于降低延迟,特别是对于实时应用或交互性应用。

  • 更好的适应网络波动: 在网络条件不稳定的情况下,分块传输编码可以更好地适应不同的带宽和延迟条件。由于可以在生成数据的同时发送,它能够更灵活地处理网络波动。

所以,总的说来,Chunk Transfer-Encoding主要用来文件大小未知的流式文件的传输,或者虽然确定大小但是文件大小很大的传输场景。

正常使用场景下,流式数据或者大文件的发送方会在http header中设置Transfer-Encoding: chunked,同时以chunk的方式发送数据,数据的接受方在接收到header并发现是Chunk Transfer-Encoding以后,会进行对应的编解码操作。当然,这个解码操作一般是对应的Library底层的自动操作,上层用户直接看到的是一个解码以后的InputStream比如BufferedInputStream等,而不需要关心本身的解码细节。

注意,Chunked Transfer Coding的每一个chunk的数据大小是自描述的,因此,并不要求在一个stream连接中所有chunk的大小都相同。只要遵循Chunked Transfer Coding的数据格式,比如,如果发送的文件是一个日志文件,数据的发送方甚至可以按照一行一个chunk进行数据的发送。

我们在下文详细讲解通过WebHDFS读写文件时对Chunk的支持。

写文件使用Chunk Transfer-Encoding

我们通过connection.setChunkedStreamingMode(chunklen),就会enable Chunked Transfer Coding写文件,即为我们添加Transfer-Encoding: chunked的http header, 同时设置了一个chunkLength,我们传入的参数如果是不大于0的参数,那么chunkLength就设置为默认值4096,否则就按照我们设置的大小设置chunkLength

与之相对,我们可以调用connection.setFixedLengthStreamingMode (int contentLength) 来选择使用Fixed-Length Streaming
Chunked StreamingFixed-Length Streaming是相互排斥的,二者只能选其一,这是因为二者的使用场景完全不同:

  • setChunkedStreamingMode: 用于指定请求体使用分块传输编码 (Chunked Transfer Encoding)。分块传输编码允许将请求体分割成多个块,每个块都带有长度信息。这样可以在数据生成的同时发送,而无需等到整个请求体准备就绪。使用这种模式可以提高效率,特别是对于较大的请求体。

    connection.setChunkedStreamingMode(0); // 使用分块传输编码,0 表示使用默认块大小
    
  • setFixedLengthStreamingMode: 用于指定请求体的固定长度。这意味着在实际写入请求体数据之前,需要知道请求体的总长度。这样的模式适用于较小的请求体,且长度可预知的情况。

    int contentLength = calculateContentLength(); // 计算请求体的长度
    connection.setFixedLengthStreamingMode(contentLength);
    

    选择使用哪个方法取决于请求体的大小和生成的时间。对于动态生成的或大小未知的请求体,通常更适合使用 setChunkedStreamingMode。对于已知大小的请求体,使用 setFixedLengthStreamingMode 可能更为合适。

下面的代码就是HttpURLConnection中两个方法的实现:

------------------------------------------HttpURLConnection------------------------------------------public void setChunkedStreamingMode (int chunklen) {.......if (fixedContentLength != -1 || fixedContentLengthLong != -1) {throw new IllegalStateException ("Fixed length streaming mode set");}chunkLength = chunklen <=0? DEFAULT_CHUNK_SIZE : chunklen;}public void setFixedLengthStreamingMode (int contentLength) {......fixedContentLength = contentLength;}

我们从HttpURLConnection.getOutputStream0()方法可以看到,

  • 由于我们之前设置了chunkLength,因此,这里会构建对应的ChunkedOutputStream,这意味着,我们通过HttpURLConnection.getOutputStream()返回的OutputStream进行数据的写入的时候,会经过ChunkedOutputStream然后才到最底层的比如SocketOutputStream,即ChunkedOutputStream会帮我们进行数据的chunk编码并发送给远程。
  • 如果是Http Method是Get,会强制变成Post,即我们不可能在get请求中使用Chunk Transfer Encoding,如果调用了setChunkedStreamingMode(chunklen),那么Http Method会被换成POST。这是为什么我们在读文件的时候假如调用setChunkedStreamingMode(chunklen),WebHDFS会返回一个400,让我们误以为WebHDFS不支持Chunk Transfer Coding。后文会详细讲解。
  • 我们从writeRequests可以看到,假如我们通过setChunkedStreamingMode(0)设置了chunkLength,那么写文件的请求中会被隐含加上Transfer-Encoding: chunked 的 Http header。而如果我们通过setFixedLengthStreamingMode()设置了使用fixed-length streaming,那么就会隐含加上Content-Length 的 header。
private synchronized OutputStream getOutputStream0() throws IOException {.........if (method.equals("GET")) { //在获取OutputStream,GET会被强制替换成POSTmethod = "POST"; // Backward compatibility}....if (streaming()) {if (strOutputStream == null) {if (chunkLength != -1) { /* chunked */strOutputStream = new StreamingOutputStream(new ChunkedOutputStream(ps, chunkLength), -1L); //封装一层ChunkedOutputStream} else { /* must be fixed content length */long length = 0L;if (fixedContentLengthLong != -1) {length = fixedContentLengthLong;} else if (fixedContentLength != -1) {length = fixedContentLength;}strOutputStream = new StreamingOutputStream(ps, length);}.....}private void writeRequests() throws IOException {.....if (this.streaming()) {if (this.chunkLength != -1) { // 设置了chunk, 会帮我们添加Transfer-Encoding:chunked的headerthis.requests.set("Transfer-Encoding", "chunked");var14 = true;} else if (this.fixedContentLengthLong != -1L) { //如果是fixed-length code,会帮我们添加Content-Length headerthis.requests.set("Content-Length", String.valueOf(this.fixedContentLengthLong));} else if (this.fixedContentLength != -1) {this.requests.set("Content-Length", String.valueOf(this.fixedContentLength));}}

下图显示了我们使用Chunked Transfer Coding写文件的时候的客户端观察到的HttpURLConnection的运行时的outputStream,从最上层 HttpURLConnection.StreamingOutStream,经过ChunkedOutputStream一直到最下层SocketOutputStream的层层封装过程。
在这里插入图片描述

读文件使用Chunk Transfer-Encoding

刚刚说了,是否使用Chunk Transfer-Encoding,其实是数据的发送方的职责,因此,客户端从WebHDFS读文件,数据的发送方,WebHDFS Service,是否使用Chunk Transfer-Encoding,是它自己的选择。我们从WebHDFS服务器端代码可以看到,基于Netty的WebHDFS服务器端的确是使用Chunked Transfer Encoding来发送文件到客户端的。
DataNode启动的时候,会构造WebHDFS背后的Netty服务器端。构造时加入了ChunkedWriteHandler:

      this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new HttpRequestDecoder(),.......p.addLast(new ChunkedWriteHandler(),new URLDispatcher(jettyAddr, conf, confForCreate));}});

从上面的代码可以看到,最后网ChannelPipeline中添加了两个ChannelHandlerChunkedWriteHandlerURLDispatcher,那么,这两个Handler在pipeline中的调用顺序是怎样的呢?

我们来看一下ChatGPT的准确回答:


在 Netty 中,ChannelPipeline 是一个处理事件的流水线。ChannelPipeline 中的每个节点是一个 ChannelHandler,负责处理特定类型的事件。当数据通过 Channel 时,它会在 ChannelPipeline 中的各个处理节点中依次经过,每个节点负责特定的处理。

对于 ChannelPipeline.addLast 添加的 ChannelHandler,它们的执行顺序是按照它们被添加的顺序来决定的。新添加的 ChannelHandler 会被添加到管道的尾部,因此会成为最后执行的处理器。

在读写的时候,事件的传递顺序如下:

  • 写操作: 当有数据要被写入 Channel 时,数据会从 ChannelPipeline 的尾部开始,依次经过每个 ChannelHandler write() 方法,最终传递给底层的通道。

  • 读操作: 当有数据从 Channel 读取时,数据会从 ChannelPipeline 的头部开始,依次经过每个 ChannelHandlerchannelRead() 方法,直到达到 ChannelPipeline 的尾部。


所以,在Netty Service向客户端写入数据的时候,从我们自定义的URLDispatcher中写入(其实内部会调用WebHdfsHandler),然后会被ChunkedWriteHandler进行处理,即chunk by chunk的进行数据的编码,最后发送给客户端。
根据ChunkedWriteHandler的官方文档,如果我们使用ChunkedWriteHandler来进行Chunk Transfer Coding的数据传输,它的输入,即它的前一个Handler必须写入的是ChunkedInput格式,这就是WebHdfsHandler.onOpen()的处理方式,ChunkedStreamChunkedInput的实现类:

  private void onOpen(ChannelHandlerContext ctx) throws IOException {........resp = new DefaultHttpResponse(HTTP_1_1, OK);HttpHeaders headers = resp.headers();.....final DFSClient dfsclient = newDfsClient(nnId, conf);HdfsDataInputStream in = dfsclient.createWrappedInputStream(dfsclient.open(path, bufferSize, true));in.seek(offset);long contentLength = in.getVisibleLength() - offset;.......final InputStream data;........headers.set(CONTENT_LENGTH, contentLength);data = new LimitInputStream(in, contentLength);ctx.write(resp); //先写元数据ctx.writeAndFlush(new ChunkedStream(data) { // 再写content,即读取的文件流。这个文件流被封装为ChunkedStream,进而交给pipeline中的ChunkedWriteHanlder,最后发送给客户端@Overridepublic void close() throws Exception {super.close();dfsclient.close();}}).addListener(ChannelFutureListener.CLOSE);}

Response Header中为什么没有Transfer-Encoding: chunked

但是很奇怪,我们从HttpUrlConnection收到的服务器端的Response Header中并没有看到Transfer-Encoding:chunked,相反,我们看到了Content-Length: 192832,难道文件并不是以Chunked Transfer-Encoding的编码方式发送的?
其实不是这样的。HttpUrlConnectionChunked Transfer-Encoding的数据处理并完全依赖Transfer-Encoding:chunked的header,它可以自动识别数据本身的格式,然后在底层自动识别并解码Chunked Transfer-Encoding,然后直接将解码后的结以InputStream的方式返回给客户端。Netty的ChunkedWriteHandler帮我们发送数据到远程客户端时,尽管数据是按照chunked格式发送给的,但是其并不会帮我们加上Transfer-Encoding:chunked到Response header中去,但是客户端依然是可以正确解析到的。ChunkedWriteHandler的好处是:

  • 提供更灵活的数据处理:ChunkedWriteHandler可以接收 FileRegionHttpContentByteBuf 对象,并负责将它们转换成分块的形式发送到下一个 ChannelOutboundHandler
  • 自动处理分块传输:ChunkedWriteHandler会自动将数据分块,确保数据按照 Chunked Transfer Encoding 规范传输。
  • 对于大文件或大数据流,使用 ChunkedWriteHandler 可以在传输过程中分块发送,降低内存占用。
    我们看到, WebHDFS服务器端使用了ChunkedWriteHandler,因此在ChannelPipeline中,需要将读取块数据的InputStream封装成ChunkedStream,因为ChunkedStreamChunkedInput 接口的实现类:
    final InputStream data;if (contentLength >= 0) {headers.set(CONTENT_LENGTH, contentLength);data = new LimitInputStream(in, contentLength);} else {data = in;}ctx.write(resp);ctx.writeAndFlush(new ChunkedStream(data) { //将读取块数据的InputStream封装为ChunkedStream,供ChunkedWriteHandler进行进一步的分块处理@Overridepublic void close() throws Exception {super.close();dfsclient.close();}}).addListener(ChannelFutureListener.CLOSE);

所以,其实ChunkedWriteHandler只是分块写入数据的一种更好的封装方式,方便了我们进行chunk数据的写入,但其实我们完全可以不用ChunkedWriteHandler而手动进行chunk数据写入,比如,我们自己在header中添加Transfer-Encoding:chunked,然后按照 Chunked Transfer Encoding 去写数据,服务端示例代码如下:

public class NettyChunkedWriteHandlerExample {public static void main(String[] args) {.....try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new SimpleChannelInboundHandler<HttpMessage>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws Exception {FileInputStream fis = new FileInputStream("/Users/cwu/chunk-test");ChunkedStream chunkedStream = new ChunkedStream(fis);// 设置响应头DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK);HttpHeaders headers = response.headers();headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");headers.set(CONTENT_TYPE, APPLICATION_OCTET_STREAM);headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); // 设置`Transfer-Encoding:chunked`ctx.write(response);chunk_write_stream(fis, ctx);....serverBootstrap.bind(8087).sync().channel().closeFuture().sync();.....}public static void chunk_write_stream(InputStream inputStream,  ChannelHandlerContext ctx) throws IOException {// 读取文件并以 chunked 形式写入响应byte[] buffer = new byte[1024];int bytesRead;while ((bytesRead = inputStream.read(buffer)) != -1) {if(bytesRead != 1024){System.out.println("The final bytesRead is not 1024, its size is " + bytesRead);byte[] buffer_copy = Arrays.copyOfRange(buffer, 0, bytesRead);ctx.write(new DefaultHttpContent(Unpooled.copiedBuffer(buffer_copy)));}else{ctx.write(new DefaultHttpContent(Unpooled.copiedBuffer(buffer)));}}//写入最后一个空字符ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);}

这时候我们看基于HttpURLConnection的客户端,可以看到,客户端识别了Transfer-Encoding:chunked,下层Stream的封装已经自动有了ChunkedInputStream,同样完成了基于chunk的数据传输。

测试WebHDFS是否支持chunk Transfer-Encoding时的一个错误导致的错误判断

我在最初测试WebHDFS是否支持 Chunked Transfer-Encoding,错误的代码差点儿导致我做出了错误的结论,认为从WebHDFS读取数据无法使用Chunked Transfer Encoding
由于对Chunked Transfer-Encoding理解不正确,我以为要想让服务器端Chunk by Chunk的方式将文件传输过来,客户端需要在GET请求(由于是读文件,因此是GET请求)中添加Transfer-Encoding:chunked header,对于HttpURLConnection客户端,不用手动添加这个header,只需要调用connection.setChunkedStreamingMode(0);就可以了,我这样去做,但是服务器端抛出了一个400

java.io.IOException: Server returned HTTP response code: 400 for URL: http://rccd101-7a.sjc2.dev.conviva.com:9864/webhdfs/v1/user/hadoop/hello-world/session_summaries.PT_HOURLY_2023-09-07_2023-09-07.sql-2?op=OPEN&user.name=hadoop&namenoderpcaddress=olap-hdfs-test&offset=0at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1914)at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1512)at WebHDFSReadFileExample.chunk_read(WebHDFSReadFileExample.java:45)at WebHDFSReadFileExample.main(WebHDFSReadFileExample.java:86)

代码很简单,就是在创建HttpURLConnection的时候,增加了connection.setChunkedStreamingMode(0);。后来通过debug,发现,当我们调用connection.setChunkedStreamingMode(0);的时候,会带来了两个主要变化:

  1. 自动重定向会被disable:这个变化倒不是导致我们上面的异常的原因,但是需要引起注意。这意味着,即使我们没有显式地通过connection.setInstanceFollowRedirects(false);去关闭自动重定向,如果调用了connection.setChunkedStreamingMode(0);,自动重定向也会被关闭。加入式读文件,那么我们需要手动识别并处理307
  2. GET请求会被自动变成POST请求:这是导致400问题的原因。WebHDFS客户端是将Http Method和用户的operation联合在一起来识别操作的,代码如下。可以看到,如果是OPEN操作,那么Http Method必须是GET 才能被识别,因此,一个POST类型的OPEN操作无法被识别:
      public void handle(ChannelHandlerContext ctx, HttpRequest req)throws IOException, URISyntaxException {String op = params.op();HttpMethod method = req.getMethod();if (PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)&& method == PUT) {onCreate(ctx);} else if (PostOpParam.Op.APPEND.name().equalsIgnoreCase(op)&& method == POST) {onAppend(ctx);} else if (GetOpParam.Op.OPEN.name().equalsIgnoreCase(op)&& method == GET) {onOpen(ctx); //读取文件的HTTP Method必须是Get} else if(GetOpParam.Op.GETFILECHECKSUM.name().equalsIgnoreCase(op)&& method == GET) {onGetFileChecksum(ctx);} else if(PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)&& method == OPTIONS) {allowCORSOnCreate(ctx);} else {throw new IllegalArgumentException("Invalid operation " + op);}
    
  • 那么,为什么我们调用connection.setChunkedStreamingMode(0);,请求类型会被强制改成POST呢?因为connection.setChunkedStreamingMode(0);本来就应该是数据发送方的事情,数据的写入方通过设置Transfer-Encoding:chunked来告知数据接受方自己的数据是Chunked Transfer Coding,请按照Chunked Transfer Coding格式进行解析。所以,我们在读取WebHDFS文件的时候设置connection.setChunkedStreamingMode(0);是错误的操作。

WebHDFS的特性和不足总结

从上面的代码分析可以看到,先抛开性能差异不谈,在具体实现上,基于Hadoop Native的客户端的很多实现细节,肯定是无法在基于Http的WebHDFS中实现的。因此,WebHDFS表面上能成为Hadoop Native的替代,但是其实整个流程都发生了变化。总而言之,我们通过实验和HDFS的代码发现:

  1. WebHDFS不支持HDFS HA。但是这还好,我们可以在WebHDFS客户端上层手动做判断,实现HA。
  2. 写大文件的时候所有Block的第一个Replica会集中在某一台机器上。
  3. 读取文件的时候,文件的所有Block的读取操作都来自于第一个Block的某一个Replica机器
  4. 基于Http本身的限制,hedged read肯定完全不再可能。
  5. 数据的写入和读取都支持Chunked Transfer Coding
  6. 性能问题。我没有进行benchmark测试,有兴趣的读者可以自行寻找测试结果或者自行测试。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/617170.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

xcode安装及运行源码

抖音教学视频 目录 1、xcode 介绍 2、xcode 下载 3、xocde 运行ios源码 4、快捷键 1、xcode 介绍 Xcode 是运行在操作系统Mac OS X上的集成开发工具&#xff08;IDE&#xff09;&#xff0c;由Apple Inc开发。Xcode是开发 macOS 和 iOS 应用程序的最快捷的方式。Xcode 具有…

2024.1.11 关于 Jedis 库操作 Redis 基本演示

目录 引言 通用命令 SET & GET EXISTS & DEL KEYS EXPIRE & TTL TYPE String 类型命令 MGET & MSET GETRANGE & SETRANGE APPEND INCR & DECR List 类型命令 LPUSH & LRANG LPOP & LPOP BLPOP & BRPOP LLEN Set 类型命…

SwiftUI 为任意视图加上徽章(Badge)而想到的(下)

概览 在 SwiftUI 为任意视图加上徽章(Badge)而想到的(上) 这篇文章中,我们讨论了如何使用 Preference 技术打造 SwiftUI 中任意视图上徽章的实现。 虽然,我们完成了一系列挑战最后基本得偿所愿,不过在上篇的实现中仍有些许不尽如人意之处。 在本篇博文中,您将学到如下…

Jmeter 性能压测 —— TPS与QPS

1、TPS和QPS的区别 TPS&#xff1a;意思是每秒事务数&#xff0c;具体事务的定义都是人为的&#xff0c;可以一个接口、多个接口、一个业务流程等等。 一个事务是指事务内第一个请求发送到接收到最后一个请求的响应的过程&#xff0c;以此来计算使用的时间和完成的事务个数。…

WPF 布局

了解 WPF中所有布局如下&#xff0c;我们一一尝试实现&#xff0c;本文档主要以图形化的形式展示每个布局的功能。 布局&#xff1a; Border、 BulletDecorator、 Canvas、 DockPanel、 Expander、 Grid、 GridView、 GridSplitter、 GroupBox、 Panel、 ResizeGrip、 Separat…

蓝屏代码0x000007E解决办法

概述 出现该问题&#xff1a; 1、硬件冲突造成的蓝屏 驱动冲突&#xff1a;与其他设备或应用程序的驱动冲突可能会引起系统崩溃。 2、内存虚拟不足造成的蓝屏 错误配置&#xff1a;不正确的配置或设置可能会导致蓝屏错误。 3、超频后也可能出现蓝屏 CUP超频或者显卡超频后出现蓝…

NAS使用的一些常见命令 ssh sftp 上传 下载 ALL in one

目录 登陆上传/下载内网穿透 登陆 ssh 登陆 ssh usernameserverIP -p portNumsftp 登陆 sftp -P portNum usernameserverIP上传/下载 如ls等&#xff0c;远程服务器操作 如lls等&#xff0c;本机操作&#xff0c;前缀为l 文件 put **** 将本机上文件上传到远程服务器上当…

分析一个项目(微信小程序篇)三

目录 接下来分析接口方面&#xff1a; home接口&#xff1a; categories接口&#xff1a; details接口&#xff1a; login接口&#xff1a; 分析一个项目讲究的是如何进行对项目的解析分解&#xff0c;进一步了解项目的整体结构&#xff0c;熟悉项目的结构&#xff0c;能够…

kylin集群反向代理(健康检查)

前面一篇文章提到了使用nginx来对kylin集群进行反向代理&#xff0c; kylin集群使用nginx反向代理-CSDN博客文章浏览阅读349次&#xff0c;点赞8次&#xff0c;收藏9次。由于是同一个集群的&#xff0c;元数据没有变化&#xff0c;所以&#xff0c;直接将原本的kylin使用scp的…

C++学习笔记——类继承

目录 一、一个简单的基类 1.1封装性 1.2继承性 1.3虚函数 1.4多态性 二、基类 2.1一个简单的C基类的示例 2.2 Animal是一个基类。 三、继承 3.1概念 3.2is-a关系 3.3多态公有继承 3.4静态联编和动态联编 3.5访问控制 3.6ABC理念 一、一个简单的基类 C中的基类是一…

02. 坦克大战项目-准备工作和绘制坦克

02. 坦克大战项目-准备工作和绘制坦克 01. 准备工作 1. 首先我们要创建四个类 1. Tank类 介绍&#xff1a;Tank 类主要用来表示坦克的基本属性和行为 public class Tank {private int x;//坦克的横坐标private int y;//坦克的纵坐标public int getX() {return x;}public v…

【ACL 2023】 The Art of Prompting Event Detection based on Type Specific Prompts

【ACL 2023】 The Art of Prompting: Event Detection based on Type Specific Prompts 论文&#xff1a;https://aclanthology.org/2023.acl-short.111/ 代码&#xff1a;https://github.com/VT-NLP/Event_APEX Abstract 我们比较了各种形式的提示来表示事件类型&#xff0…

生成式AI,发展可持续吗?

最近有消息透露&#xff0c;OpenAI预计在2024年实现16亿美元的年化收入。相较于去年10月预测的13亿美元&#xff0c;这一数字增长了3亿美元&#xff0c;增长部分主要来源于ChatGPT订阅、API接入以及其他业务。 与此同时&#xff0c;其竞争对手Anthropic预计年化收入至少为8.5亿…

Goby 漏洞发布|用友 NC registerServlet 反序列化远程代码执行漏洞

漏洞名称&#xff1a;用友 NC registerServlet 反序列化远程代码执行漏洞 English Name&#xff1a;Yonyou NC registerServlet Deserialize Remote Code Execute Vulnerability CVSS core: 9.8 影响资产数&#xff1a; 21320 漏洞描述&#xff1a; 用友 NC Cloud 是一种商…

UVa1308/LA2572 Viva Confetti

题目链接 本题是2002年ICPC亚洲区域赛金沢(日本)赛区的H题 题意 我已经把n个圆盘依次放到了桌面上。现按照放置顺序依次给出各个圆盘的圆心位置和半径&#xff0c;问最后有多少圆盘可见&#xff1f;如下图所示。 分析 《训练指南》的题解&#xff1a; 题目说“保证在对输入数据…

Kafka配置Kerberos安全认证及与Java程序集成

Background 本文主要介绍在 Kafka 中如何配置 Kerberos 认证&#xff0c;以及 java 使用 JAAS 来进行 Kerberos 认证连接。本文演示为单机版。 所用软件版本 查看 Kerberos 版本命令&#xff1a;klist -V 软件名称版本jdk1.8.0_202kafka2.12-2.2.1kerberos1.15.1 1、Kerberos …

vivado 使用项目摘要、配置项目设置、仿真设置

使用项目摘要 Vivado IDE包括一个交互式项目摘要&#xff0c;可根据设计动态更新命令被运行&#xff0c;并且随着设计在设计流程中的进展。项目摘要包括概览选项卡和用户可配置的仪表板&#xff0c;如下图所示。有关信息&#xff0c;请参阅《Vivado Design Suite用户指南&…

Python基础知识:整理11 模块的导入、自定义模块和安装第三方包

1 模块的导入 1.1 使用import 导入time模块&#xff0c;使用sleep功能&#xff08;函数&#xff09; import time print("start") time.sleep(3) print("end")1.2 使用from 导入time的sleep功能 from time import sleep print("start") slee…

高级分布式系统-第6讲 分布式系统的容错性--可靠的组通信

可靠的组通信 组内通信最好是每个进程之间都建立点到点的通信&#xff0c; 但实际中这样的组织结构不是有效的&#xff0c; 因为会浪费很大的通信带宽。 在平等组中&#xff0c; 多播是主要的组织结构。 但多播是具有同步性质的容错结构&#xff0c; 并不适用拜占庭模型。 多…

LeetCode刷题笔记

面试经典150题 1. 数组/字符串 1.1 合并两个有序数组 题目 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&#xff0c;使合并后的数组同样按 非递减顺…