Websocket如何分块处理数据量超大的消息体

在这里插入图片描述

若我们服务端一次性最大处理的字节数是1M,而客户端发来了2M的数据,此时服务端的数据就要被切割成两次传输解码。Http协议中有分块传输,而在Websocket也可以分块处理超大的消息体。在jsr356标准中使用javax.websocket.MessageHandler.Partial可以分块处理这种数据。

 interface Partial<T> extends MessageHandler {/*** Called when part of a message is available to be processed.** @param messagePart   The message part* @param last          <code>true</code> if this is the last part of*                      this message, else <code>false</code>*/void onMessage(T messagePart, boolean last);}

Partial接口中的参数last就是表示是否是最后一块分块数据。

我们即可以给WsContainer全局设置消息体的缓冲池大小,也可以给每个session单独设置消息体的缓冲池大小。发送消息体一旦超过了此大小,数据就会在服务端被分块传输解码。

 //全局设置消息体的缓冲池大小@Bean  public WebSocketContainerFactoryBean webSocketContainer(){WebSocketContainerFactoryBean factoryBean = new WebSocketContainerFactoryBean();factoryBean.setMaxTextMessageBufferSize(20);
//        factoryBean.setMaxSessionIdleTimeout(10*1000);
//        factoryBean.setMaxBinaryMessageBufferSize(1000);return factoryBean;}//session级别消息体的缓冲池大小@OnOpenpublic void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {session.setMaxTextMessageBufferSize(20);//....}      

使用JSR356注解实现消息体分块传输


@ServerEndpoint(value = "/ws/{token}")
@Component
public class WebsocketHandler2 {private final static Logger log = LoggerFactory.getLogger(WebsocketHandler2.class);private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();@OnOpenpublic void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {session.setMaxTextMessageBufferSize(20);}@OnMessagepublic void onMessage(String partialMsg, Session session, boolean isLast) throws IOException {StringBuilder stringJoiner = dataCache.get(session.getId());if (isLast) {log.info("receive client(id={}) partial last msg=>{}", session.getId(), partialMsg);if (stringJoiner == null) {String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), partialMsg);session.getBasicRemote().sendText(msg);} else {dataCache.remove(session.getId());stringJoiner.append(partialMsg);String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner);session.getBasicRemote().sendText(msg);}} else {log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), partialMsg);if (stringJoiner == null) {stringJoiner = new StringBuilder(partialMsg);dataCache.put(session.getId(), stringJoiner);} else {stringJoiner.append(partialMsg);}}}

使用spring的WebSocketHandler实现消息体分块传输

@Component
public class WebsocketHandler1 extends TextWebSocketHandler {private final Logger log = LoggerFactory.getLogger(getClass());private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {session.setTextMessageSizeLimit(20);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {StringBuilder stringJoiner = dataCache.get(session.getId());if (message.isLast()) {log.info("receive client(id={}) partial last msg=>{}", session.getId(), message.getPayload());if (stringJoiner == null) {TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), message.getPayload()));session.sendMessage(msg);} else {dataCache.remove(session.getId());stringJoiner.append(message.getPayload());TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner));session.sendMessage(msg);}} else {log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), message.getPayload());if (stringJoiner == null) {stringJoiner = new StringBuilder(message.getPayload());dataCache.put(session.getId(), stringJoiner);} else {stringJoiner.append(message.getPayload());}}}@Overridepublic boolean supportsPartialMessages() {return true;}

注意上边的WebsocketHandler1 实现的抽象方法supportsPartialMessages其返回值必须是true,否则处理大消息体时会报错。这是因为StandardWebSocketHandlerAdapter会根据supportsPartialMessages方法返回值将我们的WebSocketHandler适配成MessageHandler.PartialMessageHandler.Whole,而supportsPartialMessages返回值是false就会适配成MessageHandler.WholeMessageHandler.Whole是无法处理分块消息体的。

	//StandardWebSocketHandlerAdapter@Overridepublic void onOpen(final javax.websocket.Session session, EndpointConfig config) {this.wsSession.initializeNativeSession(session);// The following inner classes need to remain since lambdas would not retain their// declared generic types (which need to be seen by the underlying WebSocket engine)if (this.handler.supportsPartialMessages()) {session.addMessageHandler(new MessageHandler.Partial<String>() {@Overridepublic void onMessage(String message, boolean isLast) {handleTextMessage(session, message, isLast);}});session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {@Overridepublic void onMessage(ByteBuffer message, boolean isLast) {handleBinaryMessage(session, message, isLast);}});}else {session.addMessageHandler(new MessageHandler.Whole<String>() {@Overridepublic void onMessage(String message) {handleTextMessage(session, message, true);}});session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {@Overridepublic void onMessage(ByteBuffer message) {handleBinaryMessage(session, message, true);}});}//......}

