【源码】Sharding-JDBC源码分析之SQL中读写分离动态策略、数据库发现规则及DatabaseDiscoverySQLRouter路由的原理

 Sharding-JDBC系列

1、Sharding-JDBC分库分表的基本使用

2、Sharding-JDBC分库分表之SpringBoot分片策略

3、Sharding-JDBC分库分表之SpringBoot主从配置

4、SpringBoot集成Sharding-JDBC-5.3.0分库分表

5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表

6、【源码】Sharding-JDBC源码分析之JDBC

7、【源码】Sharding-JDBC源码分析之SPI机制

8、【源码】Sharding-JDBC源码分析之Yaml分片配置文件解析原理

9、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(一)

10、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(二)

11、【源码】Sharding-JDBC源码分析之Yaml分片配置转换原理

12、【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理

13、【源码】Sharding-JDBC源码分析之ContextManager创建中mode分片配置信息的持久化存储的原理

14、【源码】Sharding-JDBC源码分析之ContextManager创建中ShardingSphereDatabase的创建原理

15、【源码】Sharding-JDBC源码分析之分片规则生成器DatabaseRuleBuilder实现规则配置到规则对象的生成原理

16、【源码】Sharding-JDBC源码分析之配置数据库定义的表的元数据解析原理

17、【源码】Sharding-JDBC源码分析之ShardingSphereConnection的创建原理

18、【源码】Sharding-JDBC源码分析之ShardingSpherePreparedStatement的创建原理

19、【源码】Sharding-JDBC源码分析之Sql解析的原理

20、【源码】Sharding-JDBC源码分析之SQL路由及SingleSQLRouter单表路由

21、【源码】Sharding-JDBC源码分析之SQL中分片键路由ShardingSQLRouter的原理

22、【源码】Sharding-JDBC源码分析之SQL中读写分离路由ReadwriteSplittingSQLRouter的原理

23、【源码】Sharding-JDBC源码分析之SQL中读写分离动态策略、数据库发现规则及DatabaseDiscoverySQLRouter路由的原理

前言

在上一篇

【源码】Sharding-JDBC源码分析之SQL中读写分离路由ReadwriteSplittingSQLRouter的原理

介绍了读写分离策略时,策略分为静态策略和动态策略,并详细介绍了静态策略的实现。本篇从源码的角度,分析动态策略的实现。读写分离规则的动态策略是结合数据库发现规则来实现的,且数据库发现规则可单独作为路由器,对应的路由器为DatabaseDiscoverySQLRouter,对应的规则对象为DatabaseDiscoveryRule。

ReadwriteSplittingStrategyFactory

在ReadwriteSplittingDataSourceRule读写分离规则类的构造方法中,通过ReadwriteSplittingStrategyFactory的newInstance()方法,创建ReadwriteSplittingStrategy读写分离策略。

ReadwriteSplittingStrategyFactory的源码如下:

package org.apache.shardingsphere.readwritesplitting.strategy;/*** 读写分离策略工厂*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ReadwriteSplittingStrategyFactory {/*** 实例化读写分离策略* @param readwriteSplittingConfig 读写分离配置* @param builtRules 配置的规则* @return*/public static ReadwriteSplittingStrategy newInstance(final ReadwriteSplittingDataSourceRuleConfiguration readwriteSplittingConfig, final Collection<ShardingSphereRule> builtRules) {// 如果没有指定静态策略return null == readwriteSplittingConfig.getStaticStrategy()// 默认创建动态策略? createDynamicReadwriteSplittingStrategy(readwriteSplittingConfig.getDynamicStrategy(), builtRules)// 创建静态策略: createStaticReadwriteSplittingStrategy(readwriteSplittingConfig.getStaticStrategy());}/*** 创建一个静态读写分离策略* @param staticConfig* @return*/private static StaticReadwriteSplittingStrategy createStaticReadwriteSplittingStrategy(final StaticReadwriteSplittingStrategyConfiguration staticConfig) {return new StaticReadwriteSplittingStrategy(staticConfig.getWriteDataSourceName(), staticConfig.getReadDataSourceNames());}/*** 创建动态读写分离策略* @param dynamicConfig 读写分离的动态策略配置* @param builtRules 配置的规则* @return*/private static DynamicReadwriteSplittingStrategy createDynamicReadwriteSplittingStrategy(final DynamicReadwriteSplittingStrategyConfiguration dynamicConfig,final Collection<ShardingSphereRule> builtRules) {// 从配置的规则中获取DynamicDataSourceContainedRule,默认只有DatabaseDiscoveryRule,即动态数据库发现规则Optional<ShardingSphereRule> dynamicDataSourceStrategy = builtRules.stream().filter(each -> each instanceof DynamicDataSourceContainedRule).findFirst();// 获取是否允许写库支持读,默认为支持boolean allowWriteDataSourceQuery = Strings.isNullOrEmpty(dynamicConfig.getWriteDataSourceQueryEnabled()) ? Boolean.TRUE : Boolean.parseBoolean(dynamicConfig.getWriteDataSourceQueryEnabled());// 创建动态策略return new DynamicReadwriteSplittingStrategy(dynamicConfig.getAutoAwareDataSourceName(), allowWriteDataSourceQuery, (DynamicDataSourceContainedRule) dynamicDataSourceStrategy.get());}
}

通过createDynamicReadwriteSplittingStrategy()方法,创建DynamicReadwriteSplittingStrategy动态读写分离策略。

DynamicReadwriteSplittingStrategy

DynamicReadwriteSplittingStrategy的源码如下:

package org.apache.shardingsphere.readwritesplitting.strategy.type;/*** 动态读写分离策略*/
@RequiredArgsConstructor
@Getter
public final class DynamicReadwriteSplittingStrategy implements ReadwriteSplittingStrategy {// 自动感知的数据源名称private final String autoAwareDataSourceName;// 是否允许写数据源进行读操作private final boolean allowWriteDataSourceQuery;// 动态数据源包含规则private final DynamicDataSourceContainedRule dynamicDataSource;/*** 获取写数据源名称* @return*/@Overridepublic String getWriteDataSource() {return dynamicDataSource.getPrimaryDataSourceName(autoAwareDataSourceName);}/*** 获取读数据源名称* @return*/@Overridepublic List<String> getReadDataSources() {return new ArrayList<>(dynamicDataSource.getReplicaDataSourceNames(autoAwareDataSourceName));}@Overridepublic Collection<String> getAllDataSources() {return Collections.singletonList(autoAwareDataSourceName);}
}

在DynamicReadwriteSplittingStrategy动态读写分离策略中,通过DynamicDataSourceContainedRule对象,获取读写分离中的读数据源和写数据源名称。其中DynamicDataSourceContainedRule对象通过构造方法传入,即从ReadwriteSplittingStrategyFactory的createDynamicReadwriteSplittingStrategy()方法中传入的,且传入的规则是从系统配置的规则中,查找实现DynamicDataSourceContainedRule接口的规则,在系统中为DatabaseDiscoveryRule对象,即数据库发现规则。

动态读写分离配置示例

rules:- !READWRITE_SPLITTINGdataSources:readwrite_ds:  rw_ds    #逻辑数据源dynamicStrategy: #动态策略autoAwareDataSourceName: awareDataSource #允许自动识别的数据源名writeDataSourceQueryEnabled:  true #允许写库进行读操作- !DB_DISCOVERYdataSources:awareDataSource:  # 组名,同dynamicStrategy的autoAwareDataSourceNamedataSourceNames: ds_$->{1..2}discoveryHeartbeatName: ds_heartbeat   #心跳检测discoveryTypeName: MySQLMGRdiscoveryHeartbeats:ds_heartbeat:props:keep-alive-cron: '0/5 * * * * ?'discoveryTypes:    # 发现类型MySQLMGR:type: MySQL.MGRprops:group-name: your_group_name

DatabaseDiscoveryRule

数据库发现规则配置最后会解析成DatabaseDiscoveryRule对象。DatabaseDiscoveryRule的源码如下:

package org.apache.shardingsphere.dbdiscovery.rule;/*** 数据库发现规则*/
public final class DatabaseDiscoveryRule implements DatabaseRule, DataSourceContainedRule, DynamicDataSourceContainedRule, ExportableRule {// DatabaseDiscoveryRuleConfiguration对象@Getterprivate final RuleConfiguration configuration;// 数据源名称,默认为logic_dbprivate final String databaseName;// 数据库发现配置的数据源及对应的数据源对象private final Map<String, DataSource> dataSourceMap;// 配置的数据库发现提供算法对象信息private final Map<String, DatabaseDiscoveryProviderAlgorithm> discoveryTypes;// 数据库规则对象。key:组名(组名需为逻辑数据源或真实数据源的名称),一组一个DatabaseDiscoveryDataSourceRule对象@Getterprivate final Map<String, DatabaseDiscoveryDataSourceRule> dataSourceRules;private final InstanceContext instanceContext;// 针对Zookeeper集群的mode的日程上下文对象(非Zookeeper为空方法实例)private final ScheduleContext scheduleContext;/*** @param databaseName 数据源名称,默认为logic_db* @param dataSourceMap 数据库发现配置的数据源及对应的数据源对象* @param ruleConfig 数据库发现规则配置对象* @param instanceContext 实例上下文*/public DatabaseDiscoveryRule(final String databaseName, final Map<String, DataSource> dataSourceMap, final DatabaseDiscoveryRuleConfiguration ruleConfig, final InstanceContext instanceContext) {configuration = ruleConfig;this.databaseName = databaseName;this.dataSourceMap = dataSourceMap;this.instanceContext = instanceContext;this.scheduleContext = ScheduleContextFactory.newInstance(instanceContext.getModeConfiguration());discoveryTypes = getDiscoveryProviderAlgorithms(ruleConfig.getDiscoveryTypes());dataSourceRules = getDataSourceRules(ruleConfig.getDataSources(), ruleConfig.getDiscoveryHeartbeats());findPrimaryReplicaRelationship(databaseName, dataSourceMap);initHeartBeatJobs();}/*** 解析配置的算法信息,根据类型创建对应的算法对象* @param discoveryTypesConfig* @return*/private static Map<String, DatabaseDiscoveryProviderAlgorithm> getDiscoveryProviderAlgorithms(final Map<String, AlgorithmConfiguration> discoveryTypesConfig) {Map<String, DatabaseDiscoveryProviderAlgorithm> result = new LinkedHashMap<>(discoveryTypesConfig.size(), 1);for (Entry<String, AlgorithmConfiguration> entry : discoveryTypesConfig.entrySet()) {result.put(entry.getKey(), ShardingSphereAlgorithmFactory.createAlgorithm(entry.getValue(), DatabaseDiscoveryProviderAlgorithm.class));}return result;}/*** 获取数据源对象,key:组名,一组一个DatabaseDiscoveryDataSourceRule对象* @param dataSources 配置的数据源集合* @param heartbeatConfig 心跳配置* @return*/private Map<String, DatabaseDiscoveryDataSourceRule> getDataSourceRules(final Collection<DatabaseDiscoveryDataSourceRuleConfiguration> dataSources,final Map<String, DatabaseDiscoveryHeartBeatConfiguration> heartbeatConfig) {Map<String, DatabaseDiscoveryDataSourceRule> result = new HashMap<>(dataSources.size(), 1);// 遍历 配置的数据源集合for (DatabaseDiscoveryDataSourceRuleConfiguration each : dataSources) {// key为组名result.put(each.getGroupName(), new DatabaseDiscoveryDataSourceRule(each, Strings.isNullOrEmpty(each.getDiscoveryHeartbeatName()) ? new Properties(): heartbeatConfig.get(each.getDiscoveryHeartbeatName()).getProps(), discoveryTypes.get(each.getDiscoveryTypeName())));}return result;}/*** 查找主副本关系* @param databaseName* @param dataSourceMap 数据源Map集合*/private void findPrimaryReplicaRelationship(final String databaseName, final Map<String, DataSource> dataSourceMap) {// 遍历配置的数据源规则for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {String groupName = entry.getKey();DatabaseDiscoveryDataSourceRule dataSourceRule = entry.getValue();// 从dataSourceMap中获取数据库发现配置的数据源对象Map<String, DataSource> originalDataSourceMap = dataSourceRule.getDataSourceGroup(dataSourceMap);// 创建数据库发现引擎DatabaseDiscoveryEngine engine = new DatabaseDiscoveryEngine(dataSourceRule.getDatabaseDiscoveryProviderAlgorithm(), instanceContext.getEventBusContext());engine.checkEnvironment(databaseName, originalDataSourceMap);// 通过引擎,修改主数据源名称dataSourceRule.changePrimaryDataSourceName(engine.changePrimaryDataSource(databaseName, groupName, entry.getValue().getPrimaryDataSourceName(), originalDataSourceMap, dataSourceRule.getDisabledDataSourceNames()));}}/*** 从Map数据源集合的value中获取第一个数据源规则对象* @return*/public DatabaseDiscoveryDataSourceRule getSingleDataSourceRule() {return dataSourceRules.values().iterator().next();}/*** 根据数据源名称,获取数据库发现数据源规则* @param dataSourceName* @return*/public Optional<DatabaseDiscoveryDataSourceRule> findDataSourceRule(final String dataSourceName) {return Optional.ofNullable(dataSourceRules.get(dataSourceName));}@Overridepublic Map<String, Collection<String>> getDataSourceMapper() {Map<String, Collection<String>> result = new HashMap<>();for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {result.putAll(entry.getValue().getDataSourceMapper());}return result;}@Overridepublic void restartHeartBeatJob(final DataSourceStatusChangedEvent event) {PrimaryDataSourceChangedEvent dataSourceEvent = (PrimaryDataSourceChangedEvent) event;QualifiedDatabase qualifiedDatabase = dataSourceEvent.getQualifiedDatabase();DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(qualifiedDatabase.getGroupName());Preconditions.checkNotNull(dataSourceRule, "Can not find database discovery data source rule in database `%s`", databaseName);dataSourceRule.changePrimaryDataSourceName(qualifiedDatabase.getDataSourceName());initHeartBeatJobs();}@Overridepublic void closeSingleHeartBeatJob(final String groupName) {DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(groupName);Preconditions.checkNotNull(dataSourceRule, "Can not find database discovery data source rule in database `%s`", databaseName);scheduleContext.closeSchedule(dataSourceRule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + dataSourceRule.getGroupName());}@Overridepublic void closeAllHeartBeatJob() {for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {DatabaseDiscoveryDataSourceRule rule = entry.getValue();scheduleContext.closeSchedule(rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + rule.getGroupName());}}/*** 初始化心跳检测的任务*/private void initHeartBeatJobs() {for (Entry<String, DatabaseDiscoveryDataSourceRule> entry : dataSourceRules.entrySet()) {DatabaseDiscoveryDataSourceRule rule = entry.getValue();// 创建任务名称String jobName = rule.getDatabaseDiscoveryProviderAlgorithm().getType() + "-" + databaseName + "-" + rule.getGroupName();// 根据配置信息,创建cron任务CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),rule.getDatabaseDiscoveryProviderAlgorithm(), rule.getDisabledDataSourceNames(), instanceContext.getEventBusContext()).execute(null),rule.getHeartbeatProps().getProperty("keep-alive-cron"));// 添加到日程上下文中,根据cron,定时检测心跳scheduleContext.startSchedule(job);}}@Overridepublic String getPrimaryDataSourceName(final String dataSourceName) {return dataSourceRules.get(dataSourceName).getPrimaryDataSourceName();}/*** 通过数据源规则,从对应的逻辑数据源(组名)中获取副本数据源名称* @param dataSourceName data source name* @return*/@Overridepublic Collection<String> getReplicaDataSourceNames(final String dataSourceName) {return dataSourceRules.get(dataSourceName).getReplicaDataSourceNames();}@Overridepublic void updateStatus(final DataSourceStatusChangedEvent event) {StorageNodeDataSourceChangedEvent dataSourceChangedEvent = (StorageNodeDataSourceChangedEvent) event;DatabaseDiscoveryDataSourceRule dataSourceRule = dataSourceRules.get(dataSourceChangedEvent.getQualifiedDatabase().getGroupName());Preconditions.checkNotNull(dataSourceRule, "Can not find database discovery data source rule in database `%s`", databaseName);if (StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus())) {dataSourceRule.disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());} else {dataSourceRule.enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());}}@Overridepublic Map<String, Object> getExportData() {return Collections.singletonMap(ExportableConstants.EXPORT_DB_DISCOVERY_PRIMARY_DATA_SOURCES, exportPrimaryDataSourceMap());}private Map<String, String> exportPrimaryDataSourceMap() {Map<String, String> result = new HashMap<>(dataSourceRules.size(), 1);dataSourceRules.forEach((name, dataSourceRule) -> result.put(dataSourceRule.getGroupName(), dataSourceRule.getPrimaryDataSourceName()));return result;}@Overridepublic String getType() {return DatabaseDiscoveryRule.class.getSimpleName();}
}

