引入依赖
Spring Boot 中的 WebSocket 依赖于 Spring WebFlux 模块,使用了 Reactor Netty 库来实现底层的 WebSocket 通信。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
服务端配置
/*** WebSocket配置类*/
@Configuration
public class WebSocketConfig {/*** 注入ServerEndpointExporter的bean对象,自动注册使用了@ServerEndpoint注解的bean* @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}}
ServerEndpointExporter
在Spring框架中,ServerEndpointExporter
的注入是WebSocket配置的重要部分。
为什么要注入ServerEndpointExporter
对象?
-
自动注册WebSocket端点:
ServerEndpointExporter
会扫描Spring应用上下文中使用了@ServerEndpoint
注解的类,并自动注册这些类为WebSocket端点。这样,就不需要手动注册每个端点,简化了WebSocket端点的配置过程。 -
集成Spring框架: 通过
ServerEndpointExporter
,Spring框架能够更好地管理和配置WebSocket端点。例如,Spring的依赖注入功能可以用于WebSocket端点,使得端点可以依赖Spring管理的bean。
如果实现自动注册端点(函数registerEndpoints)?
请看下述注释:
- 创建一个linkedhashset集合来存储端点类.
- 从spring上下文中查找所有带有serverEndPoint的bean并添加到endPointClasses集合中
- 遍历集合,注册每一个端点类
- 从spring上下文中查找所有带有serverEndPointConfig的bean,放置在集合中并注册
protected void registerEndpoints() {// 创建一个LinkedHashSet来存储端点类,确保顺序和唯一性Set<Class<?>> endpointClasses = new LinkedHashSet<>();// 如果annotatedEndpointClasses不为空,将其全部添加到endpointClasses中if (this.annotatedEndpointClasses != null) {endpointClasses.addAll(this.annotatedEndpointClasses);}// 获取应用上下文ApplicationContext context = this.getApplicationContext();// 如果上下文不为空,从上下文中获取所有带有ServerEndpoint注解的bean的名字if (context != null) {String[] endpointBeanNames = context.getBeanNamesForAnnotation(ServerEndpoint.class);// 遍历所有bean名字for (String beanName : endpointBeanNames) {// 将bean对应的类型添加到endpointClasses中endpointClasses.add(context.getType(beanName));}}// 遍历所有收集到的端点类for (Class<?> endpointClass : endpointClasses) {// 注册每个端点类this.registerEndpoint(endpointClass);}// 如果上下文不为空,从上下文中获取所有ServerEndpointConfig类型的beanif (context != null) {Map<String, ServerEndpointConfig> endpointConfigMap = context.getBeansOfType(ServerEndpointConfig.class);// 遍历所有ServerEndpointConfig类型的beanfor (ServerEndpointConfig endpointConfig : endpointConfigMap.values()) {// 注册每个ServerEndpointConfigthis.registerEndpoint(endpointConfig);}}
}
创建websocket对象
/*** 服务端WebSocket对象*/
@ServerEndpoint(value = "/chat/{param}")
@Component
public class ChatEndpoint {//用来存储每一个客户端对象对应的ChatEndpoint对象 ConcurrentHashMap效率高,线程安全,但是key和value都不能为null
// public static Map<String, ChatEndpoint> onlineUsers = new ConcurrentHashMap<>();public static CopyOnWriteArraySet<ChatEndpoint> webSocketCopyOnWriteArraySet = new CopyOnWriteArraySet<>();private String loginId;//声明Session对象,通过该对象可以发送消息给指定的用户private Session session;/*** 连接建立时被调用* @param session* @param param*/@OnOpenpublic void onOpen(Session session, @PathParam("param")String param){//将局部的session对象赋值给成员sessionthis.session = session;this.loginId = param; //1_1 0是企业,1是求职者//将当前登录用户存储到容器中webSocketCopyOnWriteArraySet.add(this);}/*** 接收到客户端发来的消息时被调用* @param message*/@OnMessagepublic void onMessage(String message){try{JSONObject messageObject = JSON.parseObject(message);Integer userId = messageObject.getInteger("userId");Integer enterpriseId = messageObject.getInteger("enterpriseId");Integer status = messageObject.getInteger("status");Integer chatId = messageObject.getInteger("chatId");String receiver ="";if(status==0){receiver ="USER_"+userId;}else{receiver ="ENTERPRISE_"+enterpriseId;}// 单聊for(ChatEndpoint chatEndpoint : webSocketCopyOnWriteArraySet){if(chatEndpoint.loginId.equals(receiver)){chatEndpoint.session.getBasicRemote().sendText(JSONObject.toJSONString(messageObject));}}}catch(Exception e){e.printStackTrace();}}/*** 发送系统消息* @param receiverId*/public void sendSystemMessage(String receiverId) {try{JSONObject messageObject = new JSONObject();for(ChatEndpoint chatEndpoint : webSocketCopyOnWriteArraySet){if(chatEndpoint.loginId.equals(receiverId)){chatEndpoint.session.getBasicRemote().sendText(JSONObject.toJSONString(messageObject));}}}catch(Exception e){e.printStackTrace();}}/*** 连接关闭时被调用*/@OnClosepublic void onClose(){webSocketCopyOnWriteArraySet.remove(this);}}
ServerEndpoint注解
@ServerEndpoint
是一个类层次的注解,它的功能是标识当前类是一个WebSocket服务端,注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端。
生命周期回调方法
一个用 @ServerEndpoint
注解标记的类,可以定义以下几种方法来处理不同的 WebSocket 事件:
@OnOpen
:在客户端打开连接时调用。@OnMessage
:在接收到客户端消息时调用。@OnClose
:在连接关闭时调用。@OnError
:在通信过程中发生错误时调用。
session
对象
session
对象是 Java WebSocket API 中 javax.websocket.Session
类的一个实例。它表示服务器和特定客户端之间的 WebSocket 连接。通过 session
对象,可以执行各种操作,比如发送消息(getBasicRemote().sendText())给客户端、关闭连接以及访问 WebSocket 连接的属性。
CopyOnWriteArraySet & ConcurrentHashMap
CopyOnWriteArraySet
是 Java 中一个线程安全的 Set 实现,它基于 CopyOnWriteArrayList
,实现了 Set
接口。
线程安全的原因
-
写时复制机制:
当执行修改操作(如添加或删除元素)时,CopyOnWriteArraySet
会复制底层的数组,创建一个新的数组并在其上进行修改操作。修改完成后,将新的数组引用替换旧的数组引用。这种机制确保了在修改操作进行期间,原有的数组不会被修改,所有正在读取操作的线程仍然可以安全地访问旧的数组。 -
读写分离:
读操作直接读取底层数组,不需要加锁,因此读操作非常高效。写操作因为会复制数组,代价较高,但因为写操作相对读操作较少,这种设计在大多数应用场景中是可以接受的。
使用场景
CopyOnWriteArraySet
适用于以下场景:
- 多线程环境中读操作远多于写操作。
- 不需要强一致性的快速读取操作。
- 需要线程安全但不希望在读操作上有任何性能损耗。