引入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
WebScoket配置处理器
import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.servlet.ServletContext;/*** WebScoket配置处理器*/
@Configuration
public class WebSocketConfig implements ServletContextInitializer {/*** ServerEndpointExporter 作用** 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint** @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}//设置websocket发送内容长度@Overridepublic void onStartup(ServletContext servletContext) {servletContext.setInitParameter("org.apache.tomcat.websocket.textBufferSize","22428800");}
}
webScoket消息对象
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.util.Date;/**
* @author: ws
* @date: 20223/10/26 15:59
* @Description: WebSocketMessage
*/
@Data
public class WebSocketMessage {/**
* 用户ID
*/
private String fromId;/**
* 对方ID
*/
private String toOtherId;
//消息内容
private String message;//发送时间
@JSONField(format="yyyy-MM-dd HH:mm:ss")
public Date date;}
WebSocket操作类
import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSON;
import com.ws.wxyinghang.entity.WebSocketMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;/*** @author: ws* @date: 20223/10/26 15:59* @Description: WebSocket操作类*/
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketSever {// 与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;private String userId;// session集合,存放对应的sessionprivate static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();// concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。private static CopyOnWriteArraySet<WebSocketSever> webSocketSet = new CopyOnWriteArraySet<>();// 用于存放离线消息private static ConcurrentHashMap<String, List<WebSocketMessage>> offlineMessageMap = new ConcurrentHashMap();/*** 建立WebSocket连接** @param session* @param userId 用户ID*/@OnOpenpublic void onOpen(Session session, @PathParam(value = "userId") String userId) {log.info("WebSocket建立连接中,连接用户ID:{}", userId);try {Session historySession = sessionPool.get(userId);// historySession不为空,说明已经有人登陆账号,应该删除登陆的WebSocket对象if (historySession != null) {webSocketSet.remove(historySession);historySession.close();}} catch (IOException e) {log.error("重复登录异常,错误信息:" + e.getMessage(), e);}// 建立连接this.session = session;this.userId = userId;webSocketSet.add(this);sessionPool.put(userId, session);//从离线消息队列里面获取消息if (offlineMessageMap.containsKey(userId)) {List<WebSocketMessage> list = offlineMessageMap.get(userId);Iterator it = list.iterator();while (it.hasNext()) {Object x = it.next();//离线消息接收成功后删除消息Boolean bb = sendOfflineMessageByUser(JSON.toJSONString(x));if (bb) {System.out.println("从队列中删除离线消息" + x);it.remove();}}offlineMessageMap.remove(userId);}log.info("建立连接完成,当前在线人数为:{}", webSocketSet.size());}/*** 发生错误** @param throwable e*/@OnErrorpublic void onError(Throwable throwable) {throwable.printStackTrace();}/*** 连接关闭*/@OnClosepublic void onClose() {webSocketSet.remove(this);sessionPool.remove(this.userId);log.info("连接断开,当前在线人数为:{}", webSocketSet.size());}/*** 接收客户端消息** @param message 接收的消息*/@OnMessagepublic void onMessage(String message) {log.info("收到客户端发来的消息:{}", message);sendMessageByUser(message);}/*** 推送消息到指定用户** @param message 发送的消息*/public static Boolean sendMessageByUser(String message) {WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);Session session = sessionPool.get(msg.getToOtherId());//判断session是否正常if (session == null || !session.isOpen()) {log.info("用户ID:" + msg.getToOtherId() + ",离线,放入离线消息队列中");if (offlineMessageMap.containsKey(msg.getToOtherId())) {List<WebSocketMessage> list = offlineMessageMap.get(msg.getToOtherId());list.add(msg);offlineMessageMap.put(msg.getToOtherId(), list);} else {offlineMessageMap.put(msg.getToOtherId(), ListUtil.toList(msg));}}//发送消息else {try {session.getBasicRemote().sendText(message);} catch (IOException e) {log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);return false;}}return true;}//发送离线消息public static Boolean sendOfflineMessageByUser(String message) {WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);log.info("用户ID:" + msg.getToOtherId() + ",推送内容:" + message);Session session = sessionPool.get(msg.getToOtherId());try {session.getBasicRemote().sendText(message);} catch (IOException e) {log.error("推送消息到指定用户发生错误:" + e.getMessage(), e);return false;}return true;}/*** 群发消息** @param message 发送的消息*/public static void sendAllMessage(String message) {log.info("发送消息:{}", message);for (WebSocketSever webSocket : webSocketSet) {try {webSocket.session.getBasicRemote().sendText(message);} catch (IOException e) {log.error("群发消息发生错误:" + e.getMessage(), e);}}}}
启动项目,使用apiFox测试,新建webScoket接口
新建websocket1,连接后发送消息
新建webScoket2 ,可以看到连接后接收到了消息
如果webScoket2断开连接后, webScoket1继续发送消息,等webScoket2连接后就会收到离线的消息。