Websocket如何分块处理数据量超大的消息体

在这里插入图片描述

若我们服务端一次性最大处理的字节数是1M,而客户端发来了2M的数据,此时服务端的数据就要被切割成两次传输解码。Http协议中有分块传输,而在Websocket也可以分块处理超大的消息体。在jsr356标准中使用javax.websocket.MessageHandler.Partial可以分块处理这种数据。

 interface Partial<T> extends MessageHandler {/*** Called when part of a message is available to be processed.** @param messagePart   The message part* @param last          <code>true</code> if this is the last part of*                      this message, else <code>false</code>*/void onMessage(T messagePart, boolean last);}

Partial接口中的参数last就是表示是否是最后一块分块数据。

我们即可以给WsContainer全局设置消息体的缓冲池大小,也可以给每个session单独设置消息体的缓冲池大小。发送消息体一旦超过了此大小,数据就会在服务端被分块传输解码。

 //全局设置消息体的缓冲池大小@Bean  public WebSocketContainerFactoryBean webSocketContainer(){WebSocketContainerFactoryBean factoryBean = new WebSocketContainerFactoryBean();factoryBean.setMaxTextMessageBufferSize(20);
//        factoryBean.setMaxSessionIdleTimeout(10*1000);
//        factoryBean.setMaxBinaryMessageBufferSize(1000);return factoryBean;}//session级别消息体的缓冲池大小@OnOpenpublic void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {session.setMaxTextMessageBufferSize(20);//....}      

使用JSR356注解实现消息体分块传输


@ServerEndpoint(value = "/ws/{token}")
@Component
public class WebsocketHandler2 {private final static Logger log = LoggerFactory.getLogger(WebsocketHandler2.class);private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();@OnOpenpublic void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {session.setMaxTextMessageBufferSize(20);}@OnMessagepublic void onMessage(String partialMsg, Session session, boolean isLast) throws IOException {StringBuilder stringJoiner = dataCache.get(session.getId());if (isLast) {log.info("receive client(id={}) partial last msg=>{}", session.getId(), partialMsg);if (stringJoiner == null) {String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), partialMsg);session.getBasicRemote().sendText(msg);} else {dataCache.remove(session.getId());stringJoiner.append(partialMsg);String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner);session.getBasicRemote().sendText(msg);}} else {log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), partialMsg);if (stringJoiner == null) {stringJoiner = new StringBuilder(partialMsg);dataCache.put(session.getId(), stringJoiner);} else {stringJoiner.append(partialMsg);}}}

使用spring的WebSocketHandler实现消息体分块传输

@Component
public class WebsocketHandler1 extends TextWebSocketHandler {private final Logger log = LoggerFactory.getLogger(getClass());private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {session.setTextMessageSizeLimit(20);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {StringBuilder stringJoiner = dataCache.get(session.getId());if (message.isLast()) {log.info("receive client(id={}) partial last msg=>{}", session.getId(), message.getPayload());if (stringJoiner == null) {TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), message.getPayload()));session.sendMessage(msg);} else {dataCache.remove(session.getId());stringJoiner.append(message.getPayload());TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner));session.sendMessage(msg);}} else {log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), message.getPayload());if (stringJoiner == null) {stringJoiner = new StringBuilder(message.getPayload());dataCache.put(session.getId(), stringJoiner);} else {stringJoiner.append(message.getPayload());}}}@Overridepublic boolean supportsPartialMessages() {return true;}

注意上边的WebsocketHandler1 实现的抽象方法supportsPartialMessages其返回值必须是true,否则处理大消息体时会报错。这是因为StandardWebSocketHandlerAdapter会根据supportsPartialMessages方法返回值将我们的WebSocketHandler适配成MessageHandler.PartialMessageHandler.Whole,而supportsPartialMessages返回值是false就会适配成MessageHandler.WholeMessageHandler.Whole是无法处理分块消息体的。

	//StandardWebSocketHandlerAdapter@Overridepublic void onOpen(final javax.websocket.Session session, EndpointConfig config) {this.wsSession.initializeNativeSession(session);// The following inner classes need to remain since lambdas would not retain their// declared generic types (which need to be seen by the underlying WebSocket engine)if (this.handler.supportsPartialMessages()) {session.addMessageHandler(new MessageHandler.Partial<String>() {@Overridepublic void onMessage(String message, boolean isLast) {handleTextMessage(session, message, isLast);}});session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {@Overridepublic void onMessage(ByteBuffer message, boolean isLast) {handleBinaryMessage(session, message, isLast);}});}else {session.addMessageHandler(new MessageHandler.Whole<String>() {@Overridepublic void onMessage(String message) {handleTextMessage(session, message, true);}});session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {@Overridepublic void onMessage(ByteBuffer message) {handleBinaryMessage(session, message, true);}});}//......}