websocket底层是怎么处理分块数据的?我们在方法org.apache.tomcat.websocket.WsFrameBase#processDataText可以看到其具体的处理逻辑。
首先将接收到的二进制数据尝试解码成文本数据,若发现接收缓冲区messageBufferText容量不足则查到分块处理器,若存在分块处理器泽调用sendMessageTex(false),先处理部分数据,若不存在则直接抛出异常。

//org.apache.tomcat.websocket.server.WsFrameBaseprivate boolean processDataText() throws IOException {// Copy the available data to the bufferTransformationResult tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary);while (!TransformationResult.END_OF_FRAME.equals(tr)) {//...}messageBufferBinary.flip();boolean last = false;// Frame is fully received// Convert bytes to UTF-8while (true) {CoderResult cr = utf8DecoderMessage.decode(messageBufferBinary, messageBufferText,last);if (cr.isError()) {throw new WsIOException(new CloseReason(CloseCodes.NOT_CONSISTENT,sm.getString("wsFrame.invalidUtf8")));} else if (cr.isOverflow()) {// Ran out of space in text buffer - flush it//尝试解码时发现接收缓冲区messageBufferText容量不足//调用sendMessageTex(false),先处理部分数据。if (usePartial()) {   //查找分块处理器messageBufferText.flip();sendMessageText(false);messageBufferText.clear();} else { //没有分块处理器,就会抛出异常throw new WsIOException(new CloseReason(CloseCodes.TOO_BIG,sm.getString("wsFrame.textMessageTooBig")));}} else if (cr.isUnderflow() && !last) {// End of frame and possible message as well.if (continuationExpected) {// If partial messages are supported, send what we have// managed to decodeif (usePartial()) {messageBufferText.flip();sendMessageText(false);messageBufferText.clear();}messageBufferBinary.compact();newFrame();// Process next framereturn true;} else {// Make sure coder has flushed all outputlast = true;}} else {// End of messagemessageBufferText.flip();//处理最后一块消息sendMessageText(true);newMessage();return true;}}}      //确定是否支持分块处理private boolean usePartial() {if (Util.isControl(opCode)) {return false;} else if (textMessage) {return textMsgHandler instanceof MessageHandler.Partial;} else {// Must be binaryreturn binaryMsgHandler instanceof MessageHandler.Partial;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/61305.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【澜舟科技-注册/登录安全分析报告】

前言 由于网站注册入口容易被机器执行自动化程序攻击&#xff0c;存在如下风险&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露&#xff0c;不符合国家等级保护的要求。短信盗刷带来的拒绝服务风险 &#xff0c;造成用户无法登陆、注册&#xff0c;大量收到垃圾短信的…

uni-app快速入门(十)--常用内置组件(下)

本文介绍uni-app的textarea多行文本框组件、web-view组件、image图片组件、switch开关组件、audio音频组件、video视频组件。 一、textarea多行文本框组件 textarea组件在HTML 中相信大家非常熟悉&#xff0c;组件的官方介绍见&#xff1a; textarea | uni-app官网uni-app,un…

Tomcat 如何管理 Session

Tomcat 如何管理 Session 我们知道&#xff0c;Tomcat 中每一个 Context 容器对应一个 Web 应用&#xff0c;而 Web 应用之间的 Session 应该是独立的&#xff0c;因此 Session 的管理肯定是 Context 级的&#xff0c;也就是一个 Context 一定关联多个 Session。 Tomcat 中主…

鸿蒙NEXT开发-用户通知服务的封装和文件下载通知

注意&#xff1a;博主有个鸿蒙专栏&#xff0c;里面从上到下有关于鸿蒙next的教学文档&#xff0c;大家感兴趣可以学习下 如果大家觉得博主文章写的好的话&#xff0c;可以点下关注&#xff0c;博主会一直更新鸿蒙next相关知识 专栏地址: https://blog.csdn.net/qq_56760790/…

01 IP路由基础

一、路由器是怎么转发数据包 • 当数据包到达路由器之后&#xff0c;根据数据包的目的 IP 地址&#xff0c;查找 路由表&#xff0c;并根据路由表中相应的路由所指示出接口还有下一跳 指导数据包在网络中的转发。 • 如果路由器路由表没有路由怎么办&#xff1f; -------- 将数…

Android studio 呼叫盒app

一、权限文件 0.gradle切换国内源 #Fri Nov 08 15:46:05 CST 2024 distributionBaseGRADLE_USER_HOME distributionPathwrapper/dists distributionUrlhttps://mirrors.cloud.tencent.com/gradle/gradle-8.4-bin.zip zipStoreBaseGRADLE_USER_HOME zipStorePathwrapper/dists1…

[Admin] Dashboard Filter for Mix Report Types

Background RevOps team has built a dashboard for sales team to track team members’ performance, but they’re blocked by how to provide a manager view based on sales’ hierarchy. Therefore, they seek for dev team’s help to clear their blocker. From foll…

2024年人工智能技术赋能网络安全应用测试:广东盈世在钓鱼邮件识别场景荣获第三名!

