基于WebFlux的Websocket
一、导入XML依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency><!-- 或者引入jackson -->
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.26</version>
</dependency>
二、定义配置类,设置WebSocket拦截器
@Configuration
@EnableWebFlux
public class WebSocketConfig implements WebFluxConfigurer {@Beanpublic HandlerMapping handlerMapping() {Map<String, WebSocketHandler> map = new HashMap<>();map.put("/ws/chat", new MyWebSocketChatHandler());map.put("/ws/echo", new MyWebSocketEchoHandler());SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();mapping.setUrlMap(map);mapping.setOrder(-1); // 需要设置较高的优先级,以避免与其他处理程序冲突return mapping;}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
三、设置处理器,WebSocket的处理器
// 1. Echo的处理器
public class MyWebSocketEchoHandler implements WebSocketHandler {@NotNull@Overridepublic Mono<Void> handle(WebSocketSession session) {return session.send(session.receive().map(msg -> "Echo: " + msg.getPayloadAsText()).map(session::textMessage));}
}
设置自定义的处理器(高级处理)
public class MyWebSocketChatHandler implements WebSocketHandler {private static final Map<String, WebSocketSession> userMap = new ConcurrentHashMap<>();private static final ObjectMapper objectMapper = new ObjectMapper();@NotNull@Overridepublic Mono<Void> handle(WebSocketSession session) {
// String query = session.getHandshakeInfo().getUri().getQuery();URI uri = session.getHandshakeInfo().getUri();
// Map<String, String> queryMap = getQueryMap(query);Map<String, String> queryMap = parseQueryParams(uri);String userId = queryMap.getOrDefault("userId", "");userMap.put(userId, session);System.out.println("当前用户:" + userId);System.out.println("当前在线人数:" + userMap.size());return session.receive().flatMap(webSocketMessage -> {String payload = webSocketMessage.getPayloadAsText();Message message;try {message = objectMapper.readValue(payload, Message.class);if(Integer.parseInt(message.getCode())==CodeEnum.SUCCESS.getCode()){// 执行成功模式Mono<Void> targetSession = SuccessMode(message);if (targetSession != null) return targetSession;}else if(Integer.parseInt(message.getCode())==CodeEnum.ERROR.getCode()){// 执行出错模式return session.send(Mono.just(session.textMessage("消发送出错了")));}else{// 其他code的功能实现return session.send(Mono.just(session.textMessage("消息格式错误")));}} catch (JsonProcessingException e) {e.printStackTrace();// 这里一定要return不然会导致线程卡死直接断开连接return session.send(Mono.just(session.textMessage(e.getMessage())));}return session.send(Mono.just(session.textMessage("目标用户不在线")));}).then().doFinally(signal -> userMap.remove(userId)); // 用户关闭连接后删除对应连接}@Nullableprivate static Mono<Void> SuccessMode(Message message) {String targetId = message.getTargetId();if (userMap.containsKey(targetId)) {WebSocketSession targetSession = userMap.get(targetId);if (null != targetSession) {WebSocketMessage textMessage = targetSession.textMessage(CombineMessage(targetId, message.getMessageText()));return targetSession.send(Mono.just(textMessage));}}return null;}private static String CombineMessage(String targetId, String message) {// 创建一个JSONObject对象JSONObject json = new JSONObject();// 将参数添加到JSONObject中json.put("targetId", targetId);json.put("message", message);// 将JSONObject转换为字符串return json.toString();}// 其他的实现private Map<String, String> getQueryMap(String queryStr) {Map<String, String> queryMap = new HashMap<>();if (StringUtils.hasText(queryStr)) {String[] queryParam = queryStr.split("&");Arrays.stream(queryParam).forEach(s -> {String[] kv = s.split("=", 2);String value = kv.length == 2 ? kv[1] : "";queryMap.put(kv[0], value);});}return queryMap;}private static Map<String, String> parseQueryParams(URI uri) {return Arrays.stream(uri.getQuery().split("&")).map(param -> param.split("=")).collect(Collectors.toMap(array -> array[0],array -> array.length > 1 ? array[1] : ""));}}
注意要先配置一个实体类映射—(注意客户端的信息一定也是要json格式不然会报错哟)
@Data
public class Message {@JsonProperty("code")private String code;@JsonProperty("targetId")private String targetId;@JsonProperty("messageText")private String messageText;@JsonProperty("userId")private String userId;
}
枚举类设置code对应的信息
public enum CodeEnum {SUCCESS(1),ERROR(2);// 其他枚举值...private final Integer code;CodeEnum(int code) {this.code = code;}public Integer getCode() {return code;}
}
最后运行即可。注意访问ws://localhost:8081/ws/chat?userId=123
实现私聊的功能,访问ws://localhost:8081/ws/echo
即可实现简单的服务器和客户端的回应。群聊功能可以根据自己的需求进行实现,只需要添加对应的code
以及获取所有session
并发送message
即可。