聊聊ShardingSphere是怎么进行sql重写的

本文主要研究一下ShardingSphere进行sql重写的原理

prepareStatement

org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java

public final class ShardingSphereConnection extends AbstractConnectionAdapter {@Overridepublic PreparedStatement prepareStatement(final String sql) throws SQLException {return new ShardingSpherePreparedStatement(this, sql);}//......
}    

ShardingSphereConnection的prepareStatement创建的是ShardingSpherePreparedStatement

ShardingSpherePreparedStatement

org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

public final class ShardingSpherePreparedStatement extends AbstractPreparedStatementAdapter {@Getterprivate final ShardingSphereConnection connection;public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false, null);}private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys,final String[] columns) throws SQLException {if (Strings.isNullOrEmpty(sql)) {throw new EmptySQLException().toSQLException();}this.connection = connection;metaDataContexts = connection.getContextManager().getMetaDataContexts();SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);hintValueContext = sqlParserRule.isSqlCommentParseEnabled() ? new HintValueContext() : SQLHintUtils.extractHint(sql).orElseGet(HintValueContext::new);this.sql = sqlParserRule.isSqlCommentParseEnabled() ? sql : SQLHintUtils.removeHint(sql);statements = new ArrayList<>();parameterSets = new ArrayList<>();SQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType()));sqlStatement = sqlParserEngine.parse(this.sql, true);sqlStatementContext = SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData(), sqlStatement, connection.getDatabaseName());parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);statementOption = returnGeneratedKeys ? new StatementOption(true, columns) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);executor = new DriverExecutor(connection);JDBCExecutor jdbcExecutor = new JDBCExecutor(connection.getContextManager().getExecutorEngine(), connection.getDatabaseConnectionManager().getConnectionContext());batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getDatabaseName());kernelProcessor = new KernelProcessor();statementsCacheable = isStatementsCacheable(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData());trafficRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable();statementManager = new StatementManager();}//......
}    

ShardingSpherePreparedStatement继承了AbstractPreparedStatementAdapter,其构造器主要是通过SQLParserEngine解析sql得到SQLStatement,创建DriverExecutor、BatchPreparedStatementExecutor、KernelProcessor、StatementManager;这里即使useServerPrepStmts=true,也不会触发mysql server的prepare操作

executeUpdate

    public int executeUpdate() throws SQLException {try {if (statementsCacheable && !statements.isEmpty()) {resetParameters();return statements.iterator().next().executeUpdate();}clearPrevious();QueryContext queryContext = createQueryContext();trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null);if (null != trafficInstanceId) {JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext);return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeUpdate());}executionContext = createExecutionContext(queryContext);if (hasRawExecutionRule()) {Collection<ExecuteResult> executeResults = executor.getRawExecutor().execute(createRawExecutionGroupContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback());return accumulate(executeResults);}return isNeedImplicitCommitTransaction(connection, executionContext) ? executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate();// CHECKSTYLE:OFF} catch (final RuntimeException ex) {// CHECKSTYLE:ONhandleExceptionInTransaction(connection, metaDataContexts);throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());} finally {clearBatch();}}private void clearPrevious() {statements.clear();parameterSets.clear();generatedValues.clear();}private ExecutionContext createExecutionContext(final QueryContext queryContext) {ShardingSphereRuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData();ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName());SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, queryContext.getHintValueContext());ExecutionContext result = kernelProcessor.generateExecutionContext(queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext());findGeneratedKey(result).ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues()));return result;}

这里executeUpdate会先执行clearPrevious方法,清空statements、parameterSets、generatedValues,然后createExecutionContext,这里有一步是kernelProcessor.generateExecutionContext

KernelProcessor

generateExecutionContext

shardingsphere-infra-context-5.4.0-sources.jar!/org/apache/shardingsphere/infra/connection/kernel/KernelProcessor.java

    public ExecutionContext generateExecutionContext(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData,final ConfigurationProperties props, final ConnectionContext connectionContext) {RouteContext routeContext = route(queryContext, database, globalRuleMetaData, props, connectionContext);SQLRewriteResult rewriteResult = rewrite(queryContext, database, globalRuleMetaData, props, routeContext, connectionContext);ExecutionContext result = createExecutionContext(queryContext, database, routeContext, rewriteResult);logSQL(queryContext, props, result);return result;}

