大纲
- Rabbitmq
- 开启STOMP支持
- 服务端
- 依赖
- 参数
- 参数映射类
- 配置类
- 逻辑处理类
- 测试
- 测试页面
- Controller
- 测试案例
在《Websocket在Java中的实践——STOMP通信的最小Demo》一文中,我们使用enableSimpleBroker启用一个内置的内存级消息代理。本文我们将使用Rabbitmq作为消息代理,这样我们的服务就可以变成分布式部署。
Rabbitmq
开启STOMP支持
在Rabbitmq所在的机器上执行下面的命令:
sudo -H -u rabbitmq bash -c "/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_stomp"
然后启动Rabbitmq
sudo service rabbitmq-server start
服务端
依赖
spring-boot-starter-websocket用于Websocket服务。
spring-boot-starter-amqp和spring-rabbit-stream都是用于Rabbitmq操作。
reactor-netty用于Broker。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-stream</artifactId>
</dependency>
<dependency><groupId>io.projectreactor.netty</groupId><artifactId>reactor-netty</artifactId><version>1.1.20</version>
</dependency>
参数
src/main/resources/application.properties
需要注意的是,rabbitmq_stomp启动后会开启61613端口。
spring.rabbitmq.host=172.30.254.255
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=fangliang
spring.rabbitmq.stomp.port=61613
还有一点需要注意,很多文章上说使用guest用户登录。但是guest用户只能在Rabbitmq所在的机器上使用,如果跨机器使用会报下列错误。而且这和是否设置guest为全域无关。所以我们使用admin账户。
Received ERROR {message=[Bad CONNECT], content-type=[text/plain], version=[1.0,1.1,1.2], content-length=[26]} session=system text/plain payload=non-loopback access denied
spring.rabbitmq.stomp.port是一个自定义参数,它只是供Broker连接Rabbitmq使用。
spring.rabbitmq.port在当前本文例子中没有使用。
参数映射类
这个类主要是映射上述参数,方便后续使用。
src/main/java/com/nyctlc/stomprbmq/component/RabbitMQProperties.java
package com.nyctlc.stomprbmq.component;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
public class RabbitMQProperties {@Value("${spring.rabbitmq.password}")private String rabbitmqPassword;public String getRabbitmqPassword() {return rabbitmqPassword;}@Value("${spring.rabbitmq.username}")private String rabbitmqUsername;public String getRabbitmqUsername() {return rabbitmqUsername;}@Value("${spring.rabbitmq.host}")private String rabbitmqHost;public String getRabbitmqHost() {return rabbitmqHost;}@Value("${spring.rabbitmq.port}")private String rabbitmqPort;public String getRabbitmqPort() {return rabbitmqPort;}@Value("${spring.rabbitmq.stomp.port}")private String rabbitmqStompPort;public String getRabbitmqStompPort() {return rabbitmqStompPort;}
}
配置类
/handshake是STOMP和Websocket建立握手的接口。
enableStompBrokerRelay(“/topic”)会订阅Rabbitmq默认的交换器amq.topic的绑定关系中定义的队列。(所以我们看到很多文章订阅的前缀使用的是“topic”,而不用其他字段,这是有渊源的)
setRelayPort方法传递的是Rabbitmq的STOMP端口,即61613。
setClientLogin、setClientPasscode、setSystemLogin和setSystemPasscode都要设置为admin及其密码,否则会报错。
src/main/java/com/nyctlc/stomprbmq/config/WebSocketConfig.java
package com.nyctlc.stomprbmq.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;import com.nyctlc.stomprbmq.component.RabbitMQProperties;@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Autowiredprivate RabbitMQProperties rabbitMQProperties;@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/handshake");}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.setApplicationDestinationPrefixes("/send");registry.enableStompBrokerRelay("/topic").setRelayHost(rabbitMQProperties.getRabbitmqHost()).setRelayPort(Integer.parseInt(rabbitMQProperties.getRabbitmqStompPort())).setClientLogin(rabbitMQProperties.getRabbitmqUsername()).setClientPasscode(rabbitMQProperties.getRabbitmqPassword()).setSystemLogin(rabbitMQProperties.getRabbitmqUsername()).setSystemPasscode(rabbitMQProperties.getRabbitmqPassword());}
}
逻辑处理类
这个类的handle方法会接受/send/msg-from-user端点发来的消息,然后转发给Rabbitmq的amp.topic交换器下msg-to-user路由键对应的队列。上述代码创建的Broker会持续监听这个队列,如果收到消息,则发送给客户端。
src/main/java/com/nyctlc/stomprbmq/controller/WebSocketController.java
package com.nyctlc.stomprbmq.controller;import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;@Controller
public class WebSocketController {@MessageMapping("/msg-from-user")@SendTo("/topic/msg-to-user")public String handle(String msg) {System.out.println("Received message: " + msg);return msg;}
}
测试
测试页面
src/main/resources/static/index.html
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>STOMP over WebSocket Example with StompJs.Client</title><script src="https://cdn.jsdelivr.net/npm/@stomp/stompjs"></script>
</head>
<body><h2>STOMP over WebSocket Example with StompJs.Client</h2><button id="connectButton">Connect</button><form id="messageForm"><input type="text" id="messageInput" placeholder="Type a message..."/><button type="submit">Send</button></form><div id="messages"></div><script>var client = null;function connect() {client = new StompJs.Client({brokerURL: 'ws://localhost:8080/handshake', // WebSocket服务端点connectHeaders: {},debug: function (str) {console.log(str);},reconnectDelay: 5000,heartbeatIncoming: 4000,heartbeatOutgoing: 4000,});client.onConnect = function(frame) {console.log('Connected: ' + frame);client.subscribe('/topic/msg-to-user', function(message) { // 订阅端点showMessageOutput(JSON.parse(message.body).content);});};client.onStompError = function(frame) {console.error('Broker reported error: ' + frame.headers['message']);console.error('Additional details: ' + frame.body);};client.activate();}function sendMessage(event) {event.preventDefault(); // 阻止表单默认提交行为var messageContent = document.getElementById('messageInput').value.trim();if(messageContent && client && client.connected) {var chatMessage = { content: messageContent };client.publish({destination: "/send/msg-from-user", body: JSON.stringify(chatMessage)}); // 发送端点document.getElementById('messageInput').value = '';}}function showMessageOutput(message) {var messagesDiv = document.getElementById('messages');var messageElement = document.createElement('div');messageElement.appendChild(document.createTextNode(message));messagesDiv.appendChild(messageElement);}document.getElementById('messageForm').addEventListener('submit', sendMessage);document.getElementById('connectButton').addEventListener('click', connect);</script>
</body>
</html>
Controller
这个Controller主要是为了让上述HTML可以通过URL访问。
src/main/java/com/nyctlc/stomprbmq/controller/FileController.java
package com.nyctlc.stomprbmq.controller;import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;@Controller
public class FileController {@GetMapping("/")public String index() {return "index"; // 返回index.html}@RequestMapping(value = "/favicon.ico")@ResponseStatus(value = HttpStatus.NO_CONTENT)public void favicon() {// No operation. Just to avoid 404 error for favicon.ico}
}
测试案例
我们在管理后台直接给这个队列发送消息,前端页面也会收到。比如我们发送{“content”:“message from management”}