若我们服务端一次性最大处理的字节数是1M,而客户端发来了2M的数据,此时服务端的数据就要被切割成两次传输解码。Http协议中有分块传输,而在Websocket也可以分块处理超大的消息体。在jsr356标准中使用javax.websocket.MessageHandler.Partial
可以分块处理这种数据。
interface Partial<T> extends MessageHandler {/*** Called when part of a message is available to be processed.** @param messagePart The message part* @param last <code>true</code> if this is the last part of* this message, else <code>false</code>*/void onMessage(T messagePart, boolean last);}
Partial
接口中的参数last
就是表示是否是最后一块分块数据。
我们即可以给WsContainer全局设置消息体的缓冲池大小,也可以给每个session单独设置消息体的缓冲池大小。发送消息体一旦超过了此大小,数据就会在服务端被分块传输解码。
//全局设置消息体的缓冲池大小@Bean public WebSocketContainerFactoryBean webSocketContainer(){WebSocketContainerFactoryBean factoryBean = new WebSocketContainerFactoryBean();factoryBean.setMaxTextMessageBufferSize(20);
// factoryBean.setMaxSessionIdleTimeout(10*1000);
// factoryBean.setMaxBinaryMessageBufferSize(1000);return factoryBean;}//session级别消息体的缓冲池大小@OnOpenpublic void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {session.setMaxTextMessageBufferSize(20);//....}
使用JSR356注解实现消息体分块传输
@ServerEndpoint(value = "/ws/{token}")
@Component
public class WebsocketHandler2 {private final static Logger log = LoggerFactory.getLogger(WebsocketHandler2.class);private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();@OnOpenpublic void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {session.setMaxTextMessageBufferSize(20);}@OnMessagepublic void onMessage(String partialMsg, Session session, boolean isLast) throws IOException {StringBuilder stringJoiner = dataCache.get(session.getId());if (isLast) {log.info("receive client(id={}) partial last msg=>{}", session.getId(), partialMsg);if (stringJoiner == null) {String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), partialMsg);session.getBasicRemote().sendText(msg);} else {dataCache.remove(session.getId());stringJoiner.append(partialMsg);String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner);session.getBasicRemote().sendText(msg);}} else {log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), partialMsg);if (stringJoiner == null) {stringJoiner = new StringBuilder(partialMsg);dataCache.put(session.getId(), stringJoiner);} else {stringJoiner.append(partialMsg);}}}
使用spring的WebSocketHandler实现消息体分块传输
@Component
public class WebsocketHandler1 extends TextWebSocketHandler {private final Logger log = LoggerFactory.getLogger(getClass());private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {session.setTextMessageSizeLimit(20);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {StringBuilder stringJoiner = dataCache.get(session.getId());if (message.isLast()) {log.info("receive client(id={}) partial last msg=>{}", session.getId(), message.getPayload());if (stringJoiner == null) {TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), message.getPayload()));session.sendMessage(msg);} else {dataCache.remove(session.getId());stringJoiner.append(message.getPayload());TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner));session.sendMessage(msg);}} else {log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), message.getPayload());if (stringJoiner == null) {stringJoiner = new StringBuilder(message.getPayload());dataCache.put(session.getId(), stringJoiner);} else {stringJoiner.append(message.getPayload());}}}@Overridepublic boolean supportsPartialMessages() {return true;}
注意上边的WebsocketHandler1
实现的抽象方法supportsPartialMessages
其返回值必须是true,否则处理大消息体时会报错。这是因为StandardWebSocketHandlerAdapter
会根据supportsPartialMessages
方法返回值将我们的WebSocketHandler适配成MessageHandler.Partial
或MessageHandler.Whole
,而supportsPartialMessages
返回值是false
就会适配成MessageHandler.Whole
,MessageHandler.Whole
是无法处理分块消息体的。
//StandardWebSocketHandlerAdapter@Overridepublic void onOpen(final javax.websocket.Session session, EndpointConfig config) {this.wsSession.initializeNativeSession(session);// The following inner classes need to remain since lambdas would not retain their// declared generic types (which need to be seen by the underlying WebSocket engine)if (this.handler.supportsPartialMessages()) {session.addMessageHandler(new MessageHandler.Partial<String>() {@Overridepublic void onMessage(String message, boolean isLast) {handleTextMessage(session, message, isLast);}});session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {@Overridepublic void onMessage(ByteBuffer message, boolean isLast) {handleBinaryMessage(session, message, isLast);}});}else {session.addMessageHandler(new MessageHandler.Whole<String>() {@Overridepublic void onMessage(String message) {handleTextMessage(session, message, true);}});session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {@Overridepublic void onMessage(ByteBuffer message) {handleBinaryMessage(session, message, true);}});}//......}
websocket底层是怎么处理分块数据的?我们在方法org.apache.tomcat.websocket.WsFrameBase#processDataText
可以看到其具体的处理逻辑。
首先将接收到的二进制数据尝试解码成文本数据,若发现接收缓冲区messageBufferText容量不足则查到分块处理器,若存在分块处理器泽调用sendMessageTex(false),先处理部分数据,若不存在则直接抛出异常。
//org.apache.tomcat.websocket.server.WsFrameBaseprivate boolean processDataText() throws IOException {// Copy the available data to the bufferTransformationResult tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary);while (!TransformationResult.END_OF_FRAME.equals(tr)) {//...}messageBufferBinary.flip();boolean last = false;// Frame is fully received// Convert bytes to UTF-8while (true) {CoderResult cr = utf8DecoderMessage.decode(messageBufferBinary, messageBufferText,last);if (cr.isError()) {throw new WsIOException(new CloseReason(CloseCodes.NOT_CONSISTENT,sm.getString("wsFrame.invalidUtf8")));} else if (cr.isOverflow()) {// Ran out of space in text buffer - flush it//尝试解码时发现接收缓冲区messageBufferText容量不足//调用sendMessageTex(false),先处理部分数据。if (usePartial()) { //查找分块处理器messageBufferText.flip();sendMessageText(false);messageBufferText.clear();} else { //没有分块处理器,就会抛出异常throw new WsIOException(new CloseReason(CloseCodes.TOO_BIG,sm.getString("wsFrame.textMessageTooBig")));}} else if (cr.isUnderflow() && !last) {// End of frame and possible message as well.if (continuationExpected) {// If partial messages are supported, send what we have// managed to decodeif (usePartial()) {messageBufferText.flip();sendMessageText(false);messageBufferText.clear();}messageBufferBinary.compact();newFrame();// Process next framereturn true;} else {// Make sure coder has flushed all outputlast = true;}} else {// End of messagemessageBufferText.flip();//处理最后一块消息sendMessageText(true);newMessage();return true;}}} //确定是否支持分块处理private boolean usePartial() {if (Util.isControl(opCode)) {return false;} else if (textMessage) {return textMsgHandler instanceof MessageHandler.Partial;} else {// Must be binaryreturn binaryMsgHandler instanceof MessageHandler.Partial;}}