KernelProcessor的generateExecutionContext方法先创建routeContext,然后执行rewrite,最后执行createExecutionContext

rewrite

    private SQLRewriteResult rewrite(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData,final ConfigurationProperties props, final RouteContext routeContext, final ConnectionContext connectionContext) {SQLRewriteEntry sqlRewriteEntry = new SQLRewriteEntry(database, globalRuleMetaData, props);return sqlRewriteEntry.rewrite(queryContext.getSql(), queryContext.getParameters(), queryContext.getSqlStatementContext(), routeContext, connectionContext, queryContext.getHintValueContext());}

rewrite主要是通过SQLRewriteEntry的rewrite方法进行的

SQLRewriteEntry

shardingsphere-infra-rewrite-5.4.0-sources.jar!/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java

    /*** Rewrite.* * @param sql SQL* @param params SQL parameters* @param sqlStatementContext SQL statement context* @param routeContext route context* @param connectionContext connection context* @param hintValueContext hint value context* * @return route unit and SQL rewrite result map*/public SQLRewriteResult rewrite(final String sql, final List<Object> params, final SQLStatementContext sqlStatementContext,final RouteContext routeContext, final ConnectionContext connectionContext, final HintValueContext hintValueContext) {SQLRewriteContext sqlRewriteContext = createSQLRewriteContext(sql, params, sqlStatementContext, routeContext, connectionContext, hintValueContext);SQLTranslatorRule rule = globalRuleMetaData.getSingleRule(SQLTranslatorRule.class);DatabaseType protocolType = database.getProtocolType();Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();return routeContext.getRouteUnits().isEmpty()? new GenericSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext): new RouteSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext, routeContext);}private SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> params, final SQLStatementContext sqlStatementContext,final RouteContext routeContext, final ConnectionContext connectionContext, final HintValueContext hintValueContext) {SQLRewriteContext result = new SQLRewriteContext(database.getName(), database.getSchemas(), sqlStatementContext, sql, params, connectionContext, hintValueContext);decorate(decorators, result, routeContext, hintValueContext);result.generateSQLTokens();return result;}private void decorate(final Map<ShardingSphereRule, SQLRewriteContextDecorator> decorators, final SQLRewriteContext sqlRewriteContext,final RouteContext routeContext, final HintValueContext hintValueContext) {if (hintValueContext.isSkipSQLRewrite()) {return;}for (Entry<ShardingSphereRule, SQLRewriteContextDecorator> entry : decorators.entrySet()) {entry.getValue().decorate(entry.getKey(), props, sqlRewriteContext, routeContext);}}

SQLRewriteEntry的rewrite方法,先通过createSQLRewriteContext来创建SQLRewriteContext,这里通过decorate方法遍历decorators,挨个执行SQLRewriteContextDecorator的decorate方法;最后通过GenericSQLRewriteEngine或者RouteSQLRewriteEngine进行rewrite

SQLRewriteContextDecorator

org/apache/shardingsphere/infra/rewrite/context/SQLRewriteContextDecorator.java

@SingletonSPI
public interface SQLRewriteContextDecorator<T extends ShardingSphereRule> extends OrderedSPI<T> {/*** Decorate SQL rewrite context.** @param rule rule* @param props ShardingSphere properties* @param sqlRewriteContext SQL rewrite context to be decorated* @param routeContext route context*/void decorate(T rule, ConfigurationProperties props, SQLRewriteContext sqlRewriteContext, RouteContext routeContext);
}

SQLRewriteContextDecorator定义了decorate方法,它有诸如ShardingSQLRewriteContextDecorator、EncryptSQLRewriteContextDecorator的实现类

EncryptSQLRewriteContextDecorator

org/apache/shardingsphere/encrypt/rewrite/context/EncryptSQLRewriteContextDecorator.java