近期&#xff0c;2024年国家网络安全宣传周“网络安全技术高峰论坛主论坛暨粤港澳大湾区网络安全大会”在广州成功举办。会上&#xff0c;国家计算机网络应急技术处理协调中心公布了“2024年人工智能技术赋能网络安全应用测试结果”。结果显示&#xff0c;广东盈世计算机科技有…

Java进阶四-异常,File

异常 概念&#xff1a;代表程序出现的问题。 目的&#xff1a;程序出现了异常我们应该如何处理。 最高父类&#xff1a;Exception 异常分为两类 编译时异常&#xff1a;没有继承RuntimeException的异常,直接继承与Exception,编译阶段就会错误提示。运行时异常:RuntimeExc…

ERROR TypeError: AutoImport is not a function

TypeError: AutoImport is not a function 原因&#xff1a;unplugin-auto-import 插件版本问题 Vue3基于Webpack&#xff0c;在vue.config.js中配置 当unplugin-vue-components版本小于0.26.0时&#xff0c;使用以下写法 const { defineConfig } require("vue/cli-se…

Elasticsearch:更好的二进制量化(BBQ)对比乘积量化(PQ)

作者&#xff1a;来自 Elastic Benjamin Trent 为什么我们选择花时间研究更好的二进制量化而不是在 Lucene 和 Elasticsearch 中进行生产量化。 我们一直在逐步使 Elasticsearch 和 Lucene 的向量搜索变得更快、更实惠。我们的主要重点不仅是通过 SIMD 提高搜索速度&#xff0…

检查课程是否有效

文章目录 概要整体架构流程技术细节小结 概要 这是一个微服务内部接口&#xff0c;当用户学习课程时&#xff0c;可能需要播放课程视频。此时提供视频播放功能的媒资系统就需要校验用户是否有播放视频的资格。所以&#xff0c;开发媒资服务&#xff08;tj-media&#xff09;的…

红外遥控报警器设计(模电课设)

一、设计要求 利用NE555p芯片设计制作报警器。要求当有人遮挡红外光时发出报警信号&#xff0c;无人遮挡红外光时报警器不工作&#xff0c;即不发声。 二、元器件 555芯片&#xff1a;NE555P 集成运放&#xff1a;LM358 三级管&#xff1a;2N1711 蜂鸣器&#xff1a;HY-30…

Spring MVC——针对实习面试

目录 Spring MVC什么是Spring MVC&#xff1f;简单介绍下你对Spring MVC的理解&#xff1f;Spring MVC的优点有哪些&#xff1f;Spring MVC的主要组件有哪些&#xff1f;Spring MVC的工作原理或流程是怎样的&#xff1f;Spring MVC常用注解有哪些&#xff1f; Spring MVC 什么是…

机器学习(贝叶斯算法,决策树)

朴素贝叶斯分类 贝叶斯分类理论 假设现有两个数据集&#xff0c;分为两类 我们现在用p1(x,y)表示数据点(x,y)属于类别1(图中红色圆点表示的类别)的概率&#xff0c;用p2(x,y)表示数据点(x,y)属于类别2(图中蓝色三角形表示的类别)的概率&#xff0c;那么对于一个新数据点(x,y)…

题目讲解18 有效的括号

原题链接&#xff1a; 20. 有效的括号 - 力扣&#xff08;LeetCode&#xff09; 思路分析&#xff1a; 第一步&#xff1a;先搭建一个数据结构——栈。 typedef char STDataType; typedef struct Stack {STDataType* arr;int top, capacity; } Stack;//初始化 void StackIn…

HarmonyOS笔记5:ArkUI框架的Navigation导航组件

ArkUI框架的Navigation导航组件 在移动应用中需要在不同的页面进行切换跳转。这种切换和跳转有两种方式&#xff1a;页面路由和Navigation组件实现导航。HarmonyOS推荐使用Navigation实现页面跳转。在本文中在HarmonyOS 5.0.0 Release SDK (API Version 12 Release)版本下&…

【C++】第九节:list

1、list的介绍及使用 1.1 list的介绍 list - C 参考 1.2 list的使用 1.2.1 list的构造 void TestList1() {list<int> l1; // 构造空的l1list<int> l2(4, 100); // l2中包含4个值为100的元素list<int> l3(l2.begin(), l2.end()); // 用l2的[begin(),end())…

Idea中创建和联系MySQL等数据库

备注&#xff1a;电脑中要已下好自己需要的MySQL数据库软件 MySQL社区版下载链接&#xff1a; https://dev.mysql.com/downloads/installer/ 优点&#xff1a; 1.相比与在命令行中管理数据库&#xff0c;idea提供了图形化管理&#xff0c;简单明了&#xff1b; 2.便于与后端…

Linux_shell脚本if语句详细教程

前言 在 Linux Shell 脚本中&#xff0c;if 语句用于基于条件执行命令或代码块。它的基本语法结构如下&#xff1a; if 条件; then# 如果条件为真时执行的代码 elif 另一个条件; then# 如果另一个条件为真时执行的代码 else# 如果所有条件都不成立时执行的代码 fi一、if 语句…