问题描述与分析
现状描述与目标
在使用讯飞星火大模型API的过程中,API的返回结果在可以在其他线程中进行分次打印,但是在main方法中直接打印返回结果,显示为空。这种情况下不利于二次封装,希望在main方法中获取完整的API返回结果,便于进一步的封装。
问题分析
讯飞的API采用WebSocket进行通信,这是一个异步通信机制,当main
方法发送 WebSocket 请求,并不能立即得到回应。因此,即使 onMessage
方法最终接收到了回应并处理了它,这个处理结果也不会反映在main方法的totalannswer中。简而言之,这是由于多线程执行顺序的问题导致的。在代码中,WebSocket的响应处理是异步的,而打印语句是同步执行的。
解决方案
方案一:使用同步机制
使用像 CountDownLatch
这样的同步辅助类来等待WebSocket响应处理完成。让当前线程等待,直到 latch
的计数器减到零,或者等待时间达到 30 秒。如果计数器在 30 秒内减到零,则等待结束,线程继续执行;如果 30 秒内计数器没有减到零,则等待时间到后线程同样继续执行。
public static String getFullMessage(){try {latch.await(30, TimeUnit.SECONDS); // 等待最后一条消息或超时} catch (InterruptedException e) {Thread.currentThread().interrupt();}return totalAnswer;}
由于无法确定每个线程的操作数据,所以每次都需要等待超时。在这种情况下,该方案能够解决问题,但是延时较高。
方案二:标志变量协调协调线程间的操作
本项目中,一个线程负责处理WebSocket消息,而另一个线程负责其他任务(例如用户输入或程序逻辑)。wsCloseFlag
可以作为一个同步机制,帮助这些线程协调它们的操作。例如,接收线程可以检查这个标志以决定是否继续等待新消息,或者在发送线程决定关闭连接后停止处理。
private static volatile Boolean wsCloseFlag;while (! wsCloseFlag){}System.out.println("totalAnswer:"+totalAnswer);
注意,必须使用volatile关键字,否则程序将陷入死循环。
在Java多线程环境中,每个线程都可能有自己的本地内存(线程栈),并且线程可能会将共享变量的副本缓存在本地内存中。这意味着一个线程对共享变量的修改可能不会立即反映到其他线程的本地副本中。在代码中,wsCloseFlag
是一个共享变量,它在不同的线程中被读取和修改。如果不使用volatile关键字,线程会继续使用本地缓存中过时的wsCloseFlag
值,导致循环不能正确结束。
相关知识
WebSocket
WebSocket 是一种网络通信协议,提供了在单个 TCP 连接上进行全双工通信的能力。它最初被设计为 Web 浏览器和服务器之间的持久连接,但现在已被多种客户端和服务器应用程序广泛采用。
WebSocket主要特点
-
全双工通信: WebSocket 允许服务器和客户端之间进行双向实时数据传输。一旦建立了 WebSocket 连接,服务器和客户端都可以随时发送数据。
-
持久连接: 与传统的 HTTP 请求不同,WebSocket 在客户端和服务器之间建立一个持久的连接,该连接会保持打开状态,直到由客户端或服务器主动关闭。
-
低延迟: WebSocket 通过减少数据包的头部信息和避免频繁建立连接的开销,能够提供较低的通信延迟。
-
兼容性: WebSocket 使用 HTTP 协议进行初始握手,因此与现有的 Web 基础设施兼容。
WebSocket 工作原理
-
握手: WebSocket 通信首先通过 HTTP 请求进行握手。客户端发送一个特殊的 HTTP 请求,称为 "WebSocket 握手请求"。如果服务器接受这个升级请求,它会返回一个相应的响应头,从而升级连接到 WebSocket。
-
数据帧: 一旦握手成功,数据交换就通过 WebSocket 连接进行。WebSocket 数据通过帧的形式发送,这些帧可以包含文本或二进制数据。
-
关闭连接: 客户端或服务器可以通过发送一个关闭帧来主动关闭 WebSocket 连接。
Spring WebSocket 支持
Spring 框架提供了对 WebSocket 的支持,允许在 Spring 应用中轻松地创建 WebSocket 服务器和客户端。Spring 的 WebSocket 支持集成了更高级的消息传递子协议,例如 STOMP。
-
Spring WebSocket 配置:可以通过实现
WebSocketConfigurer
接口来配置 WebSocket。使用@EnableWebSocket
注解启用 WebSocket 支持,并在配置类中注册 WebSocket 端点。 -
消息处理:Spring 还支持使用消息代理来处理基于 WebSocket 的通信,这对于构建复杂的消息传递应用程序特别有用。
-
STOMP 支持:Spring 通过集成 STOMP 协议,提供了一个更高级别的抽象,使得创建基于订阅/发布模式的实时应用程序变得更简单。
通信类型
在计算机科学和数据通信领域中,根据数据传输的特性,通信可以分为同步通信(Synchronous Communication)和异步通信(Asynchronous Communication)两种类型。这两种通信方式在实现数据交换的机制和适用场景上有所不同。
同步通信
在同步通信中,发送方发送请求后必须等待接收方处理完成并返回响应,才能继续执行后续操作。同步通信的特点是:
- 阻塞:在等待响应的过程中,发送方通常处于阻塞状态,无法执行其他任务。
- 直接性:发送方直接等待接收方的响应,直到收到回应后才继续执行。
- 顺序性:通信过程中的操作顺序严格遵循请求-响应的模式。
同步通信的例子包括传统的 HTTP 请求、数据库查询等。
异步通信
异步通信允许发送方在发送请求后不必等待响应就可以继续执行其他任务。异步通信的特点是:
- 非阻塞:发送方发送请求后可以立即执行其他操作,不需等待响应。
- 间接性:发送方不直接等待响应,而是在响应准备好后通过回调、事件、消息队列等机制接收。
- 灵活性:异步通信允许更加灵活地处理多任务并发,提高系统的响应能力和效率。
异步通信的例子包括 WebSocket 通信、消息队列处理、事件驱动的编程等。
多线程环境中标志变量的线程安全性
-
使用
volatile
关键字:您可以将Flag
声明为volatile
。这将确保每次访问Flag
时都是从主内存中进行的,而不是从线程的本地缓存。这样可以确保一个线程对这个变量的修改对其他线程是可见的。private static volatile Boolean Flag;
-
使用同步块:使用
synchronized
关键字同步访问Flag
。这不仅可以保证可见性,还可以保证操作的原子性。 -
使用
AtomicBoolean
:可以使用java.util.concurrent.atomic.AtomicBoolean
代替Boolean
。这个类提供了一种无锁的方式来操作一个boolean
值,同时确保跨线程的可见性。
附录
原始代码
package com.day;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import okhttp3.*;import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.*;public class XunfeiModelV1 extends WebSocketListener {// 地址与鉴权信息 https://spark-api.xf-yun.com/v1.1/chat 1.5地址 domain参数为general// 地址与鉴权信息 https://spark-api.xf-yun.com/v2.1/chat 2.0地址 domain参数为generalv2public static final String hostUrl = "https://spark-api.xf-yun.com/v2.1/chat";public static final String appid = "81*******9";public static final String apiSecret = "ZTd************************mVk";public static final String apiKey = "99d8350*******************e6cea";public static List<RoleContent> historyList=new ArrayList<>(); // 对话历史存储集合public static String totalAnswer=""; // 大模型的答案汇总// 环境治理的重要性 环保 人口老龄化 我爱我的祖国public static String NewQuestion = "";public static final Gson gson = new Gson();// 个性化参数private String userId;private Boolean wsCloseFlag;private static Boolean totalFlag=true; // 控制提示用户是否输入// 构造函数public XunfeiModelV1(String userId, Boolean wsCloseFlag) {this.userId = userId;this.wsCloseFlag = wsCloseFlag;}// 主函数public static void main(String[] args) throws Exception {// 个性化参数入口,如果是并发使用,可以在这里模拟while (true){if(totalFlag){Scanner scanner=new Scanner(System.in);System.out.print("我:");totalFlag=false;NewQuestion=scanner.nextLine();// 构建鉴权urlString authUrl = getAuthUrl(hostUrl, apiKey, apiSecret);OkHttpClient client = new OkHttpClient.Builder().build();String url = authUrl.toString().replace("http://", "ws://").replace("https://", "wss://");Request request = new Request.Builder().url(url).build();for (int i = 0; i < 1; i++) {totalAnswer="";WebSocket webSocket = client.newWebSocket(request, new XunfeiModelV1(i + "",false));}}else{Thread.sleep(200);}}}public static boolean canAddHistory(){ // 由于历史记录最大上线1.2W左右,需要判断是能能加入历史int history_length=0;for(RoleContent temp:historyList){history_length=history_length+temp.content.length();}if(history_length>12000){historyList.remove(0);historyList.remove(1);historyList.remove(2);historyList.remove(3);historyList.remove(4);return false;}else{return true;}}// 线程来发送音频与参数class MyThread extends Thread {private WebSocket webSocket;public MyThread(WebSocket webSocket) {this.webSocket = webSocket;}public void run() {try {JSONObject requestJson=new JSONObject();JSONObject header=new JSONObject(); // header参数header.put("app_id",appid);header.put("uid",UUID.randomUUID().toString().substring(0, 10));JSONObject parameter=new JSONObject(); // parameter参数JSONObject chat=new JSONObject();chat.put("domain","generalv2");chat.put("temperature",0.5);chat.put("max_tokens",4096);parameter.put("chat",chat);JSONObject payload=new JSONObject(); // payload参数JSONObject message=new JSONObject();JSONArray text=new JSONArray();// 历史问题获取if(historyList.size()>0){for(RoleContent tempRoleContent:historyList){text.add(JSON.toJSON(tempRoleContent));}}// 最新问题RoleContent roleContent=new RoleContent();roleContent.role="user";roleContent.content=NewQuestion;text.add(JSON.toJSON(roleContent));historyList.add(roleContent);message.put("text",text);payload.put("message",message);requestJson.put("header",header);requestJson.put("parameter",parameter);requestJson.put("payload",payload);// System.err.println(requestJson); // 可以打印看每次的传参明细webSocket.send(requestJson.toString());// 等待服务端返回完毕后关闭while (true) {// System.err.println(wsCloseFlag + "---");Thread.sleep(200);if (wsCloseFlag) {break;}}webSocket.close(1000, "");} catch (Exception e) {e.printStackTrace();}}}@Overridepublic void onOpen(WebSocket webSocket, Response response) {super.onOpen(webSocket, response);System.out.print("大模型:");MyThread myThread = new MyThread(webSocket);myThread.start();}@Overridepublic void onMessage(WebSocket webSocket, String text) {// System.out.println(userId + "用来区分那个用户的结果" + text);JsonParse myJsonParse = gson.fromJson(text, JsonParse.class);if (myJsonParse.header.code != 0) {System.out.println("发生错误,错误码为:" + myJsonParse.header.code);System.out.println("本次请求的sid为:" + myJsonParse.header.sid);webSocket.close(1000, "");}List<Text> textList = myJsonParse.payload.choices.text;for (Text temp : textList) {System.out.print(temp.content);totalAnswer=totalAnswer+temp.content;}if (myJsonParse.header.status == 2) {// 可以关闭连接,释放资源System.out.println();System.out.println("*************************************************************************************");if(canAddHistory()){RoleContent roleContent=new RoleContent();roleContent.setRole("assistant");roleContent.setContent(totalAnswer);historyList.add(roleContent);}else{historyList.remove(0);RoleContent roleContent=new RoleContent();roleContent.setRole("assistant");roleContent.setContent(totalAnswer);historyList.add(roleContent);}wsCloseFlag = true;totalFlag=true;}}@Overridepublic void onFailure(WebSocket webSocket, Throwable t, Response response) {super.onFailure(webSocket, t, response);try {if (null != response) {int code = response.code();System.out.println("onFailure code:" + code);System.out.println("onFailure body:" + response.body().string());if (101 != code) {System.out.println("connection failed");System.exit(0);}}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}// 鉴权方法public static String getAuthUrl(String hostUrl, String apiKey, String apiSecret) throws Exception {URL url = new URL(hostUrl);// 时间SimpleDateFormat format = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z", Locale.US);format.setTimeZone(TimeZone.getTimeZone("GMT"));String date = format.format(new Date());// 拼接String preStr = "host: " + url.getHost() + "\n" +"date: " + date + "\n" +"GET " + url.getPath() + " HTTP/1.1";// System.err.println(preStr);// SHA256加密Mac mac = Mac.getInstance("hmacsha256");SecretKeySpec spec = new SecretKeySpec(apiSecret.getBytes(StandardCharsets.UTF_8), "hmacsha256");mac.init(spec);byte[] hexDigits = mac.doFinal(preStr.getBytes(StandardCharsets.UTF_8));// Base64加密String sha = Base64.getEncoder().encodeToString(hexDigits);// System.err.println(sha);// 拼接String authorization = String.format("api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"", apiKey, "hmac-sha256", "host date request-line", sha);// 拼接地址HttpUrl httpUrl = Objects.requireNonNull(HttpUrl.parse("https://" + url.getHost() + url.getPath())).newBuilder().//addQueryParameter("authorization", Base64.getEncoder().encodeToString(authorization.getBytes(StandardCharsets.UTF_8))).//addQueryParameter("date", date).//addQueryParameter("host", url.getHost()).//build();// System.err.println(httpUrl.toString());return httpUrl.toString();}//返回的json结果拆解class JsonParse {Header header;Payload payload;}class Header {int code;int status;String sid;}class Payload {Choices choices;}class Choices {List<Text> text;}class Text {String role;String content;}class RoleContent{String role;String content;public String getRole() {return role;}public void setRole(String role) {this.role = role;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}}
}