目录
Table API 案例
Table API 连接操作
Table API 是批处理和流处理的统一的关系型 API。Table API 的查询不需要修改代码就可以采用批输入或流输入来运行。Table API 是 SQL 语言的超集,并且是针对 Apache Flink 专门设计的。Table API 集成了 Scala,Java 和 Python 语言的 API。Table API 的查询是使用 Java,Scala 或 Python 语言嵌入的风格定义的,有诸如自动补全和语法校验的 IDE 支持,而不是像普通 SQL 一样使用字符串类型的值来指定查询。
Table API 案例
1. 示例:订单分组计数
下面的例子中假定有一张叫 Orders 的表,表中有属性 (orderId, dept, amount, orderTime) 。orderTime 字段是流任务中的逻辑时间属性或是批任务中的普通时间戳字段。
按照部门分组,统计每个部门的单量(数据来源于 CSV 文件)。
具体代码实现如下:
import org.apache.flink.table.api.*;import java.net.URLDecoder;import static org.apache.flink.table.api.Expressions.$;public class TableApiGroupExample {public static void main(String[] args) throws Exception {EnvironmentSettings settings = EnvironmentSettings.newIn