目录
引言
代码实现
配置类WebSocketMessageBrokerConfig
DTO
工具类
Controller
common.html
stomp-broadcast.html
运行效果
完整代码地址
引言
STOMP(Simple Text Oriented Messaging Protocol)作为一种简单文本导向的消息传递协议,提供了一种轻量级且易于使用的方式来实现实时通信。本篇博客将讲解如何使用Spring Boot创建一个基于STOMP的WebSocket应用程序,并展示相关的配置类。同时,还会介绍如何使用Thymeleaf模板引擎生成动态的HTML页面,以展示实时通信的效果。
代码实现
配置类WebSocketMessageBrokerConfig
package com.wsl.websocket.config;import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;@Configuration
@EnableWebSocketMessageBroker
public class WebSocketMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry config) {config.enableSimpleBroker("/topic");config.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// it is OK to leave it hereregistry.addEndpoint("/broadcast");//registry.addEndpoint("/broadcast").withSockJS();// custom heartbeat, every 60 sec//registry.addEndpoint("/broadcast").withSockJS().setHeartbeatTime(60_000);}
}
DTO
package com.wsl.websocket.dto;import com.wsl.websocket.util.StringUtils;
import lombok.Getter;
import lombok.Setter;@Getter
@Setter
public class ChatMessage {private String from;private String text;private String recipient;private String time;public ChatMessage() {}public ChatMessage(String from, String text, String recipient) {this.from = from;this.text = text;this.recipient = recipient;this.time = StringUtils.getCurrentTimeStamp();}
}
工具类
package com.wsl.websocket.util;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;public class StringUtils {private static final String TIME_FORMATTER= "HH:mm:ss";public static String getCurrentTimeStamp() {DateTimeFormatter formatter = DateTimeFormatter.ofPattern(TIME_FORMATTER);return LocalDateTime.now().format(formatter);}
}
Controller
package com.wsl.websocket.controller;import com.wsl.websocket.dto.ChatMessage;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;@Controller
public class WebSocketBroadcastController {@GetMapping("/stomp-broadcast")public String getWebSocketBroadcast() {return "stomp-broadcast";}@GetMapping("/sockjs-broadcast")public String getWebSocketWithSockJsBroadcast() {return "sockjs-broadcast";}@MessageMapping("/broadcast")@SendTo("/topic/broadcast")public ChatMessage send(ChatMessage chatMessage) {return new ChatMessage(chatMessage.getFrom(), chatMessage.getText(), "ALL");}
}
common.html
src/main/resources/templates/common.html
<!DOCTYPE HTML>
<html xmlns:th="http://www.thymeleaf.org">
<head th:fragment="headerfiles"><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><link rel="stylesheet" type="text/css" th:href="@{/webjars/bootstrap/4.4.1/css/bootstrap.css}"/><link rel="stylesheet" type="text/css" th:href="@{/css/main.css}"/><script th:src="@{/webjars/jquery/3.4.1/jquery.js}"></script>
</head>
<body>
<footer th:fragment="footer" class="my-5 text-muted text-center text-small"><p class="mb-1">© 2020 Dariawan</p><ul class="list-inline"><li class="list-inline-item"><a href="https://www.dariawan.com">Homepage</a></li><li class="list-inline-item"><a href="#">Articles</a></li></ul>
</footer>
</body>
</html>
stomp-broadcast.html
src/main/resources/templates/stomp-broadcast.html
<!DOCTYPE HTML>
<html xmlns:th="http://www.thymeleaf.org">
<head><title>WebSocket With STOMP Broadcast Example</title><th:block th:include="common.html :: headerfiles"></th:block>
</head>
<body>
<div class="container"><div class="py-5 text-center"><a href="/"><h2>WebSocket</h2></a><p class="lead">WebSocket Broadcast - with STOMP</p></div><div class="row"><div class="col-md-6"><div class="mb-3"><div class="input-group"><input type="text" id="from" class="form-control" placeholder="Choose a nickname"/><div class="btn-group"><button type="button" id="connect" class="btn btn-sm btn-outline-secondary" onclick="connect()">Connect</button><button type="button" id="disconnect" class="btn btn-sm btn-outline-secondary"onclick="disconnect()" disabled>Disconnect</button></div></div></div><div class="mb-3"><div class="input-group" id="sendmessage" style="display: none;"><input type="text" id="message" class="form-control" placeholder="Message"><div class="input-group-append"><button id="send" class="btn btn-primary" onclick="send()">Send</button></div></div></div></div><div class="col-md-6"><div id="content"></div><div><span class="float-right"><button id="clear" class="btn btn-primary" onclick="clearBroadcast()"style="display: none;">Clear</button></span></div></div></div>
</div><footer th:insert="common.html :: footer"></footer><script th:src="@{/webjars/stomp-websocket/2.3.3-1/stomp.js}" type="text/javascript"></script>
<script type="text/javascript">var stompClient = null;var userName = $("#from").val();function setConnected(connected) {$("#from").prop("disabled", connected);$("#connect").prop("disabled", connected);$("#disconnect").prop("disabled", !connected);if (connected) {$("#sendmessage").show();} else {$("#sendmessage").hide();}}function connect() {userName = $("#from").val();if (userName == null || userName === "") {alert('Please input a nickname!');return;}/*<![CDATA[*/var url = /*[['ws://'+${#httpServletRequest.serverName}+':'+${#httpServletRequest.serverPort}+@{/broadcast}]]*/ 'ws://localhost:8080/broadcast';/*]]>*/stompClient = Stomp.client(url);stompClient.connect({}, function () {stompClient.subscribe('/topic/broadcast', function (output) {showBroadcastMessage(createTextNode(JSON.parse(output.body)));});sendConnection(' connected to server');setConnected(true);}, function (err) {alert('error' + err);});}function disconnect() {if (stompClient != null) {sendConnection(' disconnected from server');stompClient.disconnect(function () {console.log('disconnected...');setConnected(false);});}}function sendConnection(message) {var text = userName + message;sendBroadcast({'from': 'server', 'text': text});}function sendBroadcast(json) {stompClient.send("/app/broadcast", {}, JSON.stringify(json));}function send() {var text = $("#message").val();sendBroadcast({'from': userName, 'text': text});$("#message").val("");}function createTextNode(messageObj) {return '<div class="row alert alert-info"><div class="col-md-8">' +messageObj.text +'</div><div class="col-md-4 text-right"><small>[<b>' +messageObj.from +'</b> ' +messageObj.time +']</small>' +'</div></div>';}function showBroadcastMessage(message) {$("#content").html($("#content").html() + message);$("#clear").show();}function clearBroadcast() {$("#content").html("");$("#clear").hide();}
</script>
</body>
</html>
运行效果
http://localhost:8080/stomp-broadcast
完整代码地址
GitHub - wangsilingwsl/springboot-stomp: springboot integrates stomp
指定消息的目标用户
此功能基于Spring Boot,和上面代码分隔开,没有关联关系。请结合实际情况参考下列代码。
配置类
package com.twqc.config.websocket;import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;/*** WebSocket消息代理配置类* <p>* 注解@EnableWebSocketMessageBroker表示启用WebSocket消息代理功能。不开启不能使用@MessageMapping和@SendTo注解。* 注意:@MessageMapping和@SendTo的使用方法:* <p>* 注解@MessageMapping的方法用于接收客户端发送的消息,方法的参数用于接收消息内容,方法的返回值用于发送消息内容。* 注解@SendTo的方法用于发送消息内容,方法的返回值用于发送消息内容。* <p>* 示例:@MessageMapping("/example")注解的方法用于接收客户端发送的消息,@SendTo("/topic/example")注解的方法用于发送消息内容。* 3.1 对应的客户端连接websocket的路径为:ws://localhost:8080/example* 3.2 对应的客户端发送消息的路径为:/app/example* 3.3 对应的客户端接收消息的路径为:/topic/example* 3.4 app和topic在WebSocketMessageBrokerConfigurer.configureMessageBroker方法中配置* 3.5 具体的路径需要自己定义,上文仅为示例,与本项目中使用的路径无关。* <p>** @author wsl* @date 2024/2/29* @see WebSocketMessageBrokerConfigurer* @see MessageMapping* @see SendTo* @see <a href="https://www.dariawan.com/tutorials/spring/spring-boot-websocket-stomp-tutorial/">Spring Boot WebSocket with STOMP Tutorial</a>*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {/*** 配置消息代理** @param config 消息代理配置注册器*/@Overridepublic void configureMessageBroker(MessageBrokerRegistry config) {// 设置消息代理的目的地前缀,所有以"/websocket/topic"开头的消息都会被代理发送给订阅了该目的地的客户端config.enableSimpleBroker("/websocket/topic");// 设置应用的目的地前缀,所有以"/websocket/app"开头的消息都会被路由到带有@MessageMapping注解的方法中进行处理config.setApplicationDestinationPrefixes("/websocket/app");// 设置用户的目的地前缀,所有以"/user"开头的消息都会被代理发送给订阅了该目的地的用户config.setUserDestinationPrefix("/user");}/*** 注册消息端点* 可以注册多个消息端点,每个消息端点对应一个URL路径。** @param registry 消息端点注册器*/@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 注册消息端点,参数为消息端点的URL路径(对应@MessageMapping注解的路径,也是客户端连接的路径)registry.addEndpoint("/websocket/bpm/runFlow")// 设置允许的跨域请求来源.setAllowedOrigins("*");}/*** 配置客户端入站通道** @param registration 客户端入站通道注册器*/@Overridepublic void configureClientInboundChannel(ChannelRegistration registration) {// 设置客户端入站通道的自定义拦截器registration.interceptors(new MyWebSocketInterceptor());}
}
拦截器
package com.twqc.config.websocket;import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;import java.util.List;/*** WebSocket拦截器** @author wsl* @date 2024/3/4*/
@Slf4j
public class MyWebSocketInterceptor implements ChannelInterceptor {@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);if (StompCommand.CONNECT.equals(accessor.getCommand())) {List<String> nativeHeaders = accessor.getNativeHeader("username");if (nativeHeaders != null) {String username = nativeHeaders.get(0);// 存入Principalaccessor.setUser(() -> username);log.info("用户:{}发起了stomp连接", username);} else {log.info("未提供用户名的stomp连接");return message;}}return message;}
}
用户的消息推送有两种实现方式
@SendToUser
@MessageMapping("/websocket/bpm/runFlow")@SendToUser("/websocket/topic/websocket/bpm/runFlow")public String runFlow2(Principal principal) {if (principal == null || principal.getName() == null) {log.error("未提供用户名的stomp连接,无法运行流程");}String username = principal.getName();return "success" + username;}
SimpMessagingTemplate
@MessageMapping("/websocket/bpm/runFlow")public void runFlow(FlowRequest request, Principal principal) {if (principal == null || principal.getName() == null) {log.error("未提供用户名的stomp连接,无法运行流程");}String username = principal.getName();flowService.runFlow(request, username);}
flowService
@Autowiredprivate SimpMessagingTemplate messagingTemplate;private void sendNodeResult(FlowResponse response, String username) {messagingTemplate.convertAndSendToUser(username, BpmConstant.Flow.TOPIC_FLOW_RESULTS, response);}
前端(客户端)
<template><div id="app"><!-- 发送消息表单 --><van-form @submit="onSubmit"><van-fieldv-model="content"name="内容"label="内容"rows="3"autosizetype="textarea"placeholder="请输入内容"/><div style="margin: 16px"><van-button round block type="info" native-type="submit">提交</van-button></div></van-form><!-- 消息回复体 --><van-cell-group><van-cellv-for="(msgItem, msgIndex) in msgList":key="msgIndex":title="'回复消息' + (msgIndex + 1)"value="":label="msgItem"/></van-cell-group></div>
</template><script>
import Stomp from "stompjs";
let socketTimer = null;export default {name: "App",created() {this.username = "admin";this.initWebsocket();},data() {return {content: "",stompClient: null,msgList: [],username: "admin",};},methods: {initWebsocket() {this.stompClient = Stomp.client("ws://192.168.1.109:7010/websocket/bpm/runFlow");this.stompClient.debug = null;const headers = {username: this.username,};this.stompClient.connect(headers, // 自定义请求头() => {this.stompClient.subscribe("/user/websocket/topic/websocket/bpm/runFlow",(res) => {this.msgList.push(JSON.stringify(res));});},(err) => {console.log("err", err);this.$toast("连接失败:" + JSON.stringify(err));// 监听报错信息,手动发起重连if (socketTimer) {clearInterval(socketTimer);}// 10s后重新连接一次socketTimer = setTimeout(() => {this.initWebsocket();}, 10000);});// this.stompClient.heartbeat.outgoing = 10000;// 若使用STOMP 1.1 版本,默认开启了心跳检测机制(默认值都是10000ms)// this.stompClient.heartbeat.incoming = 0;// 客户端不从服务端接收心跳包},closeWebsocket() {if (this.stompClient !== null) {this.stompClient.disconnect(() => {console.log("关闭连接");});}},onSubmit() {// 发送信息// 转成json对象this.stompClient.send("/websocket/app/websocket/bpm/runFlow",{},// JSON.stringify({ content: this.content })this.content);},},destroyed() {// 页面销毁后记得关闭定时器clearInterval(socketTimer);this.closeWebsocket();},
};
</script><style>
#app {font-family: Avenir, Helvetica, Arial, sans-serif;-webkit-font-smoothing: antialiased;-moz-osx-font-smoothing: grayscale;color: #2c3e50;
}
</style>
请注意,客户端接收信号时,需要在订阅地址前加上/app,否则接收失败。
完整代码地址
GitHub - wangsilingwsl/vue-stomp
参考:Spring Boot + WebSocket With STOMP Tutorial | Dariawan