4.1 构造方法

在构造方法中,执行如下:

1)记录配置对象、数据库名、配置的数据源、示例上下文;

2)创建日程上下文,用于心跳检测;

3)通过配置的发现规则,创建对应的发现提供器算法DatabaseDiscoveryProviderAlgorithm对象,存放在Map中,key为配置的提供者名称;

3.1)通过配置的发现类型的 type 值,使用SPI,获取对应的 DatabaseDiscoveryProviderAlgorithm 算法对象;

3.2)可配置的type值包括:

3.2.1)openGauss.NORMAL_REPLICATION:openGauss数据库的普通复制(主从),对应的算法对象为:OpenGaussNormalReplicationDatabaseDiscoveryProviderAlgorithm;

3.2.2)MySQL.MGR:MySQL是MGR机制,对应的算法对象为:MGRMySQLDatabaseDiscoveryProviderAlgorithm;

3.2.3)MySQL.NORMAL_REPLICATION:MySQL的普通复制(主从),对应的算法对象为:MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm;

4)通过配置的发现数据源、心跳检测规则,创建DatabaseDiscoveryDataSourceRule对象,存放在Map中,key为组名;

5)查找 DatabaseDiscoveryDataSourceRule 数据源中的主副本关系;

5.1)遍历 4)中的 DatabaseDiscoveryDataSourceRule;

5.2)执行DatabaseDiscoveryDataSourceRule的getDataSourceGroup()方法,从当前系统配置的数据源中查找当前数据库发现规则中配置的数据源DataSource的Map对象;

5.3)创建DatabaseDiscoveryEngine引擎;

5.4)通过DatabaseDiscoveryEngine引擎,进行环境检测,即检测对应数据源的有效性。通过对应的发现提供器算法DatabaseDiscoveryProviderAlgorithm对象,连接数据源,进行检测;

5.5)自动检测数据库发现中配置的数据源中的主数据源,并记录。通过对应的发现提供器算法DatabaseDiscoveryProviderAlgorithm对象,连接数据源,获取当前对应组的主数据源;

4.2 获取主数据源

通过当前分片或单表转换后的数据源名,匹配对应数据库发现规则的组名,找到 DatabaseDiscoveryDataSourceRule,获取主数据源名称。

4.3 获取副本数据源

通过当前分片或单表转换后的数据源名,匹配对应数据库发现规则的组名,找到 DatabaseDiscoveryDataSourceRule,获取数据库发现规则中配置的数据源中,可用且不是主数据源的其他数据源。

MySQL的MGR

简介

MySQL MGR(MySQL Group Replication)是MySQL官方在MySQL 5.7.17版本中以插件形式推出的主从复制高可用技术,可以为MySQL集群系统提供数据冗余和故障转移能力。

MGR基于原生的主从复制,将各节点归入到一个组中,通过组内节点的通信协商(组通信协议基于Paxos算法),实现数据的强一致性、故障探测、冲突检测、节点加组、节点离组等功能。它使用Paxos分布式算法来提供节点间的分布式协调,要求组中大多数节点在线才能达到法定票数,从而对一个决策做出一致的决定。

工作模式

MGR可以以单主模式或多主模式运行,通过group_replication_single_primary_mode=[ON|OFF]变量指定工作模式。组内所有成员都必须运行相同的工作模式。

  • 单主模式:从复制组中众多个MySQL节点中自动选举一个master节点,只有master节点可以写,其他节点自动设置为read only。当master节点故障时,会自动选举一个新的master节点,选举成功后,它将设置为可写,其他slave将指向这个新的master。

  • 多主模式:复制组中的任何一个节点都可以写,因此没有master和slave的概念。只要突然故障的节点数量不太多,这个多主模型就能继续可用。但需要注意的是,多主模式下可能存在数据冲突和一致性问题,因此在使用时需要谨慎。

与普通主从复制的区别

MySQL组复制分单主模式和多主模式。如果仅使用MySQL主从模式,MySQL主从模式的复制技术仅解决了数据同步的问题,如果 master 宕机,意味着数据库管理员需要介入,应用系统可能需要修改数据库连接地址或者重启才能实现。组复制在数据库层面上做到了,只要集群中大多数主机可用,则服务可用,例如有一个3台的服务器集群,则允许其中1台宕机。

常用SQL语句

1)查看集群成员及其状态

SELECT * FROM performance_schema.replication_group_members;

此语句用于显示集群中所有成员的信息,包括成员ID、主机名、端口号、状态、角色和版本等。

2)检查MySQL服务器上group_replication插件的状态

SELECT PLUGIN_STATUS FROM information_schema.PLUGINS WHERE PLUGIN_NAME='group_replication';

这个查询会返回group_replication插件的当前状态,状态值可能是以下几种之一:

ACTIVE:表示插件已启用并正在运行。
INACTIVE:表示插件已安装但未启用。
DISABLED:表示插件已被禁用(在某些MySQL版本中,这个状态可能不被明确使用,插件可能只显示为INACTIVE,但通过设置disabled_plugins系统变量来禁用)。

3)检查MySQL服务器上MGR是否以单主模式执行

SELECT VARIABLE_VALUE FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_single_primary_mode'