/*** SQL rewrite context decorator for encrypt.*/
public final class EncryptSQLRewriteContextDecorator implements SQLRewriteContextDecorator<EncryptRule> {@Overridepublic void decorate(final EncryptRule encryptRule, final ConfigurationProperties props, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {SQLStatementContext sqlStatementContext = sqlRewriteContext.getSqlStatementContext();if (!containsEncryptTable(encryptRule, sqlStatementContext)) {return;}Collection<EncryptCondition> encryptConditions = createEncryptConditions(encryptRule, sqlRewriteContext);if (!sqlRewriteContext.getParameters().isEmpty()) {Collection<ParameterRewriter> parameterRewriters = new EncryptParameterRewriterBuilder(encryptRule,sqlRewriteContext.getDatabaseName(), sqlRewriteContext.getSchemas(), sqlStatementContext, encryptConditions).getParameterRewriters();rewriteParameters(sqlRewriteContext, parameterRewriters);}Collection<SQLTokenGenerator> sqlTokenGenerators = new EncryptTokenGenerateBuilder(encryptRule,sqlStatementContext, encryptConditions, sqlRewriteContext.getDatabaseName()).getSQLTokenGenerators();sqlRewriteContext.addSQLTokenGenerators(sqlTokenGenerators);}private Collection<EncryptCondition> createEncryptConditions(final EncryptRule encryptRule, final SQLRewriteContext sqlRewriteContext) {SQLStatementContext sqlStatementContext = sqlRewriteContext.getSqlStatementContext();if (!(sqlStatementContext instanceof WhereAvailable)) {return Collections.emptyList();}Collection<WhereSegment> whereSegments = ((WhereAvailable) sqlStatementContext).getWhereSegments();Collection<ColumnSegment> columnSegments = ((WhereAvailable) sqlStatementContext).getColumnSegments();return new EncryptConditionEngine(encryptRule, sqlRewriteContext.getSchemas()).createEncryptConditions(whereSegments, columnSegments, sqlStatementContext, sqlRewriteContext.getDatabaseName());}private boolean containsEncryptTable(final EncryptRule encryptRule, final SQLStatementContext sqlStatementContext) {for (String each : sqlStatementContext.getTablesContext().getTableNames()) {if (encryptRule.findEncryptTable(each).isPresent()) {return true;}}return false;}private void rewriteParameters(final SQLRewriteContext sqlRewriteContext, final Collection<ParameterRewriter> parameterRewriters) {for (ParameterRewriter each : parameterRewriters) {each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());}}@Overridepublic int getOrder() {return EncryptOrder.ORDER;}@Overridepublic Class<EncryptRule> getTypeClass() {return EncryptRule.class;}
}

rewriteParameters是通过ParameterRewriter进行rewrite,主要是修改ParameterBuilder;而具体sql语句的修改则通过sqlTokenGenerators进行

SQLToken

@RequiredArgsConstructor
@Getter
public abstract class SQLToken implements Comparable<SQLToken> {private final int startIndex;@Overridepublic final int compareTo(final SQLToken sqlToken) {return startIndex - sqlToken.startIndex;}
}

SQLToken它有诸如InsertValuesToken、SubstitutableColumnNameToken、InsertColumnsToken之类的实现类

RouteSQLRewriteEngine

    /*** Rewrite SQL and parameters.** @param sqlRewriteContext SQL rewrite context* @param routeContext route context* @return SQL rewrite result*/public RouteSQLRewriteResult rewrite(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits = new LinkedHashMap<>(routeContext.getRouteUnits().size(), 1F);for (Entry<String, Collection<RouteUnit>> entry : aggregateRouteUnitGroups(routeContext.getRouteUnits()).entrySet()) {Collection<RouteUnit> routeUnits = entry.getValue();if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits)) {sqlRewriteUnits.put(routeUnits.iterator().next(), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));} else {addSQLRewriteUnits(sqlRewriteUnits, sqlRewriteContext, routeContext, routeUnits);}}return new RouteSQLRewriteResult(translate(sqlRewriteContext.getSqlStatementContext().getSqlStatement(), sqlRewriteUnits));}private void addSQLRewriteUnits(final Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits, final SQLRewriteContext sqlRewriteContext,final RouteContext routeContext, final Collection<RouteUnit> routeUnits) {for (RouteUnit each : routeUnits) {sqlRewriteUnits.put(each, new SQLRewriteUnit(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeContext, each)));}}private Map<RouteUnit, SQLRewriteUnit> translate(final SQLStatement sqlStatement, final Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits) {Map<RouteUnit, SQLRewriteUnit> result = new LinkedHashMap<>(sqlRewriteUnits.size(), 1F);for (Entry<RouteUnit, SQLRewriteUnit> entry : sqlRewriteUnits.entrySet()) {DatabaseType storageType = storageTypes.get(entry.getKey().getDataSourceMapper().getActualName());String sql = translatorRule.translate(entry.getValue().getSql(), sqlStatement, protocolType, storageType);SQLRewriteUnit sqlRewriteUnit = new SQLRewriteUnit(sql, entry.getValue().getParameters());result.put(entry.getKey(), sqlRewriteUnit);}return result;}

