kafka 3.5 主题分区的Follower创建Fetcher线程从Leader拉取数据源码

Kakfa集群有主题,每一个主题下又有很多分区,为了保证防止丢失数据,在分区下分Leader副本和Follower副本,而kafka的某个分区的Leader和Follower数据如何同步呢?下面就是讲解的这个

首先要知道,Follower的数据是通过Fetch线程异步从Leader拉取的数据,不懂的可以看一下Kafka——副本(Replica)机制

  • 一、Broker接收处理分区的Leader和Follower的API
  • 二、针对分区副本的Leader和Follower的处理逻辑
    • 1、如果是Follower,则准备创建Fetcher线程,好异步执行向Leader拉取数据
    • 2、遍历分区Follower副本,判断是否有向目标broker现成的Fetcher,如果是则复用,否则创建
    • 3、创建Fetcher线程的实现

一、Broker接收处理分区的Leader和Follower的API

kafkaApis.scala

     //处理Leader和follower,ISR的请求case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
 def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = {val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))//从请求头中获取关联ID//从请求体中获取LeaderAndIsrRequest对象。val correlationId = request.header.correlationIdval leaderAndIsrRequest = request.body[LeaderAndIsrRequest]//对请求进行集群操作的授权验证。authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)//检查broker的代数是否过时。if (zkSupport.isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch, leaderAndIsrRequest.isKRaftController)) {//省略代码} else {//调用replicaManager.becomeLeaderOrFollower方法处理请求,获取响应val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))requestHelper.sendResponseExemptThrottle(request, response)}}

经过一些检验后,调用becomeLeaderOrFollower获得响应结果,

二、针对分区副本的Leader和Follower的处理逻辑