这个查询会返回一个结果集,其中包含一个列VARIABLE_VALUE,该列的值指示了group_replication_single_primary_mode的设置:

  • 如果值为ON,则表示MGR集群配置为单主模式。在这种模式下,集群会自动选举一个主节点来处理写操作,而其他节点则作为从节点,只能处理读操作(除非配置了读写分离策略)。

  • 如果值为OFF,则表示MGR集群配置为多主模式。在这种模式下,集群中的任何节点都可以处理写操作,但需要小心处理数据冲突和一致性问题。

MGRMySQLDatabaseDiscoveryProviderAlgorithm

MGRMySQLDatabaseDiscoveryProviderAlgorithm为MySQL的MGR对应的数据库发现提供者算法对象。

MGRMySQLDatabaseDiscoveryProviderAlgorithm的源码如下:

package org.apache.shardingsphere.dbdiscovery.mysql.type;/*** MySQL Group Replication 数据库发现提供算法*/
@Getter
public final class MGRMySQLDatabaseDiscoveryProviderAlgorithm implements DatabaseDiscoveryProviderAlgorithm {/*** 查询 MGR 状态*/private static final String QUERY_PLUGIN_STATUS = "SELECT PLUGIN_STATUS FROM information_schema.PLUGINS WHERE PLUGIN_NAME='group_replication'";/*** 查询单主节点模式的可用值*/private static final String QUERY_SINGLE_PRIMARY_MODE = "SELECT VARIABLE_VALUE FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_single_primary_mode'";/*** 查询组名*/private static final String QUERY_GROUP_NAME = "SELECT VARIABLE_VALUE FROM performance_schema.global_variables WHERE VARIABLE_NAME='group_replication_group_name'";/***  查询所有数据源节点信息*/private static final String QUERY_MEMBER_LIST = "SELECT MEMBER_HOST, MEMBER_PORT, MEMBER_STATE FROM performance_schema.replication_group_members";/***  查询主数据源节点*/private static final String QUERY_PRIMARY_DATA_SOURCE = "SELECT MEMBER_HOST, MEMBER_PORT FROM performance_schema.replication_group_members WHERE MEMBER_ID = "+ "(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'group_replication_primary_member')";/*** 查询某个ip节点的数据源状态*/private static final String QUERY_CURRENT_MEMBER_STATE = "SELECT MEMBER_STATE FROM performance_schema.replication_group_members WHERE MEMBER_HOST=? AND MEMBER_PORT=?";private Properties props;@Overridepublic void init(final Properties props) {this.props = props;}/*** 环境检测* @param databaseName database name* @param dataSources data sources*/@Overridepublic void checkEnvironment(final String databaseName, final Collection<DataSource> dataSources) {// 创建线程池ExecutorService executorService = ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSources.size()).getExecutorServiceManager().getExecutorService();Collection<CompletableFuture<Void>> completableFutures = new LinkedList<>();// 遍历检测数据源for (DataSource dataSource : dataSources) {completableFutures.add(runAsyncCheckEnvironment(databaseName, dataSource, executorService));}CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));Iterator<CompletableFuture<Void>> mgrInstancesFuture = completableFutures.stream().iterator();while (mgrInstancesFuture.hasNext()) {// 确保所有的数据源都是有效的mgrInstancesFuture.next().join();}}/*** 异步运行环境检测* @param databaseName* @param dataSource* @param executorService* @return*/private CompletableFuture<Void> runAsyncCheckEnvironment(final String databaseName, final DataSource dataSource, final ExecutorService executorService) {return CompletableFuture.runAsync(() -> {try {checkSingleDataSourceEnvironment(databaseName, dataSource);} catch (final SQLException ex) {throw new SQLWrapperException(ex);}}, executorService);}/*** 检测单数据源环境* @param databaseName* @param dataSource* @throws SQLException*/private void checkSingleDataSourceEnvironment(final String databaseName, final DataSource dataSource) throws SQLException {try (Connection connection = dataSource.getConnection();Statement statement = connection.createStatement()) {checkPluginActive(databaseName, statement);checkSinglePrimaryMode(databaseName, statement);checkGroupName(databaseName, statement);checkMemberInstanceURL(databaseName, connection.getMetaData().getURL(), statement);}}/*** 检测MGR插件状态,确认是否为ACTIVE* @param databaseName* @param statement* @throws SQLException*/private void checkPluginActive(final String databaseName, final Statement statement) throws SQLException {try (ResultSet resultSet = statement.executeQuery(QUERY_PLUGIN_STATUS)) {ShardingSpherePreconditions.checkState(resultSet.next() && "ACTIVE".equals(resultSet.getString("PLUGIN_STATUS")), () -> new InvalidMGRPluginException(databaseName));}}/*** 检测单主节点模式的可用值* @param databaseName* @param statement* @throws SQLException*/private void checkSinglePrimaryMode(final String databaseName, final Statement statement) throws SQLException {try (ResultSet resultSet = statement.executeQuery(QUERY_SINGLE_PRIMARY_MODE)) {ShardingSpherePreconditions.checkState(resultSet.next() && "ON".equals(resultSet.getString("VARIABLE_VALUE")), () -> new InvalidMGRModeException(databaseName));}}/*** 检测组名是否和配置的组名一致* @param databaseName* @param statement* @throws SQLException*/private void checkGroupName(final String databaseName, final Statement statement) throws SQLException {try (ResultSet resultSet = statement.executeQuery(QUERY_GROUP_NAME)) {ShardingSpherePreconditions.checkState(resultSet.next() && props.getProperty("group-name", "").equals(resultSet.getString("VARIABLE_VALUE")),() -> new InvalidMGRGroupNameConfigurationException(props.getProperty("group-name"), databaseName));}}/*** 检测url对应的数据源的地址是否为MGR集群中的节点* @param databaseName* @param url* @param statement* @throws SQLException*/private void checkMemberInstanceURL(final String databaseName, final String url, final Statement statement) throws SQLException {try (ResultSet resultSet = statement.executeQuery(QUERY_MEMBER_LIST)) {while (resultSet.next()) {if (url.contains(String.join(":", resultSet.getString("MEMBER_HOST"), resultSet.getString("MEMBER_PORT")))) {return;}}}throw new InvalidMGRReplicationGroupMemberException(url, databaseName);}/*** 判断dataSource是否为主节点* @param dataSource data source to be judged* @return* @throws SQLException*/@Overridepublic boolean isPrimaryInstance(final DataSource dataSource) throws SQLException {try (Connection connection = dataSource.getConnection();Statement statement = connection.createStatement();ResultSet resultSet = statement.executeQuery(QUERY_PRIMARY_DATA_SOURCE)) {if (resultSet.next()) {MySQLDataSourceMetaData metaData = new MySQLDataSourceMetaData(connection.getMetaData().getURL());return metaData.getHostname().equals(resultSet.getString("MEMBER_HOST")) && Integer.toString(metaData.getPort()).equals(resultSet.getString("MEMBER_PORT"));}}return false;}@Overridepublic ReplicaDataSourceStatus loadReplicaStatus(final DataSource replicaDataSource) throws SQLException {try (Connection connection = replicaDataSource.getConnection()) {return new ReplicaDataSourceStatus(isOnlineDataSource(connection, new MySQLDataSourceMetaData(connection.getMetaData().getURL())), 0L);}}/*** 判断当前的metaData是否为online的节点* @param connection* @param metaData* @return* @throws SQLException*/private boolean isOnlineDataSource(final Connection connection, final MySQLDataSourceMetaData metaData) throws SQLException {try (PreparedStatement preparedStatement = connection.prepareStatement(QUERY_CURRENT_MEMBER_STATE)) {preparedStatement.setString(1, metaData.getHostname());preparedStatement.setString(2, Integer.toString(metaData.getPort()));try (ResultSet resultSet = preparedStatement.executeQuery()) {return resultSet.next() && "ONLINE".equals(resultSet.getString("MEMBER_STATE"));}}}@Overridepublic String getType() {return "MySQL.MGR";}
}

在MGRMySQLDatabaseDiscoveryProviderAlgorithm类中,主要完成以下实现:

1)数据源有效性检测;

