★ 基于Web Flux开发WebSocket
两步:
(1)实现WebSocketHandler开发WebSocket处理类。
实现该接口时只需要实现Mono handle(WebSocketSession webSocketSession)方法即可。
(2)使用HandlerMapping和WebSocketHandlerAdapter注册WebSocket处理类。
★ 反应式的WebSocket处理类
反应式API模型下,WebSocketSession的receive()方法返回的只是Flux(消息发布者),
它并不会同步获取消息,也不会阻塞。
类似的,WebSocketSession的send()方法发送的也只是Flux(消息发布者)
因此WebSocket处理类receive()消息之后,程序依然使用map()等方法对Flux中的数据项进行处理。
★ 配置基于WebFlux的WebSocket
要配置两个Bean:
-
HandlerMapping(通常使用SimpleUrlHandlerMapping实现类即可)Bean,它定义URL与WebSocketHandler Bean的映射关系。
-
WebSocketHandlerAdapter:它负责管理对WebSocketHandler Bean进行适配。
它会自动对容器中所有的WebSocketHandler Bean进行适配,
因此,意味着无论容器中有多少个WebSocketHandler ,该WebSocketHandlerAdapter只要配置一个即可。
可直接使用www.websocket.org/echo.html页面来测试WebSocket
代码演示
1、创建项目
MyWebSocketHandler
实现 WebSocKet 处理类
package cn.ljh.webflux_websocket.websockethandler;import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;//实现 WebSocKet 处理类@Component
public class MyWebSocketHandler implements WebSocketHandler
{//实现这个接口,只需要实现一个方法,这个方法可通过 WebSocketSession 获取 Flux//这个方法并不需要处理具体的数据,它面向的是Flux编程@Overridepublic Mono<Void> handle(WebSocketSession webSocketSession){//接收消息时,得到的并不是具体的消息,而是 Flux , 这个 Flux 负责消息通信,就是个消息通道Flux<WebSocketMessage> sourceFlux = webSocketSession.receive();//map() 方法里的参数,就是 function的apply()方法,通常写成 Lambda 表达式Flux<WebSocketMessage> resultFlux = sourceFlux.map(message ->{//textMessage() 方法负责将 String 转换成 WebSocketMessage//这个 message 是 WebSocketMessage 类型,WebSocketMessage.getPayloadAsText()负责将消息数据转成StringWebSocketMessage webSocketMessage = webSocketSession.textMessage("回复:" + message.getPayloadAsText());return webSocketMessage;});//发送消息Mono<Void> sendMessage = webSocketSession.send(resultFlux);return sendMessage;}
}
WebSocketConfig
配置基于WebFlux的WebSocket
package cn.ljh.webflux_websocket.config;import cn.ljh.webflux_websocket.websockethandler.MyWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;import java.util.HashMap;
import java.util.Map;//配置基于WebFlux的WebSocket@Configuration
public class WebSocketConfig
{//这个bean负责对容器中所有的 WebSocketHandler Bean 进行适配@Beanpublic WebSocketHandlerAdapter webSocketHandlerAdapter(){return new WebSocketHandlerAdapter();}//MyWebSocketHandler 会接受容器中的依赖注入@Beanpublic HandlerMapping handlerMapping(MyWebSocketHandler myWebSocketHandler){//定义 URL 与 WebSocketHandler Bean 之间的映射关系//就是向这个 /myWebSocket 地址发送请求的时候,就将这个请求交给这个 myWebSocketHandler 处理类进行处理Map map = Map.of("/myWebSocket",myWebSocketHandler);//参数1:指定 URL 和 Handler 之间的映射关系 , 参数2:就是优先级SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping(map,-1);return simpleUrlHandlerMapping;}}
前端没有写,www.websocket.org/echo.html页面已经没法测试WebSocket了。
现在简单的websocket 就完成了
通过 webFlux 弄一个webSocket 的聊天室。
完整代码:
client.html
这个客户端页面,先放在static 静态路径下面,就可以直接访问
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title> 基于WebSocket的多人聊天 </title><script type="text/javascript">// 定义Web Socket对象var webSocket = null;let sendMsg = function(){if (webSocket == null || webSocket.readyState != 1){document.getElementById('show').innerHTML+= "还未连接服务器,请先连接WebSocket服务器<br>";return;}let inputElement = document.getElementById('msg');// 发送消息webSocket.send(inputElement.value);// 清空单行文本框inputElement.value = "";}let connect = function(){let name = document.getElementById('name').value.trim();if (name == null || name == ""){document.getElementById('show').innerHTML+= "用户名不能为空<br>";return;}if (webSocket && webSocket.readyState == 1){webSocket.close();}webSocket = new WebSocket("ws://127.0.0.1:8080/myWebSocket/" + name);webSocket.onopen = function(){document.getElementById('show').innerHTML+= "恭喜您,连接服务器成功!<br>";document.getElementById('name').value = "";// 为onmessage事件绑定监听器,接收消息webSocket.onmessage= function(event){// 接收、并显示消息document.getElementById('show').innerHTML+= event.data + "<br>";}};}</script>
</head>
<body>
<input type="text" size="20" id="name" name="name"/>
<input type="button" value="连接" onclick="connect();"/>
<div style="width:600px;height:240px;overflow-y:auto;border:1px solid #333;" id="show"></div>
<input type="text" size="80" id="msg" name="msg"/>
<input type="button" value="发送" onclick="sendMsg();"/>
</body>
</html>
MyWebSocketHandler
实现 WebSocKet 处理类
package cn.ljh.webflux_websocket.websockethandler;import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;//实现 WebSocKet 处理类@Component
public class MyWebSocketHandler implements WebSocketHandler
{//创建一个线程安全的map来存聊天信息, FluxSink 代表了发送消息的通道public static final Map<WebSocketSession, FluxSink<WebSocketMessage>> myClients = new ConcurrentHashMap<>();//实现这个接口,只需要实现一个方法,这个方法可通过 WebSocketSession 获取 Flux//这个方法并不需要处理具体的数据,它面向的是Flux编程@Overridepublic Mono<Void> handle(WebSocketSession webSocketSession){//1、获取连接路径: 得到 WenSocket 的连接路径String path = webSocketSession.getHandshakeInfo().getUri().getPath();//2、获取用户名: 获取path路径字符串最后一个斜杠之后的内容,也就是获取聊天的用户名String name = path.substring(path.lastIndexOf("/") + 1);//3、接收消息: 接收消息时,得到的并不是具体的消息,而是 Flux , 这个 Flux 负责消息通信,就是个消息通道Flux<WebSocketMessage> sourceFlux = webSocketSession.receive();//map() 方法里的参数,就是 function的apply()方法,通常写成 Lambda 表达式Mono<Void> mono1 = sourceFlux.map(message ->{//这个 message 是 WebSocketMessage 类型,WebSocketMessage.getPayloadAsText()负责将消息数据转成String//获取用户发送的消息String payloadAsText = message.getPayloadAsText();//返回 用户名+消息String nameAndMessage = name + " : " + payloadAsText;return nameAndMessage;})//4、实现消息广播: 把消息发给每一个用户//此时的 message 已经是转换之后的 message 了,这时候是 String 类型.doOnNext(message ->{//此处做消息广播, keySet()用于遍历map中的所有key,存在一个set集合中for (WebSocketSession session : myClients.keySet()){//通过session这个key , 获取消息通道FluxSinkFluxSink<WebSocketMessage> fluxSink = myClients.get(session);//调用 fluxSink 的 next() 方法向 Flux 发送消息//textMessage() 方法负责将 String 转换成 WebSocketMessage,把string类型的消息转回 WebSocketMessagefluxSink.next(session.textMessage(message));}//.then() 方法 讲解的时候说是合并上面的消息操作,百度说是异步执行}).then();//创建要发送消息的 outFluxFlux<WebSocketMessage> outFlux = Flux.create(fluxSink ->{//Flux 真正发布消息用的是Flux 底层的 fluxSinkmyClients.put(webSocketSession, fluxSink);});//发送消息Mono<Void> mono2 = webSocketSession.send(outFlux);//把两个mono 的消息汇总起来 再返回Mono<Void> allMono = Mono.zip(mono1, mono2).then();return allMono;}
}
WebSocketConfig
配置基于WebFlux的WebSocket
package cn.ljh.webflux_websocket.config;import cn.ljh.webflux_websocket.websockethandler.MyWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;import java.util.HashMap;
import java.util.Map;//配置基于WebFlux的WebSocket@Configuration
public class WebSocketConfig
{//这个bean负责对容器中所有的 WebSocketHandler Bean 进行适配@Beanpublic WebSocketHandlerAdapter webSocketHandlerAdapter(){return new WebSocketHandlerAdapter();}//MyWebSocketHandler 会接受容器中的依赖注入@Beanpublic HandlerMapping handlerMapping(MyWebSocketHandler myWebSocketHandler){//定义 URL 与 WebSocketHandler Bean 之间的映射关系//就是向这个 /myWebSocket 地址发送请求的时候,就将这个请求交给这个 myWebSocketHandler 处理类进行处理Map map = Map.of("/myWebSocket/{name}",myWebSocketHandler);//参数1:指定 URL 和 Handler 之间的映射关系 , 参数2:就是优先级SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping(map,-1);return simpleUrlHandlerMapping;}}
测试结果
成功