HDFS WebHDFS 读写文件分析及HTTP Chunk Transfer Encoding相关问题探究

文章目录

  • 前言
  • 需要回答的首要问题
  • DataNode端基于Netty的WebHDFS Service的实现
  • 基于重定向的文件写入流程
    • 写入一个大文件时WebHDFS和Hadoop Native的块分布差异
  • 基于重定向的数据读取流程
    • 尝试读取一个小文件
    • 尝试读取一个大文件
  • 读写过程中的Chunk Transfer-Encoding支持
    • 写文件使用Chunk Transfer-Encoding
    • 读文件使用Chunk Transfer-Encoding
      • Response Header中为什么没有Transfer-Encoding: chunked
      • 测试WebHDFS是否支持chunk Transfer-Encoding时的一个错误导致的错误判断
  • WebHDFS的特性和不足总结

前言

最近在调研使用WebHDFS进行HDFS文件读写的相关调查,因此需要对WebHDFS的整个读写流程进行探究,其中涉及到的必要的http重定向的整个流程必须搞清楚。
同时,由于HDFS涉及到大量的流式写和大文件读,因为我们比较关心WebHDFS对Chunked Transfer Coding的支持,我们对WebHDFS的这个特性进行了测试和代码层面的研究。

需要回答的首要问题

我们基于常规的hadoop-native 方式连接去读写文件的时候,会首先与NameNode通信,然后再与DataNode通信读取具体数据,这其中的处理逻辑都放在了hadoop-native中的具体客户端代码中。
但是一旦使用WebHDFS,我们所有的通信方式仅仅是HTTP。hadoop-native中的这种多次通信逻辑放在一个http请求中该怎么处理呢?所以,我们在测试WebHDFS以前,就首先有了疑问: 基于HTTP的WebHDFS该怎么处理先到NameNode然后到DataNode的整个过程?总不会让数据从DataNode、经过NameNode再到客户端吧?

DataNode端基于Netty的WebHDFS Service的实现