通过数据源对象,执行对应MGR的SQL查询语句,检查MGR对应的状态;

2)主节点判断;

通过数据源对象,执行MGR查询主节点的SQL语句,检查对应数据源在MGR组中的状态是否为主节点;

3)副本节点状态检测;

通过数据源对象,执行MGR查询节点的SQL语句,检查对应数据源在MGR组中的状态是否为在线;

对于数据库发现规则为MySQL.MGR的读写分离动态策略中,对应的DynamicReadwriteSplittingStrategy对象中的DynamicDataSourceContainedRule对象为DatabaseDiscoveryRule对象,其对应的提供者算法为MGRMySQLDatabaseDiscoveryProviderAlgorithm对象。

在DatabaseDiscoveryRule对象中,通过MGRMySQLDatabaseDiscoveryProviderAlgorithm对象,获取组中的主数据源。从而保证了主库的可用性。

DatabaseDiscoverySQLRouter

数据库发现规则也可以单独作为路由器进行SQL语句的数据源路由,其执行在读写分离路由之后。

DatabaseDiscoverySQLRouter的源码如下:

package org.apache.shardingsphere.dbdiscovery.route;/*** 数据库发现SQL路由器*/
public final class DatabaseDiscoverySQLRouter implements SQLRouter<DatabaseDiscoveryRule> {@Overridepublic RouteContext createRouteContext(final QueryContext queryContext, final ShardingSphereDatabase database,final DatabaseDiscoveryRule rule, final ConfigurationProperties props, final ConnectionContext connectionContext) {RouteContext result = new RouteContext();// 获取第一个DatabaseDiscoveryDataSourceRuleDatabaseDiscoveryDataSourceRule singleDataSourceRule = rule.getSingleDataSourceRule();// 路由到主数据源String dataSourceName = new DatabaseDiscoveryDataSourceRouter(singleDataSourceRule).route();// 创建路由单元,添加到路由上下文中result.getRouteUnits().add(new RouteUnit(new RouteMapper(singleDataSourceRule.getGroupName(), dataSourceName), Collections.emptyList()));return result;}@Overridepublic void decorateRouteContext(final RouteContext routeContext,final QueryContext queryContext, final ShardingSphereDatabase database, final DatabaseDiscoveryRule rule,final ConfigurationProperties props, final ConnectionContext connectionContext) {Collection<RouteUnit> toBeRemoved = new LinkedList<>();Collection<RouteUnit> toBeAdded = new LinkedList<>();for (RouteUnit each : routeContext.getRouteUnits()) {// 获取当前路由上下文的数据源名称String dataSourceName = each.getDataSourceMapper().getLogicName();// 获取数据源对应的数据库发现规则Optional<DatabaseDiscoveryDataSourceRule> dataSourceRule = rule.findDataSourceRule(dataSourceName);// 存在对应的数据库发现规则if (dataSourceRule.isPresent() && dataSourceRule.get().getGroupName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {toBeRemoved.add(each);// 获取规则中的主数据源String actualDataSourceName = new DatabaseDiscoveryDataSourceRouter(dataSourceRule.get()).route();// 创建路由单元,添加到路由上下文中toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));}}routeContext.getRouteUnits().removeAll(toBeRemoved);routeContext.getRouteUnits().addAll(toBeAdded);}@Overridepublic int getOrder() {return DatabaseDiscoveryOrder.ORDER;}@Overridepublic Class<DatabaseDiscoveryRule> getTypeClass() {return DatabaseDiscoveryRule.class;}
}

数据库发现路由器的路由规则比较简单,在RouteContext上下文装饰的方法中,遍历路由单元,获取当前路由数据源,匹配数据库发现中的组名,获取对应组中的主数据源。

注:在数据库发现规则中,所有的操作都是路由到主库。

小结

关于数据库发现规则先介绍到这里,以下做一个小结:

1)数据库发现规则可以和读写分离规则结合一起使用;

