全双工通信协议:WebSockets
- 前言
- 何时使用WebSockets
- WebSocket API
- TextWebSocketHandler
- WebSocketConfigurer
- WebSocket握手
- 配置服务器
- 允许的来源
- 心跳包
- Java WebSocket API
- 案例一:前端发送消息并接收后端响应
- 案例二:模拟后端向前端推送消息
- 案例三:发送指定用户消息
- SockJS
- Spring SockJS和前端SockJS区别
- 启用SockJS
- IE 8 and 9
- 心跳
- SockJS and CORS
- SockJsClient
- WebSocketMessageBrokerConfigurer
- 使用SockJS
- 关联文章
前言
WebSocket协议,RFC 6455提供了一种标准化的方法,通过单个TCP连接在客户端和服务器之间建立全双工双向通信通道。它是一种不同于HTTP的TCP协议,但设计为在HTTP上工作,使用端口80和443,并允许重用现有的防火墙规则。
WebSocket交互以使用HTTP请求开始Upgrade头进行升级,或者在这种情况下,切换到WebSocket协议。以下示例显示了这种交互:
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket # Upgrade标头
Connection: Upgrade # 使用Upgrade链接
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
支持WebSocket的服务器返回类似于以下内容的输出,而不是通常的200状态代码:
HTTP/1.1 101 Switching Protocols #协议开关
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
成功握手后,HTTP升级请求下的TCP套接字对客户端和服务器保持开放,以继续发送和接收消息。
请注意,如果WebSocket服务器运行在web服务器(例如nginx)的后面,您可能需要对其进行配置,以便将WebSocket升级请求传递到WebSocket服务器。
尽管WebSocket被设计为HTTP兼容的,并且以HTTP请求开始,但是理解这两种协议导致非常不同的架构和应用程序编程模型是很重要的。
在HTTP和REST中,应用程序被构建为许多URL。为了与应用程序进行交互,客户端以请求-响应的方式访问这些URL。服务器根据HTTP URL、方法和头将请求路由到适当的处理程序。
相比之下,在WebSockets中,初始连接通常只有一个URL。随后,所有应用程序消息都在同一个TCP连接上流动。这指向了一个完全不同的异步、事件驱动的消息传递架构。
WebSocket也是一个低级传输协议,与HTTP不同,它没有为消息内容指定任何语义。这意味着除非客户端和服务器在消息语义上达成一致,否则无法路由或处理消息。
WebSocket客户端和服务器可以通过Sec-WebSocket-Protocol HTTP握手请求上的标头。在这种情况下,他们需要拿出自己的公约。
以Springboot为例,pom依赖如下:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId><version>2.4.0</version></dependency>
何时使用WebSockets
WebSockets可以使网页具有动态性和交互性。然而,在许多情况下,AJAX和HTTP流或长轮询的组合可以提供简单有效的解决方案。
例如,新闻、邮件和社交feed需要动态更新,但是每隔几分钟更新一次也完全没问题。另一方面,协作、游戏和金融应用需要更接近实时。
延迟本身并不是决定性因素。如果消息量相对较低(例如,监控网络故障),HTTP流或轮询可以提供有效的解决方案。低延迟、高频率和高容量的结合是使用WebSocket的最佳案例。
WebSocket API
Spring框架提供了一个WebSocket API,您可以使用它来编写处理WebSocket消息的客户端和服务器端应用程序。
TextWebSocketHandler
TextWebSocketHandler 是 Spring 框架提供的一个用于处理 WebSocket 文本消息的抽象类。它是 WebSocketHandler 的子类,可以通过实现它来处理 WebSocket 中的文本消息。
public class MyHandler extends TextWebSocketHandler {@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// 处理接收到的文本消息String payload = message.getPayload();System.out.println("接收到消息:" + payload);// 发送响应消息session.sendMessage(new TextMessage("Hello, client!"));}@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {// WebSocket 连接建立时执行的操作System.out.println("WebSocket 连接已建立");}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {// WebSocket 连接关闭时执行的操作System.out.println("WebSocket 连接已关闭");}
}
WebSocketConfigurer
WebSocketConfigurer 是 Spring 框架提供的一个接口,用于配置 WebSocket 相关的参数和处理器。比如:可以将前面的WebSocket处理程序映射到特定的URL,如下例所示:
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {webSocketHandlerRegistry.addHandler(myHandler(),"/websocket");//注册了一个名为 myHandler() 的处理器,并将它映射到路径 /websocket 上。}@Beanpublic WebSocketHandler myHandler() {return new MyHandler();}
}
在上面的示例中,通过 @Configuration
注解将该类标记为配置类,并通过 @EnableWebSocket
注解启用 WebSocket 功能。我们注册了一个名为 myHandler()
的处理器,并将它映射到路径 /websocket
上。
WebSocket握手
定制初始HTTP WebSocket握手请求的最简单方法是通过HandshakeInterceptor,它公开了握手“之前”和“之后”的方法。
public interface HandshakeInterceptor {/*** request:当前的 HTTP 请求对象,可以获取请求的头信息等。* response:当前的 HTTP 响应对象,可以设置响应的头信息等。* wsHandler:用于处理 WebSocket 连接和消息的处理器。* attributes:用于保存一些自定义的属性,可以在后续的处理中使用。*/boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception;void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Exception exception);
}
您可以使用这样的拦截器来阻止握手或使WebSocketSession的任何属性可用。下面的例子使用内置拦截器将HTTP会话属性传递给WebSocket会话:
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {webSocketHandlerRegistry.addHandler(myHandler(),"/websocket").addInterceptors(new HttpSessionHandshakeInterceptor());//添加拦截器}@Beanpublic WebSocketHandler myHandler() {return new MyHandler();}
}
当然,你也可以自定义实现拦截器,例如验证用户身份、设置 WebSocketSession 的属性等。
public class MyHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {// 在握手前进行一些预处理操作System.out.println("Before handshake");// 验证用户身份等其他逻辑String username = serverHttpRequest.getHeaders().getFirst("username");if (username == null || username.isEmpty()) {System.out.println("Unauthorized access");return false; // 不允许握手继续进行}// 设置自定义属性,可以在后续处理中使用map.put("username", username);return true; // 允许握手继续进行}@Overridepublic void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {// 在握手后进行一些后续处理操作System.out.println("After handshake");if (e != null) {System.out.println("Handshake failed: " + e.getMessage());}}
}
配置服务器
您可以配置底层WebSocket服务器,例如输入消息缓冲区大小、空闲超时等。
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();container.setMaxBinaryMessageBufferSize(8192); // 设置二进制消息缓冲区大小container.setMaxTextMessageBufferSize(8192); // 设置文本消息缓冲区大小container.setAsyncSendTimeout(5000l); // 设置异步发送超时时间(毫秒)// 其他配置项...return container;}
}
允许的来源
从Spring Framework 4.1.5开始,WebSocket和SockJS的默认行为是只接受同源请求。也可以允许所有或指定的源列表。该检查主要是为浏览器客户端设计的。
您可以配置WebSocket和SockJS允许的源,如下例所示:
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {webSocketHandlerRegistry.addHandler(myHandler(),"/websocket").setAllowedOrigins("*");}
}
setAllowedOrigins("*")
表示允许来自任何源的跨域请求连接到 WebSocket 服务器,当然你也可以设置指定域名或服务器。
心跳包
心跳机制是一种保持长连接有效性的机制,它通过定期发送小型的探测消息来检测连接是否仍然活动。这样可以避免代理服务器、网络设备或其他中间层错误地关闭空闲连接。
前端代码如下:
<!DOCTYPE html>
<html>
<head><title>WebSocket Example</title>
</head>
<body>
<h1>这是一个前台页面</h1>
<input type="text" id="inputMessage"/>
<button onclick="sendMessage()">Send</button>
<div id="messages"></div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.5.0/sockjs.min.js"></script>
<script>var sock = null;var heartbeatInterval = 5000; // 心跳间隔时间,单位为毫秒var heartbeatTimer = null;function connect() {sock = new WebSocket('ws://localhost:8088/ws');sock.onopen = function () {console.log('Connected');startHeartbeat(); // 连接成功后开始发送心跳};sock.onmessage = function (message) {// 判断是否为心跳消息if (message.data === 'heartbeat') {console.log('收到心跳消息');} else {console.log('收到其他消息:' + message);}};sock.onclose = function () {console.log('Disconnected');stopHeartbeat(); // 连接关闭时停止发送心跳};}function startHeartbeat() {heartbeatTimer = setInterval(function () {sock.send('heartbeat'); // 发送心跳消息}, heartbeatInterval);}function stopHeartbeat() {clearInterval(heartbeatTimer);}function showMessage(message) {// 处理收到的消息console.log(message);}function sendMessage() {var inputMessage = document.getElementById('inputMessage').value;sock.send(JSON.stringify({'content': inputMessage}));}connect();
</script>
</body>
</html>
上述代码,连接成功后,通过调用startHeartbeat()
方法轮询每5秒,发送心跳包heartbeat
字符串;后端收到心跳消息响应字符串,通过sock.onmessage
监听消息,若连接关闭后停止发送心跳。
后端示例代码如下:
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {webSocketHandlerRegistry.addHandler(myWebSocketHandler(),"/ws");}@Beanpublic MyWebSocketHandler myWebSocketHandler() {return new MyWebSocketHandler();}
}
public class MyWebSocketHandler extends TextWebSocketHandler {private static final String HEARTBEAT_MESSAGE = "heartbeat";@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String payload = message.getPayload();System.out.println("接收到消息:" + payload);if (HEARTBEAT_MESSAGE.equals(payload)) {// 响应心跳消息session.sendMessage(new TextMessage("heartbeat"));return;}// 处理接收到的文本消息}@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {// WebSocket 连接建立时执行的操作System.out.println("WebSocket 连接已建立");}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {// WebSocket 连接关闭时执行的操作System.out.println("WebSocket 连接已关闭");}}
上述代码,在handleTextMessage()
方法中,接收前端发送的消息,判断为心跳包,并返回响应字符串heartbeat
,表示已经收到。若有其他消息另行处理。
效果展示:
Java WebSocket API
javax.websocket
包是Java WebSocket API的一部分,提供了一组接口和注解,用于开发WebSocket客户端和服务器。
其中一些主要的类和接口包括:
javax.websocket.Session
:代表WebSocket会话,提供发送和接收消息的方法。javax.websocket.Endpoint
:WebSocket端点,可继承该类来创建自定义的WebSocket服务器端点。javax.websocket.ClientEndpoint
:用于创建WebSocket客户端端点。javax.websocket.server.ServerEndpoint
:用于创建WebSocket服务器端点。javax.websocket.OnMessage
、javax.websocket.OnOpen
、javax.websocket.OnClose
等注解:用于标注在WebSocket端点中的方法,以处理消息、开启连接和关闭连接等事件。
示例代码:
@ClientEndpoint
public class MyEndpoint {@OnOpenpublic void onOpen(Session session) {System.out.println("WebSocket opened: " + session.getId());}@OnMessagepublic void onMessage(String message, Session session) {System.out.println("Received message: " + message + " from " + session.getId());try {session.getBasicRemote().sendText("Hello, " + message + "!");} catch (IOException ex) {ex.printStackTrace();}}@OnClosepublic void onClose(Session session) {System.out.println("WebSocket closed: " + session.getId());}public static void main(String[] args) throws Exception {String serverUri = "ws://localhost:8088/ws";ClientEndpointConfig config = ClientEndpointConfig.Builder.create().build();WebSocketContainer container = ContainerProvider.getWebSocketContainer();MyEndpoint client = new MyEndpoint();Session session = container.connectToServer(client, URI.create(serverUri));session.close(); // 关闭会话}}
@ClientEndpoint
和@ServerEndpoint("/websocket")
是Java WebSocket API中的注解,它们分别表示WebSocket客户端端点和WebSocket服务器端点。
案例一:前端发送消息并接收后端响应
示例代码如下:
@Controller
public class MyController {@GetMapping(value = "/websocket")public ModelAndView websocket(HttpServletResponse response) throws Exception {ModelAndView modelAndView = new ModelAndView("websocket");return modelAndView;}
}
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {webSocketHandlerRegistry.addHandler(myWebSocketHandler(),"/ws").addInterceptors(new MyHandshakeInterceptor());}@Beanpublic WebSocketHandler myWebSocketHandler() {return new MyWebSocketHandler();}@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();container.setMaxBinaryMessageBufferSize(8192); // 设置二进制消息缓冲区大小container.setMaxTextMessageBufferSize(8192); // 设置文本消息缓冲区大小container.setAsyncSendTimeout(5000l); // 设置异步发送超时时间(毫秒)// 其他配置项...return container;}
}
public class MyHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {// 在握手前进行一些预处理操作System.out.println("Before handshake");// 验证用户身份等其他逻辑return true; // 允许握手继续进行}@Overridepublic void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {// 在握手后进行一些后续处理操作System.out.println("After handshake");}
}
public class MyWebSocketHandler extends TextWebSocketHandler {private static final String HEARTBEAT_MESSAGE = "heartbeat";@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String payload = message.getPayload();System.out.println("接收到消息:" + payload);if (HEARTBEAT_MESSAGE.equals(payload)) {// 响应心跳消息session.sendMessage(new TextMessage("heartbeat"));return;}// 发送响应消息session.sendMessage(new TextMessage("Hello, client!"));}@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {// WebSocket 连接建立时执行的操作System.out.println("WebSocket 连接已建立");}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {// WebSocket 连接关闭时执行的操作System.out.println("WebSocket 连接已关闭");}
}
前端html,socket.onmessage
获取消息,代码如下:
<!DOCTYPE html>
<html>
<head><title>WebSocket Example</title>
</head>
<body>
<input type="text" id="messageInput" placeholder="Type a message...">
<button onclick="sendMessage()">Send</button>
<div id="messages"></div><script>var socket = null;var heartbeatInterval = 5000; // 心跳间隔时间,单位为毫秒var heartbeatTimer = null;function connect() {socket = new WebSocket('ws://localhost:8088/ws');socket.onopen = function () {console.log('WebSocket连接已建立');startHeartbeat(); // 连接成功后开始发送心跳};socket.onmessage = function (message) {// 判断是否为心跳消息if (message.data === 'heartbeat') {console.log('收到心跳消息');} else {console.log('收到其他消息:' + message);var message = message.data;document.getElementById("messages").innerHTML += "<p>" + message + "</p>";}};socket.onclose = function () {console.log('WebSocket连接已关闭');stopHeartbeat(); // 连接关闭时停止发送心跳};}function startHeartbeat() {heartbeatTimer = setInterval(function () {socket.send('heartbeat'); // 发送心跳消息}, heartbeatInterval);}function stopHeartbeat() {clearInterval(heartbeatTimer);}function sendMessage() {var message = document.getElementById("messageInput").value;socket.send(message);}connect();
</script>
</body>
</html>
效果展示:
案例二:模拟后端向前端推送消息
创建List<WebSocketSession>
,将每个连接的对象存入集合,调用pushMessageToAllClients()
方法,遍历所有WebSocketSession对象,发送消息,示例代码如下:
public class MyWebSocketHandler extends TextWebSocketHandler {private List<WebSocketSession> sessions = new ArrayList<>();private static final String HEARTBEAT_MESSAGE = "heartbeat";@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String payload = message.getPayload();System.out.println("接收到消息:" + payload);if (HEARTBEAT_MESSAGE.equals(payload)) {// 响应心跳消息session.sendMessage(new TextMessage("heartbeat"));return;}// 发送响应消息session.sendMessage(new TextMessage("Hello, client!"));}@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {// WebSocket 连接建立时执行的操作System.out.println("WebSocket 连接已建立");sessions.add(session);}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {// WebSocket 连接关闭时执行的操作System.out.println("WebSocket 连接已关闭");sessions.remove(session);}public void pushMessageToAllClients(String message) {for (WebSocketSession session : sessions) {System.out.println(session);try {session.sendMessage(new TextMessage(message));} catch (IOException e) {e.printStackTrace();}}}
}
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {webSocketHandlerRegistry.addHandler(myWebSocketHandler(),"/ws").addInterceptors(new MyHandshakeInterceptor());}@Beanpublic MyWebSocketHandler myWebSocketHandler() {return new MyWebSocketHandler();}
}
public class MyHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {// 在握手前进行一些预处理操作System.out.println("Before handshake");// 验证用户身份等其他逻辑return true; // 允许握手继续进行}@Overridepublic void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {// 在握手后进行一些后续处理操作System.out.println("After handshake");}
}
后台请求接口,调用sendMessage()
方法,代码如下:
@Controller
public class MyController {@Autowiredprivate MyWebSocketHandler myWebSocketHandler;@GetMapping(value = "/back")public ModelAndView back(HttpServletResponse response) throws Exception {ModelAndView modelAndView = new ModelAndView("back");return modelAndView;}@GetMapping(value = "/websocket")public ModelAndView websocket(HttpServletResponse response) throws Exception {ModelAndView modelAndView = new ModelAndView("websocket");return modelAndView;}@PostMapping(value = "/sendMessage")@ResponseBodypublic void sendMessage(String message) {try {myWebSocketHandler.pushMessageToAllClients(message);}catch (Exception e){e.printStackTrace();}}
}
前台html,socket.onmessage
获取消息,代码如下:
<!DOCTYPE html>
<html>
<head><title>WebSocket Example</title>
</head>
<body>
<h1>这是一个前台页面</h1>
<div id="messages"></div><script>var socket = null;var heartbeatInterval = 5000; // 心跳间隔时间,单位为毫秒var heartbeatTimer = null;function connect() {socket = new WebSocket('ws://localhost:8088/ws');socket.onopen = function () {console.log('WebSocket连接已建立');startHeartbeat(); // 连接成功后开始发送心跳};socket.onmessage = function (message) {// 判断是否为心跳消息if (message.data === 'heartbeat') {console.log('收到心跳消息');} else {console.log('收到其他消息:' + message);var message = message.data;document.getElementById("messages").innerHTML += "<p>" + message + "</p>";}};socket.onclose = function () {console.log('WebSocket连接已关闭');stopHeartbeat(); // 连接关闭时停止发送心跳};}function startHeartbeat() {heartbeatTimer = setInterval(function () {socket.send('heartbeat'); // 发送心跳消息}, heartbeatInterval);}function stopHeartbeat() {clearInterval(heartbeatTimer);}connect();
</script>
</body>
</html>
后台html,发送请求,代码如下:
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org" lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
<h1>这是一个后台页面</h1>
<input type="text" id="textInput" placeholder="Enter text...">
<button onclick="submitText()">Submit</button><script>function submitText() {var text = document.getElementById("textInput").value;console.log(text)// 创建一个新的XMLHttpRequest对象var xhr = new XMLHttpRequest();// 指定请求的方法、URL和是否异步处理xhr.open("POST", "/sendMessage");xhr.setRequestHeader("Content-Type", "application/x-www-form-urlencoded");// 注册回调函数来处理响应xhr.onreadystatechange = function() {if (xhr.readyState === 4 && xhr.status === 200) {// 处理成功响应console.log("Text submitted successfully!");}};// 发送请求xhr.send("message="+text);}
</script>
</body>
</html>
效果展示:
案例三:发送指定用户消息
模拟多个用户,选择聊天对象。创建Map对象(key=sessionId,value=WebSocketSession),用户发送消息时通过sessionId获取WebSocketSession对象,然后调用sendMessage()
方法,完成发送。示例代码如下:
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {webSocketHandlerRegistry.addHandler(myWebSocketHandler(),"/ws").addInterceptors(new MyHandshakeInterceptor());}@Beanpublic MyWebSocketHandler myWebSocketHandler() {return new MyWebSocketHandler();}
}
public class MyWebSocketHandler extends TextWebSocketHandler {private Map<String,WebSocketSession> sessions = new HashMap<>();private static final String HEARTBEAT_MESSAGE = "heartbeat";public List<String> getAllUserId() {return sessions.keySet().stream().collect(Collectors.toList());}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// 处理接收到的文本消息String payload = message.getPayload();System.out.println("接收到消息:" + payload);if (HEARTBEAT_MESSAGE.equals(payload)) {// 响应心跳消息session.sendMessage(new TextMessage("heartbeat"));return;}//转换json对象JSONObject parse = JSONObject.parse(payload);String userId = parse.getString("userId");String content = parse.getString("message");// 获取发送的对象WebSocketSession webSocketSession = sessions.get(userId);try {//发送消息webSocketSession.sendMessage(new TextMessage(content));} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {// WebSocket 连接建立时执行的操作System.out.println("WebSocket 连接已建立");sessions.put(session.getId(),session);}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {// WebSocket 连接关闭时执行的操作System.out.println("WebSocket 连接已关闭");sessions.remove(session.getId());}
}
后台请求接口,getAllUserId()
方法获取用户列表,sendMessage()
方法发送消息,示例代码如下:
@Controller
public class MyController {@Autowiredprivate MyWebSocketHandler myWebSocketHandler;@GetMapping(value = "/getAllUserId")@ResponseBodypublic ResponseEntity<List> getAllUserId() {List<String> allUserId = myWebSocketHandler.getAllUserId();return ResponseEntity.ok(allUserId);}@GetMapping(value = "/chat")@ResponseBodypublic ModelAndView chat(HttpServletResponse response) throws Exception {ModelAndView modelAndView = new ModelAndView("chat");return modelAndView;}
}
前端html,分为四个部分,一部分发送心跳包,一部分提交消息,一部分通过轮询查询用户列表,一部分通过socket.onmessage
获取消息,代码如下:
<!DOCTYPE html>
<html>
<head><title>WebSocket Example</title>
</head>
<body>
<h1>聊天页面</h1>
<input type="text" id="userId" placeholder="用户id">
<input type="text" id="message" placeholder="消息内容">
<button onclick="sendMessage()">Send</button>
<p>用户列表:
<div id="userIds"></div>
</p>
<p>接受消息:
<div id="content"></div>
</p>
<script>var socket = null;var heartbeatInterval = 5000; // 心跳间隔时间,单位为毫秒var heartbeatTimer = null;function connect() {socket = new WebSocket('ws://localhost:8088/ws');socket.onopen = function () {console.log('WebSocket连接已建立');startHeartbeat(); // 连接成功后开始发送心跳};socket.onmessage = function (message) {// 判断是否为心跳消息if (message.data === 'heartbeat') {console.log('收到心跳消息');} else {console.log('收到其他消息:' + message);var message = message.data;document.getElementById("content").innerHTML += "<p>" + message + "</p>";}};socket.onclose = function () {console.log('WebSocket连接已关闭');stopHeartbeat(); // 连接关闭时停止发送心跳};}function startHeartbeat() {heartbeatTimer = setInterval(function () {socket.send('heartbeat'); // 发送心跳消息}, heartbeatInterval);}function stopHeartbeat() {clearInterval(heartbeatTimer);}//发送消息function sendMessage() {var userId = document.getElementById("userId").value;var message = document.getElementById("message").value;//转换json字符串socket.send(JSON.stringify({"userId":userId,"message":message}));}//查询用户列表function pollForNewMessages() {setInterval(function() {// 发送 Ajax 请求var xhr = new XMLHttpRequest();xhr.open("GET", "getAllUserId", true);xhr.onreadystatechange = function() {if (xhr.readyState === 4 && xhr.status === 200) {// 处理服务器返回的新消息var response = JSON.parse(xhr.responseText);document.getElementById("userIds").innerHTML = "";for (var i = 0; i < response.length; i++) {var message = response[i];document.getElementById("userIds").innerHTML += "<p>" + message + "</p>";}}};xhr.send();}, 5000); // 每隔5秒发送一次请求}//连接connect();// 启动轮询pollForNewMessages();
</script>
</body>
</html>
效果展示:
SockJS
SockJS是一个浏览器JavaScript库,它提供了一个类似WebSocket的对象。SockJS的目标是用于实现 WebSocket 的兼容性解决方案。
SockJS是为浏览器设计的。它使用各种技术来支持各种浏览器版本。传输分为三大类:WebSocket、HTTP流和HTTP长轮询。
SockJS客户端首先发送GET /info
从服务器获取基本信息。之后,它必须决定使用哪种传输方式。如果可能,使用WebSocket。如果没有,在大多数浏览器中,至少有一个HTTP流选项。如果不是,则使用HTTP(长)轮询。
Spring SockJS和前端SockJS区别
Spring SockJS 和前端 SockJS 是两个不同的概念。
- Spring SockJS
Spring SockJS 是 Spring 框架提供的一个 WebSocket 子协议,用于在服务器端实现 WebSocket 功能。它通过 WebSocket 或 HTTP 长轮询等技术来实现双向通信,支持与各种类型的客户端进行通信,包括 Web 浏览器、移动应用等。在服务器端,你可以使用 Spring SockJS 提供的 API 来创建和管理 WebSocket 连接,并通过编写 WebSocket 处理器来处理客户端发送来的消息。
- 前端 SockJS
前端 SockJS是一个 JavaScript 库,用于在 Web 浏览器中实现 WebSocket 功能。它提供了一个与 WebSocket API 类似的接口,但使用了一种称为“轮询”的技术来模拟 WebSocket 的行为,以达到在不支持 WebSocket 的浏览器中也能实现实时通信的目的。在前端代码中,你可以通过引入 sockjs 库来创建 WebSocket 连接,并通过监听 onmessage 事件来接收服务器发送来的消息。
总之,Spring SockJS 和前端 SockJS 都是用于实现 WebSocket 功能的技术,但分别运行在服务器端和客户端,提供了不同的 API 和接口来实现双向通信。Spring SockJS 在服务器端提供了更多的功能和灵活性,而前端 sockjs 则提供了跨浏览器的兼容性和易用性。
启用SockJS
您可以通过Java配置来启用SockJS,如下例所示:
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {webSocketHandlerRegistry.addHandler(myHandler(),"/sockjs").withSockJS();}
}
如果不调用withSockJS()
方法,客户端使用SockJS访问报404,示例代码如下:
<!DOCTYPE html>
<html>
<head><title>WebSocket Example</title>
</head>
<body>
<h1>这是一个前台页面</h1>
<input type="text" id="inputMessage" />
<button onclick="sendMessage()">Send</button>
<div id="messages"></div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.5.0/sockjs.min.js"></script>
<script>var sock = null;var heartbeatInterval = 5000; // 心跳间隔时间,单位为毫秒var heartbeatTimer = null;function connect() {sock = new SockJS('http://localhost:8088/sockjs');sock.onopen = function () {console.log('Connected');};sock.onmessage = function (message) {showMessage(message);};sock.onclose = function () {console.log('Disconnected');};}function showMessage(message) {// 处理收到的消息console.log(message);}function sendMessage() {var inputMessage = document.getElementById('inputMessage').value;sock.send(JSON.stringify({ 'content': inputMessage }));}connect();
</script>
</body>
</html>
如图所示:
IE 8 and 9
SockJS客户端通过使用微软的XDomainRequest在ie8和ie9中支持Ajax/XHR流。它可以跨域工作,但不支持发送cookie。cookie对于Java应用程序通常是必不可少的。但是,由于SockJS客户机可以与许多服务器类型(不仅仅是Java服务器)一起使用,因此它需要知道cookie是否重要。如果是这样,SockJS客户端更倾向于使用Ajax/XHR进行流处理。否则,它依赖于基于框架的技术。
默认情况下,Spring SockJS 会从 CDN 地址加载 SockJS 客户端。为了避免在跨域情况下出现问题,你可以将 SockJS 客户端库配置为从与应用程序相同的源加载。例如,如果你的应用程序位于 http://localhost:8080/
,那么你可以将 SockJS 客户端库的 URL 配置为 http://localhost:8080/myapp/js/sockjs-client.js
。
下面的例子展示了如何在Java配置中这样做:
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {webSocketHandlerRegistry.addHandler(myWebSocketHandler(), "/sockjs").withSockJS().setClientLibraryUrl("http://localhost:8080/myapp/js/sockjs-client.js");}
}
调用setClientLibraryUrl()
方法设置 SockJS 客户端库的 URL。
心跳
SockJS协议要求服务器发送心跳消息,以防止代理断定连接被挂起(在 SockJS 中,通常是服务器负责发送心跳消息来保持连接的活跃状态,而客户端不需要显式地发送心跳。)。Spring SockJS配置有一个名为heartbeatTime可以用来定制频率。默认情况下,心跳在25秒后发送,假设该连接上没有发送其他消息。
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {webSocketHandlerRegistry.addHandler(myHandler(),"/sockjs").withSockJS().setHeartbeatTime(5000);}
}
上述代码中,调用setHeartbeatTime()
方法设置心跳时间间隔为 5 秒,如果超过了指定的时间间隔而没有收到心跳信号,SockJS 通常会假定连接已断开或丢失(本地测试不会有这种情况)。具体行为可能因使用的库或框架而异,但通常会触发一些事件或采取适当的措施,如重新连接。
效果展示
当使用STOMP over WebSocket和SockJS时,如果STOMP客户端和服务器协商要交换的心跳,则SockJS心跳被禁用。
HTTP流和HTTP长轮询SockJS传输要求连接保持比通常更长的开放时间。一个特定的问题是Servlet API不为已经离开的客户端提供通知。
Spring尽最大努力识别这种表示客户端断开(特定于每个服务器)的网络故障,并通过使用专用日志类别DISCONNECTED_CLIENT_LOG_CATEGORY(在AbstractSockJsSession中定义)记录最小的消息。如果需要查看堆栈跟踪,可以将该日志类别设置为TRACE。
SockJS and CORS
如果允许跨域请求,SockJS协议将在XHR流和轮询传输中使用CORS进行跨域支持。因此,除非检测到响应中存在CORS标头,否则将自动添加CORS标头。因此,如果应用程序已经配置为提供CORS支持(例如,通过Servlet过滤器),Spring的SockJsService将跳过这一部分。
SockJS期望以下头和值:
- Access-Control-Allow-Origin:从Origin请求头的值初始化,比如:
http://example.com
。 - Access-Control-Allow-Credentials:始终设置为true。
- Access-Control-Request-Headers:根据等价请求头的值初始化。
- Access-Control-Allow-Methods:传输支持的HTTP方法(参见TransportType enum)。
- Access-Control-Max-Age:设置为31536000(1年)。
如果CORS配置允许,考虑排除带有SockJS端点前缀的URL,从而让Spring的SockJsService处理它。
SockJsClient
Spring提供了一个SockJS Java客户端,可以在不使用浏览器的情况下连接到远程SockJS端点。当需要在公共网络上的两个服务器之间进行双向通信时(也就是说,在网络代理可以排除使用WebSocket协议的情况下),这尤其有用。SockJS Java客户端对于测试目的也非常有用(例如,模拟大量并发用户)。
以下示例显示了如何创建SockJS客户端并连接到SockJS端点:
//底层 WebSocket 客户端
WebSocketClient webSocketClient = new StandardWebSocketClient();
List<Transport> transports = new ArrayList<>(1);
transports.add(new WebSocketTransport(webSocketClient));
//实例
SockJsClient sockJsClient = new SockJsClient(transports);
// STOMP 协议的 WebSocket 客户端
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);String url = "http://localhost:8080/portfolio";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
//建立与服务器的连接,并发送一条消息。
StompSession stompSession = stompClient.connect(url, sessionHandler).get();
stompSession.send("/message", "Hello, server!");
SockJS对消息使用JSON格式的数组。默认情况下,使用Jackson 2,并且需要在类路径中。或者,您可以配置的自定义实现SockJsMessageCodec并在上配置它SockJsClient.
以下示例显示了您也应该考虑自定义的服务器端SockJS相关属性:
@Configuration
public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/sockjs").withSockJS()//设置 SockJS 流模式下的缓存大小限制,默认是 128 KB。.setStreamBytesLimit(512 * 1024) //设置 HTTP 传输模式下消息缓存的大小限制,默认是 100 条消息。.setHttpMessageCacheSize(1000) //设置 SockJS 断开连接的延迟时间,默认是 5 秒。.setDisconnectDelay(30 * 1000); }
}
WebSocketMessageBrokerConfigurer
WebSocketMessageBrokerConfigurer也是Spring框架的接口,用于配置基于消息代理的 WebSocket。通过实现该接口,你可以配置消息代理、消息端点、消息转发等功能,以实现复杂的 WebSocket 消息传递和处理逻辑,同样也支持Websocket代码中的配置信息。
重载registerStompEndpoints()
方法,建立 WebSocket 连接,等设置。示例如下:
- 注册端点
示例代码如下:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfigurer implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/sockjs");//注册端点}
}
- 设置允许来源
示例代码如下:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfigurer implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/sockjs")//注册端点.setAllowedOrigins("*");//设置来源}
}
- 启用SockJS
示例代码如下:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfigurer implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/sockjs")//注册端点.withSockJS();//启用SockJS}
}
- 设置Spring SockJS资源地址
示例代码如下:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfigurer implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/sockjs")//注册端点.withSockJS() .setClientLibraryUrl("http://localhost:8080/myapp/js/sockjs-client.js")//Spring SockJS资源地址;}
}
- 设置心跳时长
示例代码如下:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfigurer implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/sockjs")//注册端点.withSockJS() .setHeartbeatTime(5000)//设置心跳时长;}
}
重载configureMessageBroker()
方法配置消息代理,在这里你可以设置消息代理的前缀、订阅地址等。示例如下:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfigurer implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {// 配置消息代理registry.enableSimpleBroker("/topic"); //指定了消息代理的目标前缀,即服务器会将以“/topic”和开头的消息发送到所有订阅了对应前缀的客户端。registry.setApplicationDestinationPrefixes("/app");//指定了消息发送的前缀,即发送的消息会以“/app”开头}
}
这一章内容主要以WebSocketConfigurer 配置为主,后续更多的用法在下一章节内容讲解。
使用SockJS
前面介绍了WebSocket的案例,我们以案例一为例,将代码替换为SockJS,分两步走:
- 第一步
服务端启动SockJS,调用withSockJS()
方法。
- 第二步
引入SockJS库,然后把 new WebSocket(url);
替换成 new SockJS(url);
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.5.0/sockjs.min.js"></script>
完整示例代码如下:
@Controller
public class MyController {@GetMapping(value = "/websocket")@ResponseBodypublic ModelAndView websocket(HttpServletResponse response) throws Exception {ModelAndView modelAndView = new ModelAndView("websocket");return modelAndView;}
}
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {webSocketHandlerRegistry.addHandler(myWebSocketHandler(), "/sockjs").withSockJS()//启用SockJS.setHeartbeatTime(5000);}@Beanpublic MyWebSocketHandler myWebSocketHandler() {return new MyWebSocketHandler();}
}
public class MyWebSocketHandler extends TextWebSocketHandler {@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String payload = message.getPayload();System.out.println("接收到消息:" + payload);// 发送响应消息session.sendMessage(new TextMessage("Hello, client!"));}@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {// WebSocket 连接建立时执行的操作System.out.println("WebSocket 连接已建立");}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {// WebSocket 连接关闭时执行的操作System.out.println("WebSocket 连接已关闭");}
}
前端代码如下:
<!DOCTYPE html>
<html>
<head><title>WebSocket Example</title>
</head>
<body>
<input type="text" id="messageInput" placeholder="Type a message...">
<button onclick="sendMessage()">Send</button>
<div id="messages"></div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.5.0/sockjs.min.js"></script>
<script>var socket = null;function connect() {socket = new SockJS('http://localhost:8088/sockjs');socket.onopen = function () {console.log('WebSocket连接已建立');};socket.onmessage = function (message) {// 判断是否为心跳消息console.log('收到其他消息:' + message);var message = message.data;document.getElementById("messages").innerHTML += "<p>" + message + "</p>";};socket.onclose = function () {console.log('WebSocket连接已关闭');};}function sendMessage() {var message = document.getElementById("messageInput").value;socket.send(message);}connect();
</script>
</body>
</html>
效果展示:
握手成功后,服务器每5秒负责发送心跳消息来保持连接的活跃状态,当遇到不支持WebSocket的情况时,SockJS会尝试使用其他的方案来连接。
关联文章
全双工通信协议:WebSockets+STOMP