Hadoop源码阅读(三):HDFS上传

说明:
1.Hadoop版本:3.1.3
2.阅读工具:IDEA 2023.1.2
3.源码获取:Index of /dist/hadoop/core/hadoop-3.1.3 (apache.org)
4.工程导入:下载源码之后得到 hadoop-3.1.3-src.tar.gz 压缩包,在当前目录打开PowerShell,使用tar -zxvf指令解压即可,然后使用IDEA打开hadoop-3.1.3-src文件夹,要注意配置好Maven或Gradle仓库,否则jar包导入会比较慢
5.参考课程:www.bilibili.com/video/BV1Qp…

HDFS上传

一个简单的上传代码:

public void test() throws IOException {FSDataOutputStream fos = fs.create(new Path("/input"));fos.write("hello world".getBytes());
}

可以看到,首先创建了一个FSDataOutputStream,然后向其中写数据; 接下来就分为 create创建过程 和 write上传过程 分别进行源码阅读解析

create创建过程

1.客户端向NN发送创建请求

首先进入create方法中,来到FileSystem.java:

找到create方法,继续进入,直到找到静态方法create

因此返回到该静态方法的调用:

ctrl+alt+B查找该静态方法的实现类:

进入DistributedFileSystem中:

继续向下查找:

可以看到在doCall方法中创建了一个输出流对象;

继续进入create方法,来到DFSClient.java中:

不断向下查找,找到newStreamForCreate方法:

进入newStreamForCreate方法,来到DFSOutputStream.java

这里客户端将创建请求通过RPC通信发送给NN进行处理

开启线程

2.NN处理来自客户端的创建请求

newStreamForCreate方法中进入create方法,来到ClientProtocol.java:

查找其实现类:

进入NameNodeRpcServer,create方法如下:

 @Override // ClientProtocolpublic HdfsFileStatus create(String src, FsPermission masked,String clientName, EnumSetWritable<CreateFlag> flag,boolean createParent, short replication, long blockSize,CryptoProtocolVersion[] supportedVersions, String ecPolicyName)throws IOException {checkNNStartup(); //检查NN是否启动String clientMachine = getClientMachine();if (stateChangeLog.isDebugEnabled()) {stateChangeLog.debug("*DIR* NameNode.create: file "+src+" for "+clientName+" at "+clientMachine);}if (!checkPathLength(src)) { //检查路径长度throw new IOException("create: Pathname too long.  Limit "+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");}namesystem.checkOperation(OperationCategory.WRITE);CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);if (cacheEntry != null && cacheEntry.isSuccess()) { //缓存相关检查return (HdfsFileStatus) cacheEntry.getPayload();}HdfsFileStatus status = null;try {PermissionStatus perm = new PermissionStatus(getRemoteUser().getShortUserName(), null, masked);//开启文件(重要)status = namesystem.startFile(src, perm, clientName, clientMachine,flag.get(), createParent, replication, blockSize, supportedVersions,ecPolicyName, cacheEntry != null);} finally {RetryCache.setState(cacheEntry, status != null, status);}metrics.incrFilesCreated();metrics.incrCreateFileOps();return status;}

接下来进入startFile方法,来到FSNamesystem.java:

进入startFileInt

将src(文件路径)封装到INodesInPath中;

对于INodesInPath类的解释:Contains INodes information resolved from a given path.

首先我们需要明确INodes类的概念:

INodes是一个抽象类,官方对于其的解释如下:

简单来说一个基本的INode类是一种保留在内存中的文件/块层次结构的表示形式,包含文件和目录索引节点的公共字段

可以看到INodes是最底层的一个类,保存一些文件目录共有的属性,而INodesInPath类则保存了从给定的路径解析出的INode信息;

接下来定位到startFile

进入startFile

  • 首先对文件路径是否存在进行校验:

进入getLastINode

进入getINode

可以看到,i=-1时,return inodes[inodes.length-1];

也就是说,获取最后位置上的inode,如果有,说明文件路径已经存在;