addSQLRewriteUnits是往sqlRewriteUnits添加SQLRewriteUnit,最后translate方法构建SQLRewriteUnit;SQLRewriteUnit包含了更改之后的sql以及对应改动后的参数

useDriverToExecuteUpdate

org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

    private int useDriverToExecuteUpdate() throws SQLException {ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();cacheStatements(executionGroupContext.getInputGroups());return executor.getRegularExecutor().executeUpdate(executionGroupContext,executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());}private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException {DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getDatabaseName()));} private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws SQLException {for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {each.getInputs().forEach(eachInput -> {statements.add((PreparedStatement) eachInput.getStorageResource());parameterSets.add(eachInput.getExecutionUnit().getSqlUnit().getParameters());});}replay();}private void replay() throws SQLException {replaySetParameter();for (Statement each : statements) {getMethodInvocationRecorder().replay(each);}}private void replaySetParameter() throws SQLException {for (int i = 0; i < statements.size(); i++) {replaySetParameter(statements.get(i), parameterSets.get(i));}}protected final void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> params) throws SQLException {setParameterMethodInvocations.clear();addParameters(params);for (PreparedStatementInvocationReplayer each : setParameterMethodInvocations) {each.replayOn(preparedStatement);}}private void addParameters(final List<Object> params) {int i = 0;for (Object each : params) {int index = ++i;setParameterMethodInvocations.add(preparedStatement -> preparedStatement.setObject(index, each));}}

useDriverToExecuteUpdate方法会执行createExecutionGroupContext(会执行prepare方法),cacheStatements这里主要是把eachInput.getStorageResource()真正的PrepareStatement赋值到ShardingSpherePreparedStatement的statements变量中,把eachInput.getExecutionUnit().getSqlUnit().getParameters()赋值到parameterSets,然后执行replay方法通过PreparedStatementInvocationReplayer把修改后的变量replay到真正的PrepareStatement
该方法委托给executor.getRegularExecutor().executeUpdate,最后一个参数为callback,即createExecuteUpdateCallback

DriverExecutionPrepareEngine.prepare

org/apache/shardingsphere/infra/executor/sql/prepare/AbstractExecutionPrepareEngine.java

    public final ExecutionGroupContext<T> prepare(final RouteContext routeContext, final Collection<ExecutionUnit> executionUnits,final ExecutionGroupReportContext reportContext) throws SQLException {return prepare(routeContext, Collections.emptyMap(), executionUnits, reportContext);}public final ExecutionGroupContext<T> prepare(final RouteContext routeContext, final Map<String, Integer> connectionOffsets, final Collection<ExecutionUnit> executionUnits,final ExecutionGroupReportContext reportContext) throws SQLException {Collection<ExecutionGroup<T>> result = new LinkedList<>();for (Entry<String, List<SQLUnit>> entry : aggregateSQLUnitGroups(executionUnits).entrySet()) {String dataSourceName = entry.getKey();List<SQLUnit> sqlUnits = entry.getValue();List<List<SQLUnit>> sqlUnitGroups = group(sqlUnits);ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;result.addAll(group(dataSourceName, connectionOffsets.getOrDefault(dataSourceName, 0), sqlUnitGroups, connectionMode));}return decorate(routeContext, result, reportContext);}protected List<ExecutionGroup<T>> group(final String dataSourceName, final int connectionOffset, final List<List<SQLUnit>> sqlUnitGroups, final ConnectionMode connectionMode) throws SQLException {List<ExecutionGroup<T>> result = new LinkedList<>();List<C> connections = databaseConnectionManager.getConnections(dataSourceName, connectionOffset, sqlUnitGroups.size(), connectionMode);int count = 0;for (List<SQLUnit> each : sqlUnitGroups) {result.add(createExecutionGroup(dataSourceName, each, connections.get(count++), connectionMode));}return result;}private ExecutionGroup<T> createExecutionGroup(final String dataSourceName, final List<SQLUnit> sqlUnits, final C connection, final ConnectionMode connectionMode) throws SQLException {List<T> result = new LinkedList<>();for (SQLUnit each : sqlUnits) {result.add((T) sqlExecutionUnitBuilder.build(new ExecutionUnit(dataSourceName, each), statementManager, connection, connectionMode, option, databaseTypes.get(dataSourceName)));}return new ExecutionGroup<>(result);}

