Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

1. TableEnvironment

创建 TableEnvironment

from pyflink.table import Environmentsettings, TableEnvironment# create a streaming TableEnvironmentenv_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)# or create a batch TableEnvironmentenv_settings = Environmentsettings.in_batch_mode()table_env = TableEnvironment.create(env_settings)

TableEnvironment 是 Table API 和 SQL 集成的核心概念。

TableEnvironment 可以用来:

  • ·创建 Table
  • ·将 Table 注册成临时表
  • ·执行 SQL 查询
  • ·注册用户自定义的 (标量,表值,或者聚合) 函数
  • ·配置作业
  • ·管理 Python 依赖
  • ·提交作业执行

创建 source 表

table_env.execute_sql("""CREATE TABLE datagen (id INT,data STRING) WITH ('connector' = 'datagen','fields.id.kind' = 'sequence','fields.id.start' = '1','fields.id.end' = '10')""")

创建 sink 表

table_env.execute_sql("""CREATE TABLE print (id INT,data STRING) WITH ('connector' = 'print')""")

2. Table

Table 是 Python Table API 的核心组件。Table 是 Table API 作业中间结果的逻辑表示。

一个 Table 实例总是与一个特定的 TableEnvironment 相绑定。

不支持在同一个查询中合并来自不同 TableEnvironments 的表,例如 join 或者 union 它们。

通过列表类型的对象创建

你可以使用一个列表对象创建一张表:

from pyflink.table import Environmentsettings, TableEnvironment# 创建 批 TableEnvironmentenv_settings = Environmentsettings.in_batch_mode()table_env = TableEnvironment.create(env_settings)table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])table.to_pandas()==>print(table.to_pandas())table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])print(table.to_pandas())

通过 DDL 创建

你可以通过 DDL 创建一张表,execute_sql(stmt) 执行指定的语句并返回执行结果。

执行语句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。

注意,对于 "INSERT INTO" 语句,这是一个异步操作,通常在向远程集群提交作业时才需要使用。

但是,如果在本地集群或者 IDE 中执行作业时,你需要等待作业执行完成。

from pyflink.table import Environmentsettings, TableEnvironment# 创建流 TableEnvironmentenv_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)table_env.execute_sql("""CREATE TABLE random_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='3','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='6')""")table = table_env.from_path("random_source")table.to_pandas()

通过 Catalog 创建

Catalog

Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。

元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。

元数据也可以是持久化的,例如 Hive Metastore 中的元数据。

Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

Catalog类型

GenericInMemoryCatalog

基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

JdbcCatalog

JdbcCatalog使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。

PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。

HiveCatalog

HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。

警告 Hive Metastore 以小写形式存储所有元数据对象名称,GenericInMemoryCatalog 区分大小写。

用户自定义 Catalog

Catalog 是可扩展的,用户可以通过实现 Catalog 接口来开发自定义 Catalog。

想要在 SQL CLI 中使用自定义 Catalog,用户除了需要实现自定义的 Catalog 之外,还需要为这个 Catalog 实现对应的 CatalogFactory 接口。

CatalogFactory 定义了一组属性,用于 SQL CLI 启动时配置 Catalog。

这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到 CatalogFactory 并初始化相应的 Catalog 实例。

创建 Flink 表并将其注册到 Catalog

使用 SQL DDL

用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。

from pyflink.table.catalog import HiveCatalog# Create a HiveCatalogcatalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")# Register the catalogt_env.register_catalog("myhive", catalog)# Create a catalog databaset_env.execute_sql("CREATE DATABASE mydb WITH (...)")# Create a catalog tablet_env.execute_sql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")# should return the tables in current catalog and database.t_env.list_tables()

通过 SQL DDL 创建的表和视图, 例如 “create table …” 和 “create view …",都存储在 catalog 中。

你可以通过 SQL 直接访问 catalog 中的表。

使用 Java/Scala

用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。

