一、介绍
在项目开发过程中,很多时候,我们不可避免的需要实现的一个功能:
服务端实时发送信息给客户端。比如实时公告、实时订单通知、实时报警推送等等,登录后的客户端需要知道与它相关的实时信息,以便进行下一步处理。
从事服务端开发的特别是C/C++开发的技术人员都知道,客户端可以通过套接口与服务端保持套接口长连接。这样就服务端就可以实时给客户端推送信息了,但是这是针对TCP的长连接,如果是针对HTTP协议(在TCP层之上的实现了超文本协议的短链接--一般情况下短链接),实现服务端与客户端通知一般有一下两种方式:
1、HTTP轮询
一般情况下,http是短链接,也就是请求响应式的,每一次请求都对应一次回复,回复完成后连接断开,这样做的好处就是不需要保持与服务端的长连接,因为HTTP协议底层还是TCP协议,服务端根据机器性能都有一个最大的套接口连接数限制。以windows为例子,如windows下TCP连接数受多个参数影响:
最大tcp连接数
[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]
TcpNumConnections = 0x00fffffe (Default = 16,777,214)
以上注册表信息配置单机的最大允许的TCP连接数,默认为 16M。这个数值看似很大,这个并不是限制最大连接数的唯一条件,还有其他条件会限制到TCP 连接的最大连接数。
最大动态端口数
TCP客户端和服务器连接时,客户端必须分配一个动态端口,默认情况下这个动态端口的分配范围为 1024-5000 ,也就是说默认情况下,客户端最多可以同时发起3977个Socket连接。我们可以修改如下注册表来调整这个动态端口的范围.
[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]
MaxUserPort = 5000 (Default = 5000, Max = 65534)
最大调整值65535,也就是最大能有6w多tcp连接。
最大TCB数量
系统为每个TCP连接分配一个TCP控制块(TCP control block or TCB),这个控制块用于缓存TCP连接的一些参数,每个TCB需要分配 0.5 KB的pagepool 和 0.5KB 的Non-pagepool,也就说,每个TCP连接会占用 1KB 的系统内存。换句话说TCP的连接数也受到系统的内存的限制。系统的最大TCB数量由如下注册表设置决定:
[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]
MaxFreeTcbs = 2000 (Default = RAM dependent, but usual Pro = 1000, Srv=2000)
非Server版本,MaxFreeTcbs 的默认值为1000(64M 以上物理内存),Server 版本,这个的默认值为 2000。也就是说,默认情况下,Server版本最多同时可以建立并保持2000个TCP连接。
最大TCB Hash table数量
TCB是通过Hash table来管理的,下面注册表设置决定了这个Hash table的大小
HKEY_LOCAL_MACHINE \System \CurrentControlSet \services \Tcpip \Parameters]
MaxHashTableSize = 512 (Default = 512, Range = 64-65536)
指明分配pagepool内存的数量,也就是说,如果MaxFreeTcbs = 1000, 则pagepool的内存数量为500KB那么 MaxHashTableSize应大于500才行。这个数量越大,则Hash table的冗余度就越高,每次分配和查找TCP.这里 MaxHashTableSize被配置为比MaxFreeTcbs大4倍,这样可以大大增加TCP建立的速度。
知道了底层TCP限制之后,我们可以知道实际上长连接在普通的windows机器上最多大概是1000路左右,也就并发是1000个http长连接。如果将长连接改为http短链接,http请求完成后立即释放,那么服务端的并发就会大大增加,如果请求速度不太耗时,服务端的并发量有可能达到1w或者更大!!!c
使用HTTP轮询,就是使用HTTP短链接模式,定期与服务端进行通信主动获取服务端信息的方式实现服务端“推送”信息至客户端,它有如下特点:
避免与服务端的长连接,减低服务端压力,提升服务端的并发访问能力
客户端主动与服务端通信,需要定期与服务端进行轮询查询获取信息,但对客户端而言存在延迟,延迟时间最大为轮询时间。
服务端需要做额外的工作包保存一些实时数据,等待客户端拉取。
2、websocket
http长轮询因为存在信息延迟的问题,有时候,我们需要实时收到服务端推送的信息就无法避免使用websocket了。在前面我已经说到,websocket实际上也是http升级upgrade之后的tcp长连接,长连接的数量限制经过调整后最大能有(65525-1024)= 64501个长连接(在内存、句柄数等不设限情况下)。但实际测试,可能服务端的websocket连接数可能维持的2w左右(经过实际测试),如果改为云主机,连接数可能达到6w左右。如果需要更多了连接,我们可以考虑集群的方式,如n台高性能机器能支持最大n*6w的websocket连接!!它有如下优点:
WebSocket一次握手就可以使客户端和服务端建立长连接,并进行双向数据传输
服务端可主动向客户端发送信息,实时性很高
与HTTP协议比起来,WebSocket协议每次数据传输的头信息都较小,节约带宽
针对浏览器本身,连接后台最大的websocket数量也是有限制的,以下是我搜索到的各个浏览器支持的最大websocket连接数:
IE 6个
chrome 256个
Firefox 200个
safari 1273个(MAC版本)
超过各个浏览器最大数,后台就收不到请求。
二、 websocket实现
说明了原理之后,接下来就是如何实现websocket,这里我提供了几种实现方式
1、J2EE7自带的原始实现
服务端实现
WebSocket是JavaEE7新支持的,Javax.websocket.server包含注解、类、接口用于创建和配置服务端点;Javax.websocket包则包含服务端点和客户断电公用的注解、类、接口、异常,创建一个注解式的端点,将自己的写的类以及类中的一些方法用前面提到的包中的注解装饰,这里我提供了一个基本的websocket实现接口,提供了连接、关闭、接收消息、发送消息接口:
package com.easystudy.websocket;
import java.io.IOException;
import javax.websocket.Session;
import lombok.extern.slf4j.Slf4j;
/**
* @欢迎加入群聊,一起分享,一起合作,一起进步
* QQ交流群:961179337
* 微信账号:lixiang6153
* 微信公众号:IT技术快餐
* 电子邮箱:lixx2048@163.com
*/
@Slf4j
public abstract class BaseWS {
/**
* 终端初次连接
* @param userId userId
* @param session session
* @throws IOException IOException
*/
abstract void onOpen(Session session, Long userId) throws IOException;
/**
* 终端断开连接
*/
abstract void onClose();
/**
* 终端传递参数
* @param session session
* @param message message
*/
abstract void onMessage(String message, Session session);
/**
* 报错
* @param session session
* @param error error
*/
abstract void onError(Session session, Throwable error);
/**
* 向终端发送
* @param message message
* @throws IOException IOException
*/
abstract void sendMessage(String message) throws IOException;
void heartBeat(Long user, String signal, Session session) {
if ("ping".equalsIgnoreCase(signal)) {
try {
log.info("heart beat=====> {},user:{}, sessionId:{}", signal, user, session.getId());
session.getBasicRemote().sendText("pong");
log.info("heart beat<====> {},user:{}, sessionId:{}", "pong", user, session.getId());
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
websocket端点实现:
package com.easystudy.websocket;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
/**
* @ServerEndpoint 该注解可以将类定义成一个WebSocket服务器端,
* @OnOpen 表示有浏览器链接过来的时候被调用
* @OnClose 表示浏览器发出关闭请求的时候被调用
* @OnMessage 表示浏览器发消息的时候被调用
* @OnError 表示报错了
* @欢迎加入群聊,一起分享,一起合作,一起进步
* QQ交流群:961179337
* 微信账号:lixiang6153
* 微信公众号:IT技术快餐
* 电子邮箱:lixx2048@163.com
*/
@Component
@ServerEndpoint("/ws/msg/{userid}")
public class MessageEndPoint extends BaseWS {
// concurrent包下线程安全的Set
private static final CopyOnWriteArraySet SESSIONS = new CopyOnWriteArraySet<>();
private Session session;
@Override
@OnOpen
public void onOpen(Session session, @PathParam("userid") Long userid) {
this.session = session;
SESSIONS.add(this);
System.out.println(String.format(userid + "成功建立连接~ 当前总连接数为:%s", SESSIONS.size()));
System.out.println(this);
}
@Override
@OnClose
public void onClose() {
SESSIONS.remove(this);
System.out.println(String.format("成功关闭连接~ 当前总连接数为:%s", SESSIONS.size()));
}
@Override
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("收到客户端【" +session.getId()+ "】消息:" + message);
}
@Override
@OnError
public void onError(Session session, Throwable error) {
System.out.println("发生错误");
error.printStackTrace();
}
/**
* 指定发消息
* @param message
*/
public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 群发消息: 静态方法
* @param message
*/
public static void fanoutMessage(String message) {
SESSIONS.forEach(ws -> ws.sendMessage(message));
}
}
我们监听的端点是
/ws/msg/{userid}
端点携带了一个参数userid,表示长连接的用户id,我们在对应的方法实现中通过注解引用对应参数即可。
我们使用J2EE7标准注解,必须注入相应的ServerEndpointExporter类:
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
最后,我们提供一个controller用于测试服务端往发送客户端信息:
package com.easystudy.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.easystudy.websocket.MessageEndPoint;
@RestController
@RequestMapping("/response")
public class TestController {
@GetMapping("/send")
public String reponseMsgToClient(@RequestParam(name="content", required = true)String content){
System.out.println("发送消息:[" + content + "]给客户端!");
MessageEndPoint.fanoutMessage(content);
return "消息【" +content+ "】发送成功!";
}
}
客户端实现
在实现websocket服务端之后,我们就需要实现websocket客户端,连接到服务器,接收服务端消息,实现代码如下所示:
websocket测试
WebSocket Demo
服务器回复内容:
发送
启动浏览器,打开控制台,可以看到连接到服务器字样,输入内容,点击发送,服务端后台打印接收到客户端信息并广播到客户端,客户端控制台也打印了相同字样。
2、springboot实现
除了J2EE原始实现之外,使用springboot之后,功能就更强大了,它提供了一个核心的配置类WebSocketConfigurer用于注册各种websocket端点、拦截器、处理器信息。如下我们通过继承WebSocketConfigurer配置对应端点、处理器和拦截器:
package com.easystudy.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import com.easystudy.websocket.MsgWebSocketHandler;
import com.easystudy.websocket.MsgWebSocketInterceptor;
@Configuration
@EnableWebSocket
public class WebsocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 设置端点连接路径和处理器
registry.addHandler(new MsgWebSocketHandler(), "/ws/msg/{userid}")
.setAllowedOrigins("*")
// 设置拦截器
.addInterceptors(new MsgWebSocketInterceptor());
}
}
我们配置了自己的处理器处理对应端点、配置了拦截器进行信息拦截。
我这里的拦截器主要拦截请求参数,限定请求参数必须携带用户名作为连接的唯一标识:
package com.easystudy.websocket;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
/**
* 自定义拦截器拦截WebSocket请求
* @author Administrator
* QQ交流群:961179337
* 微信账号:lixiang6153
* 微信公众号:IT技术快餐
* 电子邮箱:lixx2048@163.com
*/
public class MsgWebSocketInterceptor implements HandshakeInterceptor{
/**
* 前置拦截一般用来注册用户信息,绑定 WebSocketSession
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map attributes) throws Exception {
System.out.println("前置拦截~~");
if (!(request instanceof ServletServerHttpRequest))
return true;
// 获取用户名信息
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
String path = servletRequest.getServletPath();
System.out.println("path:" + path);
String userName = servletRequest.getParameter("userName");
//String userName = (String) servletRequest.getSession().getAttribute("userName");
if (null == userName) {
userName = "lixx";
}
// 保存属性到session属性信息中
attributes.put("userName", userName);
return true;
}
/**
* 后置拦截器
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
System.out.println("后置拦截~~");
}
}
拦截器获取到对应属性之后存入到session的会话属性之中,连接之后可以通过session获取会话属性。
处理器实现:
package com.easystudy.websocket;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* QQ交流群:961179337
* 微信账号:lixiang6153
* 微信公众号:IT技术快餐
* 电子邮箱:lixx2048@163.com
*/
public class MsgWebSocketHandler implements WebSocketHandler{
private static final Map SESSIONS = new ConcurrentHashMap<>();
/**
* 建立新的 socket 连接后回调的方法
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String userName = session.getAttributes().get("userName").toString();
SESSIONS.put(userName, session);
System.out.println(String.format("成功建立连接~ userName: %s", userName));
}
/**
* 连接关闭时,回调的方法
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
System.out.println("连接已关闭,status:" + closeStatus);
}
/**
* 接收客户端发送的 Socket
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage> message) throws Exception {
String msg = message.getPayload().toString();
System.out.println("接收到消息:" + msg);
}
/**
* 连接出错时,回调的方法
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.out.println("连接出错");
if (session.isOpen()) {
session.close();
}
}
/**
* 这个是 WebSocketHandler是否处理部分消息,返回 false就完事了
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 指定发消息
* @param userName
* @param message
*/
public static void sendMessage(String userName, String message) {
WebSocketSession webSocketSession = SESSIONS.get(userName);
if (webSocketSession == null || !webSocketSession.isOpen())
return;
try {
webSocketSession.sendMessage(new TextMessage(message));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 群发消息
* @param message
*/
public static void fanoutMessage(String message) {
SESSIONS.keySet().forEach(us -> sendMessage(us, message));
}
}
在建立连接之后,我们通过:session.getAttributes().get("userName").toString();获取到连接时候提供的用户参数,用于后续指定用户P2P发送信息。
客户端实现代码如下:
测试
WebSocket Demo
服务器回复内容:
发送
最后测试发送信息如下:
3、socketJS实现
一些浏览器中缺少对WebSocket的支持,因此,回退选项是必要的,而Spring框架提供了基于SockJS协议的透明的回退选项。SockJS的一大好处在于提供了浏览器兼容性。优先使用原生WebSocket,如果在不支持websocket的浏览器中,会自动降为轮询的方式。
SockJS是一个浏览器JavaScript库,它提供了一个类似于网络的对象。SockJS提供了一个连贯的、跨浏览器的Javascript API,它在浏览器和web服务器之间创建了一个低延迟、全双工、跨域通信通道。除此之外,spring也对socketJS提供了支持。此处实现与springboot实现相似,这里不具体介绍,只给出对应代码,实现如下(后续提供的代码是实际项目上的代码,请各位保持修改个更新,谢谢!!!)。
端点、拦截器、通道等配置如下
package com.donwait.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
/**
* 同 HTTP 在 TCP 套接字上添加 请求-响应 模型层一样,STOMP 在 WebSocket 之上提供了一个基于 帧的线路格式层,用来定义消息语义;
* (STOMP在 WebSocket 之上提供了一个基于 帧的线路格式层,用来定义消息语义)
* STOMP 帧:该帧由命令,一个或多个 头信息 以及 负载所组成。如下就是发送 数据的一个 STOMP帧:
* SEND
* destination:/app/marco
* content-length:20
*
* {\"message\":\"Marco!\"}
*
* 分析:
* A1)SEND:STOMP命令,表明会发送一些内容;
* A2)destination:头信息,用来表示消息发送到哪里;
* A3)content-length:头信息,用来表示 负载内容的 大小;
* A4)空行:
* A5)帧内容(负载)内容:
*/
@Configuration
@EnableWebSocketMessageBroker // 能够在 WebSocket 上启用 STOMP
public class WebSocketAutoConfig implements WebSocketMessageBrokerConfigurer {
/*
* 将 "/dys" 路径 注册为 STOMP 端点,即客户端在订阅或发布消息 到目的地址前,要连接该端点,
* 就是说用户发送请求 url='/项目名/dys'与 STOMP server进行连接,之后再转发到订阅url
* 端点的作用:客户端在订阅或发布消息 到目的地址前,要连接该端点
* 备注:client连接地址和发送地址是不同的,以本例为例,前者是/项目名/dys, 后者是/项目名/app/XX,先连接后发送
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 在网页上我们就可以通过这个链接 /demon/websocket ==来和服务器的WebSocket连接
// 连接:new SockJS("http://127.0.0.1:7019/websocket/dys");
registry.addEndpoint("/dys") // 开启 /dys端点
.setAllowedOrigins("*") // 允许跨域访问
.setHandshakeHandler(new HandshakeHandler()) // 握手处理器
.addInterceptors(new HandshakeInterceptor()) // 握手拦截器
.withSockJS(); // 允许使用socketJs方式访问
}
/*
* 消息传输参数配置
*/
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
registry.setMessageSizeLimit(8192) // 设置消息字节数大小
.setSendBufferSizeLimit(8192) // 设置消息缓存大小
.setSendTimeLimit(10000); // 设置消息发送时间限制毫秒
}
/*
* 输入通道参数设置
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(8) // 设置消息输入通道的线程池线程数
.maxPoolSize(16) // 最大线程数
.keepAliveSeconds(60); // 线程活动时间
registration.interceptors(createUserInterceptor()); // 注入用户入站通道拦截器
}
/*
* 输出通道参数设置
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(8)
.maxPoolSize(16);
}
/*
* 配置broker:
* 配置了一个 简单的消息代理。如果不重载,默认case下,会自动配置一个简单的内存消息代理,
* 用来处理 "/topic"为前缀的消息。但经过重载后,消息代理将会处理前缀为 "/topic" and "/queue"消息
* 分析:
* (1)应用程序的目的地 以 "/app" 为前缀,而代理的目的地以 "/topic" 和 "/queue" 作为前缀
* (2)以应用程序为目的地的消息将会直接路由到 带有 @MessageMapping注解的控制器方法中
* (3)而发送到代理上的消息,包括 @MessageMapping注解方法的返回值所形成的消息,将会路由到代理上,并最终发送到订阅这些目的地客户端
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 代理的目的地址为topic或queque(代理目的地以 /topic为前缀)
// 广播消息订阅:stompClient.subscribe('/topic/alarm', function (response)
registry.enableSimpleBroker("/topic", "/queue");
// 全局使用的消息前缀(客户端订阅路径上会体现出来):应用程序前缀:js.url = "/demon/app/hello" -> @MessageMapping("/hello") 注释的方法.
// 客户端发送端点前缀:stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name }));
registry.setApplicationDestinationPrefixes("/app");
// 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
// registry.setUserDestinationPrefix("/user/");
/*
// 启用了STOMP代理中继功能,并将其代理目的地前缀设置为 /topic and /queue,并将所有目的地前缀为 "/topic" or "/queue"的消息都会发送到STOMP代理中[真正消息代理activeMQ或RabbitMQ]
registry.enableStompBrokerRelay("/topic", "/queue") // 设置可以订阅的地址,也就是服务器可以发送的地址
.setRelayHost("192.168.12.18")
.setRelayPort(5672)
.setClientLogin("admin")
.setClientPasscode("admin")
.setSystemHeartbeatReceiveInterval(2000) // 设置心跳信息接收时间间隔
.setSystemHeartbeatSendInterval(2000); // 设置心跳信息发送时间间隔
// 应用程序前缀:js.url = "/demon/app/hello" -> @MessageMapping("/hello") 注释的方法.
registry.setApplicationDestinationPrefixes("/app");
*/
}
/**
*
* @Title: createUserInterceptor
* @Description: 将客户端渠道拦截器加入spring ioc容器
* @return
*/
@Bean
public UserInterceptor createUserInterceptor() {
return new UserInterceptor();
}
}
握手拦截器配置:
package com.donwait.websocket;
import java.util.Map;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HandshakeInterceptor extends HttpSessionHandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Map attributes) throws Exception {
log.info("============握手前===========");
/*
// 解决The extension [x-webkit-deflate-frame] is not supported问题
if(request.getHeaders().containsKey("Sec-WebSocket-Extensions")) {
request.getHeaders().set("Sec-WebSocket-Extensions", "permessage-deflate");
}
// 检查session的值是否存在
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpSession session = servletRequest.getServletRequest().getSession(false);
String accountId = (String) session.getAttribute(Constants.SKEY_ACCOUNT_ID);
//把session和accountId存放起来
attributes.put(Constants.SESSIONID, session.getId());
attributes.put(Constants.SKEY_ACCOUNT_ID, accountId);
}
*/
return super.beforeHandshake(request, response, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Exception ex) {
log.info("============握手后===========");
super.afterHandshake(request, response, wsHandler, ex);
}
}
用户拦截器配置:
package com.donwait.websocket;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.MessageHeaderAccessor;
import com.donwait.amqp.RabbitMQ;
import com.donwait.model.RtmpInviteInfo;
import com.donwait.model.User;
import com.donwait.protobuf.RTMP_INVITE_PARAM;
import com.donwait.redis.RtmpInviteService;
/**
* @ClassName: UserInterceptor
* @Description: 客户端渠道拦截适配器
*/
@SuppressWarnings("deprecation")
public class UserInterceptor extends ChannelInterceptorAdapter {
@Autowired
private RtmpInviteService redisRtmpInviteService;
@Autowired
private RabbitMQ rabbitMQ;
//@Autowired
//private UserCacheService userCacheService;
/**
* 获取包含在stomp中的用户信息
*/
@SuppressWarnings("rawtypes")
@Override
public Message> preSend(Message> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
if (raw instanceof Map) {
Object name = ((Map) raw).get("name");
if (name instanceof LinkedList) {
// 设置当前访问器的认证用户
accessor.setUser(new User(((LinkedList) name).get(0).toString()));
}
}
}
return message;
}
@Override
public void postSend(Message> message, MessageChannel channel, boolean sent) {
StompHeaderAccessor sha = StompHeaderAccessor.wrap(message);
// ignore non-STOMP messages like heartbeat messages
if(sha.getCommand() == null) {
return;
}
//这里的sessionId和accountId对应HttpSessionIdHandshakeInterceptor拦截器的存放key
//String sessionId = sha.getSessionAttributes().get(Constants.SESSIONID).toString();
//String accountId = sha.getSessionAttributes().get(Constants.SKEY_ACCOUNT_ID).toString();
//判断客户端的连接状态
switch(sha.getCommand()) {
case CONNECT:
connect(sha);
break;
case CONNECTED:
break;
case DISCONNECT:
disconnect(sha);
break;
default:
break;
}
}
// 连接成功
private void connect(StompHeaderAccessor sha){
System.out.println(" STOMP 连接成功:" + sha.getUser().getName());
}
// 断开连接
private void disconnect(StompHeaderAccessor sha){
System.out.println(" STOMP 连接断开" + sha.getUser().getName());
// 移除用户信息
//userCacheService.delete(sha.getUser().getName());
String strKey = String.format("rtmp_invite_info::%s_*", sha.getUser().getName());
List invite_list = redisRtmpInviteService.findByKeyEx(strKey);
if (invite_list != null) {
for(RtmpInviteInfo rtmpInviteInfo : invite_list){
// 通知接入服务器
RTMP_INVITE_PARAM.Builder builder = RTMP_INVITE_PARAM.newBuilder();
builder.setRtmpIP(rtmpInviteInfo.getRtmpIp());
builder.setRtmpPort(rtmpInviteInfo.getRtmpPort());
builder.setDevID(rtmpInviteInfo.getDevId());
builder.setProtocolType(rtmpInviteInfo.getProtoType());
builder.setStreamType(rtmpInviteInfo.getStreamType());
rabbitMQ.send(rtmpInviteInfo.getExchangeName(), rtmpInviteInfo.getRouteKey(), builder.build().toByteArray());
strKey = String.format("%s::%s_%s_%d_%d_%d", rtmpInviteInfo.getCacheName(), sha.getUser().getName(), rtmpInviteInfo.getDevId(), rtmpInviteInfo.getChannelNum().longValue(), rtmpInviteInfo.getProtoType().longValue(), rtmpInviteInfo.getStreamType().longValue());
redisRtmpInviteService.deleteByKey(strKey);
}
}
}
}
处理器代码:
package com.donwait.websocket;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HandshakeHandler extends DefaultHandshakeHandler{
public HandshakeHandler(){
log.debug("new HandshakeHandler");
}
}
配置完成之后,需要封装一个消息服务实现点对点和广播形式发送:
package com.donwait.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Service;
/**
* websocket广播推送服务
* @author Administrator
*
*/
@Service
public class MessageService {
@Autowired
SimpMessageSendingOperations sendOperation; // 消息发送模板
@Autowired
private SimpUserRegistry userRegistry; // 用户列表【连接的客户端信息】
/**
* 广播形式发送报警信息
* @param
*/
public void broadcast(String destination,String message) {
sendOperation.convertAndSend(destination, message);
System.out.println("路由:"+ destination + " 推送消息:" + message);
}
/**
* 单独发送信息给某用户
* 客户端发起连接时候必须携带用户名参数
* stompClient.connect(
* {
* name: 'lixx' // 携带客户端信息
* }
* @param
*/
public void send(String destination,String username, String message) {
for (SimpUser user : userRegistry.getUsers()) {
if (user.getName().equals(username)){
sendOperation.convertAndSendToUser(username, destination, message);
System.out.println("路由:"+ destination + " 推送消息:" + message);
break;
}
}
}
}
最后,送上测试的html客户端页面:
stomp
Welcome
发送消息
订阅用户消息/user/queue/message
订阅报警消息/topic/alarm
至此,websocket的具体介绍与实例都已送上,如果需要源码或者技术交流或者合作请联系一下方式
源码获取、合作、技术交流请获取如下联系方式:
QQ交流群:961179337
微信账号:lixiang6153
公众号:IT技术快餐
电子邮箱:lixx2048@163.com