一、dynamic catalog数据库存储源码分析
dynamic catalog的实现主要涉及到两个类:CoordinatorDynamicCatalogManager、WorkerDynamicCatalogManager,这两个类的详细信息如下:
这两个类主要提供了对catalog的增删改查的方法。trino-435源码中WorkerDynamicCatalogManager类并没有实现CatalogManager接口,需要对该类进行修改实现CatalogManager接口并实现接口中的方法,完成worker节点对catalog增删改查功能
二、JdbcStroeCatalog类的具体实现
该类的详细信息如下:
在代码试下中在构造方法中完成从数据库中加载catalog,并通过内部类中的loadProperties方法完成catalog属性加载,代码具体实现如下:
public final class JdbcCatalogStoreimplements CatalogStore
{private static final Logger log = Logger.get(JdbcCatalogStore.class);private final boolean readOnly;private final Jdbi catalogsJdbi;private final Boolean isCoordinator;private final ConcurrentMap<String, StoredCatalog> catalogs = new ConcurrentHashMap<>();@Injectpublic JdbcCatalogStore(JdbcCatalogStoreConfig config, ServerConfig serverConfig){requireNonNull(config, "config is null");readOnly = config.isReadOnly();isCoordinator = serverConfig.isCoordinator();String catalogsUrl = config.getCatalogConfigDbUrl();String catalogsUser = config.getCatalogConfigDbUser();String catalogsPassword = config.getCatalogConfigDbPassword();loaderJdbcDriver(this.getClass().getClassLoader(), "com.mysql.cj.jdbc.Driver", catalogsUrl);catalogsJdbi = Jdbi.create(catalogsUrl, catalogsUser, catalogsPassword);List<String> disabledCatalogs = firstNonNull(config.getDisabledCatalogs(), ImmutableList.of());List<JdbcStoredCatalog> dbCatalogs = catalogsJdbi.withHandle(handle -> {handle.execute("CREATE TABLE IF NOT EXISTS `catalogs`( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL COMMENT 'catalog名称', `properties` text, `create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间', `update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY index_name (`name`))");return handle.createQuery("SELECT name, properties FROM catalogs").mapToBean(JdbcStoredCatalog.class).list();});for (JdbcStoredCatalog catalog : dbCatalogs) {String catalogName = catalog.getName();checkArgument(!catalogName.equals(GlobalSystemConnector.NAME), "Catalog name SYSTEM is reserved for internal usage");if (disabledCatalogs.contains(catalogName)) {log.info("Skipping disabled catalog %s", catalogName);continue;}catalogs.put(catalog.getName(), catalog);}}@Overridepublic Collection<StoredCatalog> getCatalogs(){return ImmutableList.copyOf(catalogs.values());}@Overridepublic CatalogProperties createCatalogProperties(String catalogName, ConnectorName connectorName, Map<String, String> properties){checkModifiable();return new CatalogProperties(createRootCatalogHandle(catalogName, computeCatalogVersion(catalogName, connectorName, properties)),connectorName,ImmutableMap.copyOf(properties));}@Overridepublic void addOrReplaceCatalog(CatalogProperties catalogProperties){checkModifiable();String catalogName = catalogProperties.getCatalogHandle().getCatalogName();Properties properties = new Properties();properties.setProperty("connector.name", catalogProperties.getConnectorName().toString());properties.putAll(catalogProperties.getProperties());String stringProperties = JSONObject.toJSONString(properties);log.info("add catalog %s with properties %s", catalogName, stringProperties);JdbcStoredCatalog jdbcCatalog = new JdbcStoredCatalog(catalogName, stringProperties);if (isCoordinator) {log.info("The coordinator node catalog needs to be persisted to the database");catalogsJdbi.withHandle(handle -> {handle.createUpdate("INSERT INTO catalogs (name,properties) VALUES (:name, :properties)").bind("name", catalogName).bind("properties", stringProperties).execute();return null;});}catalogs.put(catalogName, jdbcCatalog);}@Overridepublic void removeCatalog(String catalogName){checkModifiable();if (isCoordinator) {log.info("The coordinator node catalog must support persistent deletion");catalogsJdbi.withHandle(handle -> {handle.createUpdate("DELETE FROM catalogs WHERE name = :name").bind("name", catalogName).execute();return null;});}catalogs.remove(catalogName);}private void checkModifiable(){if (readOnly) {throw new TrinoException(NOT_SUPPORTED, "Catalog store is read only");}}/*** This is not a generic, universal, or stable version computation, and can and will change from version to version without warning.* For places that need a long term stable version, do not use this code.*/static CatalogVersion computeCatalogVersion(String catalogName, ConnectorName connectorName, Map<String, String> properties){Hasher hasher = Hashing.sha256().newHasher();hasher.putUnencodedChars("catalog-hash");hashLengthPrefixedString(hasher, catalogName);hashLengthPrefixedString(hasher, connectorName.toString());hasher.putInt(properties.size());ImmutableSortedMap.copyOf(properties).forEach((key, value) -> {hashLengthPrefixedString(hasher, key);hashLengthPrefixedString(hasher, value);});return new CatalogVersion(hasher.hash().toString());}private static void hashLengthPrefixedString(Hasher hasher, String value){hasher.putInt(value.length());hasher.putUnencodedChars(value);}public static class JdbcStoredCatalogimplements StoredCatalog{private String name;private String properties;public JdbcStoredCatalog() {}public JdbcStoredCatalog(String name, String properties){this.name = name;this.properties = properties;}@ColumnName("properties")public String getProperties(){return properties;}public void setProperties(String properties){this.properties = properties;}@ColumnName("name")@Overridepublic String getName(){return name;}public void setName(String name){this.name = name;}@Overridepublic CatalogProperties loadProperties(){final Properties properties = convertStringToProperties(this.properties);Map<String, String> props = new HashMap<>(fromProperties(properties).entrySet().stream().collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().trim())));String connectorNameValue = props.remove("connector.name");checkState(connectorNameValue != null, "Catalog configuration %s does not contain 'connector.name'", this.name);if (connectorNameValue.indexOf('-') >= 0) {String deprecatedConnectorName = connectorNameValue;connectorNameValue = connectorNameValue.replace('-', '_');log.warn("Catalog '%s' is using the deprecated connector name '%s'. The correct connector name is '%s'", name, deprecatedConnectorName, connectorNameValue);}ConnectorName connectorName = new ConnectorName(connectorNameValue);CatalogHandle catalogHandle = createRootCatalogHandle(name, computeCatalogVersion(name, connectorName, props));return new CatalogProperties(catalogHandle, connectorName, ImmutableMap.copyOf(props));}}public static Properties convertStringToProperties(String json) {ObjectMapper objectMapper = new ObjectMapper();Properties properties = new Properties();try {Object jsonObject = objectMapper.readValue(json, Object.class);if (jsonObject instanceof Map) {Map<String, String> map = (Map<String, String>) jsonObject;for (Map.Entry<String, String> entry : map.entrySet()) {properties.setProperty(entry.getKey(), entry.getValue());}} else {throw new IllegalArgumentException("The JSON string should contain a Map object");}} catch (Exception e) {throw new RuntimeException(e.getMessage(), e);}return properties;}private static void loaderJdbcDriver(ClassLoader classLoader, String driverClassName, String catalogUrl) {try {final Class<?> clazz = Class.forName(driverClassName, true, classLoader);final Driver driver = (Driver) clazz.newInstance();if (!driver.acceptsURL(catalogUrl)) {log.error("Jdbc driver loading error. Driver {} cannot accept url.", driverClassName);throw new RuntimeException("Jdbc driver loading error.");}} catch (final Exception e) {throw new RuntimeException("Jdbc driver loading error.", e);}}
}