文章目录
- 1 WebSocket
- 1.1 简介
- 1.2 WebSocket作用和调用
- 1.2.1 作用
- 1.2.2 js端调用
- 1.3 Javax
- 1.3.1 服务端
- 1.3.1.1 服务端接收
- 1.3.1.2 服务端集成
- 1.3.1.3 ping和pong消息
- 1.3.2 客户端
- 1.3.2.1 客户端接收
- 1.3.2.2 客户端发送
- 1.4 WebMVC
- 1.4.1 服务端
- 1.1.4.1 服务端接收
- 1.1.4.2 服务端集成
- 1.1.4.3 服务器握手拦截
- 1.1.4.4 服务器地址问题
- 1.4.2 客户端
- 1.4.2.1 客户端接收
- 1.4.2.2 客服端发送
- 1.5 WebFlux
- 1.5.1 服务端
- 1.5.1.1 服务端发送接收
- 1.5.1.2 服务端集成
- 1.5.2 客户端
- 1.5.2.1 客户端发送接收
- 1.5.2.2 客户端发送
1 WebSocket
1.1 简介
WebSocket
协议是基于TCP
的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex
)通信——允许服务器主动发送信息给客户端,建立客户端和服务器之间的通信渠道。浏览器和服务器仅需一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
1.2 WebSocket作用和调用
1.2.1 作用
HTTP
是基于请求响应式的,即通信只能由客户端发起,服务端做出响应,无状态,无连接:
无状态
:每次连接只处理一个请求,请求结束后断开连接。无连接
:对于事务处理没有记忆能力,服务器不知道客户端是什么状态。
通过HTTP
实现即时通讯,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源,因为必须不停连接,或者 HTTP
连接始终打开。
WebSocket
的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息
,是真正的双向平等对话。
WebSocket
特点:
- 建立在
TCP
协议之上,服务器端的实现比较容易。 - 与
HTTP
协议有着良好的兼容性。默认端口也是80和443
,并且握手阶段采用HTTP
协议,因此握手时不容易屏蔽,能通过各种HTTP
代理服务器。 - 数据格式比较轻量,性能开销小,通信高效。
- 可以发送文本,也可以发送二进制数据。
- 没有同源限制,客户端可以与任意服务器通信。
- 协议标识符是
ws
(如果加密,则为wss
),服务器网址就是URL
1.2.2 js端调用
<script>var ws = new WebSocket('ws://localhost:8080/webSocket/10086');// 获取连接状态console.log('ws连接状态:' + ws.readyState);//监听是否连接成功ws.onopen = function () {console.log('ws连接状态:' + ws.readyState);//连接成功则发送一个数据ws.send('test1');}// 接听服务器发回的信息并处理展示ws.onmessage = function (data) {console.log('接收到来自服务器的消息:');console.log(data);//完成通信后关闭WebSocket连接ws.close();}// 监听连接关闭事件ws.onclose = function () {// 监听整个过程中websocket的状态console.log('ws连接状态:' + ws.readyState);}// 监听并处理error事件ws.onerror = function (error) {console.log(error);}function sendMessage() {var content = $("#message").val();$.ajax({url: '/socket/publish?userId=10086&message=' + content,type: 'GET',data: { "id": "7777", "content": content },success: function (data) {console.log(data)}})}
</script>
下面主要介绍三种方式:Javax,WebMVC,WebFlux
,在Spring Boot
中的服务端和客户端配置
1.3 Javax
在java
的扩展包javax.websocket
中就定义了一套WebSocket
的接口规范
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
1.3.1 服务端
1.3.1.1 服务端接收
一般使用注解的方式来进行配置
/*** html页面与之关联的接口* var reqUrl = "http://localhost:8081/websocket/" + cid;* socket = new WebSocket(reqUrl.replace("http", "ws"));*/
@Component
@ServerEndpoint("/websocket/{type}")
public class JavaxWebSocketServerEndpoint {@OnOpenpublic void onOpen(Session session, EndpointConfig config,@PathParam(value = "type") String type) {//连接建立}@OnClosepublic void onClose(Session session, CloseReason reason) {//连接关闭}@OnMessagepublic void onMessage(Session session, String message) {//接收文本信息}@OnMessagepublic void onMessage(Session session, PongMessage message) {//接收pong信息}@OnMessagepublic void onMessage(Session session, ByteBuffer message) {//接收二进制信息,也可以用byte[]接收}@OnErrorpublic void onError(Session session, Throwable e) {//异常处理}
}
我们在类上添加 @ServerEndpoint
注解来表示这是一个服务端点,同时可以在注解中配置路径,这个路径可以配置成动态的,使用{}
包起来就可以了
@OnOpen
:用来标记对应的方法作为客户端连接上来之后的回调,Session
就相当于和客户端的连接了,我们可以把它缓存起来用于发送消息;通过@PathParam
注解就可以获得动态路径中对应值了@OnClose
:用来标记对应的方法作为客户端断开连接之后的回调,我们可以在这个方法中移除对应Session
的缓存,同时可以接受一个CloseReason
的参数用于获取关闭原因@OnMessage
:用来标记对应的方法作为接收到消息之后的回调,我们可以接受文本消息,二进制消息和pong消息@OnError
:用来标记对应的方法作为抛出异常之后的回调,可以获得对应的Session
和异常对象
1.3.1.2 服务端集成
@Configuration(proxyBeanMethods = false)
public class JavaxWebSocketConfiguration {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
依赖Spring
的WebSocket
模块,手动注入ServerEndpointExporter
就可以了
需要注意ServerEndpointExporter
是Spring
中的类,算是Spring
为了支持javax.websocket
的原生用法所提供的支持类
javax.websocket
库中定义了PongMessage
而没有PingMessage
通过测试发现基本上所有的WebSocket
包括前端js
自带的,都实现了自动回复;也就是说当接收到一个ping
消息之后,是会自动回应一个pong
消息,所以没有必要再自己接受ping
消息来处理了,即我们不会接受到ping
消息;
当然我上面讲的ping和pong
都是需要使用框架提供的api
,如果是我们自己通过Message
来自定义心跳数据的话是没有任何的处理的,下面是对应的api
//发送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);//发送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);
1.3.1.3 ping和pong消息
ping 消息
和 pong 消息
都是 WebSocket
协议中的特殊消息类型,用于进行心跳保活和检测 WebSocket
连接的健康状态。
ping 消息
:由服务器端(或客户端)发送给对端的消息。它用于发起一个心跳检测请求,要求对端回复一个pong
消息作为响应。ping
消息通常用于检测对端的连接是否仍然处于活动状态,以及测量网络延迟。pong 消息
:由对端(即客户端或服务器端)作为对ping
消息的响应发送回来。它用于确认接收到ping
消息,并表明连接仍然活跃。
当一方发送一个 ping
消息时,对端应该立即发送一个 pong
消息作为响应。通过交换 ping 和 pong 消息
,可以检测连接是否仍然有效,以及测量网络的延迟时间。
ping 和 pong
消息通常由 WebSocket
底层协议处理,开发人员可以通过设置相应的参数来启用或禁用这些消息的交换。一般情况下,WebSocket
客户端和服务器都会自动处理 ping 和 pong
消息,无需开发人员显式地处理。ping 和 pong
消息是属于底层协议层
1.3.2 客户端
1.3.2.1 客户端接收
客户端也是使用注解配置
@ClientEndpoint
public class JavaxWebSocketClientEndpoint {@OnOpenpublic void onOpen(Session session) {//连接建立}@OnClosepublic void onClose(Session session, CloseReason reason) {//连接关闭}@OnMessagepublic void onMessage(Session session, String message) {//接收文本消息}@OnMessagepublic void onMessage(Session session, PongMessage message) {//接收pong消息}@OnMessagepublic void onMessage(Session session, ByteBuffer message) {//接收二进制消息}@OnErrorpublic void onError(Session session, Throwable e) {//异常处理}
}
客户端使用@ClientEndpoint
来标记,其他的@OnOpen,@OnClose,@OnMessage,@OnError
和服务端一模一样
1.3.2.2 客户端发送
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);
我们可以通过ContainerProvider
来获得一个WebSocketContainer
,然后调用connectToServer
方法将我们的客户端类和连接的uri传入就行了
通过ContainerProvider#getWebSocketContainer
获得WebSocketContainer
其实是基于SPI
实现的
在Spring
的环境中更推荐大家使用ServletContextAware
来获得,代码如下
@Component
public class JavaxWebSocketContainer implements ServletContextAware {private volatile WebSocketContainer container;public WebSocketContainer getContainer() {if (container == null) {synchronized (this) {if (container == null) {container = ContainerProvider.getWebSocketContainer();}}}return container;}@Overridepublic void setServletContext(@NonNull ServletContext servletContext) {if (container == null) {container = (WebSocketContainer) servletContext.getAttribute("javax.websocket.server.ServerContainer");}}
}
发消息
Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);//发送文本消息
session.getAsyncRemote().sendText(String message);//发送二进制消息
session.getAsyncRemote().sendBinary(ByteBuffer message);//发送对象消息,会尝试使用Encoder编码
session.getAsyncRemote().sendObject(Object message);//发送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);//发送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);
1.4 WebMVC
pom依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
1.4.1 服务端
1.1.4.1 服务端接收
我们实现一个WebSocketHandler
来处理WebSocket
的连接,关闭,消息和异常
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;public class ServletWebSocketServerHandler implements WebSocketHandler {@Overridepublic void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {//连接建立}@Overridepublic void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {//接收消息}@Overridepublic void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {//异常处理}@Overridepublic void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {//连接关闭}@Overridepublic boolean supportsPartialMessages() {//是否支持接收不完整的消息return false;}
}
1.1.4.2 服务端集成
首先需要添加@EnableWebSocket
来启用WebSocket
然后实现WebSocketConfigurer
来注册WebSocket
路径以及对应的WebSocketHandler
@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {registry//添加处理器到对应的路径.addHandler(new ServletWebSocketServerHandler(), "/websocket")//注册Handler.setAllowedOrigins("*");}
}
1.1.4.3 服务器握手拦截
提供了HandshakeInterceptor
来拦截握手
@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {registry//添加处理器到对应的路径.addHandler(new ServletWebSocketServerHandler(), "/websocket")//添加握手拦截器.addInterceptors(new ServletWebSocketHandshakeInterceptor()).setAllowedOrigins("*");}public static class ServletWebSocketHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {//握手之前if (request instanceof ServletServerHttpRequest) {String path = request.getURI().getPath();if(requestIsValid(path)){String[] params = getParams(path);attributes.put("WEBSOCKET_AUTH", params[0]);attributes.put("WEBSOCKET_PID", params[1]);attributes.put("WEBSOCKET_SN", params[2]);attributes.put("WEBSOCKET_OPENID", params[3]);attributes.put("WEBSOCKET_FIRSTONE","yes");}}System.out.println("================Before Handshake================");return true;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {//握手之后System.out.println("================After Handshake================");if(e!=null) e.printStackTrace();System.out.println("================After Handshake================");}private boolean requestIsValid(String url){//在这里可以写上具体的鉴权逻辑boolean isvalid = false;if(StringUtils.isNotEmpty(url)&& url.startsWith("/netgate/")&& url.split("/").length==6){isvalid = true;}return isvalid;}private String[] getParams(String url){url = url.replace("/netgate/","");return url.split("/");}}
}
1.1.4.4 服务器地址问题
当在集成的时候发现这种方式没办法动态匹配路径,它的路径就是固定的,没办法使用如/websocket/**
这样的通配符
在研究了一下之后发现可以在UrlPathHelper
上解决
@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {if (registry instanceof ServletWebSocketHandlerRegistry) {//替换UrlPathHelper((ServletWebSocketHandlerRegistry) registry).setUrlPathHelper(new PrefixUrlPathHelper("/websocket"));}registry//添加处理器到对应的路径.addHandler(new ServletWebSocketServerHandler(), "/websocket/**").setAllowedOrigins("*");}public class PrefixUrlPathHelper extends UrlPathHelper {private String prefix;public PrefixUrlPathHelper(String prefix){this.prefix=prefix;}@Overridepublic @NonNull String resolveAndCacheLookupPath(@NonNull HttpServletRequest request) {//获得原本的PathString path = super.resolveAndCacheLookupPath(request);//如果是指定前缀就返回对应的通配路径if (path.startsWith(prefix)) {return prefix + "/**";}return path;}}
}
因为它内部实际上就是用一个Map<String, WebSocketHandler>
来存的,所以没有办法用通配符
1.4.2 客户端
1.4.2.1 客户端接收
和服务端一样我们需要先实现一个WebSocketHandler
来处理WebSocket
的连接,关闭,消息和异常
public class ServletWebSocketClientHandler implements WebSocketHandler {@Overridepublic void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {//连接建立}@Overridepublic void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {//接收消息}@Overridepublic void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {//异常处理}@Overridepublic void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {//连接关闭}@Overridepublic boolean supportsPartialMessages() {//是否支持接收不完整的消息return false;}
}
1.4.2.2 客服端发送
WebSocketClient client = new StandardWebSocketClient();
WebSocketHandler handler = new ServletWebSocketClientHandler();
WebSocketConnectionManager manager = new WebSocketConnectionManager(client, handler, uri);
manager.start();
首先我们需要先new一个StandardWebSocketClient
,可以传入一个WebSocketContainer
参数,获得该对象的方式上面已经介绍过了,这边就先略过
然后new一个WebSocketConnectionManager
传入WebSocketClient
,WebSocketHandler
还有路径uri
最后调用一下WebSocketConnectionManager
的start
方法就可以了
这里如果大家去看WebSocketClient
的实现类就会发现有StandardWebSocketClient
还有JettyWebSocketClient
等等,所以大家可以根据自身项目所使用的容器来选择不同的WebSocketClient
实现类
这里给大家贴一小段Spring适配不同容器WebSocket的代码
public abstract class AbstractHandshakeHandler implements HandshakeHandler, Lifecycle {private static final boolean tomcatWsPresent;private static final boolean jettyWsPresent;private static final boolean jetty10WsPresent;private static final boolean undertowWsPresent;private static final boolean glassfishWsPresent;private static final boolean weblogicWsPresent;private static final boolean websphereWsPresent;static {ClassLoader classLoader = AbstractHandshakeHandler.class.getClassLoader();tomcatWsPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader);jetty10WsPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", classLoader);jettyWsPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader);undertowWsPresent = ClassUtils.isPresent("io.undertow.websockets.jsr.ServerWebSocketContainer", classLoader);glassfishWsPresent = ClassUtils.isPresent("org.glassfish.tyrus.servlet.TyrusHttpUpgradeHandler", classLoader);weblogicWsPresent = ClassUtils.isPresent("weblogic.websocket.tyrus.TyrusServletWriter", classLoader);websphereWsPresent = ClassUtils.isPresent("com.ibm.websphere.wsoc.WsWsocServerContainer", classLoader);}
}
发消息
import org.springframework.web.socket.*;WebSocketSession session = ...//发送文本消息
session.sendMessage(new TextMessage(CharSequence message);//发送二进制消息
session.sendMessage(new BinaryMessage(ByteBuffer message));//发送ping
session.sendMessage(new PingMessage(ByteBuffer message));//发送pong
session.sendMessage(new PongMessage(ByteBuffer message));
1.5 WebFlux
WebFlux
的WebSocket
不需要额外的依赖包
1.5.1 服务端
1.5.1.1 服务端发送接收
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;public class ReactiveWebSocketServerHandler implements WebSocketHandler {@NonNull@Overridepublic Mono<Void> handle(WebSocketSession session) {Mono<Void> send = session.send(Flux.create(sink -> {//可以持有sink对象在任意时候调用next发送消息sink.next(WebSocketMessage message);})).doOnError(it -> {//异常处理});Mono<Void> receive = session.receive().doOnNext(it -> {//接收消息}).doOnError(it -> {//异常处理}).then();@SuppressWarnings("all")Disposable disposable = session.closeStatus().doOnError(it -> {//异常处理}).subscribe(it -> {//连接关闭});return Mono.zip(send, receive).then();}
}
首先需要注意这里的WebSocketHandler
和WebSocketSession
是reactive
包下的:
- 通过
WebSocketSession#send
方法来持有一个FluxSink<WebSocketMessage>
来用于发送消息 - 通过
WebSocketSession#receive
来订阅消息 - 通过
WebSocketSession#closeStatus
来订阅连接关闭事件
1.5.1.2 服务端集成
注入WebSocketHandlerAdapter
@Configuration(proxyBeanMethods = false)
public class ReactiveWebSocketConfiguration {@Beanpublic WebSocketHandlerAdapter webSocketHandlerAdapter() {return new WebSocketHandlerAdapter();}
}
注册一个HandlerMapping
同时配置路径和对应的WebSocketHandler
@Order(Ordered.HIGHEST_PRECEDENCE)
@Component
public class ReactiveWebSocketServerHandlerMapping extends SimpleUrlHandlerMapping {public ReactiveWebSocketServerHandlerMapping() {Map<String, WebSocketHandler> map = new HashMap<>();map.put("/websocket/**", new ReactiveWebSocketServerHandler());setUrlMap(map);setOrder(100);}
}
注意
:我们自定义的HandlerMapping
需要设置order
,如果不设置,默认为Ordered.LOWEST_PRECEDENCE
,会导致这个HandlerMapping
被放在最后,当有客户端连接上来时会被其他的HandlerMapping
优先匹配上而连接失败
1.5.2 客户端
1.5.2.1 客户端发送接收
客户端WebSocketHandler
的写法和服务端的一样
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;public class ReactiveWebSocketClientHandler implements WebSocketHandler {@NonNull@Overridepublic Mono<Void> handle(WebSocketSession session) {Mono<Void> send = session.send(Flux.create(sink -> {//可以持有sink对象在任意时候调用next发送消息sink.next(WebSocketMessage message);})).doOnError(it -> {//处理异常});Mono<Void> receive = session.receive().doOnNext(it -> {//接收消息}).doOnError(it -> {//异常处理}).then();@SuppressWarnings("all")Disposable disposable = session.closeStatus().doOnError(it -> {//异常处理}).subscribe(it -> {//连接关闭});return Mono.zip(send, receive).then();}
}
1.5.2.2 客户端发送
import org.springframework.web.reactive.socket.client.WebSocketClient;WebSocketClient client = ReactorNettyWebSocketClient();
WebSocketHandler handler = new ReactiveWebSocketClientHandler();
client.execute(uri, handler).subscribe();
首先我们需要先new一个ReactorNettyWebSocketClient
然后调用一下WebSocketClient
的execute
方法传入路径uri
和WebSocketHandler
并继续调用subscribe
方法就可以了
注意
:WebFlux
和 WebMVC
中的 WebSocketClient
一样,Reactive
包中的WebSocketClient
也有很多实现类,比如ReactorNettyWebSocketClient
,JettyWebSocketClient
,UndertowWebSocketClient
,TomcatWebSocketClient
等等,也是需要大家基于自身项目的容器使用不同的实现类
这里也给大家贴一小段Reactive适配不同容器WebSocket的代码
public class HandshakeWebSocketService implements WebSocketService, Lifecycle {private static final boolean tomcatPresent;private static final boolean jettyPresent;private static final boolean jetty10Present;private static final boolean undertowPresent;private static final boolean reactorNettyPresent;static {ClassLoader loader = HandshakeWebSocketService.class.getClassLoader();tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", loader);jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", loader);jetty10Present = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader);undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", loader);reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", loader);}
}
发消息
我们需要使用在WebSocketHandler
中获得的FluxSink<WebSocketMessage>
来发送消息
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;public class ReactiveWebSocket {private final WebSocketSession session;private final FluxSink<WebSocketMessage> sender;public ReactiveWebSocket(WebSocketSession session, FluxSink<WebSocketMessage> sender) {this.session = session;this.sender = sender;}public String getId() {return session.getId();}public URI getUri() {return session.getHandshakeInfo().getUri();}public void send(Object message) {if (message instanceof WebSocketMessage) {sender.next((WebSocketMessage) message);} else if (message instanceof String) {//发送文本消息sender.next(session.textMessage((String) message));} else if (message instanceof DataBuffer) {//发送二进制消息sender.next(session.binaryMessage(factory -> (DataBuffer) message));} else if (message instanceof ByteBuffer) {//发送二进制消息sender.next(session.binaryMessage(factory -> factory.wrap((ByteBuffer) message)));} else if (message instanceof byte[]) {//发送二进制消息sender.next(session.binaryMessage(factory -> factory.wrap((byte[]) message)));} else {throw new IllegalArgumentException("Message type not match");}}public void ping() {//发送pingsender.next(session.pingMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));}public void pong() {//发送pongsender.next(session.pongMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));}public void close(CloseStatus reason) {sender.complete();session.close(reason).subscribe();}
}