def becomeLeaderOrFollower(correlationId: Int,leaderAndIsrRequest: LeaderAndIsrRequest,onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {//首先记录了方法开始的时间戳val startMs = time.milliseconds()//副本状态改变锁replicaStateChangeLock synchronized {//从leaderAndIsrRequest中获取一些请求信息,包括controller的ID、分区状态等val controllerId = leaderAndIsrRequest.controllerIdval requestPartitionStates = leaderAndIsrRequest.partitionStates.asScalaval response = {//处理过程中,会检查请求的controller epoch是否过时if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {//省略} else {val responseMap = new mutable.HashMap[TopicPartition, Errors]controllerEpoch = leaderAndIsrRequest.controllerEpochval partitions = new mutable.HashSet[Partition]()//要成功Leader分区的集合val partitionsToBeLeader = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()//要成为Follower分区的集合val partitionsToBeFollower = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()val topicIdUpdateFollowerPartitions = new mutable.HashSet[Partition]()//遍历requestPartitionStates,其中包含了来自控制器(controller)的分区状态请求requestPartitionStates.foreach { partitionState =>val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)//对于每个分区状态请求,首先检查分区是否存在,如果不存在,则创建一个新的分区。val partitionOpt = getPartition(topicPartition) match {case HostedPartition.Offline =>stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +s"controller $controllerId with correlation id $correlationId " +s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +"partition is in an offline log directory")responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)Nonecase HostedPartition.Online(partition) =>Some(partition)case HostedPartition.None =>val partition = Partition(topicPartition, time, this)allPartitions.putIfNotExists(topicPartition, HostedPartition.Online(partition))Some(partition)}//接下来,检查分区的主题ID和Leader的epoch(版本号)等信息。partitionOpt.foreach { partition =>val currentLeaderEpoch = partition.getLeaderEpochval requestLeaderEpoch = partitionState.leaderEpochval requestTopicId = topicIdFromRequest(topicPartition.topic)val logTopicId = partition.topicIdif (!hasConsistentTopicId(requestTopicId, logTopicId)) {//如果主题ID不一致,则记录错误并将其添加到响应映射(responseMap)中。stateChangeLogger.error(s"Topic ID in memory: ${logTopicId.get} does not" +s" match the topic ID for partition $topicPartition received: " +s"${requestTopicId.get}.")responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)} //如果Leader的epoch大于当前的epoch,则记录控制器(controller)的epoch,并将分区添加到要成为Leader或Follower的集合中。 else if (requestLeaderEpoch > currentLeaderEpoch) {              //分区副本确定是当前broker的,则添加到partitionsToBeLeader或者partitionsToBeFollower//如果分区副本是leader并且broker是当前broker,则加入partitionsToBeLeader//其他的放入到partitionsToBeFollower//这样保证后续操作partitionsToBeLeader或者partitionsToBeFollower只操作当前broker的if (partitionState.replicas.contains(localBrokerId)) {partitions += partitionif (partitionState.leader == localBrokerId) {partitionsToBeLeader.put(partition, partitionState)} else {partitionsToBeFollower.put(partition, partitionState)}} //省略代码.....}//创建高水位线检查点val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)//如果partitionsToBeLeader非空,则调用makeLeaders方法将指定的分区设置为Leader,并返回这些分区的集合,否则返回空集合val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)//这个是处理Leader的逻辑makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,highWatermarkCheckpoints, topicIdFromRequest)elseSet.empty[Partition]//如果partitionsToBeFollower非空,则调用makeFollowers方法将指定的分区副本设置为Follower,并返回这些分区的集合,否则返回空集合。val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)//这个是处理Follower的逻辑makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,highWatermarkCheckpoints, topicIdFromRequest)elseSet.empty[Partition]//根据partitionsBecomeFollower集合获取Follower分区的主题集合,并更新相关指标。val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSetupdateLeaderAndFollowerMetrics(followerTopicSet)//如果topicIdUpdateFollowerPartitions非空,则调用updateTopicIdForFollowers方法更新Follower分区的主题ID。if (topicIdUpdateFollowerPartitions.nonEmpty)updateTopicIdForFollowers(controllerId, controllerEpoch, topicIdUpdateFollowerPartitions, correlationId, topicIdFromRequest)    //启动高水位检查点线程。startHighWatermarkCheckPointThread()//根据参数初始化日志目录获取器maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest)//关闭空闲的副本获取器线程//todo FetcherThreadsreplicaFetcherManager.shutdownIdleFetcherThreads()replicaAlterLogDirsManager.shutdownIdleFetcherThreads()//省略代码....		}}//省略代码....		}}

因为这篇文章主要写Follower如何拉取数据,所以只需要关注上面代码中的makeFollowers就可以了

1、如果是Follower,则准备创建Fetcher线程,好异步执行向Leader拉取数据

/*1. 从领导者分区集中删除这些分区。2. 将这些 partition 标记为 follower,之后这些 partition 就不会再接收 produce 的请求了3. 停止对这些 partition 的副本同步,这样这些副本就不会再有(来自副本请求线程)的数据进行追加了4.对这些 partition 的 offset 进行 checkpoint,如果日志需要截断就进行截断操作;5.  清空 purgatory 中的 produce 和 fetch 请求6.如果代理未关闭,向这些 partition 的新 leader 启动副本同步线程* 执行这些步骤的顺序可确保转换中的副本在检查点偏移之前不会再接收任何消息,以便保证检查点之前的所有消息都刷新到磁盘*如果此函数中抛出意外错误,它将被传播到 KafkaAPIS,其中将在每个分区上设置错误消息,因为我们不知道是哪个分区导致了它。否则,返回由于此方法而成为追随者的分区集*/private def makeFollowers(controllerId: Int,controllerEpoch: Int,partitionStates: Map[Partition, LeaderAndIsrPartitionState],correlationId: Int,responseMap: mutable.Map[TopicPartition, Errors],highWatermarkCheckpoints: OffsetCheckpoints,topicIds: String => Option[Uuid]) : Set[Partition] = {val traceLoggingEnabled = stateChangeLogger.isTraceEnabled//省略代码。。。。//创建一个可变的Set[Partition]对象partitionsToMakeFollower,用于统计follower的集合val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()try {partitionStates.forKeyValue { (partition, partitionState) =>//遍历partitionStates中的每个分区,根据分区的leader是否可用来改变分区的状态。val newLeaderBrokerId = partitionState.leadertry {if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {//如果分区的leader可用,将分区设置为follower,并将其添加到partitionsToMakeFollower中。// Only change partition state when the leader is availableif (partition.makeFollower(partitionState, highWatermarkCheckpoints, topicIds(partitionState.topicName))) {partitionsToMakeFollower += partition}} else {//省略代码。。} catch {//省略代码。。。}}//删除针对partitionsToMakeFollower中 partition 的副本同步线程replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))stateChangeLogger.info(s"Stopped fetchers as part of become-follower request from controller $controllerId " +s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions")//对于每个分区,完成延迟的抓取或生产请求。partitionsToMakeFollower.foreach { partition =>completeDelayedFetchOrProduceRequests(partition.topicPartition)}//如果正在关闭服务器,跳过添加抓取器的步骤。if (isShuttingDown.get()) {//省略代码} else {//对于每个分区,获取分区的leader和抓取偏移量,并构建partitionsToMakeFollowerWithLeaderAndOffset映射。val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition =>val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => metadataCache.getAliveBrokerNode(leaderId, config.interBrokerListenerName)).getOrElse(Node.noNode())val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), leaderNode.port())val log = partition.localLogOrExceptionval fetchOffset = initialFetchOffset(log)partition.topicPartition -> InitialFetchState(topicIds(partition.topic), leader, partition.getLeaderEpoch, fetchOffset)}.toMap//添加抓取器以获取partitionsToMakeFollowerWithLeaderAndOffset中的分区。replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)}} catch {//省略代码}//省略代码。。。}

2、遍历分区Follower副本,判断是否有向目标broker现成的Fetcher,如果是则复用,否则创建

之后执行replicaFetcherManager.addFetcherForPartitions把信息添加到指定的Fetcher线程中

 // to be defined in subclass to create a specific fetcherdef createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): T
//主要目的是将分区和偏移量添加到相应的Fetcher线程中def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]): Unit = {lock synchronized {//首先对partitionAndOffsets进行分组,按照BrokerAndFetcherId来分组val partitionsPerFetcher = partitionAndOffsets.groupBy { case (topicPartition, brokerAndInitialFetchOffset) =>BrokerAndFetcherId(brokerAndInitialFetchOffset.leader, getFetcherId(topicPartition))}def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId,brokerIdAndFetcherId: BrokerIdAndFetcherId): T = {//创建Fetcher线程                           val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)//把线程放入到fetcherThreadMapfetcherThreadMap.put(brokerIdAndFetcherId, fetcherThread)//线程启动fetcherThread.start()fetcherThread}for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)//将启动的FetcherThread线程添加到fetcherThreadMap中val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {// //检查是否已经存在一个与当前broker和fetcher id相匹配的Fetcher线程。case Some(currentFetcherThread) if currentFetcherThread.leader.brokerEndPoint() == brokerAndFetcherId.broker =>// reuse the fetcher thread//如果存在,则重用该线程currentFetcherThreadcase Some(f) =>//如果之前有,fetcher线程,则先关闭在创建一个新的Fetcher线程并启动f.shutdown()addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)case None =>//创建一个新的Fetcher线程,并启动addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)}//将分区添加到相应的Fetcher线程中// failed partitions are removed when added partitions to threadaddPartitionsToFetcherThread(fetcherThread, initialFetchOffsets)}}}

上面可能有疑问,为什么有重用fetcher线程?
答案:是broker并不一定会为每一个主题分区的Follower都启动一个 fetcher 线程,对于一个目的 broker,只会启动 num.replica.fetchers 个线程,具体这个 topic-partition 会分配到哪个 fetcher 线程上,是根据 topic 名和 partition id 进行计算得到,实现所示:

  // Visibility for testingprivate[server] def getFetcherId(topicPartition: TopicPartition): Int = {lock synchronized {Utils.abs(31 * topicPartition.topic.hashCode() + topicPartition.partition) % numFetchersPerBroker}}

继续往下,其中createFetcherThread的实现是下面

3、创建Fetcher线程的实现

class ReplicaFetcherManager(brokerConfig: KafkaConfig,protected val replicaManager: ReplicaManager,metrics: Metrics,time: Time,threadNamePrefix: Option[String] = None,quotaManager: ReplicationQuotaManager,metadataVersionSupplier: () => MetadataVersion,brokerEpochSupplier: () => Long)extends AbstractFetcherManager[ReplicaFetcherThread](name = "ReplicaFetcherManager on broker " + brokerConfig.brokerId,clientId = "Replica",numFetchers = brokerConfig.numReplicaFetchers) {override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${sourceBroker.id}, " +s"fetcherId=$fetcherId] ")val endpoint = new BrokerBlockingSender(sourceBroker, brokerConfig, metrics, time, fetcherId,s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId", logContext)val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,replicaManager, quotaManager, metadataVersionSupplier, brokerEpochSupplier)// 创建了一个ReplicaFetcherThread对象,它的构造函数接受多个参数,用于副本的获取和管理。new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,quotaManager, logContext.logPrefix, metadataVersionSupplier)}def shutdown(): Unit = {info("shutting down")closeAllFetchers()info("shutdown completed")}
}