1.1)在读写分离规则中,配置读写分离策略为动态策略及“允许自动识别的数据源名称”;

1.2)配置数据库发现规则的数据源组名,为对应的“允许自动识别的数据源名称”;

1.3)配置数据库发现规则的提供者类型,不同的提供者类型有对应的提供者算法;

1.4)数据库发现规则对应的对象为DatabaseDiscoveryRule,该对象主要执行如下:

a)创建日程上下文,用于心跳检测,根据配置的cron,定时访问数据源,确认数据源的状态;

b)通过配置的发现规则,创建对应的发现提供器算法DatabaseDiscoveryProviderAlgorithm对象,存放在Map中,key为配置的提供者名称;

c)通过配置的发现数据源、心跳检测规则,创建DatabaseDiscoveryDataSourceRule对象,存放在Map中,key为组名;

d)查找 DatabaseDiscoveryDataSourceRule 数据源中的主副本关系,确认主数据源名称;

e)提供通过组名,获取主数据源的方法。从DatabaseDiscoveryDataSourceRule中获取;

f)提供通过组名,获取副本数据源的方法。从DatabaseDiscoveryDataSourceRule中获取;

g)通过主、副数据源,结合读写分离规则,实现了读写分离;

2)数据库发现规则也可以单独作为数据源路由器使用;

在数据库发现规则路由器中,通过相应的数据库提供者算法,获取主从数据库中的主库。将SQL路由到对应组的主数据源中执行;

3)系统默认执行的数据库发现类型包括openGauss.NORMAL_REPLICATION、MySQL.MGR:MySQL、MySQL.NORMAL_REPLICATION;

3.1)NORMAL_REPLICATION:为普通的主从复制,通过对应数据库的主从操作SQL语句确认数据源的信息(如 show slave status 等);

3.2)MySQL.MGR:MySQL Group Replication 是MySQL官方在MySQL 5.7.17版本中以插件形式推出的主从复制高可用技术,在主从复制的基础上,添加了主库选举等功能,确保主库的高可用等。在ShardingSphere中,通过MGR相关的操作SQL语句确认数据源的信息;

关于本篇内容你有什么自己的想法或独到见解,欢迎在评论区一起交流探讨下吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63416.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024 数学建模国一经验分享

2024 数学建模国一经验分享 背景&#xff1a;武汉某211&#xff0c;专业&#xff1a;计算机科学 心血来潮&#xff0c;就从学习和组队两个方面指点下后来者&#xff0c;帮新人避坑吧 2024年我在数学建模比赛中获得了国一&#xff08;教练说论文的分数是湖北省B组第一&#xff0…

可视化数据分析系统:提升企业决策效率的重要工具

作为数字化时代的企业&#xff0c;数据的重要性对于当今企业来说不言而喻。有效的针对企业内部数据进行深度分析就成为了目前企业面临的关键所在。可视化数据分析系统因此就在这样的背景之下出现的&#xff0c;通过直观清晰的数据展示&#xff0c;可以帮助企业管理层快速提高决…

XML与HTML的区别汇总

XML的基本格式规则 主要规则&#xff1a; XML文档必须格式良好(well-formed)所有标签必须关闭标签名称区分大小写HTML内容需要转义属性值必须使用引号不能有交叉嵌套 XML声明&#xff08;可选但推荐&#xff09;&#xff1a; <?xml version"1.0" encoding&quo…

C语言 字符数组/多维数组/函数/作用域

1. 遍历数组 遍历数组:通过循环的方式来把数组中的每个元素数据进行查询 使用for循环遍历数组更多一些 数组长度计算: 数组总字节数/元素的数据类型的字节数 数组总字节数/第一个元素的字节数 数组遍历相关的案例:求和,求平均值,求最大值,求最小值,冒泡排序 2. 字符数组 …

Centos7环境下nifi单机部署

Centos7环境下nifi单机部署 前言一、安装Nifi1.1 下载并解压1.2 修改配置文件 二、启动Nifi程序三、Nifi的简单使用3.1 文件移动3.2 本地文件传到HDFS 参考博客 前言 本以为在服务器上部署nifi很简单&#xff0c;跟着教程走就好&#xff0c;但是并没有成功&#xff0c;可能是因…

c++的应用

整理思维导图周五剩下的三个笔试题利用函数重载&#xff0c;实现对整形数组的冒泡排序&#xff0c;对浮点型数组的冒泡排序整理课上内容在堆区申请一个数组的空间&#xff0c;并完成对该数组中数据的输入和输出&#xff0c;程序结束释放堆区空间 冒泡排序效果图&#xff1a; 代…

YOLOv8-ultralytics-8.2.103部分代码阅读笔记-tuner.py

