php如何给网站做支付接口/荥阳seo推广

php如何给网站做支付接口,荥阳seo推广,武汉建设监理网,杭州科技公司引入 Apache Hive 是基于Hadoop的数据仓库工具,它可以使用SQL来读取、写入和管理存在分布式文件系统中的海量数据。在Hive中,HQL默认转换成MapReduce程序运行到Yarn集群中,大大降低了非Java开发者数据分析的门槛,并且Hive提供命令…

引入

Apache Hive 是基于Hadoop的数据仓库工具,它可以使用SQL来读取、写入和管理存在分布式文件系统中的海量数据。在Hive中,HQL默认转换成MapReduce程序运行到Yarn集群中,大大降低了非Java开发者数据分析的门槛,并且Hive提供命令行工具和JDBC驱动程序,方便用户连接到Hive进行数据分析操作。

严格意义上,Hive并不属于计算引擎,而是建立在Hadoop生态之上的数据仓库管理工具。它将繁杂的MapReduce作业抽象成SQL,使得开发及维护成本大幅降低。得益于HDFS的存储和MapReduce的读写能力,Hive展现出了强大的兼容能力、数据吞吐能力和服务稳定性,时至今日依然是大数据架构中不可或缺的一部分。

Hive的核心特点

  • Hive是基于Hadoop的数仓工具,底层数据存储在HDFS中;

  • Hive提供标准SQL功能,支持SQL语法访问操作数据;

  • Hive适合OLAP数据分析场景,不适合OLTP数据处理场景,所以适合数据仓库构建;

  • HQL默认转换成MapReduce任务执行,也可以配置转换成Apache Spark、Apache Tez任务运行;

  • Hive中支持定义UDF、UDAF、UDTF函数扩展功能。

Hive的架构设计

Hive用户接口

访问Hive可以通过CLI、Beeline、JDBC/ODBC、WebUI几种方式。在Hive早期版本中可以使用Hive CLI来操作Hive,Hive CLI并发性能差、脚本执行能力有限并缺乏JDBC驱动支持,从Hive 4.x版本起废弃了Hive CLI推荐使用Beeline。Beeline是一个基于JDBC的Hive客户端,支持并发环境、复杂脚本执行、JDBC驱动等,在Hive集群内连接Hive可以使用Beeline方式。在Hive集群外,通过代码或者工具连接操作Hive时可以通过JDBC/ODBC方式。通过WebUI方式可以通过浏览器查看到Hive集群的一些信息。

HiveServer2服务

HiveServer2服务提供JDBC/ODBC接口,主要用于代理远程客户端对Hive的访问,是一种基于Thrift协议的服务。例如通过JDBC或者Beeline连接访问Hive时就需要启动HiveServer2服务,就算Beeline访问本机上的Hive服务也需要启动HiveServer2服务。

HiveServer2代理远程客户端对Hive操作时会涉及到操作HDFS数据,就会有操作权限问题,那么操作HDFS中数据的用户是启动HiveServer2的用户还是远程客户端的用户需要通过“hive.server2.enable.doAs” 参数决定,该参数默认为true,表示HiveServer2操作HDFS时的用户为远程客户端用户,如果设置为false表示操作HDFS数据的用户为启动HiveServer2的用户。

MetaStore服务

MetaStore服务负责存储和管理Hive元数据,为HiverServer2提供元数据访问接口。Hive中的元数据包括表的名字,表的列和分区及其属性,表的属性(表拥有者、是否为外部表等),表的数据所在目录等。

Hive MetaStore可以将元数据存储在mysql、derby数据库中。

Hive Driver

Driver中包含解释器(SQL Parser)、编译器(Compiler)、优化器(Optimizer),负责完成HQL查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在HDFS中,并在随后有执行器(Executor)调用MapReduce执行。

对于Hive有了一个初步认识,我们下面开始梳理Hive的执行原理。

Hive的执行原理

Hive无论采用哪种调用方式,最终都会辗转到org.apache.hadoop.hive.ql.Driver类。SQL语句在Driver类中,通过Antlr框架进行解析编译,将一条SQL按照如下流程转换成最终执行的MapReduce任务。

如果直接盲目的去看Driver类的代码,会很容易看懵逼,我们需要再往前一点。

SQLOperation

先看org.apache.hive.service.cli.operation.SQLOperation 类,它负责创建Driver对象、编译SQL、异步执行SQL。其中核心的就是 runInternal()方法,主要进行如下两个步骤:

  1. Driver对象创建并编译SQL,将SQL编译成Query Plan执行计划。
  2. 对QueryPaln 进行处理,转换成MR 任务执行。

