前言
上一篇文章中学习了Calcite基本概念,其中框架的核心能力是通过统一的Sql访问不同来源的数据。这篇文章中将通过一个简单的例子学习如何实现改功能。 最终通过sql来访问Java List中的数据。
准备工作
maven依赖
<dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-core</artifactId><version>1.36.0</version></dependency>
数据源定义
PersonList维护了一个Person对象列表,我们将通过sql访问PersonList中的数据
public class PersonList {private List<Person> personList;private PersonList(List<Person> personList) {this.personList = personList;}public static PersonList create() {return new PersonList(Lists.newArrayList());}public void addPerson(Person person) {personList.add(person);}public Iterator<Person> getIterator() {return personList.iterator();}
}public class Person {private Long id;private String name;private Integer age;public static Person createRandomly() {Person person = new Person();person.setId((long) ((int) ((Math.random() * 1000000000) + 1)));person.setName("Person" + person.getId());person.setAge((int) ((Math.random() * 100) + 1));return person;}// 省略getter setter
}
核心对象介绍
为了适配多源数据,根据上一篇文章,我们需要扩展实现Calcite中的Schema与Table。
schema
在Calcite 中,Schema 是一个非常核心的概念,它代表了一种数据组织方式,用于定义数据源的结构。Schema 可以被理解为数据库中的一个模式(schema),它包含了表(tables)、视图(views)、类型(types)和其他数据库对象的集合。在 Calcite 中,Schema 是数据访问的逻辑组织和层次结构的基础。
-
数据源抽象:Schema 为不同类型的数据源提供了一个统一的抽象层。不论数据存储在何处(如 JDBC 数据库、文件、内存中的集合等),都可以通过定义相应的 Schema 来实现对这些数据的查询和操作。
-
查询解析和执行:在执行 SQL 查询时,Calcite 需要知道每个表的结构,包括列名、数据类型等信息。Schema 提供了这些信息,使 Calcite 能够解析 SQL 语句并生成相应的执行计划。
-
数据访问的统一入口:通过定义 Schema,开发者可以在不改变查询逻辑的情况下,更换后端数据源。这提供了很高的灵活性,使得系统能够适应不同的数据存储需求。
-
支持多数据源和数据联合:Calcite 能够同时访问多个数据源,并且可以在一个查询中联合多个数据源的数据。每个数据源都由一个 Schema 描述,Calcite 负责处理数据源之间的通信和数据转换。
-
扩展性和自定义:开发者可以实现自定义的 Schema,对数据访问逻辑进行定制。这对于特殊的数据格式或特定的性能优化非常有用。
table
Table 是一个核心概念,它代表了可查询的数据集合。在数据库术语中,一个表通常是数据的行和列的集合,而在 Calcite 中,Table 接口更加通用,可以表示不仅仅是传统的表格数据,还可以表示流数据或任何其他形式的结构化数据。
在 Calcite 的架构中,Table 是一个接口,定义在 org.apache.calcite.schema 包中。它提供了对数据的抽象视图,不关心数据的存储方式或物理格式。通过实现不同类型的 Table 接口,Calcite 能够支持多种数据源,如 JDBC 数据库、CSV 文件、内存中的数据结构等。
Calcite 中的 Table 可以具体化为以下几种类型,每种类型都有其特定的用途和功能:
- ScannableTable:这种类型的表支持全表扫描。当查询需要访问表中的所有数据时,可以使用此类型的表。
- FilterableTable:这种表支持在表的实现层面上应用过滤条件。这意味着不需要将整个数据集加载到内存中,而是可以推送过滤逻辑到数据源(如果数据源支持这样的操作),从而提高查询效率。
- ProjectableFilterableTable:结合了 FilterableTable 和投影(选择特定列)的能力。这允许在读取数据时仅返回查询所需的列,减少数据传输和处理的开销。
- TranslatableTable:这种类型的表可以被翻译成特定的执行计划。这是 Calcite 中非常强大的功能,因为它允许开发者自定义如何将 SQL 查询转换为对底层数据源的操作。
代码实现
结构描述
- 扩展实现Schema,对我们要访问的数据源进行抽象
- 扩展实现ScannableTable访问数据集
- 创建Table实例,关联实际数据集
- 创建Schema实列,维护Table实例
- 将创建的Schema加入Calcite
- 创建连接对象
- 执行Sql
- 遍历结果
实现代码
schema与table
public class ListSchema extends AbstractSchema {Map<String, Table> tableMap = new HashMap<>();public void addTable(String name, Table table) {tableMap.put(name, table);}public ListSchema() {}@Overrideprotected Map<String, Table> getTableMap() {return tableMap;}
}public class TableForList extends AbstractTable implements ScannableTable{private PersonList personList;public TableForList(PersonList personList) {this.personList = personList;}@Overridepublic Enumerable<Object[]> scan(DataContext root) {return new DefaultEnumerable<Object[]>() {@Overridepublic Enumerator<Object[]> enumerator() {return null;}@NotNull@Overridepublic Iterator<Object[]> iterator() {Iterator<Person> iterator = personList.getIterator();return new Iterator<Object[]>() {@Overridepublic boolean hasNext() {return iterator.hasNext();}// 迭代返回每行数据@Overridepublic Object[] next() {Person next = iterator.next();return new Object[]{next.getId(),next.getName(),next.getAge()};}};}};}@Overridepublic RelDataType getRowType(RelDataTypeFactory typeFactory) {return typeFactory.builder().add("id", typeFactory.createSqlType(SqlTypeName.BIGINT)).add("name",typeFactory.createSqlType(SqlTypeName.VARCHAR)).add("age",typeFactory.createSqlType(SqlTypeName.INTEGER)).build();}}
主流程
@Test
public void test() throws Exception{PersonList personList = PersonList.create();personList.addPerson(Person.createRandomly());personList.addPerson(Person.createRandomly());personList.addPerson(Person.createRandomly());Table tableForList = new TableForList(personList);ListSchema listSchema = new ListSchema();listSchema.addTable("MyTable", tableForList);Properties info = new Properties();Connection connection = DriverManager.getConnection("jdbc:calcite:", info);CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);SchemaPlus rootSchema = calciteConnection.getRootSchema();rootSchema.add("listSchema", listSchema);Statement statement = calciteConnection.createStatement();ResultSet resultSet = statement.executeQuery("select * from \"listSchema\".\"MyTable\"");while (resultSet.next()) {int columnCount = resultSet.getMetaData().getColumnCount();for (int i = 1; i <= columnCount; i++) {String value = resultSet.getString(i);System.out.printf("%s ",value);}System.out.println();}
}
异常处理清单
1.SqlValidatorException: Object ‘HTTPSCHEMA’ not found; did you mean ‘httpSchema’?
原sql写为 “select * from listSchema.MyTable”, 异常指出在执行 SQL 查询时遇到了问题。错误明确指出了问题所在:Object ‘HTTPSCHEMA’ not found; did you mean ‘httpSchema’?。这意味着 Calcite 在尝试查找名为 HTTPSCHEMA 的对象时失败了,这通常是因为对象名称的大小写不匹配。3种解决方法如下:
- 使用引号:如果你想明确指定标识符的大小写,可以在 SQL 查询中使用双引号将标识符括起来。例如,如果 schema 名称是小写的 listSchema,这样写查询:
select * from “listSchema”.“MyTable”;
- 配置 Calcite 的大小写敏感性:在创建 Calcite 连接时设置 unquotedCasing 来定义对于未加引号的标识符的大小写处理方式。默认情况下,Calcite 将未加引号的标识符视为大写。如果希望 Calcite 忽略标识符的大小写,可以设置 caseSensitive 为 false。
Properties info = new Properties();
info.setProperty(“caseSensitive”, “false”);
Connection connection = DriverManager.getConnection(“jdbc:calcite:”, info);
- 连接参数中设置词法规则,这里我们设置info参数lex,根据官方手册可以有这些参数,BIG_QUERY, JAVA, MYSQL, MYSQL_ANSI, ORACLE (default), SQL_SERVER., 我们设置为JAVA即可
Properties info = new Properties();Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
2. 打印结果 java.sql.SQLException: invalid column ordinal: 0
打印结果代码为,最初实现时,i = 0 开始打印。由于框架是从下标1开始访问,因此i=0 导致输出错误
for (int i = 1; i <= columnCount; i++) {String value = resultSet.getString(i);System.out.printf("%s ",value);}
总结
实现多源访问,了解框架结构及核心类最终实现