group方法调用遍历SQLUnit执行createExecutionGroup,而后者则执行sqlExecutionUnitBuilder.build;这里databaseConnectionManager.getConnections获取的connection是通过真正driver获取的connection(com.mysql.jdbc.Driver)

PreparedStatementExecutionUnitBuilder

org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java

    public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager,final Connection connection, final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException {PreparedStatement preparedStatement = createPreparedStatement(executionUnit, statementManager, connection, connectionMode, option, databaseType);return new JDBCExecutionUnit(executionUnit, connectionMode, preparedStatement);}private PreparedStatement createPreparedStatement(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager, final Connection connection,final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException {return (PreparedStatement) statementManager.createStorageResource(executionUnit, connection, connectionMode, option, databaseType);}

PreparedStatementExecutionUnitBuilder的build方法这里才真正创建PreparedStatement

StatementManager

org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java

    public Statement createStorageResource(final ExecutionUnit executionUnit, final Connection connection, final ConnectionMode connectionMode, final StatementOption option,final DatabaseType databaseType) throws SQLException {Statement result = cachedStatements.get(new CacheKey(executionUnit, connectionMode));if (null == result || result.isClosed() || result.getConnection().isClosed()) {String sql = executionUnit.getSqlUnit().getSql();if (option.isReturnGeneratedKeys()) {result = null == option.getColumns() || 0 == option.getColumns().length? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS): connection.prepareStatement(sql, option.getColumns());} else {result = connection.prepareStatement(sql, option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability());}cachedStatements.put(new CacheKey(executionUnit, connectionMode), result);}return result;}

createStorageResource则是通过connection.prepareStatement来创建真正的PrepareStatement,而此时传入的sql也是经过重写之后的sql

createExecuteUpdateCallback

org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java

    private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() {boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown) {@Overrideprotected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {return ((PreparedStatement) statement).executeUpdate();}@Overrideprotected Optional<Integer> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {return Optional.empty();}};}

createExecuteUpdateCallback创建的JDBCExecutorCallback,其executeSQL方法则是通过((PreparedStatement) statement).executeUpdate()来执行,即委托给了真正的PreparedStatement

小结

  • ShardingSphereConnection的prepareStatement创建的是ShardingSpherePreparedStatement,它在ShardingSpherePreparedStatement的executeUpdate的时候进行sql重写,然后prepare,最后执行的时候是通过JDBCExecutorCallback,其executeSQL方法则是通过((PreparedStatement) statement).executeUpdate()来执行,即委托给了真正的PreparedStatement
  • rewriteParameters是通过ParameterRewriter进行rewrite,主要是修改ParameterBuilder;而具体sql语句的修改则通过sqlTokenGenerators进行
  • PreparedStatementExecutionUnitBuilder的build方法这里才真正创建PreparedStatement:它通过StatementManager.createStorageResource则是通过connection.prepareStatement来创建真正的PrepareStatement,而此时传入的sql也是经过重写之后的sql
  • useDriverToExecuteUpdate方法会执行createExecutionGroupContext(会执行prepare方法),cacheStatements这里主要是把eachInput.getStorageResource()真正的PrepareStatement赋值到ShardingSpherePreparedStatement的statements变量中,把eachInput.getExecutionUnit().getSqlUnit().getParameters()赋值到parameterSets,然后执行replay方法通过PreparedStatementInvocationReplayer把修改后的变量replay到真正的PrepareStatement

