< dependency> < groupId> org.java-websocket< /groupId> < artifactId> Java-WebSocket< /artifactId> < version> 1.5 .7 < /version> < /dependency>
import cn.hutool.json.JSONUtil;
import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake; import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; /*** WebSocket client* * @author Mr丶s* @date 2024 /9/3 22 :06* @description*/
@Slf4j
public class MyWebSocketClient extends WebSocketClient { // 用于存储接收到的客户端响应的 CompletableFuture 对象private CompletableFuture< JSONObject> responseFuture; // 心跳检测的调度程序private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 ) ; /*** 构造函数,初始化 WebSocket 客户端连接 URI** @param serverUri WebSocket 服务器的 URI* @throws URISyntaxException 如果 URI 格式不正确*/public MyWebSocketClient( URI serverUri) throws URISyntaxException { super( serverUri) ; } /*** 当与服务器建立连接时调用此方法** @param handshakedata 握手数据*/@Overridepublic void onOpen( ServerHandshake handshakedata) { log.info( "Connected to server" ) ; startHeartbeat( ) ; } /*** 当收到服务器的消息时调用此方法** @param message 服务器发送的消息*/@Overridepublic void onMessage( String message) { // 使用 Hutool 将消息解析为 JSON 对象JSONObject jsonResponse = JSONUtil.parseObj( message) ; log.info( "Received JSON response: " + jsonResponse.toStringPretty( )) ; // 如果 responseFuture 存在,则将 JSON 响应设置为其结果if ( responseFuture != null) { responseFuture.complete( jsonResponse) ; } } /*** 当连接关闭时调用此方法** @param code 关闭的状态码* @param reason 关闭的原因* @param remote 是否是远程关闭*/@Overridepublic void onClose( int code, String reason, boolean remote) { log.info( "Connection closed with exit code " + code + " additional info: " + reason) ; stopHeartbeat( ) ; } /*** 当发生错误时调用此方法** @param ex 发生的异常*/@Overridepublic void onError( Exception ex) { System.err.println( "An error occurred: " + ex.getMessage( )) ; // 如果发生错误并且 responseFuture 存在,则将异常设置为 responseFuture 的结果if ( responseFuture != null) { responseFuture.completeExceptionally( ex) ; } } /*** 发送 JSON 格式的数据到服务器** @param jsonData 要发送的 JSON 数据(Hutool 的 JSONObject 对象)*/public void sendJsonMessage( JSONObject jsonData) { // 发送 JSON 数据到 WebSocket 服务器this.send( jsonData.toString( )) ; log.info( "Sent JSON data: " + jsonData.toStringPretty( )) ; } /*** 检查 WebSocket 连接是否已建立** @return 如果连接已建立,则返回 true;否则返回 false */public boolean isConnected ( ) { return this.isOpen( ) ; } /*** 发送 JSON 数据并异步获取服务器响应** @param jsonData 要发送的 JSON 数据(Hutool 的 JSONObject 对象)* @return 包含服务器响应的 CompletableFuture 对象*/public CompletableFuture< JSONObject> sendMessageAndGetResponse( JSONObject jsonData) { // 初始化 CompletableFuture 对象,用于存储未来的响应responseFuture = new CompletableFuture<> ( ) ; // 发送 JSON 消息sendJsonMessage( jsonData) ; // 返回 CompletableFuture 对象,客户端可以等待此 future 的完成return responseFuture; } /*** 启动心跳检测,定时发送心跳消息*/private void startHeartbeat ( ) { scheduler.scheduleAtFixedRate(( ) - > {if ( isConnected( )) { // 发送心跳消息sendJsonMessage( JSONUtil.createObj( ) .set( "type" , "heartbeat" )) ; log.info( "Heartbeat sent" ) ; } else { log.info( "Disconnected, attempting to reconnect" ) ; reconnect( ) ; } } , 0 , 30 , TimeUnit.SECONDS ) ; // 每 30 秒发送一次心跳} /*** 停止心跳检测*/private void stopHeartbeat ( ) { scheduler.shutdownNow( ) ; } /*** 尝试重新连接 WebSocket 服务器*/@Overridepublic void reconnect ( ) { try { this.reconnectBlocking( ) ; log.info( "Reconnected to server" ) ; } catch ( InterruptedException e) { e.printStackTrace( ) ; } }
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import cn.hutool.json.JSONObject;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; /*** WebSocket 服务类** @author Mr丶s* @date 2024 /8/18 下午9:23* @description*/
@Slf4j
@Service
public class WebSocketClientService { private MyWebSocketClient myWebSocketClient; /*** 初始化 WebSocket 客户端并建立连接* 此方法在服务启动时调用,确保 WebSocket 客户端可以与服务器进行通信。*/@PostConstructpublic void init ( ) { try { // WebSocket 服务器 URIURI uri = new URI( "ws://58.57.65.22:2315" ) ; // URI uri = new URI( "ws://192.168.1.106:5523" ) ; // 创建 WebSocket 客户端实例myWebSocketClient = new MyWebSocketClient( uri) ; // 尝试建立连接connectIfNotConnected( ) ; } catch ( URISyntaxException e) { e.printStackTrace( ) ; } } /*** 如果 WebSocket 未连接,则建立连接* 确保 WebSocket 连接是打开的,如果连接未打开,则尝试建立连接。*/private void connectIfNotConnected ( ) { try { if ( ! myWebSocketClient.isConnected( )) { // 阻塞式连接,直到连接成功myWebSocketClient.connectBlocking( ) ; log.info( "WebSocket client connected" ) ; } else { log.info( "WebSocket client already connected" ) ; } } catch ( InterruptedException e) { e.printStackTrace( ) ; } } /*** 发送 JSON 数据到 WebSocket 服务器并获取响应** @param jsonData 要发送的 JSON 数据(Hutool 的 JSONObject 对象)* @return 服务器返回的 JSON 数据(Hutool 的 JSONObject 对象)*/public JSONObject sendJsonData( JSONObject jsonData) { connectIfNotConnected( ) ; // 确保已建立连接// 发送消息并异步等待服务器的响应CompletableFuture< JSONObject> responseFuture = myWebSocketClient.sendMessageAndGetResponse( jsonData) ; try { // 阻塞直到获得响应,并返回响应数据return responseFuture.get( ) ; } catch ( InterruptedException | ExecutionException e) { e.printStackTrace( ) ; return null; } } /*** 关闭 WebSocket 连接*/@PreDestroypublic void close ( ) { try { myWebSocketClient.closeBlocking( ) ; log.info( "WebSocket client connection closed" ) ; } catch ( InterruptedException e) { e.printStackTrace( ) ; } }
}