创建Websocket处理器继承AbstractWebSocketHandler
覆写public void afterConnectionEstablished(WebSocketSession session)方法,建立Websocket连接 覆写protected void handleTextMessage(WebSocketSession session, TextMessage message)方法,处理接收的消息 发送消息方法webSocketSession.getWebSocketSession().sendMessage(new TextMessage(msg)); 覆写异常处理方法public void handleTransportError(WebSocketSession session, Throwable exception) 覆写websocket关闭方法public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
@Slf4j
@Component
public class MyWsHandler extends AbstractWebSocketHandler { private static final Map < String , Session > sessionMap ; static { sessionMap = new ConcurrentHashMap < > ( ) ; } @Override public void afterConnectionEstablished ( WebSocketSession session) throws Exception { super . afterConnectionEstablished ( session) ; sessionMap. put ( session. getId ( ) , session) ; log. info ( session. getClientId ( ) + "建立了连接" ) ; } @Override protected void handleTextMessage ( WebSocketSession session, TextMessage message) throws Exception { super . handleTextMessage ( session, message) ; log. info ( sessionBeanMap. get ( session. getId ( ) ) . getClientId ( ) + ":" + message. getPayload ( ) ) ; String param = message. getPayload ( ) ; sendMessageToOne ( session. getId ( ) , param) ; } @Override public void handleTransportError ( WebSocketSession session, Throwable exception) throws Exception { super . handleTransportError ( session, exception) ; if ( session. isOpen ( ) ) { session. close ( ) ; } sessionMap. remove ( session. getId ( ) ) ; } @Override public void afterConnectionClosed ( WebSocketSession session, CloseStatus status) throws Exception { int clientId = sessionMap. get ( session. getId ( ) ) . getClientId ( ) ; sessionMap. remove ( session) ; log. info ( clientId+ "关闭了连接" ) ; super . afterConnectionClosed ( session, status) ; } public void sendMessageToOne ( String userId, String param) { try { sessionMap. get ( userId) . sendMessage ( new TextMessage ( "Hello World!" ) ) ; } catch ( IOException | ExecutionException | InterruptedException e) { e. printStackTrace ( ) ; } }
}
创建Websocket拦截器继承HttpSessionHandshakeInterceptor(非必须)
建立连接前做处理,覆写public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) 建立连接后做处理,覆写public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex)
@Component
@Slf4j
public class MyInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake ( ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map < String , Object > attributes) throws Exception { log. info ( request. getRemoteAddress ( ) . toString ( ) + "开始握手" ) ; return super . beforeHandshake ( request, response, wsHandler, attributes) ; } @Override public void afterHandshake ( ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { log. info ( request. getRemoteAddress ( ) . toString ( ) + "完成握手" ) ; super . afterHandshake ( request, response, wsHandler, ex) ; }
}
注册Websocket处理器和Websocket拦截器
@Configuration
@EnableWebSocket
public class MyWsConfig implements WebSocketConfigurer { @Resource private MyWsHandler myWsHandler; @Resource private MyInterceptor myWsInterceptor; @Override public void registerWebSocketHandlers ( WebSocketHandlerRegistry registry) { registry. addHandler ( myWsHandler, "/myWebsocket" ) . addInterceptors ( myWsInterceptor) . setAllowedOrigins ( "*" ) ; }
}