其中new ReplicaFetcherThread返回一个创建的线程

class ReplicaFetcherThread(name: String,leader: LeaderEndPoint,brokerConfig: KafkaConfig,failedPartitions: FailedPartitions,replicaMgr: ReplicaManager,quota: ReplicaQuota,logPrefix: String,metadataVersionSupplier: () => MetadataVersion)extends AbstractFetcherThread(name = name,clientId = name,leader = leader,failedPartitions,fetchTierStateMachine = new ReplicaFetcherTierStateMachine(leader, replicaMgr),fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,isInterruptible = false,replicaMgr.brokerTopicStats) {override def doWork(): Unit = {super.doWork()completeDelayedFetchRequests()}}

ReplicaFetcherThread继承的AbstractFetcherThread类,AbstractFetcherThread又继承自ShutdownableThread类,其中ShutdownableThread中的run方法是线程的执行函数

abstract class AbstractFetcherThread(name: String,clientId: String,val leader: LeaderEndPoint,failedPartitions: FailedPartitions,val fetchTierStateMachine: TierStateMachine,fetchBackOffMs: Int = 0,isInterruptible: Boolean = true,val brokerTopicStats: BrokerTopicStats) //BrokerTopicStats's lifecycle managed by ReplicaManagerextends ShutdownableThread(name, isInterruptible) with Logging {override def doWork(): Unit = {maybeTruncate()maybeFetch()}
}
public abstract class ShutdownableThread extends Thread {//省略代码public abstract void doWork();public void run() {isStarted = true;log.info("Starting");try {while (isRunning())doWork();} catch (FatalExitError e) {shutdownInitiated.countDown();shutdownComplete.countDown();log.info("Stopped");Exit.exit(e.statusCode());} catch (Throwable e) {if (isRunning())log.error("Error due to", e);} finally {shutdownComplete.countDown();}log.info("Stopped");}
}    

