1、依赖引入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、启动类添加bean
public class Application {/*** 会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint* 要注意,如果使用独立的servlet容器,* 而不是直接使用springboot的内置容器,* 就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。*/@Beanpublic ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}
}
3、websocket服务创建
1、注解@ServerEndpoint("/client/websocket/{deviceId}")
2、地址参数与restful 风格一致
3、方法上通过获取地址参数 @PathParam( value = "deviceId")
4、方法getRemoteAddress() 可以获取客户端IP,如果是本机请求 则返回0.0.0.0.0.1
5、只能通过本地缓存对象sessionMap 存储session信息。
6、如果需要集群、分布式,则使用Nginx 做负载均衡(IP hash)
7、如果需要bean注入其他对象,必须使用构造函数手动申明SpringUtils.getBean(RedisCache.class);
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;/*** 与客户端进行socket通信服务* @author xuancg*/
@ServerEndpoint("/client/websocket/{deviceId}")
@Component
@Slf4j
public class ClientSocketService {private RedisCache redisCache;private ClientProperites clientProperites; /** 用于存储当前服务器连接的socket deviceId-session */private static Map<Long, Session> sessionMap = new ConcurrentHashMap<>(32);/*** 必须通过构造函数引入bean*/public ClientSocketService(){this.redisCache = SpringUtils.getBean(RedisCache.class);this.clientProperites = SpringUtils.getBean(ClientProperites.class); log.info("websocket准备完成");} /*** 连接事件,加入注解* @param deviceId* @param session*/@OnOpenpublic void onOpen(@PathParam( value = "deviceId") Long deviceId, Session session ) {// 设置消息体最大大小及session空闲时间int MAX_MESSAGE_SIZE 2 * 1024 * 1024;session.setMaxTextMessageBufferSize(MAX_MESSAGE_SIZE);session.setMaxBinaryMessageBufferSize(MAX_MESSAGE_SIZE);session.setMaxIdleTimeout(1 * 1000 * 60);log.info("客户端发起连接deviceId={}", deviceId);}/*** 连接事件,加入注解* 用户断开链接* 此处不允许执行删除sessionMap操作。由于deviceId 可能是恶意构造,需要做其他参数,或者请求token 验证,或者通过接收消息关闭onMessage* @param deviceId* @param session*/@OnClosepublic void onClose(@PathParam ( value = "deviceId") Long deviceId, Session session ) {log.info("客户端关闭连接deviceId={}", deviceId);close(session);}/*** 当接收到用户上传的消息* @param deviceId* @param session*/@OnMessagepublic void onMessage(@PathParam ( value = "deviceId") Long deviceId, Session session ,String message) {log.info("接收客户端请求 deviceId=,message={}", deviceId, message);}/*** 给单个用户推送消息* @param session* @param message*/private void sendMessage(Session session, ClientNotifyResp message){if(session == null){return;}// 同步RemoteEndpoint.Async async = session.getAsyncRemote();async.sendText(JSONUtil.toJsonStr(message));}/*** 处理用户活连接异常* @param session* @param throwable*/@OnErrorpublic void onError(Session session, Throwable throwable) {try {session.close();} catch (IOException e) {e.printStackTrace();}throwable.printStackTrace();}private void close(Session session){try {session.close();} catch (IOException e) {e.printStackTrace();}}private static String getRemoteAddress(Session session) {if (session == null) {return null;}RemoteEndpoint.Async async = session.getAsyncRemote();//在Tomcat 8.0.x版本有效//InetSocketAddress addr0 = (InetSocketAddress) getFieldInstance(async,"base#sos#socketWrapper#socket#sc#remoteAddress");//System.out.println("clientIP0" + addr0);//在Tomcat 8.5以上版本有效Object obj = getFieldInstance(async, "base#socketWrapper#socket#sc#remoteAddress");if(null == obj){return "127.0.0.1";}InetSocketAddress addr = (InetSocketAddress) obj;String ip = addr.toString().replace("/", "");int idx = ip.lastIndexOf(":");if(idx > 0){return ip.substring(0, idx);}return ip;}private static Object getFieldInstance(Object obj, String fieldPath) {String fields[] = fieldPath.split("#");for (String field : fields) {obj = getField(obj, obj.getClass(), field);if (obj == null) {return null;}}return obj;}private static Object getField(Object obj, Class<?> clazz, String fieldName) {for (; clazz != Object.class; clazz = clazz.getSuperclass()) {try {Field field;field = clazz.getDeclaredField(fieldName);field.setAccessible(true);return field.get(obj);} catch (Exception e) {}}return null;}}
4、拦截器放行
或者添加自定义拦截器
httpSecurity
// CSRF禁用,因为不使用session TODO
.csrf().disable()
// 认证失败处理类
.exceptionHandling().authenticationEntryPoint(unauthorizedHandler).and()
// 基于token,所以不需要session
.sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS).and()
// 过滤请求
.authorizeRequests()
.antMatchers(HttpMethod.GET, "/client/websocket/**").permitAll()
5、客户端调用demo
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Java后端WebSocket的Tomcat实现</title>
</head>
<body>
Welcome<br/><input id="text" type="text"/>
<button onclick="send()">发送消息</button>
<hr/>
<button onclick="closeWebSocket()">关闭WebSocket连接</button>
<hr/>
<div id="message"></div>
</body><script type="text/javascript">var websocket = null;//判断当前浏览器是否支持WebSocketif ('WebSocket' in window) {websocket = new WebSocket('ws://localhost:8080/mood-service/client/websocket/200013');}else {alert('当前浏览器 Not support websocket')}//连接发生错误的回调方法websocket.onerror = function () {setMessageInnerHTML("WebSocket连接发生错误");};//连接成功建立的回调方法websocket.onopen = function () {setMessageInnerHTML("WebSocket连接成功");}//接收到消息的回调方法websocket.onmessage = function (event) {setMessageInnerHTML(event.data);}//连接关闭的回调方法websocket.onclose = function () {setMessageInnerHTML("WebSocket连接关闭");}//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。window.onbeforeunload = function () {closeWebSocket();}//将消息显示在网页上function setMessageInnerHTML(innerHTML) {document.getElementById('message').innerHTML += innerHTML + '<br/>';}//关闭WebSocket连接function closeWebSocket() {websocket.close();}//发送消息function send() {var message = document.getElementById('text').value;websocket.send(message);}
</script>
</html>