(Kafka源码五)Kafka服务端处理消息

Kafka 服务端(Broker)采用 Reactor 的架构思想,通过1 个 Acceptor,N 个 Processor(N默认为3),M 个 KafkaRequestHandler(M默认为8),来处理客户端请求,这种模式结合了多线程和事件驱动的设计,优点是能够有效地利用系统资源,可以实现高效地处理请求,无需为每个连接或请求创建新的线程,减少了线程上下文切换的开销,以实现高并发和高吞吐量。

文章目录

    • 服务端整体架构
      • 执行流程
    • 源码剖析
      • Acceptor 线程处理
      • Processor 线程处理
      • KafkaRequestHandlerPool 处理请求

服务端整体架构

在这里插入图片描述

Kafka 服务端的网络结构主要包含以下三层:

  • 网络连接层:Acceptor 线程接收客户端的连接请求并创建网络连接。
  • 请求转发层:Acceptor 线程以轮询的方式分发给Processor 线程,从而实现负载均衡的效果,Processor
    线程将请求放到请求队列中。
  • 请求处理层:KafkaRequestHandler线程不断地从请求队列中获取请求,解析请求,调用KafkaAPIs获取对应的操作结果,并将结果返回给客户端。

执行流程

  • Acceptor 线程在初始化的时候会往selector注册 OP_ACCEPT事件,表示可以接受客户端的连接请求,当客户端有请求连接过来时,根据selectionkey可以得到socketChannel,再将socketChannel以轮询的方式交给Processor线程(默认有3个Processor线程)处理。
  • Processor线程收到Acceptor线程分发的连接后,会先将连接放入自己的队列newConnections中,然后在selector注册OP_READ事件,表示可以读取客户端的请求,当客户端发送消息过来时,Processor线程就会处理OP_READ事件,然后Processor线程会将客户端的请求连接放入requestChannel的RequestQueue(请求队列被所有Processor线程共享)里并取消OP_READ事件的监听
  • KafkaRequestHandler线程(默认会创建8个线程)会从RequestQueue取出请求进行处理,通过KafkaApis调用得到响应结果,将处理后的响应结果放入responseQueues中(每个Processor线程对应一个responseQueues)。
  • Processor线程往selector注册OP_WRITE事件,表示可以将响应结果发送给客户端,当Processor线程检测到有OP_WRITE事件时,Processor线程就会从对应的responseQueues中取出响应结果,并通过selector.poll()方法将响应结果发送给对应的客户端且取消OP_WRITE事件的监听,最后Processor线程就会重新注册OP_READ事件,准备下一个请求的处理。

源码剖析

