java中使用sockjs、stomp完成websocket通信

主要配置

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;/*** @author* WebSocket配置类*/
@Slf4j
@Configuration
@AllArgsConstructor
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {private final WebSocketInterceptor webSocketInterceptor;private final WebSocketHandshakeInterceptor webSocketHandshakeInterceptor;public static final String USER_DESTINATION_PREFIX = "/salaryother/";public static final String CALL_DEVICE_NOTIFY_PATH = USER_DESTINATION_PREFIX + "CALL_DEVICE_NOTIFY/";@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {log.info("WebSocket服务器注册");registry.addEndpoint("/ws").addInterceptors(webSocketHandshakeInterceptor).setHandshakeHandler(new WebSocketHandshakeHandler()).setAllowedOrigins("*").withSockJS();registry.addEndpoint("/wsapp").addInterceptors(webSocketHandshakeInterceptor).setHandshakeHandler(new WebSocketHandshakeHandler()).setAllowedOrigins("*");}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {log.info("WebSocket服务器启动");//心跳检测ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();threadPoolTaskScheduler.setPoolSize(10);threadPoolTaskScheduler.setThreadNamePrefix("wss-heartbeat-thread-");threadPoolTaskScheduler.initialize();///信息接收头registry.enableSimpleBroker("/topic", USER_DESTINATION_PREFIX).setHeartbeatValue(new long[]{10000, 10000}).setTaskScheduler(threadPoolTaskScheduler);//接收前缀registry.setApplicationDestinationPrefixes("/topic");//请求前缀registry.setUserDestinationPrefix("/user");}/*** 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间** @param registration*/@Overridepublic void configureWebSocketTransport(WebSocketTransportRegistration registration) {/** 1. setMessageSizeLimit 设置消息缓存的字节数大小 字节* 2. setSendBufferSizeLimit 设置websocket会话时,缓存的大小 字节* 3. setSendTimeLimit 设置消息发送会话超时时间,毫秒*/registration.setMessageSizeLimit(10240).setSendBufferSizeLimit(10240).setSendTimeLimit(10000);}@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {/** 配置消息线程池* 1. corePoolSize 配置核心线程池,当线程数小于此配置时,不管线程中有无空闲的线程,都会产生新线程处理任务* 2. maxPoolSize 配置线程池最大数,当线程池数等于此配置时,不会产生新线程* 3. keepAliveSeconds 线程池维护线程所允许的空闲时间,单位秒*/registration.taskExecutor().corePoolSize(10).maxPoolSize(60).keepAliveSeconds(60);registration.interceptors(webSocketInterceptor);}//  这个是为了解决和调度任务的冲突重写的bean@Primary@Beanpublic TaskScheduler taskScheduler() {ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();taskScheduler.setPoolSize(10);taskScheduler.initialize();return taskScheduler;}}

握手拦截(这套方案好像前端无法补充Header,就不在这里做权限校验)这里采用的方法是直接问号拼接token,前端 new SockJS(这里带问号),sockjs使用的是http所以没毛病,本文使用的是OAuth2权限校验

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.joolun.cloud.common.security.entity.BaseUser;
import com.joolun.cloud.common.security.util.SecurityUtils;
import io.micrometer.core.lang.NonNullApi;
import io.micrometer.core.lang.Nullable;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.oauth2.provider.OAuth2Authentication;
import org.springframework.security.oauth2.provider.token.RemoteTokenServices;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;import javax.servlet.http.HttpServletRequest;
import java.util.Map;@Slf4j
@Component
@NonNullApi
@AllArgsConstructor
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {private WebSocketManager webSocketManager;private RemoteTokenServices tokenService;
//SecurityUtils.getUser()代码
//    SecurityContextHolder.getContext().getAuthentication().getPrincipal()
//     principal instanceof BaseUser ? (BaseUser)principal : null;@Overridepublic boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) {String token = getToken(serverHttpRequest);if (StrUtil.isBlank(token)) return false;// 验证令牌信息try {OAuth2Authentication auth2Authentication = tokenService.loadAuthentication(token);if (ObjectUtil.isNull(auth2Authentication)) return false;SecurityContextHolder.getContext().setAuthentication(auth2Authentication);} catch (Exception e) {log.error("token验证失败");return false;}BaseUser user = SecurityUtils.getUser();String userId = user.getId();map.put("WebSocket-user", new WebSocketUserAuthentication(userId, user.getUsername(), token));webSocketManager.addUser(userId, token);log.info("userId:" + userId + "用户名:" + user.getUsername() + ":开始建立连接");return true;}@Overridepublic void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, @Nullable Exception e) {BaseUser user = SecurityUtils.getUser();log.info("userId:" + user.getId() + "用户名:" + user.getUsername() + ":建立连接完成");}private String getToken(ServerHttpRequest serverHttpRequest) {ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) serverHttpRequest;HttpServletRequest httpServletRequest = servletRequest.getServletRequest();String token = httpServletRequest.getParameter("Authorization");return StrUtil.isBlank(token) ? "" : token;}

之后可以设置握手之后的身份注入(配置了这个可以在单对单订阅时直接使用)

import io.micrometer.core.lang.NonNullApi;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;import java.security.Principal;
import java.util.Map;@NonNullApi
public class WebSocketHandshakeHandler extends DefaultHandshakeHandler {@Overrideprotected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {return (Principal) attributes.get("WebSocket-user");}}
import lombok.Data;import java.security.Principal;@Data
public class WebSocketUserAuthentication implements Principal {private String token;private String userId;private String userName;public WebSocketUserAuthentication(String userId, String userName,String token ) {this.token = token;this.userId = userId;this.userName = userName;}public WebSocketUserAuthentication() {}@Overridepublic String getName() {return token;}}

储存用户数据

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class WebSocketManager {Cache<String, String> webSocketUser;@PostConstructpublic void init() {webSocketUser = Caffeine.newBuilder().initialCapacity(16).expireAfterWrite(60, TimeUnit.MINUTES).build();}public boolean isOnline(String userId) {return StringUtils.isNotBlank(webSocketUser.getIfPresent(userId));}public void addUser(String userId, String token) {webSocketUser.put(userId, token);}public String getTokenById(String userId) {return webSocketUser.getIfPresent(userId);}public void deleteUser(String userId) {webSocketUser.invalidate(userId);}
}

之后就是通道拦截,如果不使用握手拦截可以在这里鉴权,这里就可以拿到握手后发送的Header,前端就在headers里面添加

this.stompClient.connect(headers,() => {.....
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.joolun.cloud.common.security.entity.BaseUser;
import io.micrometer.core.lang.NonNullApi;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.oauth2.provider.OAuth2Authentication;
import org.springframework.security.oauth2.provider.token.RemoteTokenServices;
import org.springframework.stereotype.Component;import java.util.Optional;@Slf4j
@NonNullApi
@Component
@Order(Ordered.HIGHEST_PRECEDENCE + 99)
@AllArgsConstructor
public class WebSocketInterceptor implements ChannelInterceptor {private WebSocketManager webSocketManager;private RemoteTokenServices tokenService;@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);if (ObjectUtil.isNull(accessor)) throw new MessagingException("获取数据失败");StompCommand stompCommand = accessor.getCommand();// 判断是否是连接或者断开请求if (StompCommand.CONNECT != stompCommand && StompCommand.DISCONNECT != stompCommand) return message;//拿取用户信息Optional<WebSocketUserAuthentication> user = getUser(accessor);if (!user.isPresent()) throw new MessagingException("获取用户失败");WebSocketUserAuthentication baseUser = user.get();String userId = baseUser.getUserId();// 下线请求if (StompCommand.DISCONNECT == stompCommand) {String userName = baseUser.getUserName();webSocketManager.deleteUser(userId);log.info("userId:" + userId + "用户名:" + userName + ":下线了");}return message;}/*** 在消息发送后立刻调用,boolean值参数表示该调用的返回值*/@Overridepublic void postSend(Message<?> message, MessageChannel messageChannel, boolean sent) {StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);// 忽略心跳消息等非STOMP消息StompCommand command = accessor.getCommand();if (command == null) return;switch (command) {case CONNECT:log.info("上线了");break;case CONNECTED:log.info("已连接");break;case SUBSCRIBE:log.info("订阅:" + accessor.getDestination());break;case UNSUBSCRIBE:log.info("取消订阅:" + accessor.getDestination());break;case DISCONNECT:log.info("下线了");break;default:log.info("不匹配以上情况");break;}}private boolean isUserOAuth2Authentication(StompHeaderAccessor accessor) {String token = getToken(accessor);if (StrUtil.isBlank(token)) return false;try {// 验证令牌信息OAuth2Authentication auth2Authentication = tokenService.loadAuthentication(token);if (ObjectUtil.isNull(auth2Authentication)) return false;SecurityContextHolder.getContext().setAuthentication(auth2Authentication);} catch (Exception e) {log.error("token验证失败");return false;}return true;}private Optional<WebSocketUserAuthentication> getUser(StompHeaderAccessor accessor) {return accessor.getUser() instanceof WebSocketUserAuthentication ?Optional.of((WebSocketUserAuthentication) accessor.getUser()) :getSystemUserToWebSocketUserAuthentication(accessor);}private Optional<WebSocketUserAuthentication> getSystemUserToWebSocketUserAuthentication(StompHeaderAccessor accessor) {Authentication authentication = getAuthentication();if (ObjectUtil.isNull(authentication)) {if (isUserOAuth2Authentication(accessor)) {authentication = getAuthentication();} else {return Optional.empty();}}Object principal = authentication.getPrincipal();if (ObjectUtil.isNull(principal)) return Optional.empty();BaseUser user = principal instanceof BaseUser ? (BaseUser) principal : null;if (ObjectUtil.isNotNull(user)) {WebSocketUserAuthentication webSocketUserAuthentication = new WebSocketUserAuthentication(user.getId(), user.getUsername(), getToken(accessor));accessor.setUser(webSocketUserAuthentication);return Optional.of(webSocketUserAuthentication);}return Optional.empty();}private String getToken(StompHeaderAccessor accessor) {String tokens = accessor.getFirstNativeHeader("Authorization");if (StrUtil.isBlank(tokens)) return "";return tokens.split(" ")[1];}private Authentication getAuthentication() {return SecurityContextHolder.getContext().getAuthentication();}}

配置完成之后就是发送消息这里就不举详细的例子了就拿SimpMessageSendingOperations来说

//引入
import org.springframework.messaging.simp.SimpMessageSendingOperations;private final SimpMessageSendingOperations simpMessageSendingOperations;
private final WebSocketManager webSocketManager;

如果配置了握手拦截器返回了Principal 个人消息

订阅地址:/user/salaryother/activistIncoming
发送地址:/salaryother/activistIncoming
发送消息:simpMessageSendingOperations.convertAndSendToUser(webSocketManager.getTokenById(用户id), 发送地址, 消息);

如果没配置那就得多加几个前缀具体参考请点击:
Spring Springboot实现websocket通讯-2 这个详细

微信小程序连接websocket

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/57379.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3+ ts ts语法在script写不知道为啥一直报错

在vue3页面中写ts语法 发现识别不了 一直报错 1.出现这种问题的话,首先查看自己写的有没有问题,没有问题的话 2.再查看 script里边有没有写 lang"ts" <script setup lang"ts">解析 setup&#xff1a;是vue3在单文件组件 (SFC) 中使用 composition …

LLM - Baichuan-13B 多卡加载与推理测试

目录 ​编辑 一.引言 二.模型加载 1.量化加载 ◆ 基础配置 ◆ 8_bit 加载 ◆ 4_bit 加载 2.多卡加载 ◆ API 加载 ◆ accelerate 加载 三.模型推理 1.显存查看 ◆ Nvidia 显卡监控 ◆ Python subprocess 调用 2.双卡推理 ◆ 双卡 divice 分配 ◆ 双卡推理 GPU…

网络直播源码UDP协议搭建:为平台注入一份力量

网络直播源码中的UDP协议的定义&#xff1a; UDP协议又名用户数据报协议&#xff0c;是一种轻量级、无连接的协议。在网络直播源码平台中&#xff0c;UDP协议有着高速传输与实时性的能力&#xff0c;尤其是在网络直播源码实时性要求较高的场景&#xff0c;UDP协议的应用有着重要…

【Flutter】Flutter 使用 flutter_timezone 获取当前操作系统的时区

【Flutter】Flutter 使用 flutter_timezone 获取当前操作系统的时区 文章目录 一、前言二、flutter_timezone 包的背景三、安装和基本使用四、深入理解时区五、实际业务中的用法六、完整示例七、总结 一、前言 大家好&#xff01;我是小雨青年&#xff0c;今天我想和大家分享一…

NeRFMeshing - 精确提取NeRF中的3D网格

准确的 3D 场景和对象重建对于机器人、摄影测量和 AR/VR 等各种应用至关重要。 NeRF 在合成新颖视图方面取得了成功&#xff0c;但在准确表示底层几何方面存在不足。 推荐&#xff1a;用 NSDT编辑器 快速搭建可编程3D场景 我们已经看到了最新的进展&#xff0c;例如 NVIDIA 的 …

前端UI组件库深度解析:构建现代化的用户体验

引言 在当今的前端开发中&#xff0c;UI组件库已经成为了我们工具箱中不可或缺的一部分。这些库可以极大地提高我们的工作效率&#xff0c;同时也使我们能够专注于实现真正的业务逻辑&#xff0c;而不是重复地编写UI代码。本篇博客将详细地探讨UI组件库的核心概念&#xff0c;…

软件工程(二十) 系统运行与软件维护

1、系统转换计划 1.1、遗留系统的演化策略 时至今日,你想去开发一个系统,想完全不涉及到已有的系统,基本是不可能的事情。但是对于已有系统我们有一个策略。 比如我们是淘汰掉已有系统,还是继承已有系统,或者集成已有系统,或者改造遗留的系统呢,都是不同的策略。 技术…

AI大模型的使用-让AI帮你写单元测试

1.体验多步提示语 我们本节就让AI帮我们写一个单元测试&#xff0c;全程用AI给我们答案&#xff0c;首先单元测试前需要有代码&#xff0c;那么我们让AI给我们生成一个代码&#xff0c;要求如下&#xff1a; 用Python写一个函数&#xff0c;进行时间格式化输出&#xff0c;比…

WPF基础入门-Class4-WPF绑定

WPF基础入门 Class4&#xff1a;WPF绑定 一、简单绑定数据 1、cs文件中设置需要绑定的数据&#xff1a; public partial class Class_4 : Window{public Class_4(){InitializeComponent();List<Color> test new List<Color>();test.Add(new Color() { Code &q…

leetcode算法题--使子序列的和等于目标的最少操作次数

原题链接&#xff1a;https://leetcode.cn/problems/minimum-operations-to-form-subsequence-with-target-sum/description/ 视频讲解&#xff1a;https://www.bilibili.com/video/BV1Em4y1T7Bq?t1456.1 这题是真的难。。 func minOperations(nums []int, target int) int…

netty与websockt实现聊天

配置websockt&#xff1a; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration;/*** websocket配置*/ Data Configuration ConfigurationProperties(prefix &qu…

PHP获取两个日期之间的所有日期

下面是一个示例代码&#xff0c;用于计算给定开始和结束日期之间的所有日期&#xff1a; <?phpfunction getDatesBetween($start_date, $end_date) {// 初始化结果数组$dates array();// 将开始日期转换为时间戳$current_date strtotime($start_date);$end_date strtot…

Java并发编程第6讲——线程池(万字详解)

Java中的线程池是运用场景最多的并发框架&#xff0c;几乎所有需要异步或并发执行任务的程序都可以使用线程池&#xff0c;本篇文章就详细介绍一下。 一、什么是线程池 定义&#xff1a;线程池是一种用于管理和重用线程的技术&#xff08;池化技术&#xff09;&#xff0c;它主…

微服务中间件--分布式搜索ES

分布式搜索ES 11.分布式搜索 ESa.介绍ESb.IK分词器c.索引库操作 (类似于MYSQL的Table)d.查看、删除、修改 索引库e.文档操作 (类似MYSQL的数据)1) 添加文档2) 查看文档3) 删除文档4) 修改文档 f.RestClient操作索引库1) 创建索引库2) 删除索引库/判断索引库 g.RestClient操作文…