ShutdownableThread中的run函数调用子类的doWork()
而doWork中的执行顺序如下

//是否截断maybeTruncate()//抓取maybeFetch()//处理延时抓取请求completeDelayedFetchRequests()

其中 maybeFetch(),就是正常拼接fetch请求,并向目标broker发送请求,调用brokercase ApiKeys.FETCH => handleFetchRequest(request)

private def maybeFetch(): Unit = {//分区映射锁val fetchRequestOpt = inLock(partitionMapLock) {val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = leader.buildFetch(partitionStates.partitionStateMap.asScala)handlePartitionsWithErrors(partitionsWithError, "maybeFetch")if (fetchRequestOpt.isEmpty) {trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)}fetchRequestOpt}fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>processFetchRequest(sessionPartitions, fetchRequest)}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/74137.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

二叉树(上)

“路虽远&#xff0c;行则将至” ❤️主页&#xff1a;小赛毛 目录 1.树概念及结构 1.1树的概念 1.2 树的相关概念 1.3 树的表示&#xff08;树的存储&#xff09; 2.二叉树概念及结构 2.1概念 2.2现实中的二叉树 2.3 特殊的二叉树&#xff1a; 2.4 二叉树的性质 3.二叉树的顺…

java 整合 swagger-ui 步骤

1.在xml 中添加Swagger 相关依赖 <!-- springfox-swagger2 --><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><!-- springfox-swa…

【Python从入门到进阶】35、selenium基本语法学习

接上篇《34、selenium基本概念及安装流程》 上一篇我们介绍了selenium技术的基础概念以及安装和调用的流程&#xff0c;本篇我们来学习selenium的基本语法&#xff0c;包括元素定位以及访问元素信息的操作。 一、元素定位 Selenium元素定位是指通过特定的方法在网页中准确定位…

Canape使用中遇到问题的解决办法

问题一、canape绘制目标时&#xff0c;二维结构体变量只能录制16个的解决办法 打开ASAP2 Studio 2.6 -Expert 软件。 把该软件中的设置项如下图进行修改。 然后用ASAP2 Studio 2.6 -Expert 软件 打开elf文件导出成A2L文件。 最后关闭该软件。 再在canape工程中重新加载刚才…

振弦采集仪应用地铁隧道安全监测详细解决方案

振弦采集仪应用地铁隧道安全监测详细解决方案 随着城市化进程的不断加快&#xff0c;地铁作为一种高效、便捷、环保的交通方式已经成为现代城市不可或缺的一部分。因此&#xff0c;对地铁的安全性也越来越重视&#xff0c;一般二三线以上的城市在不断发展中&#xff0c;地铁做…

c语言 2.0

1.数据类型 数据类型介绍 数据类型&#xff1a;c语言中数据类型有3种&#xff0c;分别是基本数据类型、构造数据类型、指针数据类型。 数据类型的作用&#xff1a;编译器预算数据分配的内存空间大小。 ps&#xff1a;可以通俗理解为&#xff1a;数据类型是用来规范内存的开销…

猜数字游戏(2)

Rust创建猜数字游戏 1.构建项目1.测试项目运行 2.处理一次猜想 1.构建项目 cargo new guessing_game cd guessing_game1.测试项目运行 cargo run2.处理一次猜想 // io输入/输出库&#xff0c;io库来自于标准库 use rand::Rng; use std::cmp::Ordering; use std::io;fn main(…

初识docker

目录 docker解决的问题1. 开发、测试和运维人员之间的矛盾2. 更轻量的虚拟化&#xff0c;节省了虚拟机的性能损耗 虚拟机与容器的区别1. 虚拟机2. 容器 Docker 系统架构 docker解决的问题 1. 开发、测试和运维人员之间的矛盾 “程序在我这跑得好好的&#xff0c;在你那怎么就…

C++之红黑树

红黑树 红黑树的概念红黑树的性质红黑树结点的定义红黑树的插入红黑树的验证红黑树与AVL树的比较 红黑树的概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个存储位表示结点的颜色&#xff0c;可以是Red或Black。 通过对任何一条从根到叶子的路径上…

uniapp 在 onLoad 事件中 this.$refs 娶不到的问题

现象 本人想在主页面加载的时候调用子组件的方法。示例代码如下&#xff1a; 运行&#xff0c;发现 this.$refs 取不到。如下图所示&#xff1a; 解决方法&#xff0c;把onLoad 换为 onReady 就可以了。

【Linux】管道

管道命令 #include <unistd.h> int pipe(int pipefd[2]); 在Linux中&#xff0c;管道&#xff08;pipe&#xff09;的返回值是一个整数数组&#xff0c;包含两个文件描述符。这两个文件描述符分别代表管道的读端和写端。 当成功创建一个管道时&#xff0c;pipe() 系统调用…

K8S:kubectl陈述式及声明式资源管理

文章目录 一.陈述式资源管理方法1.陈述式资源管理概念2.基本信息查看&#xff08;1&#xff09;查看版本信息&#xff08;2&#xff09;查看资源对象简写&#xff08;3&#xff09;查看集群信息&#xff08;4&#xff09;配置kubectl自动补全&#xff08;5&#xff09;node节点…

基于Linux并结合socket网络编程的ftp服务器的实现

项目需求 客户端能够通过调用“get”指令&#xff0c;来获取服务器的文件客户端能通过“server_ls”指令&#xff0c;来获取服务器路径下的文件列表客户端能通过“server_cd”指令&#xff0c;进入服务器路径下的某文件夹客户端可以通过“upload”指令&#xff0c;上传自己的本…

解决跨域的几种方式

解决跨域的几种方式 JSONPCORS&#xff08;跨域资源共享&#xff09;代理 JSONP 利用script标签可以跨域加载资源的特性&#xff0c;通过动态创建一个script标签&#xff0c;然后将响应数据作为回调函数的参数返回&#xff0c;从而实现跨域请求资源。该方式只支持 GET 请求方式…

Redis发布订阅机制学习

【IT老齐151】Redis发布订阅机制是如何实现的&#xff1f;_哔哩哔哩_bilibili go-redis的发布与订阅 - 知乎 (zhihu.com) 前置&#xff1a; 先输入 redis-server.exe 启动redis&#xff0c;否则对应接口不开放 再输入 redis-cli 命令启动客户端服务 1.机制示意图 当一…

AR产业变革中的“关键先生”和“关键力量”

今年6月的WWDC大会上&#xff0c;苹果发布了头显产品Vision Pro&#xff0c;苹果CEO库克形容它&#xff1a; 开启了空间计算时代。 AR产业曾红极一时&#xff0c;但因为一些技术硬伤又减弱了声量&#xff0c;整个产业在起伏中前行。必须承认&#xff0c;这次苹果发布Vision P…

七大排序算法

目录 直接插入排序 希尔排序 直接选择排序 堆排序 冒泡排序 快速排序 快速排序优化 非递归实现快速排序 归并排序 非递归的归并排序 排序:所谓排序,就是使一串记录,按照其中的某个或某些关键字的大小,递增或递减的排列起来的操作. 常见的排序算法有插入排序(直接插入…

ElementUI浅尝辄止30:PageHeader 页头

如果页面的路径比较简单&#xff0c;推荐使用页头组件而非面包屑组件。 1.如何使用&#xff1f; <el-page-header back"goBack" content"详情页面"> </el-page-header><script>export default {methods: {goBack() {console.log(go bac…

队列(Queue)的顶级理解

目录 1.队列(Queue) 的概念 2.单链表模拟实现队列 2.1创建队列 2.2入队列 2.3判断是否为空 2.4出队列 2.5获取队头元素 2.6完整代码&#xff1a; 2.7双向链表模拟实现队列代码 3.数组模拟实现队列代码 3.1创建队列 3.2判断是否为满 3.3检查是否为空 3.4插入元素 3…

ctfshow 反序列化

PHP反序列化前置知识 序列化和反序列化 对象是不能在字节流中传输的&#xff0c;序列化就是把对象转化为字符串以便存储和传输&#xff0c;反序列化就是将字符串转化为对象 魔术方法 __construct() //构造&#xff0c;当对象new时调用 __wakeup() //执行unserialize()时&am…