在DataNode启动的时候,会启动一个DatanodeHttpServer,用来启动对应的WebHDFS Service:

  private void startInfoServer()throws IOException {// SecureDataNodeStarter will bind the privileged port to the channel if// the DN is started by JSVC, pass it along.ServerSocketChannel httpServerChannel = secureResources != null ?secureResources.getHttpServerChannel() : null;httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);httpServer.start();

DataNodeHttpServer 构造的时候,实际上构造了基于Netty的DataNode Http Service。我们特别注意到,在initChannel()方法中,添加了对Chunked Transfer Encoding的支持handler ChunkedWriteHandler。后面我们会具体讲这个类在我们通过WebHDFS读取文件时的作用。

      this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new HttpRequestDecoder(),new HttpResponseEncoder());if (restCsrfPreventionFilter != null) {p.addLast(new RestCsrfPreventionFilterHandler(restCsrfPreventionFilter));}p.addLast(new ChunkedWriteHandler(), //支持chunk的数据写入new URLDispatcher(jettyAddr, conf, confForCreate));}});

上面的代码其实是创建一个Netty服务器端的基本流程(以下基本流程来自ChatGPT的回答,个人认为非常准确,因此直接粘贴):

  • 创建 EventLoopGroupEventLoopGroup 是一个处理 I/O 操作的线程池。在服务器端,我们通常需要创建两个 EventLoopGroup,一个用于接受客户端的连接(Boss Group),另一个用于处理连接的数据流(Worker Group)。

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    
  • 创建 ServerBootstrapServerBootstrap 是用于启动 Netty 服务器的引导类。它配置了服务器的各种参数,如线程模型、通道类型、处理器等。

    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyChannelInitializer()); // 设置处理连接的 ChannelInitializer
    
  • 设置 ChannelInitializerChannelInitializer 是一个特殊的处理器,用于配置新连接的 ChannelPipeline。在这里,你可以添加自定义的处理器来处理连接的数据流。在上面的HDFS代码中,创建了一个匿名的ChannelInitializer实现,重写了initChannel()方法:

    this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new HttpRequestDecoder(),new HttpResponseEncoder());.....}
    
  • 实现自定义的处理器: 在 ChannelInitializerinitChannel()方法中,添加Netty集成的或者自定义的处理器ChannelHandler,用于处理连接的数据流。这些处理器会成为一个处理器链,挂载在ChannelPipeline中。很显然,这个Pipeline下面的Handler时按照addLast()添加的顺序有序的,比如WebHDFS在下面的代码中,意味着读数据的时候,数据先经过ChunkedWriteHandler,然后经过URLDispatcher,而向客户端写数据的时候,数据流先经过URLDispatcher,然后经过ChunkedWriteHandler

      p.addLast(new ChunkedWriteHandler(), //支持chunk的数据写入new URLDispatcher(jettyAddr, conf, confForCreate));
    
  • 绑定端口并启动服务器: 最后,将服务器绑定到指定的端口,并启动服务器。

    ChannelFuture f = httpServer.bind(infoAddr);try {f.syncUninterruptibly();......
    

在DataNode端, 用户可以通过dfs.datanode.http.address配置了对应的WebHDFS地址,默认是0.0.0.0:9864

  public static final String  DFS_DATANODE_HTTP_ADDRESS_KEY = "dfs.datanode.http.address";public static final int     DFS_DATANODE_HTTP_DEFAULT_PORT = 9864;public static final String  DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT;

基于重定向的文件写入流程

写入一个大文件时WebHDFS和Hadoop Native的块分布差异

我们通过基础的java.net提供的HttpURLConnection创建Http客户端向HDFS写入文件。

通过命令, 可以看到其实这个文件的第一个Block都是在同一个机器上。
hdfs -copyFromLocal这种基于hadoop native的api拷贝文件,这个文件的block是逐个申请的,NameNode会为每一个Block根据当前的BlockPlacementPolicy分配对应的replica位置,但是,基于WebHDFS的NameNode只能在请求写文件的时候和NameNode进行一次通信,返回一个Location信息供客户端进行Redirect的写操作,因此,这导致这个文件的所有Block的第一个Replica都在这台机器上。至于每一个Block剩下的Replica的复制,按照HDFS的块写入策略,都是由第一个Replica复制到剩余两个Replica上去的。

我们做测试的时候,通过WebHDFS上传一个300MB的文件到HDFS。之所以选择300MB,是因为HDFS的块大小是128MB,因此300MB的文件需要创建3个Block,从而观察3个Block的第一个 Replica 是否是同一台机器。
下面是使用Java的HTTP客户端基于WebHDFS向HDFS写入一个大概300MB的文件的代码:

public class WebHDFSWriteFileExample
{public static void write_file(){try {// WebHDFS endpointString webHdfsEndpointProd = "http://dddddd-101.prod.corp.com:50070";// HDFS path where the file will be writtenString hdfsPath = "/user/hadoop/hello-world/330MB.txt";// Local file path to be uploadedString localFilePath = "/Users/cwu/330MB.txt";// Hadoop usernameString username = "hadoop";// WebHDFS REST API URL for file creation (PUT method)String webHdfsUrl = webHdfsEndpointProd + "/webhdfs/v1" + hdfsPath + "?op=CREATE&user.name=" + username + "&overwrite=true";// Open the local file for readingFileInputStream fileInputStream = new FileInputStream(localFilePath);BufferedReader reader = new BufferedReader(new InputStreamReader(fileInputStream));// Create HTTP connectionHttpURLConnection connection = createConnectionWithURL(webHdfsUrl);// Write file content to HDFSOutputStream outputStream = connection.getOutputStream();String inputLine;while ((inputLine = reader.readLine()) != null) {outputStream.write(inputLine.getBytes());outputStream.write("\r\n".getBytes());}outputStream.write("0\r\n\r\n".getBytes());// Close the connectionconnection.disconnect();}}// 基于WebHDFS的url创建一个HttpURLConnectionprivate static HttpURLConnection createConnectionWithURL(String webHdfsUrl) throws IOException {URL url = new URL(webHdfsUrl);HttpURLConnection connection = (HttpURLConnection) url.openConnection();connection.setDoOutput(true);connection.setRequestMethod("PUT");return connection;}}

文件上传以后,我们通过fsck命令查看上传的文件的块分布情况,结果验证了我们的猜想:文件的所有Block的第一个replica都是机器81.2.20.332,显然,这个机器就是我们向NameNode发送请求的时候NameNode返回给我们的307重定向的DataNode地址。重定向地址只有一个,因此,所有的块的写入就只能写到一个机器上去。

root@dddddd-101:~# HADOOP_USER_NAME=hdfs hdfs fsck hdfs://drdencrypted/user/hadoop/hello-world/330MB.txt -files -locations -blocks
Connecting to namenode via http://dddddd-101.prod.corp.com:50070/fsck?ugi=hdfs&files=1&locations=1&blocks=1&path=%2Fuser%2Fhadoop%2Fhello-world%2F330MB.txt
FSCK started by hdfs (auth:SIMPLE) from /81.2.2.108 for path /user/hadoop/hello-world/330MB.txt at Wed Jan 10 14:49:14 UTC 2024
/user/hadoop/hello-world/330MB.txt 348093376 bytes, 3 block(s):  OK
0. BP-332385978-81.2.1.108-1654488140099:blk_5650223226_4580715954 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-fa64ea42-f4e8-4ebb-999e-1727fcfb879a,DISK], DatanodeInfoWithStorage[81.2.12.132:50010,DS-88228db5-1c5b-4715-8b36-2bb653201367,DISK], DatanodeInfoWithStorage[81.2.11.125:50010,DS-e6936497-52f8-495e-9d80-0d7d5684369a,DISK]]
1. BP-332385978-81.2.1.108-1654488140099:blk_5650225022_4580717750 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-c0cb79b2-7679-4961-8bb4-917224b248ed,DISK], DatanodeInfoWithStorage[81.2.8.131:50010,DS-368a6917-1a36-419f-a6bc-7129978d0926,DISK], DatanodeInfoWithStorage[81.2.8.132:50010,DS-56b945ff-004f-4af5-879e-2941db14693e,DISK]]
2. BP-332385978-81.2.1.108-1654488140099:blk_5650228459_4580721187 len=79657920 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-0bf35d03-c89c-4e9a-86ce-af9c1a149b52,DISK], DatanodeInfoWithStorage[81.2.11.126:50010,DS-74da8386-1369-4562-bfec-ae6350b48fbd,DISK], DatanodeInfoWithStorage[81.2.12.132:50010,DS-9bcd388e-a129-4f5c-99b3-6611e59f5a12,DISK]]

因此,通过WebHDFS写文件其实是有一个隐含的重定向过程,但是读者看上面的代码,并没有看到任何重定向的处理逻辑,这是因为,在默认情况下,HttpURLConnection帮我们自动处理了重定向,即,如果服务器端返回307状态码,那么客户端自动重新建立到header中的Location中到连接,然后一样的请求发送给redirect地址,这一切对用户隐藏。
如果我们的确想观察重定向的详细过程,完全可以通过connection.setInstanceFollowRedirects(false);关闭客户端的自动重定向,手动处理重定向。同时,后面会讲,假如我们enable了Chunked Transfer Coding,自动重定向处理也将被关闭。
没有了自动重定向,就需要手动创建两次http请求,第一次请求发往NameNode,NameNode返回307,并附带了指向某台DataNode的Location,然后我们再依据Location第二次建立连接,将文件写入到对应的DataNode。因此基于上面的WebHDFSWriteFileExample,我们修改代码进行手动重定向以观察重定向细节:

public class WebHDFSWriteFileExample
{public static void write_file(){try {....... 发送请求到NameNodeif (responseCode == HttpURLConnection.HTTP_CREATED) {System.out.println("File created successfully on HDFS.");} else { // 发现了重定向System.out.println("Error creating file on HDFS. Response Code: " + responseCode);String newUrl = connection.getHeaderField("Location"); //获取header中的Location,即重定向的目标地址connection.disconnect(); // 关闭原先的对NameNode的连接,开始创建针对DataNode的连接HttpURLConnection connection2 = createConnectionWithURL(newUrl);// Write file content to HDFSoutputStream = connection2.getOutputStream();fileInputStream = new FileInputStream(localFilePath);reader = new BufferedReader(new InputStreamReader(fileInputStream));while ((inputLine = reader.readLine()) != null) {outputStream.write(inputLine.getBytes());}}}private static HttpURLConnection createConnectionWithURL(String webHdfsUrl) throws IOException {.....connection.setInstanceFollowRedirects(false); //disable掉自动重定向......
}

基于重定向的数据读取流程

我们将分别测试和研究读取小文件(不足一个Block)和大文件(两个或者更多Block)的读取流程,二者理论上会产生区别,因为假如文件来自两个或者更多的Block,基于WebHDFS的重定向似乎就没那么简单了。
下文来讲解具体的读取过程。

尝试读取一个小文件

对于只占用一个Block的小文件,我们发送读取请求给NameNode,NameNode会返回一个307的重定向,这个重定向一定是指向这个唯一的Block的其中一个Replica所在的DataNode的。
比如,读取一个1MB的小文件的代码如下:

public class WebHDFSReadFileExample {private static void read(String[] args){try {// WebHDFS endpointString webHdfsEndpointProd = "http://dddddd-101.prod.corp.com:50070";// HDFS path where the file will be writtenString hdfsPath = "/user/hadoop/hello-world/1MB.txt";// Local file path to be uploadedString localFilePath = "/Users/cwu/1MB.txt";// HDFS path to the file// Hadoop usernameString username = "hadoop";// WebHDFS REST API URL for file reading (GET method)String webHdfsUrl = webHdfsEndpoint + "/webhdfs/v1" + hdfsPath + "?op=OPEN&user.name=" + username;// Open the connectionHttpURLConnection connection = createConnectionWithURL(webHdfsUrl, false, "GET", false);if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {System.out.println("File created successfully on HDFS.");} else if (connection.getHeaderField("Location") != null){ // 307 redirect// 创建http连接,基于get请求读文件,关闭自动重定向,不enable chunkHttpURLConnection connection2 = createConnectionWithURL(newUrl, false, "GET", false);// Write file content to HDFSBufferedReader reader = new BufferedReader(new InputStreamReader(connection2.getInputStream()));// read the file from BufferReader.........}....}// 创建链接,用户可以选择是否打开自动重定向,是否使用chunkprivate static HttpURLConnection createConnectionWithURL(String webHdfsUrl, Boolean autoRedirect, String method, Boolean enable_chunk) throws IOException {URL url = new URL(webHdfsUrl);HttpURLConnection connection = (HttpURLConnection) url.openConnection();connection.setInstanceFollowRedirects(autoRedirect); // 关闭自动重定向connection.setRequestMethod(method);connection.setDoOutput(true);if(enable_chunk)connection.setChunkedStreamingMode(-1); //选择是否打开chunked streaming out。后面会说,该选项决定的是客户端的写,基于GET读取文件的时候,该选项不可以打开return connection;}

读取一个小文件的过程在预期之内,HttpClient首先与NameNode 建立Http连接, NameNode响应了307状态码同时在Response Header中包含了目标location的地址信息。在我们关闭了自动重定向的情况下,建立了与Location中指示的URL建立连接,然后读取到对应的文件。

尝试读取一个大文件

我们已经知道读取文件的时候,NameNode会返回一个重定向地址给我们,告诉我们去联系对应的DataNode读取对应数据。但是,假如这个文件不止一个block,那么,是不是在读取下一个block的时候DataNode也会返回下一个DataNode的地址给我们接着进行下一个Block的读取呢?

我们创建了一个200MB的文件,显然,这个文件会有两个Block,并且,我们从fsck可以看到,这个文件的两个块的总共6个副本,分布在6台不同的机器上。

root@rccp103-2d:/var/log/hadoop-hdfs# HADOOP_USER_NAME=hdfs hdfs fsck hdfs://rccp103-2d.iad5.prod.corp.com:8020/user/hadoop/hello-world/200MB.log -files -locations -blocks
Connecting to namenode via http://rccp103-2d.iad5.prod.corp.com:50070/fsck?ugi=hdfs&files=1&locations=1&blocks=1&path=%2Fuser%2Fhadoop%2Fhello-world%2Fhadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1
FSCK started by hdfs (auth:SIMPLE) from /71.9.2.108 for path /user/hadoop/hello-world/hadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1 at Thu Jan 11 08:38:25 UTC 2024
/user/hadoop/hello-world/hadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1 209715453 bytes, 2 block(s):  OK
0. BP-332385978-71.9.1.108-1654488140099:blk_5655503893_4585996621 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[71.9.9.131:50010,DS-a8904c39-cb1c-4fe0-b9e8-625f3bad85be,DISK], DatanodeInfoWithStorage[71.9.7.130:50010,DS-8e0d756e-bab4-4b44-b131-6bb9af01362b,DISK], DatanodeInfoWithStorage[71.9.8.128:50010,DS-622eb32d-e292-4824-818e-a72a12144fe3,DISK]]
1. BP-332385978-71.9.1.108-1654488140099:blk_5655503948_4585996676 len=75497725 Live_repl=3 [DatanodeInfoWithStorage[71.9.10.128:50010,DS-3e0bbada-d77c-46af-a1fa-573e54443706,DISK], DatanodeInfoWithStorage[71.9.12.129:50010,DS-7942484b-2b8e-438a-9d4b-8bf8d892c0cf,DISK], DatanodeInfoWithStorage[71.9.12.131:50010,DS-d2e9c177-4ea0-4232-9f28-f8a52644be8a,DISK]]

我们还是使用上面的WebHDFSReadFileExample代码,发现,读取文件的时候,NameNode返回了307重定向,并且指向了第一个块的某一个副本机器,文件的第一个和第二个块都是从这个机器上读取的,尽管,第二个块并不是分布在这台机器上

我们检查代码,发现了其实现的原理,简而言之,第一个Block的Replica的机器,充当了整个文件读取的中转,而不是仅仅负责自己存储的那个Replica的读取。
DataNode通过向Netty注册的WebHdfsHandler这个自定义的ChannelHandler来处理发往DataNode请求(上文讲过Netty的一些基本实体的定义)。WebHdfsHandler.onOpen()方法用来处理用户的读文件请求。我们从下面的onOpen()代码的实现细节可以看到,实际上,DataNode并不是将用户请求的文件在自己上面的Replica返回给Http客户端,而是将自己作为中转,创建一个Hadoop Client,读取文件,然后将读取到的文件的InputStream封装程一个ChunkedStream返回给客户端。所以,当客户端调用connection2.getInputStream()就能够拿到对应DataNode创建的读取文件的InputStream

 private void onOpen(ChannelHandlerContext ctx) throws IOException {// DataNode自己创建一个读取文件的Client,这是一个普通的Hadoop Native客户端,与普通的HDFS Java 客户端没有任何去呗final DFSClient dfsclient = newDfsClient(nnId, conf);HdfsDataInputStream in = dfsclient.createWrappedInputStream(dfsclient.open(path, bufferSize, true));in.seek(offset);data = in;ctx.write(resp);ctx.writeAndFlush(new ChunkedStream(data) { // 封装好InputStream,返回给客户端,可以看到,是以Chunk的方式。后面会讲,ChunkedStream会被当前的Handler中的下一个Handler即ChunkedWriteHandler进一步处理@Overridepublic void close() throws Exception {super.close();dfsclient.close();}}).addListener(ChannelFutureListener.CLOSE);}

读写过程中的Chunk Transfer-Encoding支持

Chunk Transfer-Encoding的支持会极大影响流式数据读写的性能。我们看看什么是Chunk Transfer-Encoding以及它能给我们带来什么:

分块传输编码(Chunk Transfer-Encoding)是一种 HTTP 传输机制,它将消息主体分割成一系列块,并使用长度前缀来标识每个块的大小。这种编码方式带来了一些好处:

  • 实时传输: 分块传输编码允许服务器在生成响应的同时将数据发送给客户端,而不必等到整个响应主体准备就绪。这对于实时生成的内容或流式传输非常有用。

  • 无需预先知道内容大小: 使用分块传输编码时,不需要预先知道整个消息主体的大小。这对于动态生成的内容或流式传输,其中内容的大小可能在生成过程中动态变化,非常有用。

  • 提高响应速度: 分块传输编码可以在数据准备的同时发送,减少了等待整个响应准备就绪的时间。这有助于提高响应速度,特别是对于大型或动态生成的内容。

  • 降低延迟: 分块传输编码允许客户端在接收到一部分数据时就开始处理,而不必等待整个响应完成。这有助于降低延迟,特别是对于实时应用或交互性应用。

  • 更好的适应网络波动: 在网络条件不稳定的情况下,分块传输编码可以更好地适应不同的带宽和延迟条件。由于可以在生成数据的同时发送,它能够更灵活地处理网络波动。

所以,总的说来,Chunk Transfer-Encoding主要用来文件大小未知的流式文件的传输,或者虽然确定大小但是文件大小很大的传输场景。

正常使用场景下,流式数据或者大文件的发送方会在http header中设置Transfer-Encoding: chunked,同时以chunk的方式发送数据,数据的接受方在接收到header并发现是Chunk Transfer-Encoding以后,会进行对应的编解码操作。当然,这个解码操作一般是对应的Library底层的自动操作,上层用户直接看到的是一个解码以后的InputStream比如BufferedInputStream等,而不需要关心本身的解码细节。

注意,Chunked Transfer Coding的每一个chunk的数据大小是自描述的,因此,并不要求在一个stream连接中所有chunk的大小都相同。只要遵循Chunked Transfer Coding的数据格式,比如,如果发送的文件是一个日志文件,数据的发送方甚至可以按照一行一个chunk进行数据的发送。

我们在下文详细讲解通过WebHDFS读写文件时对Chunk的支持。

写文件使用Chunk Transfer-Encoding

我们通过connection.setChunkedStreamingMode(chunklen),就会enable Chunked Transfer Coding写文件,即为我们添加Transfer-Encoding: chunked的http header, 同时设置了一个chunkLength,我们传入的参数如果是不大于0的参数,那么chunkLength就设置为默认值4096,否则就按照我们设置的大小设置chunkLength

与之相对,我们可以调用connection.setFixedLengthStreamingMode (int contentLength) 来选择使用Fixed-Length Streaming
Chunked StreamingFixed-Length Streaming是相互排斥的,二者只能选其一,这是因为二者的使用场景完全不同:

  • setChunkedStreamingMode: 用于指定请求体使用分块传输编码 (Chunked Transfer Encoding)。分块传输编码允许将请求体分割成多个块,每个块都带有长度信息。这样可以在数据生成的同时发送,而无需等到整个请求体准备就绪。使用这种模式可以提高效率,特别是对于较大的请求体。
connection.setChunkedStreamingMode(0); // 使用分块传输编码,0 表示使用默认块大小
  • setFixedLengthStreamingMode: 用于指定请求体的固定长度。这意味着在实际写入请求体数据之前,需要知道请求体的总长度。这样的模式适用于较小的请求体,且长度可预知的情况。

    int contentLength = calculateContentLength(); // 计算请求体的长度
    connection.setFixedLengthStreamingMode(contentLength);
    

    选择使用哪个方法取决于请求体的大小和生成的时间。对于动态生成的或大小未知的请求体,通常更适合使用 setChunkedStreamingMode。对于已知大小的请求体,使用 setFixedLengthStreamingMode 可能更为合适。

下面的代码就是HttpURLConnection中两个方法的实现:

------------------------------------------HttpURLConnection------------------------------------------public void setChunkedStreamingMode (int chunklen) {.......if (fixedContentLength != -1 || fixedContentLengthLong != -1) {throw new IllegalStateException ("Fixed length streaming mode set");}chunkLength = chunklen <=0? DEFAULT_CHUNK_SIZE : chunklen;}public void setFixedLengthStreamingMode (int contentLength) {......fixedContentLength = contentLength;}

我们从HttpURLConnection.getOutputStream0()方法可以看到,

  • 由于我们之前设置了chunkLength,因此,这里会构建对应的ChunkedOutputStream,这意味着,我们通过HttpURLConnection.getOutputStream()返回的OutputStream进行数据的写入的时候,会经过ChunkedOutputStream然后才到最底层的比如SocketOutputStream,即ChunkedOutputStream会帮我们进行数据的chunk编码并发送给远程。
  • 如果是Http Method是Get,会强制变成Post,即我们不可能在get请求中使用Chunk Transfer Encoding,如果调用了setChunkedStreamingMode(chunklen),那么Http Method会被换成POST。这是为什么我们在读文件的时候假如调用setChunkedStreamingMode(chunklen),WebHDFS会返回一个400,让我们误以为WebHDFS不支持Chunk Transfer Encoding。后文会详细讲解。
  • 我们从writeRequests可以看到,假如我们通过setChunkedStreamingMode(0)设置了chunkLength,那么写文件的请求中会被隐含加上Transfer-Encoding: chunked 的 Http header。而如果我们通过setFixedLengthStreamingMode()设置了使用fixed-length streaming,那么就会隐含加上Content-Length 的 header。
private synchronized OutputStream getOutputStream0() throws IOException {.........if (method.equals("GET")) { //在获取OutputStream,GET会被强制替换成POSTmethod = "POST"; // Backward compatibility}....if (streaming()) {if (strOutputStream == null) {if (chunkLength != -1) { /* chunked */strOutputStream = new StreamingOutputStream(new ChunkedOutputStream(ps, chunkLength), -1L); //封装一层ChunkedOutputStream} else { /* must be fixed content length */long length = 0L;if (fixedContentLengthLong != -1) {length = fixedContentLengthLong;} else if (fixedContentLength != -1) {length = fixedContentLength;}strOutputStream = new StreamingOutputStream(ps, length);}.....}private void writeRequests() throws IOException {.....if (this.streaming()) {if (this.chunkLength != -1) { // 设置了chunk, 会帮我们添加Transfer-Encoding:chunked的headerthis.requests.set("Transfer-Encoding", "chunked");var14 = true;} else if (this.fixedContentLengthLong != -1L) { //如果是fixed-length code,会帮我们添加Content-Length headerthis.requests.set("Content-Length", String.valueOf(this.fixedContentLengthLong));} else if (this.fixedContentLength != -1) {this.requests.set("Content-Length", String.valueOf(this.fixedContentLength));}}

下图显示了我们使用Chunked Transfer Coding写文件的时候的客户端观察到的HttpURLConnection的运行时的outputStream,从最上层 HttpURLConnection.StreamingOutStream,经过ChunkedOutputStream一直到最下层SocketOutputStream的层层封装过程。
在这里插入图片描述

读文件使用Chunk Transfer-Encoding

刚刚说了,是否使用Chunk Transfer-Encoding,其实是数据的发送方的职责,因此,客户端从WebHDFS读文件,数据的发送方,WebHDFS Service,是否使用Chunk Transfer-Encoding,是它自己的选择。我们从WebHDFS服务器端代码可以看到,基于Netty的WebHDFS服务器端的确是使用Chunked Transfer Encoding来发送文件到客户端的。
DataNode启动的时候,会构造WebHDFS背后的Netty服务器端。构造时加入了ChunkedWriteHandler:

      this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new HttpRequestDecoder(),.......p.addLast(new ChunkedWriteHandler(),new URLDispatcher(jettyAddr, conf, confForCreate));}});

从上面的代码可以看到,最后网ChannelPipeline中添加了两个ChannelHandlerChunkedWriteHandlerURLDispatcher,那么,这两个Handler在pipeline中的调用顺序是怎样的呢?

我们来看一下ChatGPT的准确回答:


在 Netty 中,ChannelPipeline 是一个处理事件的流水线。ChannelPipeline 中的每个节点是一个 ChannelHandler,负责处理特定类型的事件。当数据通过 Channel 时,它会在 ChannelPipeline 中的各个处理节点中依次经过,每个节点负责特定的处理。

对于 ChannelPipeline.addLast 添加的 ChannelHandler,它们的执行顺序是按照它们被添加的顺序来决定的。新添加的 ChannelHandler 会被添加到管道的尾部,因此会成为最后执行的处理器。

在读写的时候,事件的传递顺序如下:

  • 写操作: 当有数据要被写入 Channel 时,数据会从 ChannelPipeline 的尾部开始,依次经过每个 ChannelHandler write() 方法,最终传递给底层的通道。

  • 读操作: 当有数据从 Channel 读取时,数据会从 ChannelPipeline 的头部开始,依次经过每个 ChannelHandlerchannelRead() 方法,直到达到 ChannelPipeline 的尾部。


所以,在Netty Service向客户端写入数据的时候,从我们自定义的URLDispatcher中写入(其实内部会调用WebHdfsHandler),然后会被ChunkedWriteHandler进行处理,即chunk by chunk的进行数据的编码,最后发送给客户端。
根据ChunkedWriteHandler的官方文档,如果我们使用ChunkedWriteHandler来进行Chunk Transfer Encoding的数据传输,它的输入,即它的前一个Handler必须写入的是ChunkedInput格式,这就是WebHdfsHandler.onOpen()的处理方式,ChunkedStreamChunkedInput的实现类:

  private void onOpen(ChannelHandlerContext ctx) throws IOException {........resp = new DefaultHttpResponse(HTTP_1_1, OK);HttpHeaders headers = resp.headers();.....final DFSClient dfsclient = newDfsClient(nnId, conf);HdfsDataInputStream in = dfsclient.createWrappedInputStream(dfsclient.open(path, bufferSize, true));in.seek(offset);long contentLength = in.getVisibleLength() - offset;.......final InputStream data;........headers.set(CONTENT_LENGTH, contentLength);data = new LimitInputStream(in, contentLength);ctx.write(resp); //先写元数据ctx.writeAndFlush(new ChunkedStream(data) { // 再写content,即读取的文件流。这个文件流被封装为ChunkedStream,进而交给pipeline中的ChunkedWriteHanlder,最后发送给客户端@Overridepublic void close() throws Exception {super.close();dfsclient.close();}}).addListener(ChannelFutureListener.CLOSE);}

Response Header中为什么没有Transfer-Encoding: chunked

但是很奇怪,我们从HttpUrlConnection收到的服务器端的Response Header中并没有看到Transfer-Encoding:chunked,相反,我们看到了Content-Length: 192832,难道文件并不是以Chunked Transfer-Encoding的编码方式发送的?
其实不是这样的。HttpUrlConnectionChunked Transfer-Encoding的数据处理并完全依赖Transfer-Encoding:chunked的header,它可以自动识别数据本身的格式,然后在底层自动识别并解码Chunked Transfer-Encoding,然后直接将解码后的结以InputStream的方式返回给客户端。Netty的ChunkedWriteHandler帮我们发送数据到远程客户端时,尽管数据是按照chunked格式发送给的,但是其并不会帮我们加上Transfer-Encoding:chunked到Response header中去,但是客户端依然是可以正确解析到的。ChunkedWriteHandler的好处是:

  • 提供更灵活的数据处理:ChunkedWriteHandler可以接收 FileRegionHttpContentByteBuf 对象,并负责将它们转换成分块的形式发送到下一个 ChannelOutboundHandler
  • 自动处理分块传输:ChunkedWriteHandler会自动将数据分块,确保数据按照 Chunked Transfer Encoding 规范传输。
  • 对于大文件或大数据流,使用 ChunkedWriteHandler 可以在传输过程中分块发送,降低内存占用。
    我们看到, WebHDFS服务器端使用了ChunkedWriteHandler,因此在ChannelPipeline中,需要将读取块数据的InputStream封装成ChunkedStream,因为ChunkedStreamChunkedInput 接口的实现类:
    final InputStream data;if (contentLength >= 0) {headers.set(CONTENT_LENGTH, contentLength);data = new LimitInputStream(in, contentLength);} else {data = in;}ctx.write(resp);ctx.writeAndFlush(new ChunkedStream(data) { //将读取块数据的InputStream封装为ChunkedStream,供ChunkedWriteHandler进行进一步的分块处理@Overridepublic void close() throws Exception {super.close();dfsclient.close();}}).addListener(ChannelFutureListener.CLOSE);

所以,其实ChunkedWriteHandler只是分块写入数据的一种更好的封装方式,方便了我们进行chunk数据的写入,但其实我们完全可以不用ChunkedWriteHandler而手动进行chunk数据写入,比如,我们自己在header中添加Transfer-Encoding:chunked,然后按照 Chunked Transfer Encoding 去写数据,服务端示例代码如下:

public class NettyChunkedWriteHandlerExample {public static void main(String[] args) {.....try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new SimpleChannelInboundHandler<HttpMessage>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws Exception {FileInputStream fis = new FileInputStream("/Users/cwu/chunk-test");ChunkedStream chunkedStream = new ChunkedStream(fis);// 设置响应头DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK);HttpHeaders headers = response.headers();headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");headers.set(CONTENT_TYPE, APPLICATION_OCTET_STREAM);headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); // 设置`Transfer-Encoding:chunked`ctx.write(response);chunk_write_stream(fis, ctx);....serverBootstrap.bind(8087).sync().channel().closeFuture().sync();.....}public static void chunk_write_stream(InputStream inputStream,  ChannelHandlerContext ctx) throws IOException {// 读取文件并以 chunked 形式写入响应byte[] buffer = new byte[1024];int bytesRead;while ((bytesRead = inputStream.read(buffer)) != -1) {if(bytesRead != 1024){System.out.println("The final bytesRead is not 1024, its size is " + bytesRead);byte[] buffer_copy = Arrays.copyOfRange(buffer, 0, bytesRead);ctx.write(new DefaultHttpContent(Unpooled.copiedBuffer(buffer_copy)));}else{ctx.write(new DefaultHttpContent(Unpooled.copiedBuffer(buffer)));}}//写入最后一个空字符ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);}

这时候我们看基于HttpURLConnection的客户端,可以看到,客户端识别了Transfer-Encoding:chunked,下层Stream的封装已经自动有了ChunkedInputStream,同样完成了基于chunk的数据传输。

测试WebHDFS是否支持chunk Transfer-Encoding时的一个错误导致的错误判断

我在最初测试WebHDFS是否支持 Chunked Transfer-Encoding,错误的代码差点儿导致我做出了错误的结论,认为从WebHDFS读取数据无法使用Chunked Transfer Encoding
由于对Chunked Transfer-Encoding理解不正确,我以为要想让服务器端Chunk by Chunk的方式将文件传输过来,客户端需要在GET请求(由于是读文件,因此是GET请求)中添加Transfer-Encoding:chunked header,对于HttpURLConnection客户端,不用手动添加这个header,只需要调用connection.setChunkedStreamingMode(0);就可以了,我这样去做,但是服务器端抛出了一个400

java.io.IOException: Server returned HTTP response code: 400 for URL: http://rccd101-7a.sjc2.dev.conviva.com:9864/webhdfs/v1/user/hadoop/hello-world/session_summaries.PT_HOURLY_2023-09-07_2023-09-07.sql-2?op=OPEN&user.name=hadoop&namenoderpcaddress=olap-hdfs-test&offset=0at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1914)at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1512)at WebHDFSReadFileExample.chunk_read(WebHDFSReadFileExample.java:45)at WebHDFSReadFileExample.main(WebHDFSReadFileExample.java:86)

代码很简单,就是在创建HttpURLConnection的时候,增加了connection.setChunkedStreamingMode(0);。后来通过debug,发现,当我们调用connection.setChunkedStreamingMode(0);的时候,会带来了两个主要变化:

  1. 自动重定向会被disable:这个变化倒不是导致我们上面的异常的原因,但是需要引起注意。这意味着,即使我们没有显式地通过connection.setInstanceFollowRedirects(false);去关闭自动重定向,如果调用了connection.setChunkedStreamingMode(0);,自动重定向也会被关闭。加入式读文件,那么我们需要手动识别并处理307
  2. GET请求会被自动变成POST请求:这是导致400问题的原因。WebHDFS客户端是将Http Method和用户的operation联合在一起来识别操作的,代码如下。可以看到,如果是OPEN操作,那么Http Method必须是GET 才能被识别,因此,一个POST类型的OPEN操作无法被识别:
  public void handle(ChannelHandlerContext ctx, HttpRequest req)throws IOException, URISyntaxException {String op = params.op();HttpMethod method = req.getMethod();if (PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)&& method == PUT) {onCreate(ctx);} else if (PostOpParam.Op.APPEND.name().equalsIgnoreCase(op)&& method == POST) {onAppend(ctx);} else if (GetOpParam.Op.OPEN.name().equalsIgnoreCase(op)&& method == GET) {onOpen(ctx); //读取文件的HTTP Method必须是Get} else if(GetOpParam.Op.GETFILECHECKSUM.name().equalsIgnoreCase(op)&& method == GET) {onGetFileChecksum(ctx);} else if(PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)&& method == OPTIONS) {allowCORSOnCreate(ctx);} else {throw new IllegalArgumentException("Invalid operation " + op);}
  • 那么,为什么我们调用connection.setChunkedStreamingMode(0);,请求类型会被强制改成POST呢?因为connection.setChunkedStreamingMode(0);本来就应该是数据发送方的事情,数据的写入方通过设置Transfer-Encoding:chunked来告知数据接受方自己的数据是Chunked Transfer Coding,请按照Chunked Transfer Coding格式进行解析。所以,我们在读取WebHDFS文件的时候设置connection.setChunkedStreamingMode(0);是错误的操作。

WebHDFS的特性和不足总结

从上面的代码分析可以看到,先抛开性能差异不谈,在具体实现上,基于Hadoop Native的客户端的很多实现细节,肯定是无法在基于Http的WebHDFS中实现的。因此,WebHDFS表面上能成为Hadoop Native的替代,但是其实整个流程都发生了变化。总而言之,我们通过实验和HDFS的代码发现:

  1. WebHDFS不支持HDFS HA。但是这还好,我们可以在WebHDFS客户端上层手动做判断,实现HA。
  2. 写大文件的时候所有Block的第一个Replica会集中在某一台机器上。
  3. 读取文件的时候,文件的所有Block的读取操作都来自于第一个Block的某一个Replica机器
  4. 基于Http本身的限制,hedged read肯定完全不再可能。
  5. 数据的写入和读取都支持Chunked Transfer Coding
  6. 性能问题。我没有进行benchmark测试,有兴趣的读者可以自行寻找测试结果或者自行测试。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/627733.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

postman 简单测试(二)

接着上一节 https://blog.csdn.net/myy2012/article/details/135616719 1.Tests的简单使用&#xff08;后置处理器&#xff09; 具体的截图是每一步操作后得来的&#xff0c;记录方便自己以后查阅&#xff0c;也希望能帮助到有缘人。 1.1 把返回值存入到环境变量中&#xff…

protobuf学习日记 | 初识protobuf

目录 前言 一、序列化与反序列化 二、protobuf是什么 三、protobuf的使用特点 四、快速上手 1、proto文件编写 2、编译proto文件 3、序列化与反序列化的使用 前言 这是小编新开的一个栏目&#xff0c;为了记录自己在学习ProtoBuf的历程&#xff0c;也希望能帮助大家&am…

亚马逊店飞飞ERP系统,跟卖+铺货+物流发货模式综合一体的ERP系统

跨境电商亚马逊&#xff0c;目前为止电商行业比较靠前的电商平台&#xff01;那么有人做电商&#xff0c;就会有人出单&#xff0c;有人出单就会有中转仓需求&#xff0c;代打包&#xff0c;代贴单&#xff01;那么这一切都是有一套逻辑完善的ERP来完成&#xff01;前端通过授权…

将.NET应用转换成Window服务

写在前面 本文介绍了将.NET8.0应用程序转换成Windows服务。 需要在NuGet中获取并安装&#xff1a;Microsoft.Extensions.Hosting.WindowsServices 包 代码实现 using System.Runtime.InteropServices; using WorkerService1;public class Program {public static void Main…

Kafka 简介

目录 1、概念介绍 Kafka 由来 ZooKeeper Kafka 特性 Kafka 使用场景 Kafka 复制备份 2、Kafka 架构 Broker Topic Producer Partition Consumers Consumer Group Distribution 1、概念介绍 Kafka 由来 Kafka 是最初由 Linkedin 公司开发&#xff0c;是一个分布…

aigc修复美颜学习笔记

目录 GFPGAN进行图像人脸修复 美颜 修复畸形手势 GFPGAN进行图像人脸修复 原文&#xff1a;本地使用GFPGAN进行图像人脸修复_人相修复处理网页 csdn-CSDN博客 人脸修复 1.下载项目和权重文件 2.部署环境 3.下载权重文件 4.运行代码 5.网页端体验 首先来看一下效果图 1.下…

uni-app的项目创建和环境搭建

uni-app 是一个使用 Vue.js 开发所有前端应用的框架&#xff0c;开发者编写一套代码&#xff0c;可发布到iOS、Android、Web&#xff08;响应式&#xff09;、以及各种小程序&#xff08;微信/支付宝/百度/头条/飞书/QQ/快手/钉钉/淘宝&#xff09;、快应用等多个平台。 第一步…

clip安装使用教程

1.配置环境 安装依赖 pip install transformers pip install torch 看缺失什么包自己先安装好 2.安装clip 进入https://github.com/openai/CLIP&#xff0c;先将CLIP文件夹下载到本地&#xff0c;随便什么位置。即点击下图中的Download ZIP&#xff0c;下载到本地后进行解压…

HNU-编译原理-实验2-Bison

编译原理实验2Bison 计科210X 甘晴void 202108010XXX 实验要求 详细的实验项目文档为 https://gitee.com/coderwym/cminus_compiler-2023-fall/tree/master/Documentations/lab2 实验步骤 本次实验需要在 Lab1 已完成的 flex 词法分析器的基础上&#xff0c;进一步使用 b…

某侠网js逆向wasm解析

本次目标地址如下&#xff0c;使用base64解密获得 aHR0cHM6Ly93d3cud2FpbWFveGlhLm5ldC9sb2dpbg 打开网址&#xff0c;本次的目标是登录接口&#xff0c;如下图 本文主要讲解wasm的解析&#xff0c;所以对其他参数不做逆向处理&#xff0c;本次由wasm加密的参数只有sign一个&a…

DApp:去中心化的革命与挑战

DApp&#xff08;去中心化应用&#xff09;是一种基于区块链技术的应用程序&#xff0c;与传统的中心化应用程序不同&#xff0c;DApp具有去中心化、透明、不可篡改等特性。本文将介绍DApp的前世今生&#xff0c;以及它的优势和未来发展。 DApp的前世可以追溯到区块链技术的出现…

运维工具之tmux命令

tmux终端复用器的使用 1.tmux的概念 ​ tmux&#xff0c;“Terminal MultipleXer”,意思是"终端复用器"。是一个可以让人们通过一个窗口操作多个会话的工具&#xff0c;对于经常操作Linux系统的运维人员来说&#xff0c;绝对是一款提高工作效率的利器。 2.tmux能帮…

Kubernetes API 和流量控制:管理请求数量和排队进程

本文描述了我们最近遇到的一个真实案例&#xff1a;Kubernetes API 因其中一个集群中的大量请求而瘫痪。今天&#xff0c;我们将讨论我们如何处理这个问题&#xff0c;并提供一些关于如何预防它的提示。 高并发搞崩 Kubernetes API 一个非常普通的早晨&#xff0c;我们开始了…

SSC | Blue Prism报告:2024年智能自动化(IA)7大趋势预测

近日&#xff0c;RPA行业领导者SS&C | Blue Prism发布《2024智能自动化&#xff08;IA&#xff09;趋势与预测》报告。报告中提到&#xff0c;智能自动化&#xff08;IA&#xff09;与流程管理的有效融合&#xff0c;是实现数字化转型成功的核心。采用业务流程管理&#xf…

免费开源OCR 软件Umi-OCR

Umi-OCR 是一款免费、开源、可批量的离线 OCR 软件&#xff0c;基于 PaddleOCR&#xff0c;适用于 Windows10/11 平台 免费&#xff1a;本项目所有代码开源&#xff0c;完全免费。方便&#xff1a;解压即用&#xff0c;离线运行&#xff0c;无需网络。高效&#xff1a;自带高效…

数组的定义与越界问题

scanf标准读取函数 第一个冷知识&#xff0c;输入到scanf里面的内容都是字符串形式&#xff0c;但是&#xff01; scanf(“%d”,&a),%d决定了如何对输入的字符串进行操作 scanf用来读取标准输入&#xff0c;标准输入的内容需要放入到某个变量空间中去&#xff0c;因此变量…

Python数据分析案例34——IMDB电影评论情感分析(Transformer)

电影评论的情感分析 案例背景 很多同学对电影系列的数据都比较喜欢&#xff0c;那我就补充一下这个最经典的文本分类数据集&#xff0c;电影情感评论分析。用神经网络做。对国外的英文评论文本进行分类&#xff0c;看是正面还是负面情感。 数据集介绍 数据集&#xff1a;IMDb…

Python Tkinter Pack布局管理器

GUI 编程就相当于小孩子搭积木&#xff0c;每个积木块应该放在哪里&#xff0c;每个积木块显示为多大&#xff0c;也就是对大小和位置都需要进行管理&#xff0c;而布局管理器正是负责管理各组件的大小和位置的。此外&#xff0c;当用户调整了窗口的大小之后&#xff0c;布局管…

sphinx,一个神奇的 Python 库!

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 大家好&#xff0c;今天为大家分享一个神奇的 Python 库 - sphinx。 Github地址&#xff1a;https://github.com/sphinx-doc/sphinx/ 在软件开发和项目管理中&#xff0c;文档是不可或缺的一部分。好的文档可以…