websocket底层是怎么处理分块数据的?我们在方法org.apache.tomcat.websocket.WsFrameBase#processDataText可以看到其具体的处理逻辑。
首先将接收到的二进制数据尝试解码成文本数据,若发现接收缓冲区messageBufferText容量不足则查到分块处理器,若存在分块处理器泽调用sendMessageTex(false),先处理部分数据,若不存在则直接抛出异常。

//org.apache.tomcat.websocket.server.WsFrameBaseprivate boolean processDataText() throws IOException {// Copy the available data to the bufferTransformationResult tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary);while (!TransformationResult.END_OF_FRAME.equals(tr)) {//...}messageBufferBinary.flip();boolean last = false;// Frame is fully received// Convert bytes to UTF-8while (true) {CoderResult cr = utf8DecoderMessage.decode(messageBufferBinary, messageBufferText,last);if (cr.isError()) {throw new WsIOException(new CloseReason(CloseCodes.NOT_CONSISTENT,sm.getString("wsFrame.invalidUtf8")));} else if (cr.isOverflow()) {// Ran out of space in text buffer - flush it//尝试解码时发现接收缓冲区messageBufferText容量不足//调用sendMessageTex(false),先处理部分数据。if (usePartial()) {   //查找分块处理器messageBufferText.flip();sendMessageText(false);messageBufferText.clear();} else { //没有分块处理器,就会抛出异常throw new WsIOException(new CloseReason(CloseCodes.TOO_BIG,sm.getString("wsFrame.textMessageTooBig")));}} else if (cr.isUnderflow() && !last) {// End of frame and possible message as well.if (continuationExpected) {// If partial messages are supported, send what we have// managed to decodeif (usePartial()) {messageBufferText.flip();sendMessageText(false);messageBufferText.clear();}messageBufferBinary.compact();newFrame();// Process next framereturn true;} else {// Make sure coder has flushed all outputlast = true;}} else {// End of messagemessageBufferText.flip();//处理最后一块消息sendMessageText(true);newMessage();return true;}}}      //确定是否支持分块处理private boolean usePartial() {if (Util.isControl(opCode)) {return false;} else if (textMessage) {return textMsgHandler instanceof MessageHandler.Partial;} else {// Must be binaryreturn binaryMsgHandler instanceof MessageHandler.Partial;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/61305.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ELMo模型介绍:深度理解语言模型的嵌入艺术

ELMo模型介绍&#xff1a;深度理解语言模型的嵌入艺术 引言 在自然语言处理&#xff08;NLP&#xff09;领域&#xff0c;词嵌入&#xff08;word embedding&#xff09;是将词汇或短语从词汇表映射到向量的数学表示&#xff0c;这些向量能够捕捉词汇之间的语义和语法关系。E…

【澜舟科技-注册/登录安全分析报告】

前言 由于网站注册入口容易被机器执行自动化程序攻击&#xff0c;存在如下风险&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露&#xff0c;不符合国家等级保护的要求。短信盗刷带来的拒绝服务风险 &#xff0c;造成用户无法登陆、注册&#xff0c;大量收到垃圾短信的…

C++ 编程基础(8)模版 | 8.3、类模版

文章目录 一、类模版1、定义2、模板参数3、模版的实例化4、模版的特化5、模版成员函数6、总结 前言&#xff1a; 这里是引用 一、类模版 1、定义 类模板的定义以template关键字开始&#xff0c;后面跟着一个模板参数列表&#xff08;用尖括号< >包围&#xff09;。模板参…

HarmonyOS NEXT应用开发实战:十二、远场通信RCP简单好用的模块化封装

在进行HarmonyOS的应用开发中&#xff0c;我们常常需要进行网络通信。然而&#xff0c;原始的远场通信&#xff08;RCP&#xff09;使用方式较为繁琐&#xff0c;让人感到不够便捷。作为一位前期从事小程序开发的开发者&#xff0c;我深受小程序网络访问的简单性和便利性的吸引…

uni-app快速入门(十)--常用内置组件(下)

本文介绍uni-app的textarea多行文本框组件、web-view组件、image图片组件、switch开关组件、audio音频组件、video视频组件。 一、textarea多行文本框组件 textarea组件在HTML 中相信大家非常熟悉&#xff0c;组件的官方介绍见&#xff1a; textarea | uni-app官网uni-app,un…

Tomcat 如何管理 Session

Tomcat 如何管理 Session 我们知道&#xff0c;Tomcat 中每一个 Context 容器对应一个 Web 应用&#xff0c;而 Web 应用之间的 Session 应该是独立的&#xff0c;因此 Session 的管理肯定是 Context 级的&#xff0c;也就是一个 Context 一定关联多个 Session。 Tomcat 中主…

Flink vs Spark

Flink vs Spark Flink和Spark都是大数据处理领域的热门分布式计算框架&#xff0c;它们有各自的特点和优势&#xff0c;适用于不同的场景。本文对两者进行对比。 一、技术理念与架构 Flink&#xff1a; 基于事件驱动&#xff0c;面向流的处理框架。支持真正的流计算&#xff0c…

鸿蒙NEXT开发-用户通知服务的封装和文件下载通知