from pyflink.table import *
from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
from pyflink.table.descriptors import Kafka
settings = Environmentsettings.in_batch_mode()
t_env = TableEnvironment.create(settings)
# Create a HiveCatalog
catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
# Register the catalog
t_env.register_catalog("myhive", catalog)
# Create a catalog database
database = CatalogDatabase.create_instance({"k1": "v1"}, None)
catalog.create_database("mydb", database)
# Create a catalog table
schema = Schema.new_builder() \.column("name", DataTypes.STRING()) \.column("age", DataTypes.INT()) \.build()  
catalog_table = t_env.create_table("myhive.mydb.mytable", TableDescriptor.for_connector("kafka").schema(schema)// ….build())
# tables should contain "mytable"
tables = catalog.list_tables("mydb")

TableEnvironment 维护了一个使用标识符创建的表的 catalogs 映射。

Catalog 中的表既可以是临时的,并与单个 Flink 会话生命周期相关联,也可以是永久的,跨多个 Flink 会话可见。

如果你要用 Table API 来使用 catalog 中的表,可以使用 “from_path” 方法来创建 Table API 对象:

from_path(path)   通过指定路径下已注册的表来创建一个表,例如通过 create_temporary_view 注册表。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/42166.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式Linux开发实操(九):CAN接口开发

前言: CAN网络在汽车中的使用可以说相当广泛。而CAN网络需要的收发器最常用的就是NXP 的TJA1042: CAN网络:

605. 种花问题

链接 假设有一个很长的花坛&#xff0c;一部分地块种植了花&#xff0c;另一部分却没有。可是&#xff0c;花不能种植在相邻的地块上&#xff0c;它们会争夺水源&#xff0c;两者都会死去。给你一个整数数组 flowerbed 表示花坛&#xff0c;由若干 0 和 1 组成&#xff0c;其中…

8/16总结

WebSocket是双向通信协议&#xff0c;模拟Socket协议&#xff0c;可以双向发送或者接收信息 而Http是单向的 WebSocket是需要浏览器和服务器握手进行建立连接的 而http是浏览器发起向服务器的连接&#xff0c;服务器预先并不知道这个连接 WebSocket在建立握手时&#xff0c;数…

Python3内置函数大全

吐血整理 Python3内置函数大全 1.abs()函数2.all()函数3.any()函数4.ascii()函数5.bin()函数6.bool()函数7.bytes()函数8.challable()函数9.chr()函数10.classmethod()函数11.complex()函数12.complie()函数13.delattr()函数14.dict()函数15.dir()函数16.divmod()函数17.enumer…

注解@JsonInclude

注解JsonInclude 1. 注解由来 JsonInclude是一个用于Java类中字段或方法的注解&#xff0c;它来自于Jackson库。Jackson库是一个用于处理JSON数据的流行开源库&#xff0c;在Java对象和JSON之间进行序列化和反序列化时经常被使用。 2. 注解示例 下面是JsonInclude注解的一个…

【kubernetes】Pod控制器

目录 Pod控制器及其功用 pod控制器有多种类型 1、ReplicaSet ReplicaSet主要三个组件组成 2、Deployment 3、DaemonSet 4、StatefulSet 5、Job 6、Cronjob Pod与控制器之间的关系 1、Deployment 查看控制器配置 查看历史版本 2、SatefulSet 为什么要有headless&…

2023-08-18力扣每日一题

链接&#xff1a; 1388. 3n 块披萨 题意&#xff1a; 一个长度3n的环&#xff0c;选n次数字&#xff0c;每次选完以后相邻的数字会消失&#xff0c;求选取结果最大值 解&#xff1a; 这波是~~&#xff08;ctrl&#xff09;CV工程师了~~ 核心思想是选取n个不相邻的元素一定…

无涯教程-Perl - splice函数

描述 此函数从LENGTH元素的OFFSET元素中删除ARRAY元素,如果指定,则用LIST替换删除的元素。如果省略LENGTH,则从OFFSET开始删除所有内容。 语法 以下是此函数的简单语法- splice ARRAY, OFFSET, LENGTH, LISTsplice ARRAY, OFFSET, LENGTHsplice ARRAY, OFFSET返回值 该函数…

Vue 项目运行 npm install 时,卡在 sill idealTree buildDeps 没有反应

解决方法&#xff1a;切换到淘宝镜像。 以下是之前安装的 xmzs 包&#xff0c;用于控制切换淘宝镜像。 该截图是之前其他项目切换淘宝镜像的截图。 切换镜像后&#xff0c;顺利执行 npm install 。

生成国密密钥对

在线生成国密密钥对 生成的密钥对要妥善保管&#xff0c;丢失是无法找回的。

selinux

一、selinux的说明 二、selinux的工作原理 三、selinux的启动、关闭与查看 Enforcing和permissive都是临时的&#xff0c;重启还是依据配置文件中&#xff0c;禁用selinux&#xff0c;修改配置文件&#xff1a; 之后重启生效 四、selinux对linux服务的影响

SpringBoot 接口调用出现乱码解决 中文乱码

SpringBoot 接口调用出现乱码解决 package com.cxjg.mvc.util;import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.converter.HttpMessageConverter; import org.springfra…

相同数字的积木游戏

题目描述 题目描述 小华和小薇一起通过玩积木游戏学习数学。 他们有很多积木&#xff0c;每个积木块上都有一个数字&#xff0c;积木块上的数字可能相同。 小华随机拿一些积木挨着排成一排&#xff0c;请小薇找到这排积木中数字相同目所处位置最远的2块积木块&#xff0c;计算…

【JAVA】我们该如何规避代码中可能出现的错误?(一)

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️初识JAVA】 文章目录 前言三种类型的异常异常处理JAVA内置异常类Exception 类的层次 前言 异常是程序中的一些错误&#xff0c;但并不是所有的错误都是异常&#xff0c;并且错误有时候是可以避免的&…

【BASH】回顾与知识点梳理(三十三)

【BASH】回顾与知识点梳理 三十三 三十三. 认识系统服务 (daemons)33.1 什么是 daemon 与服务 (service)早期 System V 的 init 管理行为中 daemon 的主要分类 (Optional)systemd 使用的 unit 分类systemd 的配置文件放置目录systemd 的 unit 类型分类说明 33.2 透过 systemctl…

Grounding dino + segment anything + stable diffusion 实现图片编辑

目录 总体介绍总体流程 模块介绍目标检测&#xff1a; grounding dino目标分割&#xff1a;Segment Anything Model (SAM)整体思路模型结构&#xff1a;数据引擎 图片绘制 集成样例 其他问题附录 总体介绍 总体流程 本方案用到了三个步骤&#xff0c;按顺序依次为&#xff1a…

Tomcat 部署优化

Tomcat Tomcat 开放源代码web应用服务器&#xff0c;是由java代码开发的 tomcat就是处理动态请求和基于java代码的页面开发 可以在html当中写入java代码&#xff0c;tomcat可以解析html页面当中的iava&#xff0c;执行动态请求 动态页面机制有问题&#xff1a;不对tomcat进行优…

vue 使用indexDB 简单完整逻辑

1 npm npm install idb 2 代码 <template><div><p>Data: {{ data }}</p><button click"fetchData">Fetch Data</button></div> </template><script> import { openDB } from idb;export default {data() {…

eqtl-GWAS和GWAS-GWAS

目前教程中有eqtl-GWAS和GWAS-GWAS两种模式&#xff0c;其他模式比较少见&#xff0c;还未进行开发 数据类型cc为分类变量即case/control&#xff0c;quant为连续变量&#xff0c;eqtl数据默认quant coloc.abf有两个比较需要注意的点&#xff0c;就是数据集中N是代表样本量&am…

解决Windows系统远程登陆后vscdoe无法输入字符,键盘没有反应,鼠标可以点击,没有反应

文章目录 前言操作过程 前言 使用vscode编译器时&#xff0c;通过远程登录或者屏幕锁屏解锁后&#xff0c;vscode出现无法输入字符内容&#xff0c;但vscode没有死机&#xff0c;切换到其他软件的窗口再切换回来后&#xff0c;可以使用鼠标点击&#xff0c;但是只要使用键盘输…