spring sharding JDBC 动态调整数据库连接
通过重写ShardingSphereDataSource类来实现
代码
package org.apache.shardingsphere.driver.jdbc.core.datasource;import com.alibaba.druid.pool.DruidDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractDataSourceAdapter;
import org.apache.shardingsphere.driver.jdbc.context.CachedDatabaseMetaData;
import org.apache.shardingsphere.driver.jdbc.context.JDBCContext;
import org.apache.shardingsphere.driver.state.DriverStateContext;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.checker.RuleConfigurationCheckerFactory;
import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
import org.apache.shardingsphere.infra.config.database.impl.DataSourceProvidedDatabaseConfiguration;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.scope.GlobalRuleConfiguration;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceType;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.builder.schema.DatabaseRulesBuilder;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderFactory;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
import org.apache.shardingsphere.spring.boot.ShardingSphereAutoConfiguration;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;/*** @author qyc* @version 1.0*/
@Slf4j
public final class ShardingSphereDataSource extends AbstractDataSourceAdapter implements AutoCloseable {private final String databaseName;private final ContextManager contextManager;private final JDBCContext jdbcContext;private ContextManagerBuilderParameter contextManagerBuilderParameter;private ModeConfiguration modeConfiguration;private Collection<RuleConfiguration> ruleConfigs;private Properties properties;public ShardingSphereDataSource(final String databaseName, final ModeConfiguration modeConfig) throws SQLException {this.databaseName = databaseName;this.modeConfiguration = modeConfig;this.properties = new Properties();this.ruleConfigs = new LinkedList<>();contextManager = createContextManager(databaseName, modeConfig, new HashMap<>(), this.ruleConfigs, this.properties);jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));}/*** {@link ShardingSphereAutoConfiguration#shardingSphereDataSource(org.springframework.beans.factory.ObjectProvider, org.springframework.beans.factory.ObjectProvider)}** @param databaseName* @param modeConfig* @param dataSourceMap* @param ruleConfigs* @param props* @throws SQLException*/public ShardingSphereDataSource(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,final Collection<RuleConfiguration> ruleConfigs, final Properties props) throws SQLException {checkRuleConfiguration(databaseName, ruleConfigs);this.modeConfiguration = modeConfig;this.databaseName = databaseName;contextManager = createContextManager(databaseName, modeConfig, dataSourceMap, ruleConfigs, null == props ? new Properties() : props);this.ruleConfigs = ruleConfigs;this.properties = props;jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));}/*** 更新现有的数据库配置** @param addDataSourcePropertiesMap 添加的配置* @param dropDataSourceNames 移除的配置* @param updateSources 修改的配置* @param newDataNodes 新的节点使用配置* @throws Exception */public void updateContextMetaData(final Map<String, DataSourceProperties> addDataSourcePropertiesMap, final Collection<String> dropDataSourceNames, Map<String, DataSourceProperties> updateSources, String newDataNodes) throws Exception {if (CollectionUtils.isEmpty(addDataSourcePropertiesMap) && CollectionUtils.isEmpty(dropDataSourceNames) && CollectionUtils.isEmpty(updateSources)) {if (!StringUtils.isEmpty(newDataNodes)) {this.rebuildShardingSphereRule(newDataNodes);}return;}Map<String, DataSourceProperties> refreshProperties = new HashMap<>();Map<String, DataSource> oldDataSources = new HashMap<>(this.contextManager.getDataSourceMap(databaseName));Map<String, DataSource> closeDataSources = new HashMap<>();if (!CollectionUtils.isEmpty(addDataSourcePropertiesMap)) {log.info("添加新的db:{}", addDataSourcePropertiesMap.keySet());this.contextManager.addResource(this.databaseName, addDataSourcePropertiesMap);refreshProperties.putAll(addDataSourcePropertiesMap);}if (!CollectionUtils.isEmpty(dropDataSourceNames)) {log.info("移除旧的db:{}", dropDataSourceNames);this.contextManager.dropResource(this.databaseName, dropDataSourceNames);dropDataSourceNames.forEach(s -> {closeDataSources.put(s, oldDataSources.get(s));});}if (!CollectionUtils.isEmpty(updateSources)) {log.info("更新db:{}", updateSources.keySet());this.contextManager.dropResource(this.databaseName, updateSources.keySet());this.contextManager.addResource(this.databaseName, updateSources);refreshProperties.putAll(updateSources);}log.info("重新加载shardingRule");this.rebuildShardingSphereRule(newDataNodes);log.info("重新构建managerBuildParameter");Map<String, DataSource> dataSourceMap = this.contextManager.getDataSourceMap(databaseName);this.contextManagerBuilderParameter = this.builderParameter(this.databaseName, this.modeConfiguration, dataSourceMap, this.ruleConfigs, this.properties);closeDataSources.forEach((k, closeDataSource) -> {try {log.info("关闭原dataSources:{}", k);if (closeDataSource instanceof DruidDataSource) {((DruidDataSource) closeDataSource).close();}} catch (Exception e) {log.warn("db:{}close时出现异常,异常为:{}", k, e.getMessage(), e);}});}private Map<String, DataSourceProperties> getDataSourcePropertiesMap(final Map<String, DataSource> dataSourceMap) {Map<String, DataSourceProperties> result = new LinkedHashMap<>(dataSourceMap.size(), 1);for (Map.Entry<String, DataSource> each : dataSourceMap.entrySet()) {result.put(each.getKey(), DataSourcePropertiesCreator.create(each.getValue()));}return result;}@SuppressWarnings("unchecked")private void checkRuleConfiguration(final String databaseName, final Collection<RuleConfiguration> ruleConfigs) {ruleConfigs.forEach(each -> RuleConfigurationCheckerFactory.findInstance(each).ifPresent(optional -> optional.check(databaseName, each)));}private ContextManager createContextManager(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,final Collection<RuleConfiguration> ruleConfigs, final Properties props) throws SQLException {ContextManagerBuilderParameter parameter = this.builderParameter(databaseName, modeConfig, dataSourceMap, ruleConfigs, props);this.contextManagerBuilderParameter = parameter;return ContextManagerBuilderFactory.getInstance(modeConfig).build(parameter);}private ContextManagerBuilderParameter builderParameter(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,final Collection<RuleConfiguration> ruleConfigs, final Properties props) {ContextManagerBuilderParameter parameter = ContextManagerBuilderParameter.builder().modeConfig(modeConfig).databaseConfigs(Collections.singletonMap(databaseName, new DataSourceProvidedDatabaseConfiguration(dataSourceMap, ruleConfigs))).globalRuleConfigs(ruleConfigs.stream().filter(each -> each instanceof GlobalRuleConfiguration).collect(Collectors.toList())).props(props).instanceDefinition(new InstanceDefinition(InstanceType.JDBC)).build();return parameter;}private Optional<CachedDatabaseMetaData> createCachedDatabaseMetaData(final Map<String, DataSource> dataSources) throws SQLException {if (dataSources.isEmpty()) {return Optional.empty();}try (Connection connection = dataSources.values().iterator().next().getConnection()) {return Optional.of(new CachedDatabaseMetaData(connection.getMetaData()));}}@Overridepublic Connection getConnection() throws SQLException {return DriverStateContext.getConnection(databaseName, contextManager, jdbcContext);}@Overridepublic Connection getConnection(final String username, final String password) throws SQLException {return getConnection();}/*** Close data sources.** @param dataSourceNames data source names to be closed* @throws Exception exception*/public void close(final Collection<String> dataSourceNames) throws Exception {Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);for (String each : dataSourceNames) {close(dataSourceMap.get(each));}contextManager.close();}private void close(final DataSource dataSource) throws Exception {if (dataSource instanceof AutoCloseable) {((AutoCloseable) dataSource).close();}}@Overridepublic void close() throws Exception {close(contextManager.getDataSourceMap(databaseName).keySet());}@Overridepublic int getLoginTimeout() throws SQLException {Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);return dataSourceMap.isEmpty() ? 0 : dataSourceMap.values().iterator().next().getLoginTimeout();}@Overridepublic void setLoginTimeout(final int seconds) throws SQLException {for (DataSource each : contextManager.getDataSourceMap(databaseName).values()) {each.setLoginTimeout(seconds);}}public Set<String> getDataBaseNames() {return this.contextManager.getDataSourceMap(databaseName).keySet();}private void rebuildShardingSphereRule(String newDataNodes) {if (StringUtils.isEmpty(newDataNodes)) {return;}log.info("重新加载db映射:{}", newDataNodes);MetaDataPersistService metaDataPersistService = this.contextManager.getMetaDataContexts().getPersistService().get();Map<String, DatabaseConfiguration> databaseConfigurationMap = getDatabaseConfigMap(Stream.of(this.databaseName).collect(Collectors.toList()), metaDataPersistService, this.contextManagerBuilderParameter);databaseConfigurationMap.get(this.databaseName).getRuleConfigurations().forEach(rule -> {if (rule instanceof ShardingRuleConfiguration) {Collection<ShardingTableRuleConfiguration> tables = ((ShardingRuleConfiguration) rule).getTables();List<ShardingTableRuleConfiguration> newShardingTableRuleConfigurations = tables.stream().map(table -> {String newActualDataNodes = newDataNodes + "." + table.getLogicTable();ShardingTableRuleConfiguration shardingTableRuleConfiguration = new ShardingTableRuleConfiguration(table.getLogicTable(), newActualDataNodes);shardingTableRuleConfiguration.setDatabaseShardingStrategy(table.getDatabaseShardingStrategy());shardingTableRuleConfiguration.setTableShardingStrategy(table.getTableShardingStrategy());shardingTableRuleConfiguration.setReplaceTablePrefix(table.getReplaceTablePrefix());shardingTableRuleConfiguration.setKeyGenerateStrategy(table.getKeyGenerateStrategy());return shardingTableRuleConfiguration;}).collect(Collectors.toList());((ShardingRuleConfiguration) rule).setTables(newShardingTableRuleConfigurations);}});ConfigurationProperties props = new ConfigurationProperties(metaDataPersistService.getPropsService().load());Collection<ShardingSphereRule> build = DatabaseRulesBuilder.build(databaseName, databaseConfigurationMap.get(this.databaseName), props);List<ShardingSphereRule> oldRules = (List<ShardingSphereRule>) this.contextManager.getMetaDataContexts().getMetaData().getDatabases().get(databaseName).getRuleMetaData().getRules();oldRules.clear();oldRules.addAll(build);}private Map<String, DatabaseConfiguration> getDatabaseConfigMap(final Collection<String> databaseNames, final MetaDataPersistService metaDataPersistService,final ContextManagerBuilderParameter parameter) {Map<String, DatabaseConfiguration> result = new HashMap<>(databaseNames.size(), 1);databaseNames.forEach(each -> result.put(each, createDatabaseConfiguration(each, metaDataPersistService, parameter)));return result;}private DatabaseConfiguration createDatabaseConfiguration(final String databaseName, final MetaDataPersistService metaDataPersistService,final ContextManagerBuilderParameter parameter) {Map<String, DataSource> dataSources = this.contextManager.getDataSourceMap(databaseName);Collection<RuleConfiguration> databaseRuleConfigs = metaDataPersistService.getDatabaseRulePersistService().load(databaseName);return new DataSourceProvidedDatabaseConfiguration(dataSources, databaseRuleConfigs);}
}
通过调用重写的DataSource的updateContextMetaData方法来重新加载连接配置
* @param addDataSourcePropertiesMap 添加的配置* @param dropDataSourceNames 移除的配置 * @param updateSources 修改的配置* @param newDataNodes 新的节点使用配置
spring:shardingsphere:datasource:names: ${SHARDING_DATA_SOURCE_NAMES:db-0,db-1}db-0:type: com.alibaba.druid.pool.DruidDataSourcedriverClassName: com.mysql.jdbc.Driverurl: jdbc:mysql://${mysql.message0.host}/${mysql.message0.database}?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=falseusername: ${mysql.message0.username}password: ${mysql.message0.password}initialSize: 5 # 初始化大小minIdle: 5 # 最小maxActive: 20 # 最大maxWait: 60000 # 获取连接等待超时的时间...db-1:type: com.alibaba.druid.pool.DruidDataSourcedriverClassName: com.mysql.jdbc.Driver...
参数说明
参数名 | 说明 |
---|---|
addDataSourcePropertiesMap | 添加的db连接,key->连接id,例如配置中的db-0,db-1,value->根据普通db连接配置构建成的DataSourceProperties |
dropDataSourceNames | 移除的db连接,key->连接id,例如配置中的db-0,db-1 |
updateSources | 修改的db连接,key->连接id,例如配置中的db-0,db-1,value->根据普通db连接配置构建成的DataSourceProperties |
newDataNodes | 新的节点使用配置,将配置项中使用的的spring.shardingsphere.datasource.names值更换为该值 |
DataSourceProperties构建类
getDataSource(),传入的值为普通的db连接配置,例如
type: com.alibaba.druid.pool.DruidDataSourcedriverClassName: com.mysql.jdbc.Driverurl: jdbc:mysql://db3Ip/db3DB?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=falseusername: db3Userpassword: db3PwdinitialSize: 5 # 初始化大小minIdle: 5 # 最小maxActive: 20 # 最大maxWait: 60000 # 获取连接等待超时的时间timeBetweenEvictionRunsMillis: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒(3600000:为1小时)minEvictableIdleTimeMillis: 300000 # 配置一个连接在池中最小生存的时间,单位是毫秒validationQuery: select current_timestamp() #SELECT 1 FROM DUAL #用来检测连接是否有效的sql,要求是一个查询语句。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会其作用。testWhileIdle: true #申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。建议配置为true,不影响性能,并且保证安全性。testOnBorrow: false #申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。缺省值:truetestOnReturn: false #归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。缺省值:falsepoolPreparedStatements: true #打开PSCache,并且指定每个连接上PSCache的大小#是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle。在mysql5.5以下的版本中没有PSCache功能,建议关闭掉。5.5及以上版本有PSCache,建议开启。缺省值:falsemaxPoolPreparedStatementPerConnectionSize: 20 # 要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。在Druid中,不会存在Oracle下PSCache占用内存过多的问题,可以把这个数值配置大一些,比如说100。
package com.kittlen.provider.config.sharding;import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.expr.InlineExpressionParser;
import org.apache.shardingsphere.spring.boot.datasource.AopProxyUtils;
import org.apache.shardingsphere.spring.boot.util.PropertyUtil;
import org.springframework.core.env.Environment;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.jndi.JndiObjectFactoryBean;
import org.springframework.stereotype.Component;import javax.naming.NamingException;
import javax.sql.DataSource;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;/*** @author kittlen* @version 1.0*/
public class MyShardingDataSource {private static final String PREFIX = "spring.shardingsphere.datasource.";private static final String DATA_SOURCE_NAME = "name";private static final String DATA_SOURCE_NAMES = "names";private static final String DATA_SOURCE_TYPE = "type";private static final String JNDI_NAME = "jndi-name";/*** Get data source map.** @param environment spring boot environment* @return data source map*/public static Map<String, DataSource> getDataSourceMap(final Environment environment) {Map<String, DataSource> result = new LinkedHashMap<>();for (String each : getDataSourceNames(environment)) {try {result.put(each, getDataSource(environment, each));} catch (final NamingException ex) {throw new ShardingSphereException("Can't find JNDI data source.", ex);}}return result;}/*** Get data source map.** @param environment spring boot environment* @return data source map*/public static Map<String, DataSourceProperties> getDataSourcePropertiesMap(final Environment environment) {Map<String, DataSourceProperties> result = new LinkedHashMap<>();for (String each : getDataSourceNames(environment)) {try {result.put(each, getDataSourceProperties(environment, each));} catch (final NamingException ex) {throw new ShardingSphereException("Can't find JNDI data source.", ex);}}return result;}private static List<String> getDataSourceNames(final Environment environment) {StandardEnvironment standardEnv = (StandardEnvironment) environment;standardEnv.setIgnoreUnresolvableNestedPlaceholders(true);String dataSourceNames = standardEnv.getProperty(PREFIX + DATA_SOURCE_NAME);if (Strings.isNullOrEmpty(dataSourceNames)) {dataSourceNames = standardEnv.getProperty(PREFIX + DATA_SOURCE_NAMES);}return new InlineExpressionParser(dataSourceNames).splitAndEvaluate();}private static DataSource getDataSource(final Environment environment, final String dataSourceName) throws NamingException {Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, String.join("", PREFIX, dataSourceName), Map.class);Preconditions.checkState(!dataSourceProps.isEmpty(), "Wrong datasource [%s] properties.", dataSourceName);if (dataSourceProps.containsKey(JNDI_NAME)) {return getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString());}return DataSourcePoolCreator.create(new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps)));}private static DataSourceProperties getDataSourceProperties(final Environment environment, final String dataSourceName) throws NamingException {Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, String.join("", PREFIX, dataSourceName), Map.class);Preconditions.checkState(!dataSourceProps.isEmpty(), "Wrong datasource [%s] properties.", dataSourceName);if (dataSourceProps.containsKey(JNDI_NAME)) {return DataSourcePropertiesCreator.create(getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString()));}return new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps));}public static DataSourceProperties getDataSourceProperties(Map<String, Object> dataSourceProps) throws NamingException {if (dataSourceProps.containsKey(JNDI_NAME)) {return DataSourcePropertiesCreator.create(getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString()));}return new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps));}public static DataSource getDataSource(Map<String, Object> dataSourceProps) throws NamingException {if (dataSourceProps.containsKey(JNDI_NAME)) {return getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString());}return DataSourcePoolCreator.create(new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps)));}private static DataSource getJNDIDataSource(final String jndiName) throws NamingException {JndiObjectFactoryBean bean = new JndiObjectFactoryBean();bean.setResourceRef(true);bean.setJndiName(jndiName);bean.setProxyInterface(DataSource.class);bean.afterPropertiesSet();return (DataSource) AopProxyUtils.getTarget(bean.getObject());}
}