项目需要集成实时消息通讯,所以尝试在项目中集成websocket。技术上选择了Socket.io,前/后端统一使用此开源项目来实现需求。
一、版本
spring cloud: 2022.0.4
注册中心: nacos
Netty-Socket.io : 2.0.9
<dependency><groupId>com.corundumstudio.socketio</groupId><artifactId>netty-socketio</artifactId><version>${netty-socketio.version}</version>
</dependency>
前端:vue3、socket.io-client【 4.7.4】
二、关键代码
socket event handler
@Component
@Slf4j
public class NettySocketEventHandler {@Autowiredprivate SocketIOServer socketIoServer;@Autowiredprivate SocketClientService socketClientService;@Value("${socketio.application.name}")private String serverName;@Value("${socketio.reg-server}")private String host;@Autowiredprivate NacosDiscoveryProperties nacosDiscoveryProperties;private void start() throws Exception {//注册到Nacos里registerNamingService(serverName, String.valueOf(socketIoServer.getConfiguration().getPort()));}/*** 注册到 nacos 服务中** @param nettyName netty服务名称* @param nettyPort netty服务端口*/private void registerNamingService(String nettyName, String nettyPort) {try {log.info("-------------- register socket server {} {}", nettyName, nettyPort);NamingService namingService = NamingFactory.createNamingService(nacosDiscoveryProperties.getServerAddr());// 注册到nacosInstance instance = new Instance();instance.setIp(host);instance.setPort(socketIoServer.getConfiguration().getPort());instance.setServiceName(nettyName);instance.setWeight(1.0);Map<String, String> map = new HashMap<>();map.put("preserved.register.source", "SPRING_CLOUD");instance.setMetadata(map);namingService.registerInstance(nettyName, instance);} catch (Exception e) {throw new RuntimeException(e);}}@PostConstructprivate void autoStartup() {try {socketIoServer.start();start();log.info("-------------- start socket server ----------");} catch (Exception ex) {log.error("SocketIOServer启动失败", ex);}}@PreDestroyprivate void autoStop() {socketIoServer.stop();}//socket事件消息接收入口@OnEvent(value = MessageConstant.SOCKET_EVENT_NAME) //value值与前端自行商定public void onEvent(SocketIOClient client, AckRequest ackRequest, SendMessageDTO data) {
// client.sendEvent("message_event", "已成功接收数据"); //向前端发送接收数据成功标识log.info("socket event {}", JSON.toJSONString(data));}//socket添加@OnDisconnect事件,客户端断开连接时调用,刷新客户端信息@OnDisconnectpublic void onDisconnect(SocketIOClient client) {String userId = client.getHandshakeData().getSingleUrlParam("userId");UUID sessionId = client.getSessionId();log.info("socket Disconnect {} {}", userId, sessionId);socketClientService.deleteSessionClientByUserId(userId, sessionId);log.info("socket Disconnect {} {}", userId, sessionId);client.disconnect();}//socket添加connect事件,当客户端发起连接时调用@OnConnectpublic void onConnect(SocketIOClient client) {
// log.info("socket onConnect {}", JSON.toJSONString(client));if (client != null) {HandshakeData client_mac = client.getHandshakeData();String userId = client_mac.getSingleUrlParam("userId");// 处理业务} else {log.error("客户端为空");}}}
socket client service
@Component
public class SocketClientService {private static ConcurrentHashMap<String, HashMap<UUID, SocketIOClient>> concurrentHashMap = new ConcurrentHashMap<>();/*** 保存客户端实例,发送消息时使用** @param userId 用户ID* @param sessionId 用户连接的session,可能存在多个页面连接* @param socketIOClient 客户的实例*/public void saveClient(String userId, UUID sessionId, SocketIOClient socketIOClient) {HashMap<UUID, SocketIOClient> sessionIdClientCache = concurrentHashMap.get(userId);if (sessionIdClientCache == null) {sessionIdClientCache = new HashMap<>();}sessionIdClientCache.put(sessionId, socketIOClient);concurrentHashMap.put(userId, sessionIdClientCache);}/*** 获取用户的客户端实例** @param userId 用户的ID* @return HashMap<UUID, SocketIOClient>*/public HashMap<UUID, SocketIOClient> getUserClient(String userId) {return concurrentHashMap.get(userId);}/*** 获取所有客户端,不区分用户** @return 集合*/public Collection<HashMap<UUID, SocketIOClient>> getAllClient() {return concurrentHashMap.values();}/*** 删除用户的某个页面的连接** @param userId 用户ID* @param sessionId 页面的sessionID*/public void deleteSessionClientByUserId(String userId, UUID sessionId) {if(concurrentHashMap.get(userId) != null){concurrentHashMap.get(userId).remove(sessionId);}}/*** 删除用户的所有连接的实例** @param userId 用户的ID*/public void deleteUserCacheByUserId(String userId) {concurrentHashMap.remove(userId);}
}
socket config
@Data
@Configuration
@ConfigurationProperties(prefix = "socketio")
public class SocketIOConfig {private String host;private Integer port;private int bossCount;private int workCount;private boolean allowCustomRequests;private int upgradeTimeout;private int pingTimeout;private int pingInterval;@Beanpublic SocketIOServer socketIOServer() {SocketConfig socketConfig = new SocketConfig();socketConfig.setTcpNoDelay(true);socketConfig.setSoLinger(0);com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();config.setSocketConfig(socketConfig);config.setHostname(host);config.setPort(port);config.setBossThreads(bossCount);config.setWorkerThreads(workCount);config.setAllowCustomRequests(allowCustomRequests);config.setUpgradeTimeout(upgradeTimeout);config.setPingTimeout(pingTimeout);config.setPingInterval(pingInterval);return new SocketIOServer(config);}@Beanpublic SpringAnnotationScanner springAnnotationScanner() {return new SpringAnnotationScanner(socketIOServer());}
}
nacos 里的网关的配置【关键:StripPrefix 需要是0,否则长连接,并不能连接上】
# socket - id: socket-serviceuri: lb://socket-servicepredicates:- Path=/socket.io/**filters:- StripPrefix=0
socket.io的配置
socketio:application: name: socket-servicereg-server: 127.0.0.1 host: 127.0.0.1port: 16001
# 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器maxFramePayloadLength: 1048576
# 设置http交互最大内容长度maxHttpContentLength: 1048576
# socket连接数大小(如只监听一个端口boss线程组为1即可)bossCount: 1workCount: 100allowCustomRequests: true
# 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间upgradeTimeout: 100000
# Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件pingTimeout: 6000000
# Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔pingInterval: 25000
三、问题
1、socket.io与其它微服务在同一个web容器里,这时候是2个端口。所以socket.io另注册了一个服务名。
2、解决分布式的问题。我是采用了后端增加消息中间件来分发。
有问题可以私信我。