注意&#xff1a;博主有个鸿蒙专栏&#xff0c;里面从上到下有关于鸿蒙next的教学文档&#xff0c;大家感兴趣可以学习下 如果大家觉得博主文章写的好的话&#xff0c;可以点下关注&#xff0c;博主会一直更新鸿蒙next相关知识 专栏地址: https://blog.csdn.net/qq_56760790/…

01 IP路由基础

一、路由器是怎么转发数据包 • 当数据包到达路由器之后&#xff0c;根据数据包的目的 IP 地址&#xff0c;查找 路由表&#xff0c;并根据路由表中相应的路由所指示出接口还有下一跳 指导数据包在网络中的转发。 • 如果路由器路由表没有路由怎么办&#xff1f; -------- 将数…

Android studio 呼叫盒app

一、权限文件 0.gradle切换国内源 #Fri Nov 08 15:46:05 CST 2024 distributionBaseGRADLE_USER_HOME distributionPathwrapper/dists distributionUrlhttps://mirrors.cloud.tencent.com/gradle/gradle-8.4-bin.zip zipStoreBaseGRADLE_USER_HOME zipStorePathwrapper/dists1…

[Admin] Dashboard Filter for Mix Report Types

Background RevOps team has built a dashboard for sales team to track team members’ performance, but they’re blocked by how to provide a manager view based on sales’ hierarchy. Therefore, they seek for dev team’s help to clear their blocker. From foll…

网络技术-路由协议

路由协议是网络中确保数据包能够有效地从源节点传递到目的节点的重要机制。以下是常见的几种路由协议&#xff1a; 一、根据算法分类 1.距离向量路由协议&#xff08;Distance Vector Routing Protocol&#xff09; RIP&#xff08;Routing Information Protocol&#xff09;&…

2024年人工智能技术赋能网络安全应用测试:广东盈世在钓鱼邮件识别场景荣获第三名!

近期&#xff0c;2024年国家网络安全宣传周“网络安全技术高峰论坛主论坛暨粤港澳大湾区网络安全大会”在广州成功举办。会上&#xff0c;国家计算机网络应急技术处理协调中心公布了“2024年人工智能技术赋能网络安全应用测试结果”。结果显示&#xff0c;广东盈世计算机科技有…

Java进阶四-异常,File

异常 概念&#xff1a;代表程序出现的问题。 目的&#xff1a;程序出现了异常我们应该如何处理。 最高父类&#xff1a;Exception 异常分为两类 编译时异常&#xff1a;没有继承RuntimeException的异常,直接继承与Exception,编译阶段就会错误提示。运行时异常:RuntimeExc…

Gin 框架中的路由

1、路由概述 路由(Routing)是由一个 URI(或者叫路径)和一个特定的 HTTP 方法(GET、POST 等) 组成的,涉及到应用如何响应客户端对某个网站节点的访问。 RESTful API 是目前比较成熟的一套互联网应用程序的 API 设计理论,所以我们设计我们的路 由的时候建议参考 …

ERROR TypeError: AutoImport is not a function

TypeError: AutoImport is not a function 原因&#xff1a;unplugin-auto-import 插件版本问题 Vue3基于Webpack&#xff0c;在vue.config.js中配置 当unplugin-vue-components版本小于0.26.0时&#xff0c;使用以下写法 const { defineConfig } require("vue/cli-se…

Elasticsearch:更好的二进制量化(BBQ)对比乘积量化(PQ)

作者&#xff1a;来自 Elastic Benjamin Trent 为什么我们选择花时间研究更好的二进制量化而不是在 Lucene 和 Elasticsearch 中进行生产量化。 我们一直在逐步使 Elasticsearch 和 Lucene 的向量搜索变得更快、更实惠。我们的主要重点不仅是通过 SIMD 提高搜索速度&#xff0…

检查课程是否有效

文章目录 概要整体架构流程技术细节小结 概要 这是一个微服务内部接口&#xff0c;当用户学习课程时&#xff0c;可能需要播放课程视频。此时提供视频播放功能的媒资系统就需要校验用户是否有播放视频的资格。所以&#xff0c;开发媒资服务&#xff08;tj-media&#xff09;的…

红外遥控报警器设计(模电课设)

一、设计要求 利用NE555p芯片设计制作报警器。要求当有人遮挡红外光时发出报警信号&#xff0c;无人遮挡红外光时报警器不工作&#xff0c;即不发声。 二、元器件 555芯片&#xff1a;NE555P 集成运放&#xff1a;LM358 三级管&#xff1a;2N1711 蜂鸣器&#xff1a;HY-30…

英语fault和false的区别

"fault" 和 "false" 在英语中虽然都与错误或问题有关&#xff0c;但它们的含义和用法有很大的不同。下面详细解释这两个词的区别&#xff1a; 1. Fault 定义&#xff1a;错误、缺陷、责任、故障。特点&#xff1a; 错误或缺陷&#xff1a;指某物或某事存…