http协议与apache

http概念&#xff1a; 互联网&#xff1a;是网络的网络&#xff0c;是所有类型网络的母集 因特网&#xff1a;世界上最大的互联网网络。即因特网概念从属于互联网概念 万维网&#xff1a;万维网并非某种特殊的计算机网络&#xff0c;是一个大规模的、联机式的信息贮藏库&…

C++11---std::bind

下面这段代码解析 std::function<decltype(f(args...))()> func std::bind(std::forward<F>(f), std::forward<Args>(args)...); 这行代码的作用是创建一个 std::function 对象 func&#xff0c;将其绑定到一个可调用对象上。 让我们逐步解释这行代码的各…

长胜证券:沪指探底回升涨0.47%,券商、酿酒板块拉升,传媒板块活跃

24日早盘&#xff0c;沪指盘中震动回落&#xff0c;接近午盘快速拉升走高&#xff1b;深成指、创业板指强势上扬&#xff1b;北向资金今天转向&#xff0c;早盘积极出场&#xff0c;半日净买入近30亿元。 到午间收盘&#xff0c;沪指涨0.47%报3092.88点&#xff0c;深成指涨1.1…

最新AI创作系统ChatGPT源码+详细图文部署教程/支持GPT-4/AI绘画/H5端/Prompt知识库/思维导图生成

一、AI系统 如何搭建部署AI创作ChatGPT系统呢&#xff1f;小编这里写一个详细图文教程吧&#xff01;SparkAi使用Nestjs和Vue3框架技术&#xff0c;持续集成AI能力到AIGC系统&#xff01; 1.1 程序核心功能 程序已支持ChatGPT3.5/GPT-4提问、AI绘画、Midjourney绘画&#xf…

Django(8)-静态资源引用CSS和图片

除了服务端生成的 HTML 以外&#xff0c;网络应用通常需要一些额外的文件——比如图片&#xff0c;脚本和样式表——来帮助渲染网络页面。在 Django 中&#xff0c;我们把这些文件统称为“静态文件”。 我们使用static文件来存放静态资源&#xff0c;django会在每个 INSTALLED…

Vue——axios的二次封装

文章目录 一、请求和传递参数1、get 请求2、post 请求3、axios 请求配置 二、axios 的二次封装1、配置拦截器2、发送请求 三、API 的解耦1、配置文件对应的请求2、获取请求的数据 四、总结 一、请求和传递参数 在 Vue 中&#xff0c;发送请求一般在 created 钩子中&#xff0c…