需求:
最近在对接一个物联网里设备,他的通信方式是 websocket 。所以我需要在 springboot框架中集成websocket 依赖,从而实现与设备实时通信!
框架:springboot2.7
java版本:java8
好了,还是直接上代码
第一步:引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
第二步写配置:
package com.agentai.base.config;import com.agentai.base.yumou.webSocket.YuMouDeviceWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;/*** WebSocket配置类* 负责配置WebSocket服务器和注册WebSocket处理器*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {// 注册WebSocket处理器,// 允许所有来源的跨域请求registry.addHandler(deviceWebSocketHandler(), "/linker-dev").setAllowedOrigins("*");}@Beanpublic YuMouDeviceWebSocketHandler deviceWebSocketHandler() {return new YuMouDeviceWebSocketHandler();}@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();// 设置消息缓冲区大小container.setMaxTextMessageBufferSize(8192);container.setMaxBinaryMessageBufferSize(8192);// 设置会话超时时间(毫秒)container.setMaxSessionIdleTimeout(60000L);return container;}
}
第三方:WebSocket会话管理器
package com.agentai.base.yumou.webSocket;import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** WebSocket会话管理器* 负责管理所有WebSocket会话,包括会话状态跟踪、心跳检测和清理过期会话*/
@Slf4j
public class WebSocketSessionManager {// 心跳超时限制(毫秒)private static final long HEARTBEAT_TIMEOUT = 30000;// 心跳检查间隔(毫秒)private static final long HEARTBEAT_CHECK_INTERVAL = 10000;// 心跳消息内容private static final String HEARTBEAT_MESSAGE = "{\"type\":\"ping\"}";// 会话信息,包含WebSocket会话和最后活动时间private static class SessionInfo {WebSocketSession session;long lastActiveTime;SessionInfo(WebSocketSession session) {this.session = session;this.lastActiveTime = Instant.now().toEpochMilli();}void updateLastActiveTime() {this.lastActiveTime = Instant.now().toEpochMilli();}}// 保存所有会话信息private final Map<String, SessionInfo> sessions = new ConcurrentHashMap<>();private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();public WebSocketSessionManager() {// 启动心跳检查任务scheduler.scheduleAtFixedRate(this::checkHeartbeats,HEARTBEAT_CHECK_INTERVAL, HEARTBEAT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);}/*** 添加新的会话* @param session 新的WebSocket会话*/public void addSession(WebSocketSession session) {sessions.put(session.getId(), new SessionInfo(session));log.info("新会话已添加: {}", session.getId());}/*** 移除会话* @param sessionId 会话ID*/public void removeSession(String sessionId) {sessions.remove(sessionId);log.info("会话已移除: {}", sessionId);}/*** 更新会话最后活动时间* @param sessionId 会话ID*/public void updateSessionActivity(String sessionId) {SessionInfo info = sessions.get(sessionId);if (info != null) {info.updateLastActiveTime();}}/*** 发送消息到指定会话* @param sessionId 会话ID* @param message 消息内容* @return 是否发送成功*/public boolean sendMessage(String sessionId, String message) {SessionInfo info = sessions.get(sessionId);if (info != null && info.session.isOpen()) {try {info.session.sendMessage(new TextMessage(message));return true;} catch (IOException e) {log.error("发送消息到会话[{}]失败: {}", sessionId, e.getMessage());}}return false;}/*** 广播消息到所有会话* @param message 消息内容*/public void broadcastMessage(String message) {sessions.forEach((sessionId, info) -> {if (info.session.isOpen()) {try {info.session.sendMessage(new TextMessage(message));} catch (IOException e) {log.error("广播消息到会话[{}]失败: {}", sessionId, e.getMessage());}}});}/*** 检查心跳并清理过期会话*/private void checkHeartbeats() {long now = Instant.now().toEpochMilli();sessions.forEach((sessionId, info) -> {if (now - info.lastActiveTime > HEARTBEAT_TIMEOUT) {try {// 发送心跳消息info.session.sendMessage(new TextMessage(HEARTBEAT_MESSAGE));log.debug("发送心跳到会话: {}", sessionId);} catch (IOException e) {// 如果发送失败,关闭并移除会话log.warn("会话[{}]心跳检测失败,关闭会话: {}", sessionId, e.getMessage());try {info.session.close();} catch (IOException ex) {log.error("关闭会话[{}]失败: {}", sessionId, ex.getMessage());}removeSession(sessionId);}}});}/*** 关闭会话管理器*/public void shutdown() {scheduler.shutdown();sessions.forEach((sessionId, info) -> {try {info.session.close();} catch (IOException e) {log.error("关闭会话[{}]失败: {}", sessionId, e.getMessage());}});sessions.clear();}
}
第四步:设备WebSocket处理器
package com.agentai.base.yumou.webSocket;import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
/*** 设备WebSocket处理器* 负责处理设备的WebSocket连接、消息接收和断开连接*/
@Slf4j
public class YuMouDeviceWebSocketHandler extends TextWebSocketHandler {private final WebSocketSessionManager sessionManager;// 构造函数,初始化会话管理器public YuMouDeviceWebSocketHandler() {this.sessionManager = new WebSocketSessionManager();}/*** WebSocket连接建立后的处理* @param session WebSocket会话*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) {// 将新会话添加到会话管理器String sessionId = session.getId();sessionManager.addSession(session);log.info("WebSocket连接已建立: {}", sessionId);}@AutowiredYuMouService yuMouService;/*** 处理接收到的文本消息* @param session 当前会话* @param message 接收到的文本消息*/@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) {String payload = message.getPayload();String sessionId = session.getId();try {// 更新会话的活动时间sessionManager.updateSessionActivity(sessionId);log.info("接收到设备[{}]的文本消息: {}", sessionId, payload);JSONObject jsonObject = JSONObject.parseObject(payload);log.info("数据:", jsonObject );// 处理其他业务消息// TODO: 添加具体的业务消息处理逻辑} catch (Exception e) {log.error("处理设备[{}]消息时发生错误: {}", sessionId, e.getMessage());}}/*** 处理接收到的二进制消息* @param session 当前会话* @param message 接收到的二进制消息*/@Overrideprotected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) {byte[] payload = message.getPayload().array();String sessionId = session.getId();log.info("接收到设备[{}]的二进制消息,长度: {} 字节", sessionId, payload.length);// 目前只打印消息长度,可以根据需求处理二进制数据// TODO: 添加二进制消息处理逻辑}/*** 处理传输错误* @param session 当前会话* @param exception 错误异常*/@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) {String sessionId = session.getId();log.error("设备[{}]连接传输错误: {}", sessionId, exception.getMessage());}/*** WebSocket连接关闭后的处理* @param session 当前会话* @param status 关闭状态*/@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) {String sessionId = session.getId();sessionManager.removeSession(sessionId);log.info("设备[{}]WebSocket连接已关闭,状态码: {}", sessionId, status.getCode());}/*** 发送消息到指定会话* @param sessionId 会话ID* @param message 消息内容* @return 是否发送成功*/public boolean sendMessage(String sessionId, String message) {return sessionManager.sendMessage(sessionId, message);}/*** 广播消息到所有连接的会话* @param message 消息内容*/public void broadcastMessage(String message) {sessionManager.broadcastMessage(message);}/*** 关闭WebSocket处理器,清理资源*/public void shutdown() {sessionManager.shutdown();}
}