服务端的核心代码都在kafka.scala这个类,首先是main入口方法,该方法主要设置参数,然后调用启动方法

  def main(args: Array[String]): Unit = {try {//启动服务端的时候,在这里解析参数val serverProps = getPropsFromArgs(args)val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)//启动的核心代码方法kafkaServerStartable.startup//...}

kafka的服务端核心方法都在startup()里面

  def startup() {//启动服务server.startup()//...}

创建SocketServer,startup启动后,会创建Acceptor线程和三个Processor线程并启动

//Kafka 服务端的功能 都是在这里面实现
def startup() {//创建NIO的服务端socketServer = new SocketServer(config, metrics, kafkaMetricsTime)socketServer.startup()
}
 def startup() {this.synchronized {// 设置发送和接收的缓冲区大小val sendBufferSize = config.socketSendBufferBytesval recvBufferSize = config.socketReceiveBufferBytes//获取当前broker主机idval brokerId = config.brokerIdvar processorBeginIndex = 0//endpoints表示Kafka配置文件config/server.properties中的信息//正常情况下,只有一个服务实例endpoints.values.foreach { endpoint =>val protocol = endpoint.protocolType//processorEndIndex = 0 + 3val processorEndIndex = processorBeginIndex + numProcessorThreads//创建了三个Processor的线程for (i <- processorBeginIndex until processorEndIndex)//默认新建3个Processor线程processors(i) = newProcessor(i, connectionQuotas, protocol)//Acceptor类的主构造函数会启动3个Processor线程val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)acceptors.put(endpoint, acceptor)// Utils是线程工具类,启动acceptor线程,Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()acceptor.awaitStartup()processorBeginIndex = processorEndIndex}}}

Acceptor 线程处理

Utils.newThread启动acceptor线程的start()方法后,就会执行该线程的run方法

  • 首先 serverChannelnioSelector 注册 OP_ACCEPT 事件,nioSelector就会监听serverChannel是否有新的连接请求

  • 若有新的连接请求到来,根据该连接的key创建 SocketChannel,然后通过轮询的方式分发给不同的 processors线程处理,从而保证processor线程的负载均衡。

  def run() {//ServerChannel往Selector注册OP_ACCEPT事件,表示可以接收客户端的请求,//Selector就会检查ServerChannel是否有新的请求到达serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)startupComplete()try {var currentProcessor = 0//死循环,不断轮询while (isRunning) {try {//selecotr 查看是否有新的注册事件val ready = nioSelector.select(500)//大于0,说明有新事件到来if (ready > 0) {//获取事件的keyval keys = nioSelector.selectedKeys()//遍历注册的所有keyval iter = keys.iterator()while (iter.hasNext && isRunning) {try {val key = iter.next//遍历完就删除iter.remove()//如果事件是OP_ACCEPT,就会调用accept()方法接收请求if (key.isAcceptable)// 创建SocketChannel,将其分发给Processor线程处理accept(key, processors(currentProcessor))elsethrow new IllegalStateException("Unrecognized key state for acceptor thread.")// 轮询遍历下一个Processor线程currentProcessor = (currentProcessor + 1) % processors.length} catch {case e: Throwable => error("Error while accepting connection", e)}}}}}

根据key封装socketChannel,分发给processor线程处理,processor线程将socketChannel放入自己的队列newConnections中,该队列是由ConcurrentLinkedQueue实现的队列,然后唤醒processorselector处理

def accept(key: SelectionKey, processor: Processor) {//根据SelectionKey获取到serverSocketChannelval serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]//获取到一个socketChannelval socketChannel = serverSocketChannel.accept()try {connectionQuotas.inc(socketChannel.socket().getInetAddress)socketChannel.configureBlocking(false)socketChannel.socket().setTcpNoDelay(true)socketChannel.socket().setKeepAlive(true)// processor调用accept方法对socketChannel进行处理processor.accept(socketChannel)}}
 def accept(socketChannel: SocketChannel) {//将接收的 SocketChannel放入到自己的队列newConnections.add(socketChannel)// 唤醒 Processor 的 selector 进行处理wakeup()}

Processor 线程处理

在上面的startup()中已经创建了3个Processor线程,然后在Acceptor的主构造函数中进行启动

//主构造函数,new出来的时候会被运行
private[kafka] class Acceptor(val endPoint: EndPoint,val sendBufferSize: Int,val recvBufferSize: Int,brokerId: Int,processors: Array[Processor],connectionQuotas: ConnectionQuotas)extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {this.synchronized {//启动在startup()创建的3个Processor线程processors.foreach { processor =>Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()}}

Processor启动之后就会执行run方法

  override def run() {startupComplete()while (isRunning) {try {//读取队列中的每个SocketChannel,都往Selector上面注册OP_READ事件configureNewConnections()//处理响应,并注册OP_WRITE事件processNewResponses()//读取和发送请求的代码应该都是在这个方法完成,用于处理OP_READ事件与OP_WRITE事件poll()//处理接收到的新请求,将这些请求放入requestChannel请求队列中并取消OP_READ事件processCompletedReceives()//处理已经发送出去的响应并重新监听OP_READ事件processCompletedSends()processDisconnected()} swallowError(closeAll())shutdownComplete()}

不断获取连接队列里所有的SocketChannel,解析参数得到ConnectionId,再往selector注册OP_READ事件,注册之后就可以读取客户端的请求。

private def configureNewConnections() {
//当连接队列不为空while (!newConnections.isEmpty) {//不断获取连接队列里面的SocketChannelval channel = newConnections.poll()try {//解析SocketChannel,获取对应的参数val localHost = channel.socket().getLocalAddress.getHostAddressval localPort = channel.socket().getLocalPortval remoteHost = channel.socket().getInetAddress.getHostAddressval remotePort = channel.socket().getPortval connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString//往selector注册OP_READ事件selector.register(connectionId, channel)} }}

从这段代码可以知道kafka对SocketChannel进行了封装,封装成KakaChannel,并将SelectionKey和KakaChannel进行二者的绑定,除此之外,Kafka还实现了channel复用,将connectionId和KakaChannel放入map中,避免每次发起请求都新建channel,减少了资源的消耗

  public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {//往自己的Selector上面注册OP_READ事件SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);//kafka对SocketChannel进行了封装,封装成KakaChannelKafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);//将key和channel进行绑定key.attach(channel);//channels护了多个网络连接,实现channel复用this.channels.put(id, channel);}

将客户端的请求放入请求队列中,并取消OP_READ事件

  private def processCompletedReceives() {//遍历每一个请求selector.completedReceives.asScala.foreach { receive =>try {val channel = selector.channel(receive.source)val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),channel.socketAddress)//对于获取到的请求进行解析,得到requestval req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)//将request放入请求队列中requestChannel.sendRequest(req)//取消OP_READ事件selector.mute(receive.source)} }}

KafkaRequestHandlerPool 处理请求

接下来就会通过KafkaRequestHandler线程去处理请求队列中的请求。回到最开始的 startup(),

  def startup() {//主要用于处理请求队列里面的请求requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)//...
}

新建的KafkaRequestHandlerPool,会在主构造函数创建8个KafkaRequestHandler

class KafkaRequestHandlerPool(val brokerId: Int,val requestChannel: RequestChannel,val apis: KafkaApis,numThreads: Int) extends Logging with KafkaMetricsGroup {val threads = new Array[Thread](numThreads)val runnables = new Array[KafkaRequestHandler](numThreads)//默认启动8个线程,一般情况下可以根据消息的吞吐量去设置这个参数for(i <- 0 until numThreads) {//创建线程runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))//线程启动,就会执行run方法threads(i).start()}}

KafkaRequestHandler启动之后就会执行run方法,将客户端的请求交由KafkaAPIs进行最终的处理。

def run() {while(true) {try {var req : RequestChannel.Request = nullwhile (req == null) {val startSelectTime = SystemTime.nanoseconds//获取request对象req = requestChannel.receiveRequest(300)val idleTime = SystemTime.nanoseconds - startSelectTimeaggregateIdleMeter.mark(idleTime / totalHandlerThreads)}//将请求交给KafkaApis进行处理apis.handle(req)} }}
  def handle(request: RequestChannel.Request) {//处理生产者发送过来的请求case ApiKeys.PRODUCE => handleProducerRequest(request)}
def handleProducerRequest(request: RequestChannel.Request) {//获取到生产发送过来的请求信息val produceRequest = request.body.asInstanceOf[ProduceRequest]val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf//按照分区的方式去遍历数据val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition {//对发送过来的数据进行权限等判断。case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)}//判断是否有写权限。val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))}//把接收的数据追加到磁盘上replicaManager.appendMessages(produceRequest.timeout.toLong,produceRequest.acks,internalTopicsAllowed,authorizedMessagesPerPartition,sendResponseCallback)}

数据存储到磁盘后,调用sendResponseCallback()回调函数处理响应。

  def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {//...quotas.produce.recordAndMaybeThrottle(request.session.sanitizedUser,request.header.clientId,numBytesAppended,produceResponseCallback)}}

继续调用回调函数produceResponseCallback(),根据ack的值进行处理

  1. acks=0:生产者不会等待任何来自broker的确认。
  2. acks=1(默认):生产者会等待leader broker接收到消息并确认(但不保证所有副本都已同步)。
  3. acks=all 或 acks=-1:生产者会等待所有同步副本都确认接收到消息。
 def produceResponseCallback(delayTimeMs: Int) {//acks = 0,表示生产者不关心数据的处理结果,所以不需要返回响应信息if (produceRequest.acks == 0) {//...} else {//acks不为0,表明生产者需要响应消息//封装请求头val respHeader = new ResponseHeader(request.header.correlationId)//封装请求体,也就是响应消息val respBody = request.header.apiVersion match {case 0 => new ProduceResponse(mergedResponseStatus.asJava)case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)}//将响应消息发送给客户端requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))}}

将响应放入processor对应的responseQueue中,默认情况下有3个responseQueue。

  def sendResponse(response: RequestChannel.Response) {//先从数组获取Processor对应的队列,再将响应放到这个队列responseQueues(response.processor).put(response)for(onResponse <- responseListeners)onResponse(response.processor)}

接着服务端需要响应客户端,回到processor的run方法

 override def run() {//处理响应,并注册OP_WRITE事件processNewResponses()//处理已经发送出去的响应并重新监听OP_READ事件processCompletedSends()}

处理responseQueues中的响应可以分为三种类型:

  • NoOpAction:对于不需要返回响应的请求,重新注册OP_READ监听事件。
  • SendAction:需要发送响应的情况,接下来注册OP_WRITE监听事件,并最终通过selector.poll()方法将响应结果发送给客户端。
  • CloseConnectionAction:需要关闭的响应。
private def processNewResponses() {//通过Process线程的id获取Response对象var curr = requestChannel.receiveResponse(id)while (curr != null) {try {curr.responseAction match {//对于不需要返回响应的请求case RequestChannel.NoOpAction =>curr.request.updateRequestMetrics//重新监听OP_READ事件selector.unmute(curr.request.connectionId)//需要发送响应的情况case RequestChannel.SendAction =>//注册OP_WRITE事件,发送响应sendResponse(curr)// 需要关闭的响应,关闭连接case RequestChannel.CloseConnectionAction =>curr.request.updateRequestMetricsclose(selector, curr.request.connectionId)}} finally {curr = requestChannel.receiveResponse(id)}}}

正常情况下处理已经发送出去的响应,将响应从响应队列中移除,并重新监听OP_READ事件,准备处理客户端的下一个请求。

  private def processCompletedSends() {selector.completedSends.asScala.foreach { send =>//移除响应队列的响应val resp = inflightResponses.remove(send.destination).getOrElse {//...}resp.request.updateRequestMetrics()selector.unmute(send.destination)}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/55345.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

kubeadm部署k8s集群,版本1.23.6;并设置calico网络BGP模式通信,版本v3.25--未完待续

1.集群环境创建 三台虚拟机&#xff0c;一台master节点&#xff0c;两台node节点 (根据官网我们知道k8s 1.24版本之后就需要额外地安装cri-dockerd作为桥接才能使用Docker Egine。经过尝试1.24后的版本麻烦事很多&#xff0c;所以此处我们选择1.23.6版本) 虚拟机环境创建参考…

YOLOv11改进策略【损失函数篇】| Shape-IoU:考虑边界框形状和尺度的更精确度量

一、本文介绍 本文记录的是改进YOLOv11的损失函数&#xff0c;将其替换成Shape-IoU。现有边界框回归方法通常考虑真实GT&#xff08;Ground Truth&#xff09;框与预测框之间的几何关系&#xff0c;通过边界框的相对位置和形状计算损失&#xff0c;但忽略了边界框本身的形状和…

关于malloc,calloc,realloc

1.引用的头文件介绍&#xff1a; 这三个函数需要调用<stdlib.h>这个头文件 2.malloc 2.1 函数简单介绍&#xff1a; 首先这个函数是用于动态开辟一个空间&#xff0c;例如数组在c99标准之前是无法arr[N]的&#xff0c;这个时候就需要使用malloc去进行处理&#xff0c…

互斥量mutex、锁、条件变量和信号量相关原语(函数)----很全

线程相关知识可以看这里: 线程控制原语(函数)的介绍-CSDN博客 进程组、会话、守护进程和线程的概念-CSDN博客 1.同步概念 所谓同步&#xff0c;即同时起步&#xff0c;协调一致。不同的对象&#xff0c;对“同步”的理解方式略有不同。如&#xff0c;设备同步&#xff0c;是…

【C语言指南】数据类型详解(上)——内置类型

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 ⏩ 文章专栏&#xff1a;《C语言指南》 期待您的关注 目录 引言 1. 整型&#xff08;Integer Types&#xff09; 2. 浮点型&#xff08;Floating-Point …

计算机毕业设计 基于Python高校岗位招聘和分析平台的设计与实现 Python+Django+Vue 前后端分离 附源码 讲解 文档

&#x1f34a;作者&#xff1a;计算机编程-吉哥 &#x1f34a;简介&#xff1a;专业从事JavaWeb程序开发&#xff0c;微信小程序开发&#xff0c;定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事&#xff0c;生活就是快乐的。 &#x1f34a;心愿&#xff1a;点…

YOLOv8改进 ,YOLOv8改进主干网络为华为的轻量化架构GhostNetV1

摘要 摘要:将卷积神经网络(CNN)部署在嵌入式设备上是困难的,因为嵌入式设备的内存和计算资源有限。特征图的冗余是成功的 CNN 的一个重要特征,但在神经网络架构设计中很少被研究。作者提出了一种新颖的 Ghost 模块,用于通过廉价操作生成更多的特征图。基于一组内在特征图…

力扣(leetcode)每日一题 983 最低票价 |动态规划

983. 最低票价 题干 在一个火车旅行很受欢迎的国度&#xff0c;你提前一年计划了一些火车旅行。在接下来的一年里&#xff0c;你要旅行的日子将以一个名为 days 的数组给出。每一项是一个从 1 到 365 的整数。 火车票有 三种不同的销售方式 &#xff1a; 一张 为期一天 的通…

Android 安卓内存安全漏洞数量大幅下降的原因

谷歌决定使用内存安全的编程语言 Rust 向 Android 代码库中写入新代码&#xff0c;尽管旧代码&#xff08;用 C/C 编写&#xff09;没有被重写&#xff0c;但内存安全漏洞却大幅减少。 Android 代码库中每年发现的内存安全漏洞数量&#xff08;来源&#xff1a;谷歌&#xff09…

Spring Boot实现足球青训俱乐部管理自动化

4 系统设计 4.1 系统架构设计 B/S系统架构是本系统开发采用的结构模式&#xff0c;使用B/S模式开发程序以及程序后期维护层面需要的经济成本是很低的&#xff0c;用户能够承担得起。使用这样的模式开发&#xff0c;用户使用起来舒心愉悦&#xff0c;不会觉得别扭&#xff0c;操…

WebSocket消息防丢ACK和心跳机制对信息安全性的作用及实现方法

WebSocket消息防丢ACK和心跳机制对信息安全性的作用及实现方法 在现代即时通讯&#xff08;IM&#xff09;系统和实时通信应用中&#xff0c;WebSocket作为一种高效的双向通信协议&#xff0c;得到了广泛应用。然而&#xff0c;在实际使用中&#xff0c;如何确保消息的可靠传输…

Docker笔记-Docker磁盘空间清理

无用的容器指的是已经停止运行且处于非活跃状态的容器。无用的镜像包括没有被任何容器使用的镜像&#xff0c;或者是被标记为"<none>"的镜像&#xff0c;通常是构建过程中产生的无标签镜像。 通过执行 docker container ls -a 和 docker image ls -a 命令&…

LiveNVR监控流媒体Onvif/RTSP功能-支持电子放大拉框放大直播视频拉框放大录像视频流拉框放大电子放大

LiveNVR监控流媒体Onvif/RTSP功能-支持电子放大拉框放大直播视频拉框放大录像视频流拉框放大电子放大 1、视频广场2、录像回看3、RTSP/HLS/FLV/RTMP拉流Onvif流媒体服务 1、视频广场 视频广场 -》播放 &#xff0c;左键单击可以拉取矩形框&#xff0c;放大选中的范围&#xff…

2024年9月中国干旱监测报告(FYDI-2.0指数)

目录 引言 旱情监测与分析 资料来源 引言 2024年9月&#xff0c;北方的降水逐渐增多&#xff0c;进入华西秋雨集中期&#xff0c;从青藏高原北部一直延伸到东北多地&#xff0c;常出现大范围的云带&#xff0c;西北地区的降雨强度较大。南方地区降水分布不均&#xff0c;受…

【Python报错已解决】error: subprocess-exited-with-error

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…

025.Oracle_DBMS_job定时任务

课 程 推 荐我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448;入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448;虚 拟 环 境 搭 建 &#xff1a;&#x1…

Windows开发工具使用技巧

在 Windows 上进行开发时&#xff0c;有许多工具和技巧可以提升开发效率和用户体验。以下是一些常用的开发工具和技巧&#xff1a; 常用开发工具 1. Visual Studio Code (VS Code) - 插件管理&#xff1a;利用扩展市场&#xff08;Extension Marketplace&#xff09;安装各种…

RabbitMQ基本原理

一、基本结构 所有中间件技术都是基于 TCP/IP 协议基础之上进行构建新的协议规范&#xff0c;RabbitMQ遵循的是AMQP协议&#xff08;Advanced Message Queuing Protocol - 高级消息队列协议&#xff09;。 生产者发送消息流程&#xff1a; 1、生产者和Broker建立TCP连接&#…

如何实现Mybatis自定义插件

背景 MyBatis的插件机制&#xff0c;也可称为拦截器&#xff0c;是一种强大的扩展工具。它允许开发者在不修改MyBatis框架源代码的情况下&#xff0c;通过拦截和修改MyBatis执行过程中的行为来定制和增强功能。 MyBatis插件可以拦截四大核心组件的方法调用&#xff1a;Executor…

Certbot自动申请并续期https证书

Certbot自动申请并续期https证书 一、 安装 Certbot&#xff1a;使用命令安装 Certbot&#xff1a; dnf install certbot python3-certbot-nginx获取 SSL 证书&#xff1a;运行 Certbot 命令来获取并安装 SSL 证书。 示例命令&#xff0c;替换其中的域名和路径信息&#xff1a…