OkHttp源码分析:分发器任务调配,拦截器责任链设计,连接池socket复用

目录

一,分发器和拦截器

二,分发器处理异步请求

1.分发器处理入口

2.分发器工作流程

3.分发器中的线程池设计

三,分发器处理同步请求

四,拦截器处理请求

1.责任链设计模式

 2.拦截器工作原理

3.OkHttp五大拦截器


一,分发器和拦截器

        OkHttp在内部维护了这几个重要对象:分发器dispatcher,连接池connectionPool,拦截器Interceptor;

//拦截器
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher//连接池
@get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool//拦截器
@get:JvmName("interceptors") val interceptors: List<Interceptor> =builder.interceptors.toImmutableList()@get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> =builder.networkInterceptors.toImmutableList()

他们的作用分别为:

  • 分发器Dispatcher:调配请求任务,内部维护队列线程池  
  • 拦截器:处理请求与响应,完成请求过程
  • 连接池:管理socket连接与连接复用

        从OkHttp的请求处理流程来看: 拦截器负责完成网络请求过程,同步和异步请求必须经过分发器调配后才会发给拦截器进行网络请求;

二,分发器处理异步请求

1.分发器处理入口

private void visitInternet() {//1.创建HttpClient对象OkHttpClient okHttpClient = new OkHttpClient();//2.获取request对象Request.Builder builder = new Request.Builder().url("https://www.bilibili.com/");Request request = builder.build();//3.获取call对象Call call = okHttpClient.newCall(request);//4.执行网络操作try {Response response = call.execute();String result = response.body().string();showResultOnUiThread(result);} catch (IOException e) {throw new RuntimeException(e);}
}

        从OkHttp处理流程来看,每次发送请求前我们需要调用 newCall() 方法获取call对象,这里的Call是一个接口,newCall返回的是Call接口的实现类RealCall;

  /** Prepares the [request] to be executed at some point in the future. */override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

call对象只能使用一次 

        发起异步请求需要调用call对象的 enqueue() 方法,enqueue方法首先会将call对象中的executed字段置为true,代表这个call对象已经使用过,第二次就无法使用,想要再次使用的话需要调用call对象的 clone() 方法;

        callStart方法执行后表示请求开始,之后便会执行分发器的enqueue方法处理异步请求,这里传入的对象AsyncCall是Runnable接口的实现类,可以理解为是我们要处理的异步任务;

  override fun enqueue(responseCallback: Callback) {//call对象只能使用一次check(executed.compareAndSet(false, true)) { "Already Executed" }callStart() //请求开始//分发器处理异步请求client.dispatcher.enqueue(AsyncCall(responseCallback))}

2.分发器工作流程

分发器中维护了三个队列:

  • readyAsyncCalls:等待中异步请求队列
  • runningAsyncCalls:执行中异步请求队列
  • runningSyncCalls:执行中同步请求队列

        分发器dispatcher的enqueue方法执行后,异步请求AsyncCall默认先放到readAsyncCalls中,如果是非websocket连接,则检查一下runningAsyncCalls和readAsyncCalls中是否有相同域名host的请求,如果有则复用之前的域名的计数器existingCall

        计数器之后用于判断同一主机(域名)请求连接数

  internal fun enqueue(call: AsyncCall) {synchronized(this) {readyAsyncCalls.add(call)if (!call.call.forWebSocket) {val existingCall = findExistingCallWithHost(call.host)if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)}}promoteAndExecute()}

检查完之后调用promoteAndExecute()方法,在这个方法中会检查两件事:

  • 进行中异步请求数是否 ≥ 64(runningAsyncCalls队列的size是否 ≥ 64),
  • 对同一域名(主机)的请求callsPerHost是否大于5;

若条件符合,将异步任务加入到runningAsyncCalls中

检查完可执行请求并更新状态后,将请求提交到线程池中执行