ShardingSpherePreparedStatement实现了java.sql.PreparedStatement接口,其sql属性是用户传入的sql,即未经过重写的sql,而实际execute的时候,会触发sql重写(包括重写sql语句及参数),最后会通过connection.prepareStatement(传入重写之后的sql)来创建真正的PrepareStatement,然后有一步replay操作,把重写后的参数作用到真正的PrepareStatement,最后通过((PreparedStatement) statement).executeUpdate()来触发执行
至此我们可以得到sql重写的一个基本思路:通过实现java.sql.PreparedStatement接口伪装一个PreparedStatement类,其创建和set参数先内存缓存起来,之后在execute的时候进行sql重写,创建真正的PreparedStatement,replay参数,执行execute方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/67664.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue项目配置MongoDB的增删改查操作

在Vue中配置MongoDB的增删改查操作&#xff0c;需要先安装mongoose模块来连接MongoDB数据库。 1. 在Vue项目的根目录中&#xff0c;使用命令行安装mongoose模块&#xff1a; npm install mongoose --save 2. 找到启动node的app.js文件&#xff08;我这里是在server文件中&…

processflow流程图多人协作预热

前言 在线上办公如火如荼的今天&#xff0c;多人协作功能是每个应用绕不开的门槛。processflow在线流程图&#xff08;前身基于drawio二次开发&#xff09;沉寂两年之久&#xff0c;经过长时间设计开发&#xff0c;调整&#xff0c;最终完成了多人协作的核心模块设计。废话不多…

【ES】笔记-Map介绍与API

Map介绍与API Map实列 Map ES6提供了Map数据结构。它类似于对象&#xff0c;也是键值对的集合。但是“键”的范围不限于字符串&#xff0c;各种类型的值(包括对象)都可以当作键。Map也实现了iterator接口&#xff0c;所以可以使用【扩展运算符】和【for…of…】进行遍历。Map的…

.net项目部署Docker

1、项目生成的bin目录下创建Dockerfile文件 #运行环境描述&#xff0c;此处是用的Net5构建镜像 FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build #复制文件到 docker容器中的app文件夹中 COPY . /app #设置工作目录为 app 文件夹&#xff0c;要和上面一致哦 WORKDIR /app #设…

【深度学习】实验07 使用TensorFlow完成逻辑回归

文章目录 使用TensorFlow完成逻辑回归1. 环境设定2. 数据读取3. 准备好placeholder4. 准备好参数/权重5. 计算多分类softmax的loss function6. 准备好optimizer7. 在session里执行graph里定义的运算 附&#xff1a;系列文章 使用TensorFlow完成逻辑回归 TensorFlow是一种开源的…

vlan笔记

在一个LAN中可能有很多设备节点&#xff0c;有很多协议使用广播&#xff0c;每次整个网络中所有设备都要处理广播&#xff0c;效率低。 一个LAN中用一个或多个二层交换机switch连接&#xff0c;交换机对广播透明。 在交换机上配置VLAN&#xff0c;把物理网络划分为多个逻辑网…

肖sir__设计测试用例方法之场景法04_(黑盒测试)

设计测试用例方法之场景法 1、场景法主要是针对测试场景类型的&#xff0c;顾也称场景流程分析法。 2、流程分析是将软件系统的某个流程看成路径&#xff0c;用路径分析的方法来设计测试用例。根据流程的顺序依次进行组合&#xff0c;使得流程的各个分支能走到。 举例说明&…

网易低代码引擎Tango正式开源

一、Tango简介 Tango 是一个用于快速构建低代码平台的低代码设计器框架,借助 Tango 只需要数行代码就可以完成一个基本的低代码平台前端系统的搭建。Tango 低代码设计器直接读取前端项目的源代码,并以源代码为中心,执行和渲染前端视图,并为用户提供低代码可视化搭建能力,…

uniapp从零到一的学习商城实战

涵盖的功能&#xff1a; 安装开发工具HBuilder&#xff1a;HBuilderX-高效极客技巧 创建项目步骤&#xff1a; 1.右键-项目&#xff1a; 2.选择vue2和默认模板&#xff1a; 3.完整的项目目录&#xff1a; 微信开发者工具调试&#xff1a; 1.安装微信开发者工具 2.打开…

GeoServe Web 管理界面 实现远程访问

