Calcite 通过 API 定义 RelNode 示例

1)总结

通过 RelBuilder 创建 RelNode。

2)代码示例

MyRelBuilder

import cn.com.ptpress.cdm.ds.csv.CsvSchema;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.RelBuilder;
import org.junit.jupiter.api.Test;import java.util.List;class MyRelBuilder {/*** LogicalJoin(condition=[=($2, $8)], joinType=[inner])*   LogicalJoin(condition=[=($0, $3)], joinType=[inner])*     LogicalTableScan(table=[[csv, data]])*     LogicalTableScan(table=[[csv, data]])*   LogicalJoin(condition=[=($1, $4)], joinType=[inner])*     LogicalTableScan(table=[[csv, data]])*     LogicalTableScan(table=[[csv, data]])*/@Testpublic void joinTest() {final FrameworkConfig config = MyRelBuilder.config().build();final RelBuilder builder = RelBuilder.create(config);final RelNode left = builder.scan("data").scan("data").join(JoinRelType.INNER, "Id").build();final RelNode right = builder.scan("data").scan("data").join(JoinRelType.INNER, "Name").build();final RelNode result = builder.push(left).push(right).join(JoinRelType.INNER, "Score").build();System.out.println(RelOptUtil.toString(result));}/*** LogicalFilter(condition=[>($1, 90)])*   LogicalProject(Name=[$1], Score=[$2])*     LogicalTableScan(table=[[csv, data]])*/@Testpublic void projectWithFilterTest() {final FrameworkConfig config = MyRelBuilder.config().build();final RelBuilder builder = RelBuilder.create(config);final RelNode node = builder.scan("data").project(builder.field("Name"), builder.field("Score")).filter(builder.call(SqlStdOperatorTable.GREATER_THAN,builder.field("Score"),builder.literal(90))).build();System.out.println(RelOptUtil.toString(node));}/*** LogicalTableScan(table=[[csv, data]])*/@Testpublic void scanTest() {final FrameworkConfig config = MyRelBuilder.config().build();final RelBuilder builder = RelBuilder.create(config);final RelNode node = builder.scan("data").build();System.out.println(RelOptUtil.toString(node));}public static Frameworks.ConfigBuilder config() {final SchemaPlus rootSchema = Frameworks.createRootSchema(true);return Frameworks.newConfigBuilder().parserConfig(SqlParser.Config.DEFAULT).defaultSchema(rootSchema.add("csv", new CsvSchema("data.csv"))).traitDefs((List<RelTraitDef>) null);}
}

CsvSchema

