目录
1.序言
2.后端集成
2.1.pom.xml和集成配置
2.2.行为接口封装
2.3.Spring的自定义消息策略模式
2.3.1.定义接口,描点注解和消息枚举类
2.3.2.策略接口实现类
2.4.2.策略工厂类
2.4.Spring的事件通知机制
2.4.1.自定义事件源
2.4.2.事件监听器
2.4.3.事件发布器
2.4.4.测试
3.前端集成
3.1.页面集成
3.2.vite的反向代理配置
4.总之感觉还是很简单的
1.序言
WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工(full-duplex)通讯的协议。没有了 Request 和 Response 的概念,两者地位完全平等,连接一旦建立,就建立了真持久性连接,双方可以通过WebSocket随时向对方发送数据。
目的:
服务器端的资源经常在更新,客户端需要尽量及时地知道这些更新从而展示给用户。
常用的解决方案:
前端定时轮询:效率低,非常浪费资源(网络带宽和计算资源)。有一定延迟、服务器压力较大,并且大部分是无效请求。
SSE:一种基于HTTP的,以流的形式由服务端持续向客户端发送数据的技术,是一种半双工通讯方式。听说有用过,有机会再整理
DWR:不熟悉的可以参考我的另一篇文章,专门针对DWR框架的讲解
Websocket:想了解原理或者属性,方法等,可以参考我的下载,有专门的PPT讲解
本篇文章主要是实战开发,后端语言是java,前端是vue3+vite
2.后端集成
2.1.pom.xml和集成配置
springboot2.5.3版本
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
增加配置文件WebsocketConfig.java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@EnableWebSocket
@Configuration
public class WebsocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
2.2.行为接口封装
IWebsocket接口定义,包含了websocket的4大基础方法,业务逻辑客户端的增查删改操作,打标记等行为。
import com.xxx.notification.websocket.dto.CacheClient;import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;public interface IWebsocket {interface ILinkPoint {//连接时回调void onOpen(Session session);//收到消息时回调void onMessage(String message);//连接关闭时回调void onClose();//发生错误时回调void onError(Session session, Throwable throwable);}interface IClient {//获取会话Session getSession();//获取标记String getTag();//发送文本void sendText(String text);//发送对象void send(Object object);}interface IManager<T extends CacheClient> {//向指定客户端发送文本void sendText(String text, List<T> clients);//向所有客户端发送文本void sendTextYoAll(String text);//添加客户端void addClient(T client);//获取所有客户端CopyOnWriteArraySet<T> all();//移除客户端void removeClients(List<T> clients);//根据标记获取客户端T getClientByTag(String tag);//根据标记获取多个客户端T[] getClientsByTags(List<String> tags);}
接口实现类ClientService :包含了基础操作
import com.xxx.exception.I18nServerEndException;
import com.xxx.notification.websocket.IWebsocket;
import com.xxx.notification.websocket.dto.CacheClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;@Slf4j
@Component
@ServerEndpoint("/api/v1/ws/{username}")
public class ClientService implements IWebsocket.ILinkPoint, IWebsocket.IClient {private static ManagerService managerService = ManagerService.getInstance();private String tag;private Session session;@OnOpen@Overridepublic void onOpen(Session session) {Map<String, List<String>> requestParameterMap = session.getRequestParameterMap();String username = requestParameterMap.get("username").get(0);String types = requestParameterMap.get("types").get(0);List<String> typeList = Arrays.asList(types.split(","));CacheClient cacheClient = CacheClient.getInstance(username, typeList, this);managerService.addClient(cacheClient);this.session = session;this.tag = username;}@OnMessage@Overridepublic void onMessage(String message) {}@OnClose@Overridepublic void onClose() {Set<CacheClient> allCacheClients = managerService.all();List<CacheClient> list = allCacheClients.stream().filter(client -> StringUtils.equals(client.getUsername(), tag)).collect(Collectors.toList());if(list != null && list.size() != 0){managerService.removeClients(list);}}@OnError@Overridepublic void onError(Session session, Throwable throwable) {try {session.close();onClose();} catch (IOException e) {e.printStackTrace();log.error("websocket报错:", e);throw new I18nServerEndException("common.tips_12");}}@Overridepublic Session getSession() {return this.session;}@Overridepublic String getTag() {return this.tag;}@Overridepublic synchronized void sendText(String text) {try {session.getBasicRemote().sendText(text);} catch (IOException e) {e.printStackTrace();log.error("推送消息失败", e);}}@Overridepublic synchronized void send(Object object) {try {session.getBasicRemote().sendObject(object);} catch (IOException | EncodeException e) {log.error("推送消息失败", e);}}
}
接口实现类ClientService :包含了上层的行为操作
import com.xxx.notification.websocket.IWebsocket;
import com.xxx.notification.websocket.dto.CacheClient;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;public class ManagerService implements IWebsocket.IManager<CacheClient> {private static CopyOnWriteArraySet<CacheClient> linkSet = new CopyOnWriteArraySet<>();//单例private static ManagerService instance = new ManagerService();private ManagerService() {}public static ManagerService getInstance() {return instance;}@Overridepublic void sendText(String text, List<CacheClient> clients) {for (CacheClient cacheClient : clients) {cacheClient.getClientService().sendText(text);}}@Overridepublic void sendTextYoAll(String text) {for (CacheClient cacheClient : linkSet) {cacheClient.getClientService().sendText(text);}}@Overridepublic void addClient(CacheClient client) {linkSet.add(client);}@Overridepublic CopyOnWriteArraySet<CacheClient> all() {return linkSet;}@Overridepublic void removeClients(List<CacheClient> clients) {for (CacheClient cacheClient : clients) {linkSet.remove(cacheClient);}}@Overridepublic CacheClient getClientByTag(String tag) {for (CacheClient clientService : linkSet) {if (clientService.getClientService().getTag().equals(tag)) {return clientService;}}return null;}@Overridepublic CacheClient[] getClientsByTags(List<String> tags) {if (null == tags || tags.size() == 0) {return null;}Set<String> tagSet = tags.stream().collect(Collectors.toSet());List<CacheClient> clientList = linkSet.stream().filter(c -> tagSet.contains(c.getClientService().getTag())).collect(Collectors.toList());CacheClient[] clients = new CacheClient[clientList.size()];clientList.toArray(clients);return clients;}
}
涉及到的泛型对象CacheClient
import com.xxx.notification.websocket.service.ClientService;
import com.xxx.system.dao.UserDao;
import com.xxx.system.model.User;
import com.xxx.util.SpringContextUtil;
import lombok.Data;
import java.util.List;
import java.util.stream.Collectors;@Data
public class CacheClient {private List<Integer> types;private ClientService clientService;private String username;private String domain;public static CacheClient getInstance(String username, List<String> types, ClientService clientService){UserDao userDao = (UserDao)SpringContextUtil.getBean("userDao");User user = userDao.getByUsername(username);String domain = user.getDomain();CacheClient cacheClient = new CacheClient();cacheClient.setUsername(username);cacheClient.setTypes(types.stream().map(Integer::parseInt).collect(Collectors.toList()));cacheClient.setClientService(clientService);cacheClient.setDomain(domain);return cacheClient;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}CacheClient that = (CacheClient) o;return username != null ? username.equals(that.username) : that.username == null;}@Overridepublic int hashCode() {int result = types != null ? types.hashCode() : 0;result = 31 * result + (username != null ? username.hashCode() : 0);result = 31 * result + (domain != null ? domain.hashCode() : 0);return result;}
}
2.3.Spring的自定义消息策略模式
2.3.1.定义接口,描点注解和消息枚举类
import com.alibaba.fastjson.JSON;
import com.xxx.notification.constants.NotificationType;
import com.xxx.notification.websocket.dto.CacheClient;
import com.xxx.notification.websocket.service.ManagerService;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;/*** 消息服务*/
public interface INotificationService<P> {ManagerService managerService = ManagerService.getInstance();default void sendMessage(NotificationType notificationType, Object message, Set<CacheClient> allCacheClients){Integer type = notificationType.getType();List<CacheClient> list = allCacheClients.stream().filter(client -> client.getTypes().contains(type)).collect(Collectors.toList());if(list != null && list.size() != 0){managerService.sendText(JSON.toJSONString(message), list);}}/*** 发起推送* @param p* @return*/public void pushNotification(P p);
}import java.lang.annotation.*;/*** 策略描点注解*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface StrategyAnchor {NotificationType[] type();
}/*** 通知类型枚举*/
public enum NotificationType {DEVICE_ALARM(1, "deviceAlarm"),SYSTEM_ALARM(2, "systemAlarm");private Integer type;private String name;NotificationType(Integer type, String name) {this.type = type;this.name = name;}public Integer getType() {return type;}public void setType(Integer type) {this.type = type;}public String getName() {return name;}public void setName(String name) {this.name = name;}
}
2.3.2.策略接口实现类
只写了一种实现类
import com.xxx.dto.ObjectResponse;
import com.xxx.notification.constants.NotificationType;
import com.xxx.notification.constants.StrategyAnchor;
import com.xxx.notification.dto.NotificationDto;
import com.xxx.notification.event.AlarmEvent;
import com.xxx.notification.service.INotificationService;
import com.xxx.notification.websocket.dto.CacheClient;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Service;
import java.util.Set;
import java.util.stream.Collectors;@Service
@StrategyAnchor(type = NotificationType.DEVICE_ALARM)
public class DeviceAlarmNotificationService implements INotificationService<AlarmEvent> {@Overridepublic void pushNotification(AlarmEvent alarmEvent) {NotificationType notificationType = alarmEvent.getNotificationType();String domain = alarmEvent.getDomain();if(StringUtils.isBlank(domain)) return;Set<CacheClient> allCacheClient = managerService.all();Set<CacheClient> cacheClients = allCacheClient.stream().filter(client -> {String clientDomain = client.getDomain();if(StringUtils.isBlank(clientDomain)) return false;return StringUtils.startsWith(domain, stripEndStr(clientDomain));}).collect(Collectors.toSet());//发送websocketNotificationDto notificationDto = NotificationDto.getInstance(notificationType, alarmEvent);sendMessage(notificationType, ObjectResponse.generateNormalResponse(notificationDto), cacheClients);}private String stripEndStr(String data){String temp = StringUtils.stripEnd(data, "0");temp = StringUtils.isBlank(temp) ? "0" : temp;return temp;}
}
2.4.2.策略工厂类
import com.xxx.notification.constants.NotificationType;
import com.xxx.notification.constants.StrategyAnchor;
import com.xxx.notification.service.INotificationService;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;@Component
public class NotificationFactory implements ApplicationContextAware {private static Map<Integer, INotificationService> beans = new ConcurrentHashMap<>();public static INotificationService getNotificationService(NotificationType notificationType) {return beans.get(notificationType.getType());}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, INotificationService> map = applicationContext.getBeansOfType(INotificationService.class);map.forEach((beanName, bean) -> {StrategyAnchor anchor = applicationContext.findAnnotationOnBean(beanName, StrategyAnchor.class);Optional.ofNullable(anchor).ifPresent(an -> Arrays.stream(anchor.type()).forEach(type -> beans.put(type.getType(), bean)));});}
}
2.4.Spring的事件通知机制
结合消息策略模式和解耦合+线程池模式(@Async+@EnableAsync),异步推送消息
2.4.1.自定义事件源
import com.xxx.notification.constants.NotificationType;
import org.springframework.context.ApplicationEvent;public class AlarmEvent extends ApplicationEvent {private NotificationType notificationType;private String domain;public AlarmEvent(Object source) {super(source);}public NotificationType getNotificationType() {return notificationType;}public void setNotificationType(NotificationType notificationType) {this.notificationType = notificationType;}public String getDomain() {return domain;}public void setDomain(String domain) {this.domain = domain;}
}
2.4.2.事件监听器
import com.xxx.notification.NotificationFactory;
import com.xxx.notification.constants.NotificationType;
import com.xxx.notification.service.INotificationService;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;@Component
@EnableAsync
public class AlarmEventListener {@Async@EventListenerpublic void handleAlarmEvent(AlarmEvent event) {NotificationType notificationType = event.getNotificationType();if(notificationType == null || notificationType.getType() == null) return;INotificationService notificationService = NotificationFactory.getNotificationService(notificationType);if(notificationService == null) return;notificationService.pushNotification(event);}
}
2.4.3.事件发布器
import com.xxx.alarm.model.Alarm;
import com.xxx.notification.constants.NotificationType;
import org.springframework.context.ApplicationEventPublisher;public class AlarmEventPublisher {/*** 设备告警发送事件* @param alarm* @param eventPublisher*/public static void publishDeviceAlarmEvent(Alarm alarm, ApplicationEventPublisher eventPublisher){AlarmEvent alarmEvent = new AlarmEvent(alarm);alarmEvent.setNotificationType(NotificationType.DEVICE_ALARM);alarmEvent.setDomain(alarm.getDomain());eventPublisher.publishEvent(alarmEvent);}/*** 系统告警发送事件* @param obj* @param eventPublisher*/public static void publishSystemAlarmEvent(Object obj, ApplicationEventPublisher eventPublisher){AlarmEvent alarmEvent = new AlarmEvent(obj);alarmEvent.setNotificationType(NotificationType.SYSTEM_ALARM);eventPublisher.publishEvent(alarmEvent);}
}
2.4.4.测试
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private AlarmService alarmService;@GetMapping("/test")
public voidtestEvent(String id){Alarm alarm = alarmService.findOne(......);//实时消息推送AlarmEventPublisher.publishDeviceAlarmEvent(alarm, eventPublisher);
}
3.前端集成
vue3+vite的方式
3.1.页面集成
<template><h1>Web Socket</h1>
</template><script lang="ts">import { ref,onUnmounted } from "vue";export default {setup() {const ws = ref();const initWs = () => {console.log('${location.host}',location.host);ws.value = new WebSocket(`ws://${location.host}/wsUrl/{登录的用户名}?types=1,2`)// //连接发生错误的回调方法ws.value.onerror = function () {console.log("ws连接发生错误");};//连接成功建立的回调方法ws.value.onopen = function () {console.log("ws连接成功");}//接收到消息的回调方法ws.value.onmessage = function (event:any) {console.log('的',event.data);}}initWs();onUnmounted(() => {closeWebSocket();});const closeWebSocket = () => {ws.value.close();};return {};},
};
</script>
3.2.vite的反向代理配置
在vite.config.ts文件中
server: {host: '0.0.0.0',// host: "localhost",port: 3001,// // 是否自动在浏览器打开// open: true,// // 是否开启 https// https: false,// // 服务端渲染// ssr: false,proxy: {'/wsUrl':{target: 'ws://后端IP地址:9090/xxx/api/v1/ws/',changeOrigin: true,ws: true,rewrite: (path) => path.replace('/wsUrl', '')},'/api': {target: 'http://localhost:3333/',changeOrigin: true,ws: true,rewrite: (pathStr) => pathStr.replace('/api', '')},},},
其中rewrite属性:把访问路径中的wsUrl设置成空串,另外wsUrl可替换成任意字符,只作为前端标识符的作用。
例如:
前端访问地址是ws://127.0.0.1:3001/wsUrl/xyz,经过反向代理给后端的地址是
ws://后端IP地址:9090/xxx/api/v1/ws/xyz
如果没有加rewrite属性
前端访问地址是ws://127.0.0.1:3001/wsUrl/,经过反向代理给后端的地址是
ws://后端IP地址:9090/xxx/api/v1/ws/wsUrl,就不符合实际要求了