runInternal() 方法源码内容如下:

  /*** 内部运行方法,用于执行SQL操作。** @throws HiveSQLException 如果在执行过程中发生Hive SQL异常。*/public void runInternal() throws HiveSQLException {// 设置操作状态为PENDINGsetState(OperationState.PENDING);// 判断是否应该异步运行boolean runAsync = shouldRunAsync();// 判断是否应该异步编译final boolean asyncPrepare = runAsync&& HiveConf.getBoolVar(queryState.getConf(),HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_ASYNC_COMPILE);// 如果不是异步编译,则同步准备查询if (!asyncPrepare) {//创建Driver对象,编译SQL//Driver经过:SQL -> AST(抽象语法树) -> QueryBlock(查询块) -> Operator(e逻辑执行计划) -> TaskTree(物理执行计划) -> QueryPlan(查询计划)prepare(queryState);}// 如果不是异步运行,则同步运行查询if (!runAsync) {runQuery();} else {// 我们将在后台线程中传递ThreadLocals,从前台(处理程序)线程传递。// 1) ThreadLocal Hive对象需要在后台线程中设置// 2) Hive中的元数据存储客户端与正确的用户相关联。// 3) 当前UGI将在元数据存储处于嵌入式模式时被元数据存储使用Runnable work = new BackgroundWork(getCurrentUGI(), parentSession.getSessionHive(),SessionState.getPerfLogger(), SessionState.get(), asyncPrepare);try {// 如果没有可用的后台线程来运行此操作,此提交将阻塞Future<?> backgroundHandle = getParentSession().submitBackgroundOperation(work);// 设置后台操作句柄setBackgroundHandle(backgroundHandle);} catch (RejectedExecutionException rejected) {// 设置操作状态为ERRORsetState(OperationState.ERROR);// 抛出HiveSQLException异常throw new HiveSQLException("The background threadpool cannot accept" +" new task for execution, please retry the operation", rejected);}}}

1.Driver对象创建并编译SQL,将SQL编译成Query Plan执行计划

其中核心的是prepare()方法,它的源码在2.x和3.x、4.x有一些区别,不过其核心功能是没变的,主要是创建Driver对象,并编译SQL,然后通过Driver将SQL最终转换成Query Plan。

prepare()方法3.x的源码如下:

  /*** 准备执行SQL查询的操作。* 此方法负责初始化Driver,设置查询超时,编译查询语句,并处理可能的异常。** @param queryState 包含查询状态信息的对象。* @throws HiveSQLException 如果在准备过程中发生Hive SQL异常。*/public void prepare(QueryState queryState) throws HiveSQLException {// 设置操作状态为运行中setState(OperationState.RUNNING);try {// 创建Driver实例,返回的Driver对象是 ReExecDriverdriver = DriverFactory.newDriver(queryState, getParentSession().getUserName(), queryInfo);// 如果查询超时时间大于0,则启动一个定时任务来取消查询if (queryTimeout > 0) {// 创建一个单线程的定时任务执行器timeoutExecutor = new ScheduledThreadPoolExecutor(1);// 创建一个定时任务,在查询超时后取消查询Runnable timeoutTask = new Runnable() {@Overridepublic void run() {try {// 获取查询IDString queryId = queryState.getQueryId();// 记录日志,查询超时并取消执行LOG.info("Query timed out after: " + queryTimeout+ " seconds. Cancelling the execution now: " + queryId);// 取消查询SQLOperation.this.cancel(OperationState.TIMEDOUT);} catch (HiveSQLException e) {// 记录日志,取消查询时发生错误LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e);} finally {// 关闭定时任务执行器timeoutExecutor.shutdown();}}};// 安排定时任务在查询超时后执行timeoutExecutor.schedule(timeoutTask, queryTimeout, TimeUnit.SECONDS);}// 设置查询显示信息queryInfo.setQueryDisplay(driver.getQueryDisplay());// 设置操作句柄信息,以便Thrift API用户可以使用操作句柄查找Yarn ATS中的查询信息String guid64 = Base64.encodeBase64URLSafeString(getHandle().getHandleIdentifier().toTHandleIdentifier().getGuid()).trim();driver.setOperationId(guid64);// 编译SQL查询并响应 ReExecDriver.compileAndRespond(...) -> Driver.compileAndRespond(...)response = driver.compileAndRespond(statement);// 如果响应代码不为0,则抛出异常if (0 != response.getResponseCode()) {throw toSQLException("Error while compiling statement", response);}// 设置是否有结果集setHasResultSet(driver.hasResultSet());} catch (HiveSQLException e) {// 设置操作状态为错误setState(OperationState.ERROR);// 抛出异常throw e;} catch (Throwable e) {// 设置操作状态为错误setState(OperationState.ERROR);// 抛出异常throw new HiveSQLException("Error running query: " + e.toString(), e);}}

2.x与3.x源码最核心的区别就是在创建Driver,其对应源码是:

driver = new Driver(queryState, getParentSession().getUserName());

而4.x与3.x源码最核心的区别如下:

  1. 利用 Java 8 的 Lambda 表达式特性,简化代码逻辑,提高代码的可读性和可维护性。
  2. 通过将 queryTimeout 的类型改为 long,支持了更大的超时值,避免了溢出问题。
  3. 在资源管理方面,对调度器的生命周期管理也进行了优化,不需要显式的关闭操作。

4.x对应源码是:

if (queryTimeout > 0L) {timeoutExecutor = Executors.newSingleThreadScheduledExecutor();timeoutExecutor.schedule(() -> {try {final String queryId = queryState.getQueryId();log.info("Query timed out after: {} seconds. Cancelling the execution now: {}", queryTimeout, queryId);SQLOperation.this.cancel(OperationState.TIMEDOUT);} catch (HiveSQLException e) {log.error("Error cancelling the query after timeout: {} seconds", queryTimeout, e);}return null;}, queryTimeout, TimeUnit.SECONDS);
}

DriverFactory.newDriver()方法中返回 ReExecDriver对象,该对象表示执行过程失败可重试的Driver对象,然后调用 Driver.compileAndRespond() 方法进行编译SQL。

2.对QueryPaln 进行处理,转换成MR 任务执行

BackgroundWork是一个线程,负责异步处理QueryPlan,通过submitBackgroundOperation(work)提交运行,执行到SQLOperator.BackgroundOperation.run()方法,最终调用到Driver.run() 方法。

Driver

下面我们再来Driver类,它在不同版本中也有一些差别,比如2.x版本是直接 implements CommandProcessor,而3.x和4.x版本则是implements IDriver,而IDriver 则是 extends CommandProcessor。本质是为了更好的解耦和扩展性,使得代码更加模块化,易于维护和扩展。同时,通过继承 CommandProcessor 接口,也保持了与旧版本的兼容性,确保了功能的连续性。不过其核心功能是没变的,主要包含编译、优化及执行。

为了方便理解,我们先梳理整个执行步骤如下:

  1. 通过Antlr解析SQL语法规则和语法解析,将SQL语法转换成AST(抽象语法树)

  2. 遍历AST(抽象语法树) 将其转化成Query Block(查询块,可以看成查询基本执行单元)

  3. 将Query Block(查询块) 转换成OperatorTree(逻辑执行计划),并进行优化。

  4. OperatorTree(逻辑执行计划)转换成TaskTree(物理执行计划,每个Task对应一个MR Job任务)

  5. TaskTree(物理执行计划)最终包装成Query Plan(查询计划)

简单总结执行流程如下:

SQL -> AST(抽象语法树) -> QueryBlock(查询块) -> Operator(逻辑执行计划) -> TaskTree(物理执行计划) -> QueryPlan(查询计划)

下面我们再结合SQLOperation调用的Driver类里面的核心方法,来看看底层源码是如何实现的:

compileAndRespond方法

首先第一个核心方法是

response = driver.compileAndRespond(statement);

compileAndRespond()方法2.x源码如下:

    /*** 编译给定的 SQL 命令并返回一个命令处理器响应。* 此方法调用 compileInternal 方法进行实际的编译操作,并使用编译结果创建一个命令处理器响应。** @param command 要编译的 SQL 命令* @return 包含编译结果的命令处理器响应*/public CommandProcessorResponse compileAndRespond(String command) {return createProcessorResponse(compileInternal(command, false));}

3.x和4.x会有些区别,会返回以下方法的调用结果:

coreDriver.compileAndRespond(statement);

无论哪个版本,最终compileAndRespond()方法都会调用到 compileInternal()方法,我们继续看2.x版本compileInternal()方法源码如下:

    private int compileInternal(String command, boolean deferClose) {int ret;// 获取Metrics实例,如果存在则增加等待编译操作的计数器Metrics metrics = MetricsFactory.getInstance();if(metrics != null) {metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);}// 尝试获取编译锁,如果获取失败则返回编译锁超时错误码final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled, command);if(compileLock == null) {return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();}try {// 如果Metrics实例存在,减少等待编译操作的计数器if(metrics != null) {metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);}// 进行Hive SQL编译ret = compile(command, true, deferClose);} finally {// 无论编译结果如何,最终都要释放编译锁compileLock.unlock();}// 如果编译失败,尝试释放锁并回滚事务if(ret != 0) {try {releaseLocksAndCommitOrRollback(false, null);} catch(LockException e) {// 记录释放锁时的异常信息LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e));}}// 保存编译时的性能日志,用于WebUI展示// 执行时的性能日志由另一个线程的PerfLogger或重置后的PerfLogger完成PerfLogger perfLogger = SessionState.getPerfLogger();queryDisplay.setPerfLogStarts(QueryDisplay.Phase.COMPILATION, perfLogger.getStartTimes());queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes());return ret;}

3.x有一些区别,但是都是通过执行Driver.compile()方法,而4.x则是解耦了,执行的是Compiler.compile()

compile方法

核心都是compile()方法,compile()方法2.x源码如下:

/*** 编译一个新的查询,可选择重置任务ID计数器并决定是否延迟关闭。* * @param command      要编译的HiveQL查询。* @param resetTaskIds 如果为true,则重置任务ID计数器。* @param deferClose   如果为true,则在编译过程被中断时延迟关闭/销毁操作。* @return 0表示编译成功,否则返回错误代码。*/
public int compile(String command, boolean resetTaskIds, boolean deferClose) {// 获取性能日志记录器,并开始记录编译过程的性能PerfLogger perfLogger = SessionState.getPerfLogger(true);perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);// 锁定驱动状态,将驱动状态设置为编译中lDrvState.stateLock.lock();try {lDrvState.driverState = DriverState.COMPILING;} finally {lDrvState.stateLock.unlock();}// 对查询命令进行变量替换command = new VariableSubstitution(new HiveVariableSource() {@Overridepublic Map<String, String> getHiveVariable() {return SessionState.get().getHiveVariables();}}).substitute(conf, command);// 存储查询字符串String queryStr = command;try {// 对查询命令进行脱敏处理,避免记录敏感数据queryStr = HookUtils.redactLogString(conf, command);} catch(Exception e) {// 若脱敏失败,记录警告信息LOG.warn("WARNING! Query command could not be redacted." + e);}// 检查编译过程是否被中断,若中断则处理中断并返回错误代码if(isInterrupted()) {return handleInterruption("at beginning of compilation."); //indicate if need clean resource}// 如果上下文不为空且解释分析状态不为运行中,则关闭现有上下文if(ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {// close the existing ctx etc before compiling a new query, but does not destroy drivercloseInProcess(false);}// 如果需要重置任务ID,则重置任务工厂的IDif(resetTaskIds) {TaskFactory.resetId();}// 获取查询IDString queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);// 保存查询信息,用于Web UI显示this.queryDisplay.setQueryStr(queryStr);this.queryDisplay.setQueryId(queryId);// 记录编译开始信息LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr);// 设置查询的当前时间戳SessionState.get().setupQueryCurrentTimestamp();// 标记编译过程中是否发生错误boolean compileError = false;try {// 初始化事务管理器final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf);// 移除旧的关闭hookShutdownHookManager.removeShutdownHook(shutdownRunner);// 创建新的关闭hook,用于在JVM关闭时释放锁shutdownRunner = new Runnable() {@Overridepublic void run() {try {releaseLocksAndCommitOrRollback(false, txnManager);} catch(LockException e) {// 若释放锁时发生异常,记录警告信息LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " + e.getMessage());}}};// 添加新的关闭hookShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);// 再次检查编译过程是否被中断if(isInterrupted()) {return handleInterruption("before parsing and analysing the query");}// 如果上下文为空,则创建新的上下文if(ctx == null) {ctx = new Context(conf);}// 设置上下文的重试次数、命令和HDFS清理标志ctx.setTryCount(getTryCount());ctx.setCmd(command);ctx.setHDFSCleanup(true);/*** 把 HQL命令 翻译成一个 ASTNode Tree* 封装了 ParseDriver 对 HQL 的解析工作* ParseDriver 对 command 进行词法分析和语法解析(统称为语法分析),返回一个抽象语法树AST*/// 开始记录解析过程的性能perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);// 解析查询命令,得到抽象语法树ASTNode tree = ParseUtils.parse(command, ctx);// 结束记录解析过程的性能perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);// 加载查询hookqueryHooks = loadQueryHooks();// 如果查询hook不为空且不为空列表,则触发查询hook的编译前操作if(queryHooks != null && !queryHooks.isEmpty()) {QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();qhc.setHiveConf(conf);qhc.setCommand(command);for(QueryLifeTimeHook hook : queryHooks) {hook.beforeCompile(qhc);}}// 开始记录语义分析过程的性能perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);// 获取语义分析器BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);// 获取语义分析hookList<HiveSemanticAnalyzerHook> saHooks = getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK, HiveSemanticAnalyzerHook.class);// 刷新元数据存储缓存,确保获取最新的元数据Hive.get().getMSC().flushCache();// 进行语义分析和计划生成if(saHooks != null && !saHooks.isEmpty()) {HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();hookCtx.setConf(conf);hookCtx.setUserName(userName);hookCtx.setIpAddress(SessionState.get().getUserIpAddress());hookCtx.setCommand(command);hookCtx.setHiveOperation(queryState.getHiveOperation());// 触发语义分析hook的预分析操作for(HiveSemanticAnalyzerHook hook : saHooks) {tree = hook.preAnalyze(hookCtx, tree);}/*** sem 是一个 SemanticAnalyzer(语义分析器) 对象* 主要的工作是将 ASTNode 转化为 TaskTree,包括可能的 optimize,过程比较复杂** tree:  AST  抽象语法树   ===> TaskTree*        TaskTree : 物理执行计划**   把抽象语法树交给 SemanticAnalyzer 执行语法解析*   1、从 AST 转成 解析树*   2、通过解析树 再生成 QB 在查询快*   3、从 QB 树在生成 OperatorTree (Logical Plan)*   4、逻辑执行计划的优化*   5、OperatorTree转变成TaskTree*   6、再针对物理执行计划执行优化*   7、生成QueryPlan*/// 进行语义分析sem.analyze(tree, ctx);// 更新hook上下文hookCtx.update(sem);// 触发语义分析hook的后分析操作for(HiveSemanticAnalyzerHook hook : saHooks) {hook.postAnalyze(hookCtx, sem.getAllRootTasks());}} else {// 若没有语义分析hook,直接进行语义分析sem.analyze(tree, ctx);}// 记录查询中发现的ACID文件接收器acidSinks = sem.getAcidFileSinks();// 记录语义分析完成信息LOG.info("Semantic Analysis Completed");// 验证语义分析生成的计划是否有效sem.validate();// 检查查询中是否包含ACID操作acidInQuery = sem.hasAcidInQuery();// 结束语义分析阶段的性能日志记录perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);// 检查编译过程是否被中断,如果中断则处理中断情况并返回if(isInterrupted()) {return handleInterruption("after analyzing query.");}// 根据语义分析结果和配置信息获取查询的输出模式schema = getSchema(sem, conf);/*** 把 TaskTree 生成一个 QueryPlan* 通过  Exeuctor 提交的方法,要接受的参数就是 QueryPlan*/// 根据查询字符串、语义分析器、开始时间、查询ID、操作类型和输出模式创建查询计划plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema);// 设置查询字符串到配置中conf.setQueryString(queryStr);// 设置MapReduce工作流ID到配置中conf.set("mapreduce.workflow.id", "hive_" + queryId);// 设置MapReduce工作流名称到配置中conf.set("mapreduce.workflow.name", queryStr);// 如果查询计划中包含FetchTask,则对其进行初始化if(plan.getFetchTask() != null) {plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext());}// 进行授权检查,如果语义分析不跳过授权且开启了授权功能if(!sem.skipAuthorization() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {try {// 开始记录授权过程的性能日志perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);// 执行授权操作doAuthorization(queryState.getHiveOperation(), sem, command);} catch(AuthorizationException authExp) {// 如果授权失败,打印错误信息并设置错误状态和返回码console.printError("Authorization failed:" + authExp.getMessage() + ". Use SHOW GRANT to " + "get" + " more details.");errorMessage = authExp.getMessage();SQLState = "42000";return 403;} finally {// 结束记录授权过程的性能日志perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);}}// 如果配置中开启了记录EXPLAIN输出的功能if(conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {// 获取查询的EXPLAIN输出String explainOutput = getExplainOutput(sem, plan, tree);if(explainOutput != null) {if(conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {// 记录EXPLAIN输出到日志中LOG.info("EXPLAIN output for queryid " + queryId + " : " + explainOutput);}if(conf.isWebUiQueryInfoCacheEnabled()) {// 如果开启了Web UI查询信息缓存,将EXPLAIN计划设置到查询显示信息中queryDisplay.setExplainPlan(explainOutput);}}}// 编译成功,返回0return 0;} catch(Exception e) {// 如果编译过程中被中断,处理中断情况并返回if(isInterrupted()) {return handleInterruption("during query compilation: " + e.getMessage());}// 标记编译过程出现错误compileError = true;// 获取错误信息ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());// 构建错误消息errorMessage = "FAILED: " + e.getClass().getSimpleName();if(error != ErrorMsg.GENERIC_ERROR) {errorMessage += " [Error " + error.getErrorCode() + "]:";}// HIVE-4889if((e instanceof IllegalArgumentException) && e.getMessage() == null && e.getCause() != null) {errorMessage += " " + e.getCause().getMessage();} else {errorMessage += " " + e.getMessage();}if(error == ErrorMsg.TXNMGR_NOT_ACID) {errorMessage += ". Failed command: " + queryStr;}// 设置SQL状态码SQLState = error.getSQLState();// 记录下游错误信息downstreamError = e;// 打印错误信息和详细堆栈跟踪console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));// 返回错误代码return error.getErrorCode();// since it exceeds valid range of shell return values} finally {// 触发编译后的hook函数try {if(queryHooks != null && !queryHooks.isEmpty()) {QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();qhc.setHiveConf(conf);qhc.setCommand(command);for(QueryLifeTimeHook hook : queryHooks) {hook.afterCompile(qhc, compileError);}}} catch(Exception e) {// 如果触发hook函数时出现异常,记录警告信息LOG.warn("Failed when invoking query after-compilation hook.", e);}/*** 计算任务总耗时*/// 结束编译阶段的性能日志记录并计算耗时double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE) / 1000.00;// 获取编译过程中HMS调用的时间统计信息ImmutableMap<String, Long> compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation");// 设置查询显示信息中的HMS时间统计信息queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings);// 检查编译过程是否被中断boolean isInterrupted = isInterrupted();if(isInterrupted && !deferClose) {// 如果被中断且不延迟关闭,关闭正在进行的操作closeInProcess(true);}// 锁定驱动状态lDrvState.stateLock.lock();try {if(isInterrupted) {// 如果被中断,根据是否延迟关闭设置驱动状态lDrvState.driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR;} else {// 如果未被中断,根据编译是否出错设置驱动状态lDrvState.driverState = compileError ? DriverState.ERROR : DriverState.COMPILED;}} finally {// 解锁驱动状态lDrvState.stateLock.unlock();}if(isInterrupted) {// 如果编译过程被中断,记录中断信息LOG.info("Compiling command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds");} else {// 如果编译过程未被中断,记录编译完成信息LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " " + "seconds");}}
}

compile()方法在3.x和4.x有一些区别,但是都有以下几个核心方法:

  1. 2.x和3.x是通过ParseUtils.parse(command, ctx),而4.x是通过parse()将Hive SQL转换成AST(抽象语法树),即:HQL -> AST 转换;
  2. 然后无论哪个版本,都会通过BaseSemanticAnalyzer.analyze()方法将AST解析生成TaskTree(物理执行计划);
  3. 最后2.x和3.x版本都会将BaseSemanticAnalyzer传入QueryPlan构造函数来创建QueryPlan(查询计划),而4.x版本则是传入createPlan()方法创建QueryPlan。

总结

本文介绍了Hive,并通过源码梳理了Hive的执行原理,其核心正是引入篇我们提到的解析(Parsing)、校验(Validation)、优化(Optimization)和执行(Execution)

总结起来主要有以下四个步骤:

  1. 词法解析
    将SQL语法转换成AST(抽象语法树) 
    核心是parse()方法中调用的HiveLexer和HiveParser这两个类,它们分别负责SQL的词法分析和语法解析。
  2. 语义分析
    这一步是对AST进行进一步的抽象和结构化处理,通过遍历AST(抽象语法树) 将其转化成Query Block(查询块,可以看成查询基本执行单元,它包含了输入源、计算过程和输出结果这三个基本组成部分。)
    核心是通过BaseSemanticAnalyzer子类SemanticAnalyzer的analyzeInternal()方法,核心逻辑是首先将SQL语句中涉及的各类信息提取出来,并存储到QueryBlock中,在完成后,通过genOPTree()方法将Query Block(查询块) 转换成OperatorTree(逻辑执行计划)
  3. 逻辑优化
    到了第三步时,操作符树虽然已经勾勒出执行任务的先后顺序和上下游依赖,但细节还比较粗糙,例如存在重复的数据扫描、不必要的Shuffle操作等,因此还需要进行进一步优化。通过优化,Hive可以改进查询的执行计划,并生成更高效的作业图以在分布式计算框架中执行。这些优化可以提高查询的性能和效率,并减少资源开销。
    核心是通过Optimizer类的方法完成的,从源码可以看到,优化器的种类非常繁杂。总体而言,优化的目的是通过匹配相应的规则来减少MapReduce作业的数量,降低数据传输和Shuffle的数据量。
  4. 物理优化
    在逻辑优化阶段结束后,输入的SQL语句也逐步转换为优化后的逻辑计划,不过此时的逻辑计划仍然不能直接执行,还需要进一步转换成可以识别并执行的MapReduce Task,因此物理优化实际上分为两个执行步骤:首先将优化后的OperatorTree(逻辑执行计划)转换成TaskTree(物理执行计划,每个Task对应一个MR Job任务),并对物理执行计划进行一些优化,然后依次调用执行。

因为篇幅问题,本文有很多有意思的方法源码没有深入探索,比如compile()方法中的核心方法;比如Hive通过Antler实现的词法分析和语法解析,是我们去梳理hive任务血缘的核心技术等,都是很有意思的东西,感兴趣的小伙伴可以深入了解一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/69796.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络(1)基础篇

目录 1.TCP/IP 网络模型 2.键入网址--->网页显示 2.1 生成HTTP数据包 2.2 DNS服务器进行域名与IP转换 2.3 建立TCP连接 2.4 生成IP头部和MAC头部 2.5 网卡、交换机、路由器 3 Linux系统收发网络包 1.TCP/IP 网络模型 首先&#xff0c;为什么要有 TCP/IP 网络模型&a…

c++ 多线程知识汇总

一、std::thread std::thread 是 C11 引入的标准库中的线程类&#xff0c;用于创建和管理线程 1. 带参数的构造函数 template <class F, class... Args> std::thread::thread(F&& f, Args&&... args);F&& f&#xff1a;线程要执行的函数&…

LabVIEW用户界面(UI)和用户体验(UX)设计

作为一名 LabVIEW 开发者&#xff0c;满足功能需求、保障使用便捷与灵活只是基础要求。在如今这个用户体验至上的时代&#xff0c;为 LabVIEW 应用程序设计直观且具有美学感的界面&#xff0c;同样是不容忽视的关键任务。一个优秀的界面设计&#xff0c;不仅能提升用户对程序的…

【工业场景】用YOLOv8实现火灾识别

火灾识别任务是工业领域急需关注的重点安全事项,其应用场景和背景意义主要体现在以下几个方面: 应用场景:工业场所:在工厂、仓库等工业场所中,火灾是造成重大财产损失和人员伤亡的主要原因之一。利用火灾识别技术可以及时发现火灾迹象,采取相应的应急措施,保障人员安全和…

STM32_USART通用同步/异步收发器

目录 背景 程序 STM32浮空输入的概念 1.基本概念 2. STM32浮空输入的特点 3. STM32浮空输入的应用场景 STM32推挽输出详解 1. 基本概念 2. 工作原理 3. 应用场景 使能外设时钟 TXE 和 TC的区别 USART_IT_TXE USART_IT_TC 使能串口外设 中断处理函数 背景 单片…

利用IDEA将Java.class文件反编译为Java文件:原理、实践与深度解析

文章目录 引言&#xff1a;当.class文件遇到源代码缺失第一章&#xff1a;反编译技术基础认知1.1 Java编译执行原理1.2 反编译的本质1.3 法律与道德边界 第二章&#xff1a;IDEA内置反编译工具详解2.1 环境准备2.2 三步完成基础反编译2.3 高级反编译技巧2.3.1 调试模式反编译2.…

算法日记16:SC68 联通块问题(并查集)

一、题目&#xff1a; 二、题解&#xff1a; 1、看到求联通块问题&#xff0c;我们可以考虑使用DFS/并查集(在这里我们仅介绍并查集) 2、什么是并查集&#xff1f; 2.1&#xff1a;初始化&#xff1a;对于每一个点&#xff0c;我们都对其进行初始化操作pre[i]i pre[i]表示i的…

visual studio导入cmake项目后打开无法删除和回车

通过Cmakelists.txt导入的项目做删除和回车无法响应&#xff0c;需要点击项目&#xff0c;然后选择配置项目就可以了

ChartDB:一个基于Web的可视化数据库设计工具

这次给大家介绍一个可视化的数据库设计工具&#xff1a;ChartDB。 ChartDB 是一个免费开源的数据库可视化设计工具&#xff0c;支持的数据库包括 MySQL、MariaDB、PostgreSQL、Microsoft SQL Server、SQLite、ClickHouse 等。 对于已有的数据库&#xff0c;ChartDB 提供了一键…

elementUI tree树形控件 根据数据动态设置禁用,全选时不可选中禁用数据

需求 根据后端返回的数据禁用数据&#xff0c;将tree结构对应的数据设置为禁用状态&#xff0c;并且在点击全选后不可选中禁用数据。 效果 根据数据动态设置禁用 全选时不可选中禁用数据 代码 <template>...<div class"list-box"><div class&q…

Flutter 添加 iOS widget 小组件

环境 macOS 15.1 Xcode16.1 Flutter 3.27.4 前言 本篇文章主要记录&#xff0c;在Flutter 项目中如何正确地添加iOS 小组件&#xff0c;iOS 小组件 相关的知识在另一篇文章有记录。 iOS 14 widget 添加小组件 WidgetExtension 打开Xcode New -> Target 选择 iOS -> 搜…

LLM:GPT 系列

阅读原文&#xff1a; LLM&#xff1a;Qwen 系列 GPT&#xff08;Generative Pre-trained Transformer&#xff09;是生成式预训练语言模型&#xff0c;基于 Transformer 架构&#xff0c;专注于通过自回归的方式生成自然语言文本&#xff0c;即给定一个输入序列 x { x 1 , …

消息中间件:RabbitMQ镜像集群部署配置全流程

目录 1、特点 2、RabbitMQ的消息传递模式 2.1、简单模式&#xff08;Simple Mode&#xff09; 2.2、工作队列模式&#xff08;Work Queue Mode&#xff09; 2.3、发布/订阅模式&#xff08;Publish/Subscribe Mode&#xff09; 2.4、路由模式&#xff08;Routing Mode&am…

【STM32】通过HAL库Flash建立FatFS文件系统并配置为USB虚拟U盘MSC

【STM32】通过HAL库Flash建立FatFS文件系统并配置为USB虚拟U盘MSC 在先前 分别介绍了FatFS文件系统和USB虚拟U盘MSC配置 前者通过MCU读写Flash建立文件系统 后者通过MSC连接电脑使其能够被操作 这两者可以合起来 就能够实现同时在MCU、USB中操作Flash的文件系统 【STM32】通过…

打穿内网三重奏-红日7

靶机下载地址&#xff1a; 漏洞详情 (qiyuanxuetang.net) 攻击链路&#xff1a; DMZ区IP段为192.168.11.1/24 第二层网络环境IP段为192.168.52.1/24 第三层网络环境IP段为192.168.93.1/24 这里DMZ和攻击者我用的是192.168.11.1 这个网段&#xff0c;其他不变 这里我加了两张…

Vue.js 在低代码开发平台中的应用与优化

Vue.js 在低代码开发平台中的应用与优化 在数字化转型的进程中&#xff0c;低代码开发平台成为了企业快速构建应用的得力助手。而 Vue.js 作为一款广受欢迎的前端框架&#xff0c;在低代码开发平台中发挥着举足轻重的作用。它不仅提升了开发效率&#xff0c;还优化了应用的用户…

QML 快捷键与Shortcut的使用

一、效果展示 二、源码分享 import QtQuick import QtQuick.Controls import Qt.labs.qmlmodels import QtQuick.Controls.Basic import QtQuick.Layouts import QtQuick.Effects import Qt.labs.platformApplicationWindow {id:rootwidth: 1000height: 730visible: truetitle…

RocketMQ和Kafka如何实现顺序写入和顺序消费?

0 前言 先说明kafka&#xff0c;顺序写入和消费是Kafka的重要特性&#xff0c;但需要正确的配置和使用方式才能保证。本文需要解释清楚Kafka如何通过分区来实现顺序性&#xff0c;以及生产者和消费者应该如何配合。   首先&#xff0c;顺序写入。Kafka的消息是按分区追加写入…

【南方Cass】快捷键0002:合并多段线

快捷键&#xff1a;JOIN 按下快捷键JOIN&#xff0c;然后选择需要合并的对象&#xff08;多段线&#xff09;&#xff0c;按下回车即可完成合并。

Qt的isVisible ()函数介绍和判断窗口是否在当前界面显示

1、现象&#xff1a;当Qt的窗口最小化时&#xff0c;isVisible值一定是true&#xff0c;这是正常的。 解释&#xff1a;在Qt中&#xff0c;当你点击窗口的最小化按钮时&#xff0c;Qt内部不会自动调用 hide() 方或 setVisible(false) 来隐藏窗口。相反&#xff0c;它会改变窗口…