package cn.com.ptpress.cdm.ds.csv;import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.util.Source;
import org.apache.calcite.util.Sources;import java.net.URL;
import java.util.HashMap;
import java.util.Map;public class CsvSchema extends AbstractSchema {private Map<String, Table> tableMap = new HashMap<>();private String dataFiles;public CsvSchema(String dataFile) {this.dataFiles = dataFile;}@Overrideprotected Map<String, Table> getTableMap() {//获取resources下的每隔csv文件,并为其创建CSV表结构for (String dataFile : dataFiles.split(",")) {URL url = ClassLoader.getSystemClassLoader().getResource(dataFile);assert url != null;Source source = Sources.of(url);tableMap.put(dataFile.split("\\.")[0], new CsvTable(source));}return tableMap;}
}

CsvTable

package cn.com.ptpress.cdm.ds.csv;import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Source;import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;public class CsvTable extends AbstractTable implements ScannableTable {private Source source;public CsvTable(Source source) {this.source = source;}/*** 获取字段类型*/@Overridepublic RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {//保存字段和字段类型的映射List<String> names = new LinkedList<>();List<RelDataType> types = new LinkedList<>();try (BufferedReader reader = new BufferedReader(new FileReader(source.file()))) {String[] columnWithType = reader.readLine().split(" ");for (String str : columnWithType) {String name = str.split(":")[0];String type = str.split(":")[1];names.add(name);types.add(relDataTypeFactory.createSqlType(SqlTypeName.get(type)));}} catch (IOException e) {throw new RuntimeException(e);}return relDataTypeFactory.createStructType(Pair.zip(names, types));}@Overridepublic Enumerable<Object[]> scan(DataContext dataContext) {return new AbstractEnumerable<Object[]>() {@Overridepublic Enumerator<Object[]> enumerator() {return new CsvEnumerator<>(source);}};}
}

CsvEnumerator

package cn.com.ptpress.cdm.ds.csv;import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Source;
import org.apache.commons.lang3.time.FastDateFormat;import java.io.BufferedReader;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;public class CsvEnumerator<E> implements Enumerator<E> {private E current;private BufferedReader br;private List<SqlTypeName> types;CsvEnumerator(Source source) {try {this.br = new BufferedReader(source.reader());String[] columnWithType = this.br.readLine().split(" ");types = new ArrayList<>(columnWithType.length);for (String str : columnWithType) {String type = str.split(":")[1];types.add(SqlTypeName.get(type));}} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic E current() {return current;}/*** 判断是否有下一行,并更新current*/@Overridepublic boolean moveNext() {try {String line = br.readLine();if (line == null || "".equals(line.trim())) {return false;}final String[] values = line.split(",");Object[] row = new Object[values.length];for (int i = 0; i < values.length; i++) {row[i] = convert(types.get(i), values[i]);}current = (E) row;    // 如果是多列,这里要多个值} catch (IOException e) {e.printStackTrace();return false;}return true;}@Overridepublic void reset() {throw new UnsupportedOperationException("Error");}@Overridepublic void close() {try {br.close();} catch (IOException e) {e.printStackTrace();}}private static final FastDateFormat TIME_FORMAT_DATE;private static final FastDateFormat TIME_FORMAT_TIME;private static final FastDateFormat TIME_FORMAT_TIMESTAMP;static {final TimeZone gmt = TimeZone.getTimeZone("GMT");TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt);TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt);}private Object convert(SqlTypeName fieldType, String string) {if (fieldType == null) {return string;}switch (fieldType) {case BOOLEAN:if (string.length() == 0) {return null;}return Boolean.parseBoolean(string);case TINYINT:if (string.length() == 0) {return null;}return Byte.parseByte(string);case SMALLINT:if (string.length() == 0) {return null;}return Short.parseShort(string);case INTEGER:if (string.length() == 0) {return null;}return Integer.parseInt(string);case BIGINT:if (string.length() == 0) {return null;}return Long.parseLong(string);case FLOAT:if (string.length() == 0) {return null;}return Float.parseFloat(string);case DOUBLE:if (string.length() == 0) {return null;}return Double.parseDouble(string);case DATE:if (string.length() == 0) {return null;}try {Date date = TIME_FORMAT_DATE.parse(string);return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);} catch (ParseException e) {return null;}case TIME:if (string.length() == 0) {return null;}try {Date date = TIME_FORMAT_TIME.parse(string);return (int) date.getTime();} catch (ParseException e) {return null;}case TIMESTAMP:if (string.length() == 0) {return null;}try {Date date = TIME_FORMAT_TIMESTAMP.parse(string);return date.getTime();} catch (ParseException e) {return null;}case VARCHAR:default:return string;}}
}

resources/data.csv

Id:VARCHAR Name:VARCHAR Score:INTEGER
1,小明,90
2,小红,98
3,小亮,95

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/125123.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大数据-Storm流式框架(六)---Kafka介绍

Kafka简介 Kafka是一个分布式的消息队列系统(Message Queue)。 官网&#xff1a;Apache Kafka 消息和批次 kafka的数据单元称为消息。消息可以看成是数据库表的一行或一条记录。 消息由字节数组组成&#xff0c;kafka中消息没有特别的格式或含义。 消息有可选的键&#x…

Kubernetes Taint(污点) 和 Toleration(容忍)

Author&#xff1a;rab 目录 前言一、Taint&#xff08;污点&#xff09;1.1 概述1.2 查看节点 Taint1.3 标记节点 Taint1.4 删除节点 Taint 二、Toleration&#xff08;容忍&#xff09; 前言 Kubernetes 中的污点&#xff08;Taint&#xff09;和容忍&#xff08;Toleration…

3D模拟场景开发引擎

在3D工程模拟开发中&#xff0c;有一些专门的引擎和工具可供选择&#xff0c;以帮助您创建逼真的三维模拟和模型。以下是一些用于3D工程模拟的开发引擎和工具&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流…

SCADA在污水和供水系统解决方案

1. 引言 随着城市化的不断发展&#xff0c;污水和供水系统的管理变得越来越重要。为了提高运营效率和监控系统状态&#xff0c;许多污水处理厂开始使用SCADA系统。 SCADA系统具有实时数据采集、监控和控制功能&#xff0c;可以帮助污水处理厂运营人员实时了解系统的运行情况&…

Leetcode.274 H 指数

题目链接 Leetcode.274 H 指数 mid 题目描述 给你一个整数数组 c i t a t i o n s citations citations &#xff0c;其中 c i t a t i o n s [ i ] citations[i] citations[i] 表示研究者的第 i i i 篇论文被引用的次数。计算并返回该研究者的 h h h 指数。 根据维基百科…

Python 学习1 基础

文章目录 基础字符串字面量常用的值类型注释变量print语句数据类型数据类型转换标识符运算符 字符串拓展小结 2023.10.28 周六 最近打算学一下Python&#xff0c;毕竟确实简单方便&#xff0c;而且那个编程语言排名还是在第一。不过不打算靠它吃饭&#xff0c;深不深入暂且不说…

linux-vsftp虚拟多用户

目录 1.安装vsftp 2.安装DB工具&#xff0c;能转化普通文件为vsftpd识别数据库加密文件 3.创建登录虚拟用户的名单 4.加密文件 6.需要修改vsftpd的配置文件 7.修改vsftp的配置文件&#xff0c;加载支持虚拟用户模式 8.针对不同用户开启不同权限 9.重启服务 10.测试 安…

React 组件点击事件

点击事件 点击事件方式1、传统类方法&#xff08;不推荐&#xff09;2、传统类方法 16.3.0 - 自动绑定&#xff08;不推荐&#xff09;3、箭头函数3.1、类组件3.2、函数组件3.3、内联箭头函数 4、useState Hook 点击事件方式 1、传统类方法&#xff08;不推荐&#xff09; 当…

记录微调chatglm3

用于记录chatglm3的过程&#xff0c;防止忘记 需要注意的 可以使用xtuner -h查看有哪些功能可以使用。 [2023-10-31 11:40:18,643] [INFO] [real_accelerator.py:158:get_accelerator] Setting ds_accelerator to cuda (auto detect) 10/31 11:40:22 - mmengine - INFO - Ar…

软件测试之接口测试详解

首先&#xff0c;什么是接口呢&#xff1f; 接口一般来说有两种&#xff0c;一种是程序内部的接口&#xff0c;一种是系统对外的接口。 系统对外的接口&#xff1a;比如你要从别的网站或服务器上获取资源或信息&#xff0c;别人肯定不会把数据库共享给你&#xff0c;他只能给…

C++启动线程的方法

&#xff08;1&#xff09;函数指针 情况一&#xff1a;主线程有join&#xff0c;正常执行 #include <thread> #include <iostream>void work(int num) {while(num-- > 0) {std::cout << num << std::endl;} }int main() {std::thread t(work, 5);…

塔望食观察丨从“一药难求”看国内退烧药品牌是怎样炼成的

随着新冠疫情防疫的全面放开&#xff0c;感染患者不断增多&#xff0c;市民在未知的恐慌中开启了囤药模式&#xff0c;药店中的“四类药”&#xff08;退烧、止咳、抗病毒、抗生素类药品&#xff09;被一抢而空&#xff0c;尤其是以退烧类药物更为短缺&#xff0c;以解热镇痛的…

简单工厂模式

1 概念及特点 简单工厂模式属于类的创建型模式,又叫做静态工厂方法模式。 通过专门定义一个类来负责创建其他类的实例&#xff0c;被创建的实例通常都具有共同的父类。 简单工厂模式可以减少客户程序对类创建过程的依赖。 2 实现步骤 1. 提供一个工厂类 简单工厂模式的核…

针灸养生服务预约小程序的效果如何

针灸服务的市场需求度很高&#xff0c;每个城市中都有不少相关品牌&#xff0c;对商家来说&#xff0c;如何引流拓客、打造品牌是生意开展的首要条件之一&#xff0c;且主要以同城用户为主&#xff0c;或连锁情况下为各店引流&#xff0c;但传统线下模式很难实现生意拓展。 那…

缓存和数据库一致性解决方案

引入缓存提高性能 如果你的业务处于起步阶段&#xff0c;流量非常小&#xff0c;那无论是读请求还是写请求&#xff0c;直接操作数据库即可&#xff0c;这时你的架构模型是这样的&#xff1a; 但随着业务量的增长&#xff0c;你的项目请求量越来越大&#xff0c;这时如果每次都…

双十一限时优惠!沃通SSL证书、代码签名证书年度好价

2023年11月01日至11月11日&#xff0c;沃通2023“双十一限时特惠”活动&#xff0c;精选9款SSL证书、国密SSL证书、代码签名证书产品推出年度好价&#xff0c;部分SSL证书产品低至5折&#xff0c;更有EV代码签名证书爆款特惠&#xff01;多种数字证书一站式采购&#xff0c;解决…

外汇天眼:GOMAX──假网友热心教投资,高返利活动骗入金

在通讯科技如此发达的今日&#xff0c;人们愈来愈习惯透过网路交友&#xff0c;寻找志同道合的伙伴&#xff0c;甚至发展一段亲密关系。 然而&#xff0c;近年来假交友诈骗十分猖獗&#xff0c;至今已造成许多民众极大的财务损失&#xff0c;成为无法忽视的社会问题。 不久前&a…

听GPT 讲Rust源代码--library/std(6)

题图来自 Why you should use Python and Rust together[1] File: rust/library/std/src/sys/unix/thread_parking/netbsd.rs 文件netbsd.rs位于Rust源代码的rust/library/std/src/sys/unix/thread_parking目录下。该文件是Rust标准库中用于Unix操作系统的线程等待和唤醒机制的…

SQLAlchemy删除所有重复的用户|Counter类运用

Python标准库中的collections模块中的Counter类。Counter类用于计算可迭代对象中元素的出现次数&#xff0c;并以字典的形式返回结果&#xff0c;其中键是元素&#xff0c;值是该元素的出现次数。 for name, count in Counter(names).items() 是一个循环语句&#xff0c;它用于…

【小算法】C++ 时间戳转换

背景 使用 C 而不是 C 的方法来实现时间戳转换问题。 方法 简化 #include <ctime> #include <chrono> #include <iomanip> using sys_clock std::chrono::system_clock; using time_point_t sys_clock::time_point; using time_duration_t sys_clock:…