private fun promoteAndExecute(): Boolean {this.assertThreadDoesntHoldLock()val executableCalls = mutableListOf<AsyncCall>()val isRunning: Booleansynchronized(this) {val i = readyAsyncCalls.iterator()while (i.hasNext()) {val asyncCall = i.next()//检查可执行请求if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.i.remove()asyncCall.callsPerHost.incrementAndGet()executableCalls.add(asyncCall)runningAsyncCalls.add(asyncCall)}isRunning = runningCallsCount() > 0}//提交到线程池中执行for (i in 0 until executableCalls.size) {val asyncCall = executableCalls[i]asyncCall.executeOn(executorService //线程池)}return isRunning}

将AsyncCall提交到线程池后,AsyncCall对象的run方法便会被执行;

在run方法中,从拦截器中获取了服务器的响应,完成请求后调用dispatcher的finish方法,结束本次异步请求;

override fun run() {threadName("OkHttp ${redactedUrl()}") {var signalledCallback = falsetimeout.enter()try {//拦截器完成请求,返回响应val response = getResponseWithInterceptorChain()signalledCallback = trueresponseCallback.onResponse(this@RealCall, response)} catch (e: IOException) {if (signalledCallback) {// Do not signal the callback twice!Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)} else {responseCallback.onFailure(this@RealCall, e)}} catch (t: Throwable) {cancel()if (!signalledCallback) {val canceledException = IOException("canceled due to $t")canceledException.addSuppressed(t)responseCallback.onFailure(this@RealCall, canceledException)}throw t} finally {//调用finish方法,结束本次异步请求client.dispatcher.finished(this)}}}

在完成一次请求后,runningAsyncCalls队列会空出位置

所以在finish方法中,会重新调用检查异步任务方法promoteAndExecute(),也就是在结束一次请求后,会去检查readyAsyncCalls队列中符合条件的异步任务,并去执行他们

idleCallback.run() 用于在所有请求完成后执行特定操作,操作内容自定义

internal fun finished(call: AsyncCall) {call.callsPerHost.decrementAndGet()finished(runningAsyncCalls, call)}/** Used by [Call.execute] to signal completion. */internal fun finished(call: RealCall) {finished(runningSyncCalls, call)}private fun <T> finished(calls: Deque<T>, call: T) {val idleCallback: Runnable?synchronized(this) {if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")idleCallback = this.idleCallback}//重新调用promoteAndExecute,检查可执行异步请求val isRunning = promoteAndExecute()if (!isRunning && idleCallback != null) {//用于在所有请求完成后执行特定操作,操作内容自定义idleCallback.run()}}

3.分发器中的线程池设计

分发器中的线程池:

  • 核心线程数:0
  • 最大线程数:Int.MAX_VALUE
  • 空闲时间:60s
  • 工作队列:SynchronousQueue()
@get:Synchronized@get:JvmName("executorService") val executorService: ExecutorServiceget() {if (executorServiceOrNull == null) {executorServiceOrNull = ThreadPoolExecutor(0, //核心线程数Int.MAX_VALUE, //最大线程数60, //空闲时间TimeUnit.SECONDS, //空闲时间单位(秒)SynchronousQueue(), //工作队列threadFactory("$okHttpName Dispatcher", false))}return executorServiceOrNull!!}

线程池工作原理:

  1. 工作中线程 < 核心线程数 创建新线程
  2. 工作中线程 > 核心线程数且工作队列未满,加入工作队列
  3. 工作队列已满,工作中线程数若 < 最大线程数, 创建新线程
  4. 工作队列已满,工作中线程数 > 最大线程数, 执行拒绝策略(默认为抛出异常,可自定义)

在okhttp的分发器中,线程池使用SynchronousQueue()作为工作队列,这种容器没有容量,也就无法添加任务,所以当工作中线程 > 核心线程数,会直接创建新线程

三,分发器处理同步请求

对于同步请求,分发器只记录请求(放入RunningSyncCalls中)

  override fun execute(): Response {check(executed.compareAndSet(false, true)) { "Already Executed" }timeout.enter()callStart()try {client.dispatcher.executed(this)return getResponseWithInterceptorChain()} finally {client.dispatcher.finished(this)}}//dispatcher.executed()@Synchronized internal fun executed(call: RealCall) {//分发器只记录同步请求runningSyncCalls.add(call)}

四,拦截器处理请求

1.责任链设计模式

OkHttp中的拦截器采用责任链设计模式:

        为避免请求发送者与多个请求处理者耦合在一起,于是将所有请求处理者通过前一对象记住下一对象的引用而形成一条链,当有请求发生时,请求只需沿着链传递,直到有对象处理它

模拟责任链设计模式:

我们定义一个Handler抽象类,并让他持有下一Handler对象的引用next,并创建Handler三个子类

abstract class Handler {protected var next : Handler? = null;fun setNext(next : Handler){this.next = next;}fun getNext() : Handler?{return next;}abstract fun handle(request : String);
}class Handler1 : Handler() {override fun handle(request: String) {if("1".equals(request)){Log.i("TAG", "handle1处理")}else{if(getNext() != null){next?.handle(request);}else{Log.i("TAG", "没有下一个handler")}}}
}class Handler2 : Handler() {override fun handle(request: String) {if("2".equals(request)){Log.i("TAG", "handle1处理")}else{if(getNext() != null){next?.handle(request);}else{Log.i("TAG", "没有下一个handler")}}}
}class Handler3 : Handler() {override fun handle(request: String) {if("3".equals(request)){Log.i("TAG", "handle1处理")}else{if(getNext() != null){next?.handle(request);}else{Log.i("TAG", "没有下一个handler")}}}
}

        我们让handler1拥有2的引用,2拥有3的引用,这样当我们调用1的handle("3")时,request对象就会一直沿着责任链执行,直到遇到能处理他的对象(handler3)

val handler1: Handler = Handler1()
val handler2: Handler = Handler2()
val handler3: Handler = Handler3()handler1.setNext(handler2)
handler2.setNext(handler3)handler1.handle("3")

 2.拦截器工作原理

拦截器的工作基本分为三步:

  1. 处理请求request
  2. 将请求传往下一拦截器,获取返回的请求response
  3. 处理响应response并返回

例如,我们自定义一个日志打印拦截器:

class LogInterceptor : Interceptor {override fun intercept(chain: Interceptor.Chain): Response {//1.处理请求val request = chain.request();val requestLog = StringBuilder().apply {append("Request:\n")append("URL: ${request.url}\n")append("Method: ${request.method}\n")append("Headers: ${request.headers}\n")request.body?.let {append("Body: ${it.toString()}\n")}}Log.d("OkHttp", requestLog.toString())//将请求传往下一拦截器,获取响应val response = chain.proceed(request)//处理响应并返回val responseLog = StringBuilder().apply {append("Response:\n")append("Code: ${response.code}\n")append("Headers: ${response.headers}\n")response.body?.let {append("Body: ${it.string()}\n")}}Log.d("OkHttp", responseLog.toString())return response;}
}

在chain的proceed方法中,程序会找到拦截器链中的下一拦截器并将请求传给他,获取返回的请求

  @Throws(IOException::class)override fun proceed(request: Request): Response {check(index < interceptors.size)calls++if (exchange != null) {check(exchange.finder.sameHostAndPort(request.url)) {"network interceptor ${interceptors[index - 1]} must retain the same host and port"}check(calls == 1) {"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}// 找到拦截器链中的下一拦截器val next = copy(index = index + 1, request = request)val interceptor = interceptors[index]//传递请求,获取响应@Suppress("USELESS_ELVIS")val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")if (exchange != null) {check(index + 1 >= interceptors.size || next.calls == 1) {"network interceptor $interceptor must call proceed() exactly once"}}check(response.body != null) { "interceptor $interceptor returned a response with no body" }return response}

3.OkHttp五大拦截器

OkHttp中默认配置五个拦截器,分别为:

val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {interceptors += client.networkInterceptors
}
nterceptors += CallServerInterceptor(forWebSocket)
  • 重试和重定向拦截器 RetryAndFollowUpInterceptor:重试拦截器在交出前(交给下一个拦截器),负责判断用户是否取消了请求。在获得了响应之后,会根据响应码判断是否需要重定向,如果满足所有条件就会重启执行所有拦截器
  • 桥接拦截器(处理请求头和响应头)BridgeInterceptor:在交出之前,负责将Http协议必备的请求头加入请求之中(如Host,Connection),并添加一些默认的行为(如RZIP压缩);获得响应后调用保存cookie接口并解析GZIP数据
  • 缓存拦截器 CacheInterceptor:交出之前读取并判断是否使用缓存;获取响应后判断是否缓存
  • 连接拦截器 ConnectInterceptor:交出之前,负责创建或找到一个连接,并获取socket流;获取响应后不进行额外处理
  • 网络请求拦截器(执行实际的网络请求)CallServerInterceptor:进行真正的与服务器通信,向服务器发送数据,解析读取的响应数据

OkHttp中添加拦截器有两种方式:addInterceptor()和 addNetworkInterceptor(),他们的主要区别如下:

  • 调用时机:Application拦截器在请求开始时调用,Network在网络连接建立后调用
  • 调用次数:Application只调用一次,Network可能调用多次(重定向)
  • 可见信息:Application只能看到最终请求/响应,Network能看到所有中间请求/响应
  • 缓存感知:Application无法感知缓存,Network可以感知缓存
  • 使用场景:Application一般用于业务处理(如:身份验证,日志记录,错误处理),Network一般用于网络层操作(如:网络监控,缓存处理,压缩处理)

OkHttp完整拦截器链如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/890054.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(2024年最新)Linux(Ubuntu) 中配置静态IP(包含解决每次重启后配置文件失效问题)

Hello! 亲爱的小伙伴们&#xff0c;大家好呀&#xff08;Smile~&#xff09;&#xff01;我是Huazzi&#xff0c;欢迎观看本篇博客&#xff0c;接下来让我们一起来学习一下Ubuntu 中如何配置静态IP吧&#xff01;祝你有所收获&#xff01; 提前对Linux有所了解的小伙伴应该知道…

vue3渲染el-tree组件,给默认选中的节点,禁用所有子节点

实现逻辑&#xff1a;给默认选中节点的所有子节点添加一个disabled属性&#xff0c;以此禁用子节点。 编写代码 <template><el-tree :props"{ children: children, label: name }" :data"treeListData" show-checkbox node-key"id" r…

uniapp中vuex(全局共享)的应用

一、Vuex概述 1.1 官方解释 Vuex 是一个专为 Vue.js 应用程序开发的状态管理模式。 它采用集中式存储管理 应用的所有组件的状态&#xff0c;并以相应的规则保证状态以一种可预测的方式发生变化 - Vuex 也集成到 Vue 的官方调试工具 devtools extension&#xff0c;提供了诸…

Unity中触发器Trigger无法被射线检测到的问题

今天在做项目的时候发现,同一个物体,当他是碰撞器的时候,可以被射线检测到. 但是当他变成触发器的时候,射线就检测不到了??? 本来以为就是这样的,但是查了资料发现并没有这样的限制,触发器也是可以正常被射线检测的 到处查资料都没有发现问题,后来发现是下面这个设置不知道…

搭建springmvc项目

什么是springmvc MVC它是一种设计理念。把程序按照指定的结构来划分: Model模型 View视图 Controller控制层 springmvc框架是spring框架的一个分支。它是按照mvc架构思想设计的一款框架。 springmvc的主要作用: 接收浏览器的请求数据&#xff0c;对数据进行处理&#xff0c;…

超越 RAG 基础:AI 应用的高级策略

作者&#xff1a;来自 Elastic Elastic Platform Team 我们最近与 Cohere 举办的虚拟活动深入探讨了检索增强生成 (retrieval augmented generation - RAG) 的世界&#xff0c;重点讨论了在概念验证阶段之后构建 RAG 应用程序的关键注意事项。我们的演讲者是 Elastic 的首席解…

音频开发中常见的知识体系

在 Linux 系统中&#xff0c;/dev/snd 目录包含与声音设备相关的文件。每个文件代表系统中的一部分音频硬件或音频控制接口。以下是你列出的文件及其含义&#xff1a; 一.基本术语 样本长度(sample)&#xff1a;样本是记录音频数据最基本的单位&#xff0c;计算机对每个通道采…

贪心算法 part01

class Solution { public:int maxSubArray(vector<int>& nums) {int result INT32_MIN;int count 0;for (int i 0; i < nums.size(); i) {count nums[i];if (count > result) { // 取区间累计的最大值&#xff08;相当于不断确定最大子序终止位置&#xff…

Ubuntu 安装texstudio sty与texlive

手动安装需要的包 访问CTAN网站&#xff08;Comprehensive TeX Archive Network&#xff09;并下载enumitem宏包&#xff1a; enumitem CTAN页面下载后&#xff0c;将宏包解压到/usr/share/texmf/tex/latex/下。 可打开texstudio/帮助/宏包帮助下载。 如果不想手动安装一个个…

Moretl安全日志采集工具

永久免费: 至Gitee下载 使用教程: Moretl使用说明 使用咨询: 用途 定时全量或增量采集工控机,电脑文件或日志. 优势 开箱即用: 解压直接运行.不需额外下载.管理设备: 后台统一管理客户端.无人值守: 客户端自启动,自更新.稳定安全: 架构简单,兼容性好,通过授权控制访问. 架…

CAN配置---波特率中断引脚等---autochips-AC7811-ARM-M3内核

1、配置工具 虽然不怎么好用&#xff0c;但比没有强多了。具体看图&#xff1a; 时钟选着 NVIC配置 GPIO配置 2、生成的具体配置信息 NXP的配置工具里面&#xff0c;具体的波特率可以直接显示&#xff0c;这个工具没有&#xff0c;怎么办&#xff1f; 它放到了生成的代码里面…

【ETCD】ETCD 架构揭秘:内部各组件概览

ETCD 的主要组件及它们之间的关联关系如下&#xff1a; 目录 1. Client&#xff08;客户端&#xff09;2. gRPC 接口3. Etcd Server Main Loop&#xff08;ETCD 主循环&#xff09;4. Raft&#xff08;共识模块&#xff09;5. Peer Etcd Nodes&#xff08;ETCD 集群节点&#x…

乐凡信息智能安全管控方案:助力油气田行业安全管控多方位升级

我国油田地域广阔&#xff0c;分布着大量各种油井&#xff0c;油井开采设备的连续稳定运行是保证石油开采的首要条件。然而&#xff0c;由于油田多位于特殊地理环境中&#xff0c;因而实现油井之间的通信首要问题就是要克服地理环境所带来的限制&#xff0c;传统通信系统的建设…

windows 使用python共享网络给另外一个网卡

# -*- coding: utf-8 -*- import subprocessdef open_share(to_shared_adapter, from_shared_adapter):"""打开以太网的网络共享:return: None"""powershell_script f"""# Register the HNetCfg library (once)# regsvr32 hnetc…

深度学习实战智能交通计数

本文采用YOLOv8作为核心算法框架&#xff0c;结合PyQt5构建用户界面&#xff0c;使用Python3进行开发。YOLOv8以其高效的实时检测能力&#xff0c;在多个目标检测任务中展现出卓越性能。本研究针对车辆目标数据集进行训练和优化&#xff0c;该数据集包含丰富的车辆目标图像样本…

rebase ‘A‘ onto ‘master‘ 和 merge ‘master‘ into ‘A‘有什么区别

在Git版本控制系统中&#xff0c;rebase 和 merge 是两种不同的操作&#xff0c;用于合并分支。rebase A onto master 和 merge master into A 虽然最终目的都是将两个分支的更改合并在一起&#xff0c;但它们在处理方式和结果上有所不同。 rebase ‘A’ onto ‘master’ 含义…

MySQL Explain 分析SQL语句性能

一、EXPLAIN简介 使用EXPLAIN关键字可以模拟优化器执行SQL查询语句&#xff0c;从而知道MySQL是如何处理你的SQL语句的。分析你的查询语句或是表结构的性能瓶颈。 &#xff08;1&#xff09; 通过EXPLAIN&#xff0c;我们可以分析出以下结果&#xff1a; 表的读取顺序数据读取…

关于SAP Router连接不稳定的改良

这个也是网上看来的&#xff0c;之前在用的时候也在想是不是建立一个长连接&#xff0c;就不至于断线。今天正好看到。 关于SAP Router连接不稳定的改良 我们在使用SAPRouter时经常会碰到断线&#xff0c;其发生原因有很多&#xff0c;如&#xff1a;网络不稳定、操作间隔时间…

游泳溺水识别数据集,对9984张原始图片进行YOLO,COCO JSON, VOC XML 格式的标注,平均识别率在91.7%以上

游泳溺水识别数据集&#xff1a; 对9984张原始图片进行YOLO&#xff0c;COCO JSON, VOC XML 格式的标注&#xff0c;平均识别率在91.7&#xff05;以上 &#xff0c;可识别泳池或者水库中是否有人溺水。 数据集分割 训练组98&#xff05; 9818图片 有效集&#xff05;…

Docker的容器编排

目录 1. 什么是容器编排&#xff08;Docker Compose&#xff09;2. 容器编排的功能3. 容器编排文件&#xff08;docker-compose.yml&#xff09;的介绍3.1 文件语法版本3.2 文件基本结构及常见指令 4. Docker Compose命令详解4.1 Docker Compose命令清单4.2 命令格式和常见选项…