tuner.py ultralytics\engine\tuner.py 目录 tuner.py 1.所需的库和模块 2.class Tuner: 1.所需的库和模块 # Ultralytics YOLO &#x1f680;, AGPL-3.0 license# 模块提供用于对象检测、实例分割、图像分类、姿势估计和多对象跟踪的 Ultralytics YOLO 模型的超参数调整…

【仪器仪表】怎么模拟电池短路、正极开路或负极开路

最近新人需要做一个电池充放电工装的测试验证板。这种验证板需要模拟很多状态,比如电池有可能发生短路、电池的正极开路、电池负极开路、电池内阻上升、电池电压过高、电池电压过低、电池反接等等。 在规划电池短路、正极开路或负极开路的电路时,本来想用一个继电器做线路的开…

FlinkCDC实战:将 MySQL 数据同步至 ES

&#x1f4cc; 当前需要处理的业务场景: 将订单表和相关联的表(比如: 商品表、子订单表、物流信息表)组织成宽表, 放入到 ES 中, 加速订单数据的查询. 同步数据到 es. 概述 1. 什么是 CDC 2. 什么是 Flink CDC 3. Flink CDC Connectors 和 Flink 的版本映射 实战 1. 宽表查…

使用 Glide 加载占位图或错误图时,发现它们没有应用圆角效果--问题解决

如果您在使用 Glide 加载占位图或错误图时,发现它们没有应用圆角效果,可能是因为占位图和错误图的加载方式没有使用自定义的圆角变换。以下是确保占位图和错误图都能显示圆角效果的步骤。 1. 确保自定义变换类正确 首先,确保您的 GlideRoundTransformUtil 类实现正确。以下…

Cobalt Strike 4.8 用户指南-第十二节 可拓展 PE,进程注入和后渗透

12.1、概述 Malleable C2 文件不仅仅是通信指标。Malleable C2 配置文件还能控制 Beacon 的内存特性&#xff0c;决定 Beacon 如何进行进程注入&#xff0c;并影响 Cobalt Strike 的后渗透工作。本章将介绍 Malleable C2 语言的这些扩展。 # 12.2、PE和内存指标 Malleable C…

unity 让文字变形

效果&#xff1a; using TMPro; using UnityEngine; using NaughtyAttributes;[ExecuteInEditMode] public class TMTextPerpective : MonoBehaviour {[OnValueChanged("DoPerspective")][Range(-1f, 1f)]public float CenterBias 0f;[OnValueChanged("DoPers…

NIO - selector简单介绍

一 前言 selector作为NIO当中三大组件之一&#xff0c;是处理NIO非阻塞模式下的核心组件&#xff0c;它允许一个单个线程管理多个通道。 NIO下的阻塞模式 因为对于阻塞模式下的NIO模式&#xff0c;存在很大的问题&#xff0c;即使在单线程下&#xff0c;对应的服务端也会一直进…

C语言:分支结构

C语言&#xff1a;分支结构 分支结构 问题引出 我们在程序设计往往会遇到如下的问题&#xff0c;比如下面的函数的计算 也就是我们是必须要通过一个条件的结果来选择下一步的操作&#xff0c;算法上属于一个分支结构&#xff0c;C语言中实现分支结构主要使用if语句 条件判断…

利用anzocapital昂首资本技术优化订单执行

在金融市场的深海中&#xff0c;anzocapital昂首资本作为内行&#xff0c;深知订单执行的技术缺陷如何悄然侵蚀交易者的利润。那么&#xff0c;一个懂交易的人会如何避免这些缺陷&#xff0c;确保自己的投资策略不被市场波动所左右呢? 在订单执行过程中&#xff0c;技术缺陷可…

数据库基础入门:从零开始学习数据库的核心概念

数据库是现代软件开发的核心组成部分之一&#xff0c;无论是网站、手机应用还是企业管理系统&#xff0c;都离不开数据库的支持。本文将带你从零开始&#xff0c;逐步了解数据库的基本概念和常见操作。 什么是数据库&#xff1f; 数据库&#xff08;Database&#xff09;是一个…

RTR Chaptor11 下

全局光照 定向遮蔽预计算定向遮蔽定向遮蔽的动态计算使用定向屏蔽进行着色 满反射全局光照表面预照明定向表面预照明预计算传输存储方法动态漫反射全局光照光照传播体积基于体素的方法屏幕空间方法其他方法 镜面全局光照局部环境贴图环境贴图的动态更新基于体素的方法平面反射屏…

java如何解析和生成sql?

1.什么是 JSQLParser&#xff1f; JSQLParser 是一个开源的 Java 库&#xff0c;用于解析 SQL 语句并将其转换为抽象语法树&#xff08;AST&#xff09;。它支持多种 SQL 方言&#xff0c;包括 MySQL、PostgreSQL、Oracle 和 SQL Server 等。JSQLParser 使开发者能够轻松地分析…

【Apache Paimon】-- 4 -- Flink 消费 kafka 数据,然后写入 paimon

目录 1、本地开发环境 2、kafka2paimon 实现流程 3、代码实现 3.1、项目名称 3.2、项目结构 3.3、Pom.xml 和 log4j.properties 文件 3.4、代码核心类 3.4.1、入口类:Kafka2PaimonDemo.java 3.4.2、参数解析类 3.4.2.1、JobParameterUtil.java( flink job schedule…

超越DFINE最新目标检测SOTA模型DEIM

代码地址&#xff1a;https://github.com/ShihuaHuang95/DEIM 论文地址&#xff1a;DEIM: DETR with Improved Matching for Fast Convergence 论文中文版&#xff1a;DEIM: 改进匹配的 DETR 以实现快速收敛 以下是文章的主要贡献和发现&#xff1a; DEIM框架&#xff1a;提…