StarRocks 中如何做到查询超时(QueryTimeout)

背景

本文基于 StarRocks 3.1.7
主要是分析以下两种超时设置的方式:

  • SESSION 级别
    SET query_timeout = 10;SELECT sleep(20);
  • SQL 级别
  select /*+ SET_VAR(query_timeout=10) */ sleep(20); 

通过本文的分析大致可以了解到在Starrocks的FE端是如何进行Command的交互以及数据流走向,其他的命令也是可以举一反三

分析

query_timeout 命令解析

和Spark以及hive等但是解析一样,StarRocks也是采用的Anltr4进行语法的解析,
对于StarRocks来说, 对应的语法解析文件为 StarRocks.g4文件,那么其set query_time在如下的位置

setStatement: SET setVar (',' setVar)*;setVar: (CHAR SET | CHARSET | CHARACTER SET) (identifierOrString | DEFAULT)                       #setNames| NAMES (charset = identifierOrString | DEFAULT)(COLLATE (collate = identifierOrString | DEFAULT))?                                     #setNames| PASSWORD '=' (string | PASSWORD '(' string ')')                                           #setPassword| PASSWORD FOR user '=' (string | PASSWORD '(' string ')')                                  #setPassword| userVariable '=' expression                                                               #setUserVar| varType? identifier '=' setExprOrDefault                                                  #setSystemVar| systemVariable '=' setExprOrDefault                                                       #setSystemVar| varType? TRANSACTION transaction_characteristics                                          #setTransaction;

继而可以找到对应的语法解析部分为 AstBuilder.java 中

 @Overridepublic ParseNode visitSetSystemVar(StarRocksParser.SetSystemVarContext context) {NodePosition pos = createPos(context);if (context.systemVariable() != null) {VariableExpr variableDesc = (VariableExpr) visit(context.systemVariable());Expr expr = (Expr) visit(context.setExprOrDefault());return new SystemVariable(variableDesc.getSetType(), variableDesc.getName(), expr, pos);} else {Expr expr = (Expr) visit(context.setExprOrDefault());String variable = ((Identifier) visit(context.identifier())).getValue();if (context.varType() != null) {return new SystemVariable(getVariableType(context.varType()), variable, expr, pos);} else {return new SystemVariable(SetType.SESSION, variable, expr, pos);}}}

从以上所示,SET query_timeout = 10; 就会在语法层面解析为 new SystemVariable(SetType.SESSION, variable, expr, pos)

数据流向

以上只是说到了 SET query_timeout = 10 只会被解析为SystemVariable对应的java数据结构,但是一条SQL从客户端发送过来,是怎么一个数据流呢?
我们大概的捋一下:

StarRocksFE中新建QeService对象||\/new NMysqlServer(port, scheduler, sslContext)||\/new AcceptListener(connectScheduler, sslContext)||\/AcceptListener.handleEvent||\/context.startAcceptQuery(processor)||\/NMysqlChannel.startAcceptQuery||\/conn.getSourceChannel().setReadListener(new ReadListener(nConnectContext, connectProcessor))||\/ReadListener.handleEvent||\/connectProcessor.processOnce()||\/connectProcessor.dispatch||\/connectProcessor.handleQuery||\/stmts = com.starrocks.sql.parser.SqlParser.parse(originStmt, ctx.getSessionVariable());||\/StmtExecutor.execute()||\/StatementPlanner.plan(parsedStmt, context)||\/StmtExecutor.handleSetStmt()||\/SetExecutor.execute // 会设置到变量的keyValue到`ConnectContext`的`SystemVariable`变量中,后续会或获取对应的值

query_timeout 怎么生效

还是定位到StarRocksFE.java中:

ExecuteEnv.setup();

该方法是整个执行环境的基础设置。其中会有ConnectScheduler的初始化:

public ConnectScheduler(int maxConnections) {this.maxConnections = new AtomicInteger(maxConnections);numberConnection = new AtomicInteger(0);nextConnectionId = new AtomicInteger(0);// Use a thread to check whether connection is timeout. Because// 1. If use a scheduler, the task maybe a huge number when query is messy.//    Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler.// 2. Use a thread to poll maybe lose some accurate, but is enough to us.ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1,"Connect-Scheduler-Check-Timer", true);checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, TimeUnit.MILLISECONDS);}

这里有个定时线程池去进行timeout的检查,间隔是一秒。具体的检查机制在TimeoutChecker类中:

private class TimeoutChecker extends TimerTask {@Overridepublic void run() {try {long now = System.currentTimeMillis();synchronized (ConnectScheduler.this) {//Because unregisterConnection will be callback in NMysqlChannel's close,//unregisterConnection will remove connectionMap (in the same thread)//This will result in a concurrentModifyException.//So here we copied the connectionIds to avoid removing iterator during operate iteratorArrayList<Long> connectionIds = new ArrayList<>(connectionMap.keySet());for (Long connectId : connectionIds) {ConnectContext connectContext = connectionMap.get(connectId);connectContext.checkTimeout(now);}}} catch (Throwable e) {//Catch Exception to avoid thread exitLOG.warn("Timeout checker exception, Internal error : " + e.getMessage());}}}

主要逻辑就是从connectionMap中获取对应的ConnectContext,从而调用ConnectContext.checkTimeout方法检查是否超时。
checkTimeout方法主要是通过sessionVariable.getQueryTimeoutS()获取设置的超时时间,如果超时,则调用StmtExecutor.cancel,继而调用Coordinator.cancel
所以现在就存在一个问题: 当前连接的ConnectContext什么时候被集成到 connectionMap中去的?
还是回到流程 AcceptListener.handleEvent 中去:

    connectScheduler.submit(context);...if (connectScheduler.registerConnection(context)) {MysqlProto.sendResponsePacket(context);connection.setCloseListener(streamConnection -> connectScheduler.unregisterConnection(context));} else {...

这里的submit 方法会生成context的conectionId.
registerConnection方法会把当前 ConnectionContext 的id和 ConnectionContext 组成KeyValue对并放置到connectionMap

至此 SET query_timeout = 10 整体的数据流就结束了,待在同一个连接中进行select 操作的时候,就会根据执行的长短进行超时处理了。

注意:
对于 select /*+ SET_VAR(query_timeout=10) */ sleep(20); 这种情况的解析,是通过 HintCollector来解析的。
词法解析是在StarRocksLex.g4 中,

OPTIMIZER_HINT: '/*+' .*? '*/' -> channel(2);

对于获取hint是通过HintCollectorextractHintToRight获取的:

 private void extractHintToRight(ParserRuleContext ctx) {Token semi = ctx.start;int i = semi.getTokenIndex();List<Token> hintTokens = tokenStream.getHiddenTokensToRight(i, HINT_CHANNEL);if (hintTokens != null) {contextWithTokenMap.computeIfAbsent(ctx, e -> new ArrayList<>()).addAll(hintTokens);}}

对应的解析在:SqlParser.parseWithStarRocksDialect 方法中:

  HintCollector collector = new HintCollector((CommonTokenStream) parser.getTokenStream());collector.collect(singleStatementContexts.get(idx));AstBuilder astBuilder = new AstBuilder(sessionVariable.getSqlMode(), collector.getContextWithHintMap());

AstBuilder 中会存储 hint到 hintMap 变量中,而在 visitQuerySpecification方法中

        selectList.setOptHints(extractVarHints(hintMap.get(context)));

从而在StmtExecutor.execute中会调用 optHints = selectRelation.getSelectList().getOptHints();获取对应的hint,

 if (isQuery &&((QueryStatement) parsedStmt).getQueryRelation() instanceof SelectRelation) {SelectRelation selectRelation = (SelectRelation) ((QueryStatement) parsedStmt).getQueryRelation();optHints = selectRelation.getSelectList().getOptHints();}if (optHints != null) {LOG.error("optHints: parsedStmt:" + parsedStmt.getOrigStmt() +"  "+ optHints.size());});SessionVariable sessionVariable = (SessionVariable) sessionVariableBackup.clone();for (String key : optHints.keySet()) {VariableMgr.setSystemVariable(sessionVariable,new SystemVariable(key, new StringLiteral(optHints.get(key))), true);}context.setSessionVariable(sessionVariable);

这样 hint相关的变量就设置到ConnectContextSessionVariable中了,后续的流程和之前的一致,只不过在StatExecutor执行的最后会把time_out给复原:

    SessionVariable sessionVariableBackup = context.getSessionVariable();...SessionVariable sessionVariable = (SessionVariable) sessionVariableBackup.clone();for (String key : optHints.keySet()) {VariableMgr.setSystemVariable(sessionVariable,new SystemVariable(key, new StringLiteral(optHints.get(key))), true);}context.setSessionVariable(sessionVariable);...context.setSessionVariable(sessionVariableBackup);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/55414.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java Web 之 Cookie 详解

在 JavaWeb 开发中&#xff0c;Cookie 就像网站给浏览器贴的小纸条&#xff0c;用于记录一些用户信息或状态&#xff0c;方便下次访问时识别用户身份或进行个性化服务。 也可以这么理解&#xff1a; 场景一&#xff1a;想象一下&#xff0c;你去一家咖啡店&#xff0c;店员认…

webpack信息泄露

先看看webpack中文网给出的解释 webpack 是一个模块打包器。它的主要目标是将 JavaScript 文件打包在一起,打包后的文件用于在浏览器中使用,但它也能够胜任转换、打包或包裹任何资源。 如果未正确配置&#xff0c;会生成一个.map文件&#xff0c;它包含了原始JavaScript代码的映…

VPN简述

文章目录 VPNVPN基础VPN类型 VPN VPN隧道安全 VPN基础 背景&#xff1a; 在网络传输中&#xff0c;绝大部分数据内容都是明文传输&#xff0c;存在很多安全隐患&#xff08;窃听、篡改、冒充&#xff09; 总部、分公司、办事处、出差人员、合作单位等需要访问总部网络资源 Vi…

富格林:警悟可信经验安全投资

富格林指出&#xff0c;黄金具有不错的投资价值&#xff0c;一直以来备受投资者的喜爱&#xff0c;近年来大家也纷纷加入现货黄金市场为己增值财富。但是要为投资安全护航的前提&#xff0c;是需要投资者使用合适可信的方法以及掌握相对应的投资技巧。下面富格林将总结以下可信…

【数学分析笔记】第4章第4节 复合函数求导法则及其应用(1)

4. 微分 4.4 复合函数求导法则及其应用 4.4.1 复合函数求导法则 【定理4.4.1】 u g ( x ) ug(x) ug(x)在 x x 0 xx_0 xx0​可导&#xff0c; g ( x 0 ) u ( 0 ) g(x_0)u(0) g(x0​)u(0)&#xff0c; y f ( u ) yf(u) yf(u)在 u u 0 uu_0 uu0​可导&#xff0c;则 y f …

SpringBoot+Redis+RabbitMQ完成增删改查

各部分分工职责 RabbitMQ负责添加、修改、删除的异步操作 Redis负责数据的缓存 RabbitMQ里面角色职责简单描述 RabbitMQ里面有几个角色要先分清以及他们的对应关系&#xff1a; 交换机、队列、路由键 交换机和队列是一对多 队列和路由键是多对多 然后就是消息的发送者&…

课设实验-数据结构-线性表-手机销售

题目&#xff1a; 代码&#xff1a; #include<stdio.h> #include<string.h> #define MaxSize 10 //定义顺序表最大长度 //定义手机结构体类型 typedef struct {char PMod[10];//手机型号int PPri;//价格int PNum;//库存量 }PhoType; //手机类型 //记录手机的顺序…

【HTTP(3)】(状态码,https)

【认识状态码】 状态码最重要的目的&#xff0c;就是反馈给浏览器:这次请求是否成功&#xff0c;若失败&#xff0c;则出现失败原因 常见状态码: 200:OK&#xff0c;表示成功 404:Not Found&#xff0c;浏览器访问的资源在服务器上没有找到 403:Forbidden&#xff0c;访问被…

springboot系列--web相关知识探索三

一、前言 web相关知识探索二中研究了请求是如何映射到具体接口&#xff08;方法&#xff09;中的&#xff0c;本次文章主要研究请求中所带的参数是如何映射到接口参数中的&#xff0c;也即请求参数如何与接口参数绑定。主要有四种、分别是注解方式、Servlet API方式、复杂参数、…

【案例】距离限制模型透明

开发平台&#xff1a;Unity 2023 开发工具&#xff1a;Unity ShaderGraph   一、效果展示 二、路线图 三、案例分析 核心思路&#xff1a;计算算式&#xff1a;透明值 实际距离 / 最大距离 &#xff08;实际距离 ≤ 最大距离&#xff09;   3.1 说明 | 改变 Alpha 值 在 …

计算机组成原理之无符号整数的表示和运算

无符号整数的表示 无符号整数的表示&#xff1a;无符号整数直接使用其二进制形式表示&#xff0c;所有位都是数值位&#xff0c;没有符号位。例如&#xff0c;一个8位的无符号整数可以表示的范围是从0&#xff08;00000000&#xff09;到255&#xff08;11111111&#xff09;。…

stm32f103调试,程序与定时器同步设置

在调试定时器相关代码时&#xff0c;注意到定时器的中断位总是置1&#xff0c;怀疑代码有问题&#xff0c;经过增大定时器的中断时间&#xff0c;发现定时器与代码调试并不同步&#xff0c;这一点对于调试涉及定时器的代码是非常不利的&#xff0c;这里给出keil调试stm32使定时…

自用Proteus(8.15)常用元器件图示和功能介绍(持续更新...)

文章目录 一、 前言二、新建工程&#xff08;以51单片机流水灯为例&#xff09;2.1 打开软件2.2 建立新工程2.3 创建原理图2.4 不创建PCB布版设计2.5 创建成功2.6 添加元器件2.7 原理图放置完成2.8 编写程序&#xff0c;进行仿真2.9 仿真 三、常用元器件图示和功能介绍3.1 元件…

【回眸】Tessy 单元测试软件使用指南(四)常见报错及解决方案与批量初始化的经验

前言 分析时Tessy的报错 1.fatal error: Tricore/Compilers/Compilers.h: No such file or directory 2.error: #error "Compiler unsupported" 3.warning: invalid suffix on literal;C11 requires a space between literal and string macro 4.error: unknown…

螺蛳壳里做道场:老破机搭建的私人数据中心---Centos下Docker学习01(环境准备)

1 准备工作 由于创建数据中心需要安装很多服务器&#xff0c;这些服务器要耗费很所物理物理计算资源、存储资源、网络资源和软件资源&#xff0c;作为穷学生只有几百块的n手笔记本&#xff0c;不可能买十几台服务器来搭建数据中心&#xff0c;也不愿意跑实验室&#xff0c;想躺…

第十二章--- fixed 和 setprecision 函数、round 函数、进制转换及底层逻辑

1. 保留几位小数 在C中&#xff0c;如果你想要控制输出的小数点后的位数&#xff0c;可以使用<iomanip>头文件提供的fixed和setprecision函数。这里的fixed用于设置浮点数的输出格式为定点表示法&#xff0c;而setprecision(n)则用来指定小数点后保留的位数。具体用法如…

文件上传之%00截断(00截断)以及pikachu靶场

pikachu的文件上传和upload-lab的文件上传 目录 mime type类型 getimagesize 第12关%00截断&#xff0c; 第13关0x00截断 差不多了&#xff0c;今天先学文件上传白名单&#xff0c;在网上看了资料&#xff0c;差不多看懂了&#xff0c;但是还有几个地方需要实验一下&#…

SpringBoot整合异步任务执行

同步任务&#xff1a; 同步任务是在单线程中按顺序执行&#xff0c;每次只有一个任务在执行&#xff0c;不会引发线程安全和数据一致性等 并发问题 同步任务需要等待任务执行完成后才能执行下一个任务&#xff0c;无法同时处理多个任务&#xff0c;响应慢&#xff0c;影响…

C++ 语言特性21 - 别名模板

一&#xff1a;概述 别名模板是 C11 引入的&#xff0c;用于为一个模板类型定义别名&#xff0c;从而简化复杂的模板类型定义。它结合了 using 关键字&#xff0c;可以对模板类型进行重新命名&#xff0c;使代码更加简洁和可读。 1. 作用 定义模板类型的别名。简化复杂的模板类…

VirtualBox+Vagrant快速搭建Centos7系统【最新详细教程】

VirtualBoxVagrant快速搭建Centos7系统 &#x1f4d6;1.安装VirtualBox✅下载VirtualBox✅安装 &#x1f4d6;2.安装Vagrant✅下载Vagrant✅安装 &#x1f4d6;3.搭建Centos7系✅初始化Vagrantfile文件生成✅启动Vagrantfile文件✅解决 vagrant up下载太慢的问题✅配置网络ip地…