接下来再判断是否允许覆写:

如果不允许覆写,则会抛出异常,告知文件路径已存在,不允许重复上传文件;

  • 然后判断是否存在父目录:

如果父目录存在,则向其中添加文件元数据信息(addFile方法)

进入addFile方法:

进入addINode

将数据写入到INode的目录树中;至此文件目录创建完毕

3.DataStreamer启动流程

NN处理完成后,再次回到客户端,启动相应线程;

打开DFSOutputStream.java,找到newStreamForCreate方法,NN完成创建请求后,进行输出流的创建:

定位到DFSOutputStream

计算chunk大小(Directory => File => Block(128M) => packet(64K) => chunk(chunk 512byte + chunksum 4byte))

返回到newStreamForCreate方法,进入out.start()

继续进入:

继续进入DataStreamer

进入Daemon

可以看到,out.start方法开启了一个线程,因此回到DataStreamer,搜索run方法:

如果dataQueue中没有数据,代码会进行阻塞;

如果dataQueue不为空,则从其中取出packet

write上传过程

1.向DataStreamer的队列里面写数据

create阶段启动了DataStreamer,在write阶段向其中写数据;

进入write方法,到FilterOutputStream.java中:

一直前进,直到抽象方法write

ctrl+alt+B查找其实现类:

进入FSOutputSummer.java,定位到write方法:

进入flushBuffer方法,顾名思义即为刷写缓冲区:

进入writeChecksumChunks方法:

进入writeChunk方法(将chunk写入数据队列):

是一个抽象方法,因此查找其实现类:

进入DFSOutputStream.java,查看writeChunk方法的具体实现逻辑,如下:

  @Overrideprotected synchronized void writeChunk(byte[] b, int offset, int len,byte[] checksum, int ckoff, int cklen) throws IOException {writeChunkPrepare(len, ckoff, cklen);currentPacket.writeChecksum(checksum, ckoff, cklen); //往packet里面写chunk的校验和 4bytecurrentPacket.writeData(b, offset, len); // 往packet里面写一个chunk  512byte// 记录写入packet中的chunk个数,累计到127个chuck,这个packet就满了currentPacket.incNumChunks();getStreamer().incBytesCurBlock(len);//如果packet已经满了,则将其放入队列等待传输if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||getStreamer().getBytesCurBlock() == blockSize) {enqueueCurrentPacketFull(); }}

进入enqueueCurrentPacketFull方法:

进入enqueueCurrentPacket方法:

进入waitAndQueuePacket方法:

void waitAndQueuePacket(DFSPacket packet) throws IOException {synchronized (dataQueue) {try {// 如果队列满了,则等待boolean firstWait = true;try {while (!streamerClosed && dataQueue.size() + ackQueue.size() >dfsClient.getConf().getWriteMaxPackets()) {if (firstWait) {Span span = Tracer.getCurrentSpan();if (span != null) {span.addTimelineAnnotation("dataQueue.wait");}firstWait = false;}try {dataQueue.wait(); //等待队列有充足的空间} catch (InterruptedException e) {// If we get interrupted while waiting to queue data, we still need to get rid// of the current packet. This is because we have an invariant that if// currentPacket gets full, it will get queued before the next writeChunk.//// Rather than wait around for space in the queue, we should instead try to// return to the caller as soon as possible, even though we slightly overrun// the MAX_PACKETS length.Thread.currentThread().interrupt();break;}}} finally {Span span = Tracer.getCurrentSpan();if ((span != null) && (!firstWait)) {span.addTimelineAnnotation("end.wait");}}checkClosed();//如果队列没满,则向队列中添加数据queuePacket(packet);} catch (ClosedChannelException ignored) {}}}

进入queuePacket方法(向队列中添加数据的逻辑),来到DataStreamer.java中:

2.建立管道

2.1机架感知(确定block的存储位置)

Ctrl + n全局查找DataStreamer,搜索run方法:

  @Overridepublic void run() {long lastPacket = Time.monotonicNow();TraceScope scope = null;while (!streamerClosed && dfsClient.clientRunning) {// if the Responder encountered an error, shutdown Responderif (errorState.hasError()) {closeResponder();}DFSPacket one;try {// process datanode IO errors if anyboolean doSleep = processDatanodeOrExternalError();final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;//步骤一:等待要发送的packet到来synchronized (dataQueue) {// wait for a packet to be sent.long now = Time.monotonicNow();while ((!shouldStop() && dataQueue.size() == 0 &&(stage != BlockConstructionStage.DATA_STREAMING ||now - lastPacket < halfSocketTimeout)) || doSleep) {long timeout = halfSocketTimeout - (now-lastPacket);timeout = timeout <= 0 ? 1000 : timeout;timeout = (stage == BlockConstructionStage.DATA_STREAMING)?timeout : 1000;try {//如果dataQueue中没有数据,代码会阻塞在这里dataQueue.wait(timeout);} catch (InterruptedException  e) {LOG.warn("Caught exception", e);}doSleep = false;now = Time.monotonicNow();}if (shouldStop()) {continue;}// 获取要发送的数据包if (dataQueue.isEmpty()) {one = createHeartbeatPacket();} else {try {backOffIfNecessary();} catch (InterruptedException e) {LOG.warn("Caught exception", e);}//如果数据队列不为空,则从其中取出packetone = dataQueue.getFirst(); SpanId[] parents = one.getTraceParents();if (parents.length > 0) {scope = dfsClient.getTracer().newScope("dataStreamer", parents[0]);scope.getSpan().setParents(parents);}}}//步骤二:从NN获取新的blockif (LOG.isDebugEnabled()) {LOG.debug("stage=" + stage + ", " + this);}if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {LOG.debug("Allocating new block: {}", this);//向NN申请block并建立数据管道(Pipeline)setPipeline(nextBlockOutputStream());//启动ResponseProcessor用来监听packet发送是否成功initDataStreaming();} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {LOG.debug("Append to block {}", block);setupPipelineForAppendOrRecovery();if (streamerClosed) {continue;}initDataStreaming();}long lastByteOffsetInBlock = one.getLastByteOffsetBlock();if (lastByteOffsetInBlock > stat.getBlockSize()) {throw new IOException("BlockSize " + stat.getBlockSize() +" < lastByteOffsetInBlock, " + this + ", " + one);}if (one.isLastPacketInBlock()) {// wait for all data packets have been successfully ackedsynchronized (dataQueue) {while (!shouldStop() && ackQueue.size() != 0) {try {// wait for acks to arrive from datanodesdataQueue.wait(1000);} catch (InterruptedException  e) {LOG.warn("Caught exception", e);}}}if (shouldStop()) {continue;}stage = BlockConstructionStage.PIPELINE_CLOSE;}// 步骤三:发送packetSpanId spanId = SpanId.INVALID;synchronized (dataQueue) {// move packet from dataQueue to ackQueueif (!one.isHeartbeatPacket()) {if (scope != null) {spanId = scope.getSpanId();scope.detach();one.setTraceScope(scope);}scope = null;dataQueue.removeFirst(); //从dataQueue 把要发送的这个packet 移除出去ackQueue.addLast(one); //ackQueue 里面添加这个packetpacketSendTime.put(one.getSeqno(), Time.monotonicNow());dataQueue.notifyAll();}}LOG.debug("{} sending {}", this, one);// 步骤四:向DN中写数据try (TraceScope ignored = dfsClient.getTracer().newScope("DataStreamer#writeTo", spanId)) {one.writeTo(blockStream); //写出数据blockStream.flush();} catch (IOException e) {// HDFS-3398 treat primary DN is down since client is unable to// write to primary DN. If a failed or restarting node has already// been recorded by the responder, the following call will have no// effect. Pipeline recovery can handle only one node error at a// time. If the primary node fails again during the recovery, it// will be taken out then.errorState.markFirstNodeIfNotMarked();throw e;}lastPacket = Time.monotonicNow();// update bytesSentlong tmpBytesSent = one.getLastByteOffsetBlock();if (bytesSent < tmpBytesSent) {bytesSent = tmpBytesSent;}if (shouldStop()) {continue;}// Is this block full?if (one.isLastPacketInBlock()) {// wait for the close packet has been ackedsynchronized (dataQueue) {while (!shouldStop() && ackQueue.size() != 0) {dataQueue.wait(1000);// wait for acks to arrive from datanodes}}if (shouldStop()) {continue;}endBlock();}if (progress != null) { progress.progress(); }// This is used by unit test to trigger race conditions.if (artificialSlowdown != 0 && dfsClient.clientRunning) {Thread.sleep(artificialSlowdown);}} catch (Throwable e) {// Log warning if there was a real error.if (!errorState.isRestartingNode()) {// Since their messages are descriptive enough, do not always// log a verbose stack-trace WARN for quota exceptions.if (e instanceof QuotaExceededException) {LOG.debug("DataStreamer Quota Exception", e);} else {LOG.warn("DataStreamer Exception", e);}}lastException.set(e);assert !(e instanceof NullPointerException);errorState.setInternalError();if (!errorState.isNodeMarked()) {// Not a datanode issuestreamerClosed = true;}} finally {if (scope != null) {scope.close();scope = null;}}}closeInternal();}

进入nextBlockOutputStream(第68行):

进入locateFollowingBlock

进入addBlock

进入addBlock,来到ClientProtocol类:

因此可以判断,该方法是通过NN的客户端代理来实现的

查找其实现类:

进入NameNodeRpcServer,定位到addBlock:

进入getAdditionalBlock

选择block的存储位置;

进入chooseTargetForNewBlock

进入chooseTarget4NewBlock

进入chooseTarget

继续进入chooseTarget

可以看到其是一个抽象类,因此查找其实现类:

进入BlockPlacementPolicyDefault.java:

进入chooseTarget

进入chooseTarget

进入chooseTargetInOrder,即机架感知的逻辑:

  protected Node chooseTargetInOrder(int numOfReplicas, Node writer,final Set<Node> excludedNodes,final long blocksize,final int maxNodesPerRack,final List<DatanodeStorageInfo> results,final boolean avoidStaleNodes,final boolean newBlock,EnumMap<StorageType, Integer> storageTypes)throws NotEnoughReplicasException {final int numOfResults = results.size();if (numOfResults == 0) {//第一个block存储在当前节点DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,storageTypes, true);writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor(): null;if (--numOfReplicas == 0) {return writer;}}final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();if (numOfResults <= 1) {//第二个block存储在另外一个机架chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);if (--numOfReplicas == 0) {return writer;}}if (numOfResults <= 2) {final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();if (clusterMap.isOnSameRack(dn0, dn1)) {//如果第一个和第二个在同一个机架,那么第三个放在其他机架chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);} else if (newBlock){//如果是新块,和第二个块存储在同一个机架chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);} else {//如果不是新块,放在当前机架chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);}if (--numOfReplicas == 0) {return writer;}}chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,maxNodesPerRack, results, avoidStaleNodes, storageTypes);return writer;}
2.2 socket发送

回到nextBlockOutputStream

进入createBlockOutputStream

从注释可以看出,该方法的主要功能是和管道中的第一个DN建立连接;

  boolean createBlockOutputStream(DatanodeInfo[] nodes,StorageType[] nodeStorageTypes, String[] nodeStorageIDs,long newGS, boolean recoveryFlag) {if (nodes.length == 0) {LOG.info("nodes are empty for write pipeline of " + block);return false;}String firstBadLink = "";boolean checkRestart = false;if (LOG.isDebugEnabled()) {LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this);}// persist blocks on namenode on next flushpersistBlocks.set(true);int refetchEncryptionKey = 1;while (true) {boolean result = false;DataOutputStream out = null;try {assert null == s : "Previous socket unclosed";assert null == blockReplyStream : "Previous blockReplyStream unclosed";//和DN创建socket连接s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);long readTimeout = dfsClient.getDatanodeReadTimeout(nodes.length);//输出流,用于写数据到DNOutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);//输入流,用于读取写数据到DN的结果InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);unbufOut = saslStreams.out;unbufIn = saslStreams.in;out = new DataOutputStream(new BufferedOutputStream(unbufOut,DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));blockReplyStream = new DataInputStream(unbufIn);//// Xmit header info to datanode//BlockConstructionStage bcs = recoveryFlag ?stage.getRecoveryStage() : stage;// We cannot change the block length in 'block' as it counts the number// of bytes ack'ed.ExtendedBlock blockCopy = block.getCurrentBlock();blockCopy.setNumBytes(stat.getBlockSize());boolean[] targetPinnings = getPinnings(nodes);// 发送数据new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,nodes.length, block.getNumBytes(), bytesSent, newGS,checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,(targetPinnings != null && targetPinnings[0]), targetPinnings,nodeStorageIDs[0], nodeStorageIDs);// receive ack for connectBlockOpResponseProto resp = BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(blockReplyStream));Status pipelineStatus = resp.getStatus();firstBadLink = resp.getFirstBadLink();// Got an restart OOB ack.// If a node is already restarting, this status is not likely from// the same node. If it is from a different node, it is not// from the local datanode. Thus it is safe to treat this as a// regular node error.if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&!errorState.isRestartingNode()) {checkRestart = true;throw new IOException("A datanode is restarting.");}String logInfo = "ack with firstBadLink as " + firstBadLink;DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);assert null == blockStream : "Previous blockStream unclosed";blockStream = out;result =  true; // successerrorState.resetInternalError();lastException.clear();// remove all restarting nodes from failed nodes listfailed.removeAll(restartingNodes);restartingNodes.clear();} catch (IOException ie) {if (!errorState.isRestartingNode()) {LOG.info("Exception in createBlockOutputStream " + this, ie);}if (ie instanceof InvalidEncryptionKeyException &&refetchEncryptionKey > 0) {LOG.info("Will fetch a new encryption key and retry, "+ "encryption key was invalid when connecting to "+ nodes[0] + " : " + ie);// The encryption key used is invalid.refetchEncryptionKey--;dfsClient.clearDataEncryptionKey();// Don't close the socket/exclude this node just yet. Try again with// a new encryption key.continue;}// find the datanode that matchesif (firstBadLink.length() != 0) {for (int i = 0; i < nodes.length; i++) {// NB: Unconditionally using the xfer addr w/o hostnameif (firstBadLink.equals(nodes[i].getXferAddr())) {errorState.setBadNodeIndex(i);break;}}} else {assert !checkRestart;errorState.setBadNodeIndex(0);}final int i = errorState.getBadNodeIndex();// Check whether there is a restart worth waiting for.if (checkRestart) {errorState.initRestartingNode(i,"Datanode " + i + " is restarting: " + nodes[i],shouldWaitForRestart(i));}errorState.setInternalError();lastException.set(ie);result =  false;  // error} finally {if (!result) {IOUtils.closeSocket(s);s = null;IOUtils.closeStream(out);IOUtils.closeStream(blockReplyStream);blockReplyStream = null;}}return result;}}

进入writeBlock

进入send:

通过flush刷写数据;

2.3.socket接收

数据接收是DN的任务,因此进入DataXceiverServer.java,定位到run方法:

接收socket请求;

客户端每发送一个block,都启动一个DataXceiver去处理block

进入DataXceiver,定位到run方法:

读取数据的操作类型;

根据操作类型处理数据;

进入processOp

可以看到不同的操作类型

进入opWriteBlock(写数据):

Ctrl +alt +b 查找writeBlock的实现类,进入DataXceiver.java:

创建一个BlockReceiver;

向下游socket中发送数据

接下来进入getBlockReceiver

进入BlockReceiver

创建管道;

进入createRbw

进入FsDatasetImpl.java:

进入createRbw

通过createRbwFile创建file

3.客户端接收DN的应答

回到DataStreamer.java,定位到run:

通过initDataStreaming方法来启动ResponseProcessor,用于监听packet发送是否成功;

创建ResponseProcessor并启动线程;

进入ResponseProcessor,定位到run:

    @Overridepublic void run() {setName("ResponseProcessor for block " + block);PipelineAck ack = new PipelineAck();TraceScope scope = null;while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {// 处理来自DN的应答try {// 从管道中读取一个ackack.readFields(blockReplyStream);if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {Long begin = packetSendTime.get(ack.getSeqno());if (begin != null) {long duration = Time.monotonicNow() - begin;if (duration > dfsclientSlowLogThresholdMs) {LOG.info("Slow ReadProcessor read fields for block " + block+ " took " + duration + "ms (threshold="+ dfsclientSlowLogThresholdMs + "ms); ack: " + ack+ ", targets: " + Arrays.asList(targets));}}}if (LOG.isDebugEnabled()) {LOG.debug("DFSClient {}", ack);}long seqno = ack.getSeqno();// processes response status from datanodes.ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {final Status reply = PipelineAck.getStatusFromHeader(ack.getHeaderFlag(i));if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==PipelineAck.ECN.CONGESTED) {congestedNodesFromAck.add(targets[i]);}// Restart will not be treated differently unless it is// the local node or the only one in the pipeline.if (PipelineAck.isRestartOOBStatus(reply)) {final String message = "Datanode " + i + " is restarting: "+ targets[i];errorState.initRestartingNode(i, message,shouldWaitForRestart(i));throw new IOException(message);}// node errorif (reply != SUCCESS) {errorState.setBadNodeIndex(i); // mark bad datanodethrow new IOException("Bad response " + reply +" for " + block + " from datanode " + targets[i]);}}if (!congestedNodesFromAck.isEmpty()) {synchronized (congestedNodes) {congestedNodes.clear();congestedNodes.addAll(congestedNodesFromAck);}} else {synchronized (congestedNodes) {congestedNodes.clear();lastCongestionBackoffTime = 0;}}assert seqno != PipelineAck.UNKOWN_SEQNO :"Ack for unknown seqno should be a failed ack: " + ack;if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ackcontinue;}// 标志成功传输的ackDFSPacket one;synchronized (dataQueue) {one = ackQueue.getFirst();}if (one.getSeqno() != seqno) {throw new IOException("ResponseProcessor: Expecting seqno " +" for block " + block +one.getSeqno() + " but received " + seqno);}isLastPacketInBlock = one.isLastPacketInBlock();// Fail the packet write for testing in order to force a// pipeline recovery.if (DFSClientFaultInjector.get().failPacket() &&isLastPacketInBlock) {failPacket = true;throw new IOException("Failing the last packet for testing.");}// update bytesAckedblock.setNumBytes(one.getLastByteOffsetBlock());synchronized (dataQueue) {scope = one.getTraceScope();if (scope != null) {scope.reattach();one.setTraceScope(null);}lastAckedSeqno = seqno;pipelineRecoveryCount = 0;ackQueue.removeFirst(); //从ack队列中移除packetSendTime.remove(seqno);dataQueue.notifyAll(); //通知dataQueue应答处理完毕one.releaseBuffer(byteArrayManager);}} catch (Exception e) {if (!responderClosed) {lastException.set(e);errorState.setInternalError();errorState.markFirstNodeIfNotMarked();synchronized (dataQueue) {dataQueue.notifyAll();}if (!errorState.isRestartingNode()) {LOG.warn("Exception for " + block, e);}responderClosed = true;}} finally {if (scope != null) {scope.close();}scope = null;}}}

至此,客户端成功收到DN的应答后,上传过程完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/86121.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023华为杯数学建模竞赛E题

一、前言 颅内出血&#xff08;ICH&#xff09;是由多种原因引起的颅腔内出血性疾病&#xff0c;既包括自发性出血&#xff0c;又包括创伤导致的继发性出血&#xff0c;诊断与治疗涉及神经外科、神经内科、重症医学科、康复科等多个学科&#xff0c;是临床医师面临的重要挑战。…

Python之网络编程

一、网络编程 互联网时代,现在基本上所有的程序都是网络程序,很少有单机版的程序了。 网络编程就是如何在程序中实现两台计算机的通信。 Python语言中,提供了大量的内置模块和第三方模块用于支持各种网络访问,而且Python语言在网络通信方面的优点特别突出,远远领先其他语…

KT142C语音芯片flash型用户如何更新固件的说明_V2

目录 一、简介 2.1 让芯片进入PC模式 2.2 双击提供的exe程序即可 一、简介 正常的情况下&#xff0c;用户肯定是不需要更新固件的&#xff0c;因为芯片出厂默认就烧录了对应的程序固件&#xff0c;但是有客户可能需要小修小改&#xff0c;或者订制一下某些功能&#xff0c…

Linux设备驱动之Camera驱动

Linux设备驱动之Camera驱动 Camera&#xff0c;相机&#xff0c;平常手机使用较多&#xff0c;但是手机的相机怎么进行拍照的&#xff0c;硬件和软件&#xff0c;都是如何配合拍摄到图像的&#xff0c;下面大家一起来了解一下。 基础知识 在介绍具体Camera框架前&#xff0c…

Linux——进程

目录 一、基本概念 二、描述进程-PCB &#xff08;一&#xff09;task_struct-PCB的一种 &#xff08;二&#xff09;task_ struct内容分类 三、查看进程 &#xff08;一&#xff09;利用ps命令 &#xff08;二&#xff09; 通过 /proc 系统文件夹查看 &#xff08;三…

停车场系统源码

源码下载地址&#xff08;小程序开源地址&#xff09;&#xff1a;停车场系统小程序&#xff0c;新能源电动车充电系统&#xff0c;智慧社区物业人脸门禁小程序: 【涵盖内容】&#xff1a;城市智慧停车系统&#xff0c;汽车新能源充电&#xff0c;两轮电动车充电&#xff0c;物…

zemaxMIF曲线图

调制传递函数&#xff08; Modulation Transfer Function&#xff0c;MTF &#xff09;是用来形容光学系统成像质量的重要指标。 通过对光学系统像空间进行傅里叶变换&#xff0c;可以得到一张分析图表&#xff0c;来描述像面上对比度和空间频率之间的对应关系。 对比度&…

C/C++统计满足条件的4位数个数 2023年5月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析

目录 C/C统计满足条件的4位数个数 一、题目要求 1、编程实现 2、输入输出 二、解题思路 1、案例分析 三、程序代码 四、程序说明 五、运行结果 六、考点分析 C/C统计满足条件的4位数个数 2019年12月 C/C编程等级考试一级编程题 一、题目要求 1、编程实现 给定若干…

numpy 和 tensorflow 中的各种乘法(点乘和矩阵乘)

嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 &#x1f447; &#x1f447; &#x1f447; 更多精彩机密、教程&#xff0c;尽在下方&#xff0c;赶紧点击了解吧~ python源码、视频教程、插件安装教程、资料我都准备好了&#xff0c;直接在文末名片自取就可 点乘和矩阵乘…

【深度学习实验】前馈神经网络(三):自定义多层感知机(激活函数logistic、线性层算Linear)

目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入必要的工具包 1. 构建数据集 2. 激活函数logistic 3. 线性层算子 Linear 4. 两层的前馈神经网络MLP 5. 模型训练 一、实验介绍 本实验实现了一个简单的两层前馈神经网络 激活函数…

Linux 链表示例 LIST_INIT LIST_INSERT_HEAD

list(3) — Linux manual page 用Visual Studio 2022创建CMake项目 * CmakeLists.txt # CMakeList.txt : Top-level CMake project file, do global configuration # and include sub-projects here. # cmake_minimum_required (VERSION 3.12)project ("llist")# I…

云原生Kubernetes:K8S存储卷

目录 一、理论 1.存储卷 2.emptyDir 存储卷 3.hostPath卷 4.NFS共享存储 5.PVC 和 PV 6.静态创建PV 7.动态创建PV 二、实验 1.emptyDir 存储卷 2.hostPath卷 3.NFS共享存储 4.静态创建PV 5.动态创建PV 三、问题 1.生成pod一直pending 2.shoumount -e未显示共享…

YOLOv5如何训练自己的数据集

文章目录 前言1、数据标注说明2、定义自己模型文件3、训练模型4、参考文献 前言 本文主要介绍如何利用YOLOv5训练自己的数据集 1、数据标注说明 以生活垃圾数据集为例子 生活垃圾数据集&#xff08;YOLO版&#xff09;点击这里直接下载本文生活垃圾数据集 生活垃圾数据集组成&…

CTF 全讲解:[SWPUCTF 2021 新生赛]jicao

文章目录 参考环境题目index.phphighlight_file()include()多次调用&#xff0c;多次执行单次调用&#xff0c;单次执行 $_POST超全局变量HackBarHackBar 插件的获取 $_POST打开 HackBar 插件通过 HackBar 插件发起 POST 请求 GET 请求查询字符串超全局变量 $_GET JSONJSON 数据…

Android 滑动事件消费监控,Debug 环境下通用思路

Android Debug 环境下滑动事件消费监控通用思路 背景 Android 开发中&#xff0c;经常会遇到滑动事件冲突。在一些简单的场景下&#xff0c;我们如果能够知道是那个 View 拦截了事件&#xff0c;那我们能够很容易得解决。解决方法通常就是内部拦截法或者外部拦截法。ViewPage…

【数据结构】七大排序算法详解

目录 ♫什么是排序 ♪排序的概念 ♪排序的稳定性 ♪排序的分类 ♪常见的排序算法 ♫直接插入排序 ♪基本思想 ♪算法实现 ♪算法稳定性 ♪时间复杂度 ♪空间复杂度 ♫希尔排序 ♪基本思想 ♪算法实现 ♪算法稳定性 ♪时间复杂度 ♪空间复杂度 ♫直接选择排序 ♪基本思想 ♪算法…

【日常业务开发】Java实现异步编程

【日常业务开发】Java实现异步编程 Java实现异步编程什么是异步异步的八种实现方式异步编程线程异步Future异步CompletableFuture实现异步Spring的Async异步Spring ApplicationEvent事件实现异步消息队列ThreadUtil异步工具类Guava异步 CompletableFuture异步编排工具类创建异步…

unity自己对象池的使用

unity出了自己的对象池 这里记录一下用法 命名空间就是这个 一般有两种用法&#xff0c;第一种是在using里面获取&#xff0c;脱离这个域就释放。第二种是在Get和Release配合使用 // This version will only be returned to the pool if we call Release on it.//只有使用Re…

Android进阶之路 - 盈利、亏损金额格式化

在金融类型的app中&#xff0c;关于金额、数字都相对敏感和常见一些&#xff0c;在此仅记录我在金融行业期间学到的皮毛&#xff0c;如后续遇到新的场景也会加入该篇 该篇大多采用 Kotlin 扩展函数的方式进行记录&#xff0c;尽可能熟悉 Kotlin 基础知识 兄弟 Blog StringUti…

MediaPipe+OpenCV 实现实时手势识别(附Python源码)

MediaPipe官网&#xff1a;https://developers.google.com/mediapipe MediaPipe仓库&#xff1a;https://github.com/google/mediapipe 一、MediaPipe介绍 MediaPipe 是一个由 Google 开发的开源跨平台机器学习框架&#xff0c;用于构建视觉和感知应用程序。它提供了一系列预训…