文章目录 前言1.安装GeoServer2. windows 安装 cpolar3. 创建公网访问地址4. 公网访问Geo Servcer服务5. 固定公网HTTP地址 前言 GeoServer是OGC Web服务器规范的J2EE实现&#xff0c;利用GeoServer可以方便地发布地图数据&#xff0c;允许用户对要素数据进行更新、删除、插入…

Android Studio新版本New UI及相关设置丨遥遥领先版

1、前言 俗话说工欲善其事必先利其器嘛&#xff0c;工具用不好怎么行呢&#xff0c;借着Android Studio的更新&#xff0c;介绍一下新版本中的更新内容&#xff0c;以及日常开发中那些好用的设置。 2、关于新版本 2.1、最新正式版本 Android Studio Giraffe | 2022.3.1 Pat…

elementui el-table在有summary-method时,table数据行将合计行遮挡住了

前端使用框架&#xff1a;elementUI 使用组件&#xff1a;el-table 在表格内添加合计了合计行&#xff0c;根据业务多次调用数据渲染画面后&#xff0c;偶然导致画面变成如下图所示&#xff0c;table的数据行将合计行遮挡住了&#xff0c;且这个现象有时候好用&#xff0c;有…

Android图形-架构1

目录 引言 Android图形的关键组件&#xff1a; Android图形的pipeline数据流 BufferQueue是啥&#xff1f; 引言 Android提供用于2D和3D图形渲染的API&#xff0c;可与制造商的驱动程序实现代码交互&#xff0c;下面梳理一下Android图形的运作原理。 应用开发者通过三种方…

C++多态案例2----制作饮品

#include<iostream> using namespace std;//制作饮品的大致流程都为&#xff1a; //煮水-----冲泡-----倒入杯中----加入辅料//本案例利用多态技术&#xff0c;提供抽象类制作饮品基类&#xff0c;提供子类制作茶叶和咖啡class AbstractDrinking {public://煮水//冲水//倒…

Scala的集合操作之可变数组和不可变数组,可变List集合与不可变List集合,可变Set与不可变Set操作,可变和不可变Map集合和元组操作

Scala的集合操作之&#xff0c;可变数组和不可变数组&#xff0c;可变List集合与不可变List集合 不可变数组 /* traversable/ˈtrvəsəbl/adj.能越过的&#xff1b;可否认的*/ object Test01_ImmutableArray {def main(args: Array[String]): Unit {// 1. 创建数组val arr:…

视频监控/视频汇聚/视频云存储EasyCVR平台HLS流集成在小程序无法播放问题排查

安防视频/视频云存储/视频集中存储EasyCVR视频监控综合管理平台可以根据不同的场景需求&#xff0c;让平台在内网、专网、VPN、广域网、互联网等各种环境下进行音视频的采集、接入与多端分发。在视频能力上&#xff0c;视频云存储平台EasyCVR可实现视频实时直播、云端录像、视频…

字节前端实习的两道算法题,看看强度如何

最长严格递增子序列 题目描述 给你一个整数数组nums&#xff0c;找到其中最长严格递增子序列的长度。 子序列是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。例如&#xff0c;[3,6,2,7] 是数组 [0,3,1,6,2,2,7…

flink实现kafka、doris精准一次说明

前言说明:本文档只讨论数据源为kafka的情况实现kafka和doris的精准一次写入 flink的kafka连接器已经实现了自动提交偏移量到kafka,当flink中的数据写入成功后,flink会将这批次数据的offset提交到kafka,程序重启时,kafka中记录了当前groupId消费的offset位置,开始消费时将…

文件系统与inode编号

文件描述符fd 0&1&2 Linux 进程默认情况会有3个缺省打开的文件描述符&#xff0c;分别是标准输入0&#xff0c; 标准输出1&#xff0c; 标准错误2. 0,1,2对应的物理设备一般是&#xff1a;键盘&#xff0c;显示器&#xff0c;显示器 所以输入输出还可以采用如下方式 …

中国非晶纳米晶行业市场预测与投资战略报告(2023版)

内容简介&#xff1a; 由于性能优异&#xff0c;非晶材料从20世纪80年代开始成为中国外科学界研究重点&#xff0c;目前美、日、德已经具备完善的生产规模&#xff0c;大量的非晶合金产品逐渐取代硅钢、铁氧体等。 2021年之前&#xff0c;中国铁基非晶带材企业有12家&#xf…