认识并使用OkHttp3
- 一、前言:发送Http请求并处理响应
- 1、背景
- 2、传统技术:使用java.net.HttpURLConnection
- 3、学习OkHttp3(可以实现真正的流式处理)
- 二、OkHttp3
- 1、OkHttp3是什么?
- 2、如何使用OkHttp3呢?
- 2.1 为OkHttpClient创建单例
- 2.1.1 默认设置
- 2.1.2 自定义设置(更常用)
- 2.1.3 不用主动关闭Http client
- 2.2 同步方式
- 2.3 异步方式
- 2.3.1 这并不是真正的流式处理
- 2.4 真正的流式处理技术
- 2.4.1 SSE
- 2.4.2 WebSocket
一、前言:发送Http请求并处理响应
1、背景
- 实际开发中,难免有“发送Http请求并处理响应”的技术诉求。
- 那么,在Java的世界中,如何发送Http请求并处理响应呢?
2、传统技术:使用java.net.HttpURLConnection
- 代码
public class HttpUtils {/*** 使用java.net.HttpURLConnection发送HTTP请求并处理响应*/public static void useHttpURLConnection(String urlStr) {URL url = null;HttpURLConnection httpURLConnection = null;try {// 1、构建httpURLConnection(我理解就是建立Http连接)url = new URL(urlStr);httpURLConnection = (HttpURLConnection) url.openConnection();httpURLConnection.setRequestProperty("Content-Type", "application/json");httpURLConnection.setRequestMethod("POST");// 单位:毫秒httpURLConnection.setConnectTimeout(5000);httpURLConnection.setReadTimeout(5000);// 当想通过POST,PUT等方式发送请求体时,需要调用setDoOutput方法并将其设置为 true。// 调用此方法后,就能获取 HttpURLConnection 的输出流,并将请求体写入这个流中。httpURLConnection.setDoOutput(true);// 2、发送Http请求String requestBody = "";try (PrintWriter printWriter = new PrintWriter(httpURLConnection.getOutputStream())) {printWriter.write(requestBody);printWriter.flush();}// 3.1 失败if (HttpURLConnection.HTTP_OK != httpURLConnection.getResponseCode()) {try (InputStream errorStream = httpURLConnection.getErrorStream();InputStreamReader errorStreamReader = new InputStreamReader(errorStream, StandardCharsets.UTF_8);BufferedReader errorBufferedReader = new BufferedReader(errorStreamReader)) {String errorMsg = errorBufferedReader.lines().collect(Collectors.joining("\n"));throw new RuntimeException(String.format("The HTTP connection fails and the BufferReader cannot be obtained\nCause by : %s", errorMsg));}}// 3.2 成功,获取bufferedReaderInputStream inputStream = httpURLConnection.getInputStream();InputStreamReader inputStreamReader = new InputStreamReader(Objects.requireNonNull(inputStream));BufferedReader bufferedReader = new BufferedReader(inputStreamReader);// 4、处理响应String line;while ((line = bufferedReader.readLine()) != null) {System.out.println(line);}// 5、资源关闭(用try-with-resources更好)bufferedReader.close();inputStreamReader.close();inputStream.close();httpURLConnection.disconnect();} catch (Exception e) {e.printStackTrace();}}
}
仅做编码思路的展示,故未测试:)
- 结论:
- (1)这种写法太麻烦了:)
- (2)
java.net.HttpURLConnection
是 Java 的标准类,主要用于处理同步的 HTTP 请求和响应。它不支持流式传输的 HTTP 功能,如 Server-Sent Events (SSE) 或 WebSocket。HttpURLConnection 主要用于传统的请求-响应模型,其中客户端发送一个请求到服务器,并等待服务器返回一个完整的响应。
3、学习OkHttp3(可以实现真正的流式处理)
二、OkHttp3
1、OkHttp3是什么?
- OkHttp3是一个效率非常高的
HTTP客户端
,它支持HTTP/2,允许所有同一个主机地址的请求共享同一个socket连接,减少了请求延迟。此外,它还有自动处理网络缓存的功能,以及对GZIP的支持来减少数据的传输量。- Http协议是应用层协议,建立在TCP连接的基础上。我们不希望每次发送HTTP请求时,都重新进行TCP的3次握手,那样会导致请求延迟较高。
2、如何使用OkHttp3呢?
2.1 为OkHttpClient创建单例
- 当创建单个 OkHttpClient 实例并将其复用于所有 HTTP 调用时,OkHttp 的性能最佳。这是因为每个client都有自己的连接池和线程池。复用连接和线程可以减少延迟并节省内存。相反,为每个请求创建一个client会浪费空闲池上的资源。【官方文档】
2.1.1 默认设置
// HTTP client单例
public final OkHttpClient client = new OkHttpClient();
2.1.2 自定义设置(更常用)
// HTTP client单例
public final OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(httpLoggingInterceptor).addInterceptor(...).connectTimeout(450, TimeUnit.SECONDS).writeTimeout(450, TimeUnit.SECONDS).readTimeout(450, TimeUnit.SECONDS).cache(...).build();
- OkHttp的拦截器也要学习下~
2.1.3 不用主动关闭Http client
- 保持空闲的线程和连接将自动释放
2.2 同步方式
- 代码:
public class HttpUtils {private static final OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(450,TimeUnit.SECONDS).writeTimeout(450, TimeUnit.SECONDS).readTimeout(450, TimeUnit.SECONDS).build();public static void useBlockInvokeOfOkHttp() {// 构建请求体(openai api 请求体是一个json格式的数据,对应于一个Java类)ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder().stream(false).messages(Collections.singletonList(Message.builder().role(Constants.Role.USER).content("1+1").build())).model(ChatCompletionRequest.Model.GPT_3_5_TURBO.getCode()).maxTokens(1024).build();// 构建请求Request postRequest;try {postRequest = new Request.Builder().url("https://api.openai.com/v1/chat/completions").header(Header.AUTHORIZATION.getValue(), "Bearer {your apiKey}").post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), new ObjectMapper().writeValueAsString(chatCompletionRequest))).build();} catch (JsonProcessingException e) {throw new RuntimeException("new ObjectMapper().writeValueAsString(chatCompletionRequest)) exception", e);}try {Response response = okHttpClient.newCall(postRequest).execute();System.out.println(response.body().string());} catch (Exception e) {throw new RuntimeException("okHttpClient.newCall(postRequest).execute() exception", e);}}
}
- 执行
Response response = okHttpClient.newCall(postRequest).execute();
的线程会等待服务器的响应:
{"id": "chatcmpl-8lyvDqO0k7FW9Ff582C8HFDux7oyb","object": "chat.completion","created": 1706446291,"model": "gpt-3.5-turbo-0613","choices": [{"index": 0,"message": {"role": "assistant","content": "1+1 equals 2."},"logprobs": null,"finish_reason": "stop"}],"usage": {"prompt_tokens": 10,"completion_tokens": 7,"total_tokens": 17},"system_fingerprint": null
}
2.3 异步方式
- 代码:
public static void useStreamInvokeOfOkHttp() {// 构建请求体ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder().stream(true) // 流式调用.messages(Collections.singletonList(Message.builder().role(Constants.Role.USER).content("1+1").build())).model(ChatCompletionRequest.Model.GPT_3_5_TURBO.getCode()).maxTokens(1024).build();// 构建请求 (和“2.2 同步方式”一致)...// 创建CountDownLatchCountDownLatch latch = new CountDownLatch(1);try {System.out.println(Thread.currentThread().getName() + ": okHttpClient.newCall(postRequest).enqueue");okHttpClient.newCall(postRequest).enqueue(new Callback() {@Overridepublic void onFailure(Call call, IOException e) {e.printStackTrace();// 在这里处理请求失败的情况latch.countDown(); // 调用countDown}@Overridepublic void onResponse(Call call, Response response) throws IOException {if (!response.isSuccessful()) {throw new IOException("Unexpected code " + response);}// 在这里处理响应// 注意:这不是主线程String responseBody = response.body().string();System.out.println(Thread.currentThread().getName() + ": " + responseBody);latch.countDown(); // 调用countDown}});} catch (Exception e) {throw new RuntimeException("okHttpClient.newCall(postRequest).execute() exception", e);}// 等待子线程执行结束 (单元测试,主线程执行结束了,程序就结束了)try {latch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}}
- 结果:
main: okHttpClient.newCall(postRequest).enqueue
OkHttp https://api.openai.com/...: data: {"id":"chatcmpl-8lz1oAgvwuWsxxQQTbxIz2Q65WAyK","object":"chat.completion.chunk","created":1706446700,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}data: {"id":"chatcmpl-8lz1oAgvwuWsxxQQTbxIz2Q65WAyK","object":"chat.completion.chunk","created":1706446700,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"1"},"logprobs":null,"finish_reason":null}]}...data: [DONE]
2.3.1 这并不是真正的流式处理
- 如果使用 OkHttp 的同步请求(response = okHttpClient.newCall(request).execute())或者异步请求(client.newCall(request).enqueue(new Callback() {…}))来处理服务器端发送的批量数据,这并不是真正的流式处理(SSE - Server-Sent Events)。
- 在这种情况下,OkHttp 客户端会等待服务器端发送完所有数据,并关闭响应体,然后一次性返回整个响应体。这意味着不会以实时、流式的方式接收数据。服务器端可能是分批次发送数据,但 OkHttp 客户端只会在所有数据都接收完毕后才处理这些数据。这就导致了两个主要问题:
实时性缺失
:由于数据是在全部接收完之后才开始处理,将无法实时地接收和处理服务器端发送的每一批数据。内存问题
:如果服务器发送的数据量非常大,那么整个响应体将被加载到内存中,这可能会导致内存溢出或性能问题。
2.4 真正的流式处理技术
- 为了实现真正的流式处理,需要使用特定于此目的的技术,如 SSE 或 WebSocket。
- SSE 适用于服务器到客户端的单向通信
- WebSocket 适用于双向通信。
- 这些技术允许实时接收数据,每当有新数据到达时就立即处理,而不是等待所有数据都发送完毕。
2.4.1 SSE
- 依赖
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp-sse</artifactId><version>3.14.9</version>
</dependency>
- 代码:
public class HttpUtils {private static final OkHttpClient okHttpClient = ...private static final EventSource.Factory factory = EventSources.createFactory(okHttpClient);public static void useStreamInvokeOfOkHttpBySse() {// 构建请求体(和“2.3 异步方式”一致)...// 构建请求(和“2.3 异步方式”一致)...EventSourceListener listener = new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {// 连接开启时的处理}@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {// 接收到事件时的处理System.out.println("Event: " + data);}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {// 连接失败时的处理}};factory.newEventSource(postRequest, listener);}
}public class HttpUtilsTest {public static void main(String[] args) {HttpUtils.useStreamInvokeOfOkHttpBySse();}
}
- 结果:
Event: {"id":"chatcmpl-8lzJCq71wZuWQmXu9HfvLM2QUN8OE","object":"chat.completion.chunk","created":1706447778,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}
......
Event: {"id":"chatcmpl-8lzJCq71wZuWQmXu9HfvLM2QUN8OE","object":"chat.completion.chunk","created":1706447778,"model":"gpt-3.5-turbo-0613","system_fingerprint":null,"choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}
Event: [DONE]
- 特点:
EventSourceListener
提供了几个方法来处理不同的事件。当服务器发送事件时,onEvent方法会被调用。
2.4.2 WebSocket
- 代码:
public static void useStreamInvokeOfOkHttpByWebSocket() {// WebSocket 的规范要求必须使用 GET 请求。// WebSocket 握手是基于 HTTP GET 请求的,这是 WebSocket 协议的一个标准要求。Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions").header(Header.AUTHORIZATION.getValue(), "Bearer {your apiKey}").build();okHttpClient.newWebSocket(request, new WebSocketListener() {@SneakyThrows@Overridepublic void onOpen(WebSocket webSocket, Response response) {// WebSocket 连接打开后,发送数据// 构建请求体ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder().stream(true).messages(Collections.singletonList(Message.builder().role(Constants.Role.USER).content("1+1").build())).model(ChatCompletionRequest.Model.GPT_3_5_TURBO.getCode()).maxTokens(1024).build();webSocket.send(new ObjectMapper().writeValueAsString(chatCompletionRequest));}@Overridepublic void onMessage(WebSocket webSocket, String text) {// 接收到文本消息时调用System.out.println("Received message: " + text);}@Overridepublic void onMessage(WebSocket webSocket, ByteString bytes) {// 接收到二进制消息时调用}@Overridepublic void onClosing(WebSocket webSocket, int code, String reason) {// 当连接即将关闭时调用webSocket.close(1000, null);}@Overridepublic void onFailure(WebSocket webSocket, Throwable t, Response response) {// 当连接失败时调用t.printStackTrace();}});
}
- 报错:
Expected HTTP 101 response but was '405 Method Not Allowed'
openai api 不支持 WebSocket 连接。这个 API 是基于 HTTP REST 架构设计的,通常是通过发起 HTTP POST 请求来使用。