Flink流批一体计算(14):PyFlink Tabel API之SQL查询

举个例子

查询 source 表,同时执行计算

# 通过 Table API 创建一张表:
source_table = table_env.from_path("datagen")
# 或者通过 SQL 查询语句创建一张表:
source_table = table_env.sql_query("SELECT * FROM datagen")
result_table = source_table.select(source_table.id + 1, source_table.data)

Table API 查询

Table 对象有许多方法,可以用于进行关系操作。

这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。

这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。

Table API 文档描述了流和批处理上所有支持的 Table API 操作。

以下示例展示了一个简单的 Table API 聚合查询:

from pyflink.table import Environmentsettings, TableEnvironment
# 通过 batch table environment 来执行查询
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],['name', 'country', 'revenue'])
# 计算所有来自法国客户的收入
revenue = orders \.select(orders.name, orders.country, orders.revenue) \.where(orders.country == 'FRANCE') \.group_by(orders.name) \.select(orders.name, orders.revenue.sum.alias('rev_sum'))
revenue.to_pandas()

Table API 也支持行操作的 API, 这些行操作包括 Map Operation, FlatMap Operation, Aggregate Operation 和 FlatAggregate Operation.

以下示例展示了一个简单的 Table API 基于行操作的查询

from pyflink.table import Environmentsettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import pandas as pd# 通过 batch table environment 来执行查询
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)], ['name', 'country', 'revenue'])
map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),result_type=DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("revenue", DataTypes.BIGINT())]),func_type="pandas")
orders.map(map_function).alias('name', 'revenue').to_pandas()

SQL 查询

Flink 的 SQL 基于 Apache Calcite,它实现了标准的 SQL。SQL 查询语句使用字符串来表达。SQL 支持Flink 对流和批处理。

下面示例展示了一个简单的 SQL 聚合查询:

from pyflink.table import Environmentsettings, TableEnvironment# 通过 stream table environment 来执行查询env_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)table_env.execute_sql("""CREATE TABLE random_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='8','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='11')""")table_env.execute_sql("""CREATE TABLE print_sink (id BIGINT,data_sum TINYINT) WITH ('connector' = 'print')""")table_env.execute_sql("""INSERT INTO print_sinkSELECT id, sum(data) as data_sum FROM(SELECT id / 2 as id, data FROM random_source)WHERE id > 1GROUP BY id""").wait()

Table API 和 SQL 的混合使用

Table API 中的 Table 对象和 SQL 中的 Table 可以自由地相互转换。

下面例子展示了如何在 SQL 中使用 Table 对象:

create_temporary_view(view_path, table)  将一个 `Table` 对象注册为一张临时表,类似于 SQL 的临时表。

# 创建一张 sink 表来接收结果数据
table_env.execute_sql("""CREATE TABLE table_sink (id BIGINT,data VARCHAR) WITH ('connector' = 'print')
""")
# 将 Table API 表转换成 SQL 中的视图
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('table_api_table', table)
# 将 Table API 表的数据写入结果表
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()

下面例子展示了如何在 Table API 中使用 SQL 表:

sql_query(query)   执行一条 SQL 查询,并将查询的结果作为一个 `Table` 对象。

# 创建一张 SQL source 表
table_env.execute_sql("""CREATE TABLE sql_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='4','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='7')
""")# 将 SQL 表转换成 Table API 表
table = table_env.from_path("sql_source")
# 或者通过 SQL 查询语句创建表
table = table_env.sql_query("SELECT * FROM sql_source")
# 将表中的数据写出
table.to_pandas()

优化

数据倾斜

当数据发生倾斜(某一部分数据量特别大),虽然没有GCGabage Collection,垃圾回收),但是task执行时间严重不一致。

  • 需要重新设计key,以更小粒度的key使得task大小合理化。
  • 修改并行度。
  • 调用rebalance操作,使数据分区均匀。

缓冲区超时设置

由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。

当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。

示例可以参考如下:

env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/42915.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT实现天气预报

1. MainWindow类设计的成员变量和方法 public: MainWindow(QWidget* parent nullptr); ~MainWindow(); protected: 形成文本菜单来用来右键关闭窗口 void contextMenuEvent(QContextMenuEvent* event); 鼠标被点击之后此事件被调用 void mousePressEvent(QMouseEv…

Leetcode每日一题:1444. 切披萨的方案数(2023.8.17 C++)

目录 1444. 切披萨的方案数 题目描述: 实现代码与解析: 二维后缀和 动态规划 原理思路: 1444. 切披萨的方案数 题目描述: 给你一个 rows x cols 大小的矩形披萨和一个整数 k ,矩形包含两种字符: A …

Spring(三):Spring中Bean的生命周期和作用域

前言 在 Spring 中,那些组成应用程序的主体及由 Spring IOC 容器所管理的对象,被称之为 bean。简单地讲,bean 就是由 IOC 容器初始化、装配及管理的对象,除此之外,bean 就与应用程序中的其他对象没有什么区别了。而 b…

Oracle数据库运维大全

以下是一些常见的Oracle数据库运维任务和对应的语句脚本示例: 检查数据库实例状态: SELECT instance_name, status, startup_time FROM v$instance; 查看数据库版本和补丁级别: SELECT * FROM v$version; SELECT patch_id, action, status …

LeetCode 热题 100(四):48. 旋转图像、240. 搜索二维矩阵 II、234. 回文链表

一.48. 旋转图像 题目要求:就是一个顺时针的旋转过程。 思路:观察矩阵,得出翻转前第i行的第J个元素 等于 翻转后倒数第i列的第J个元素,举例说明,第1行第2个元素为“2”,翻转后到了 倒数第1列的第2个元素…

MAC环境,在IDEA执行报错java: -source 1.5 中不支持 diamond 运算符

Error:(41, 51) java: -source 1.5 中不支持 diamond 运算符 (请使用 -source 7 或更高版本以启用 diamond 运算符) 进入设置 修改java版本 pom文件中加入 <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin&l…

vue项目预览pdf功能(解决动态文字无法显示的问题)

最近&#xff0c;因为公司项目需要预览pdf的功能&#xff0c;开始的时候找了市面上的一些pdf插件&#xff0c;都能用&#xff0c;但是&#xff0c;后面因为pdf变成了需要根据内容进行变化的&#xff0c;然后&#xff0c;就出现了需要动态生成的文字不显示了。换了好多好多的插件…

Flink安装与使用

1.安装准备工作 下载flink Apache Flink: 下载 解压 [dodahost166 bigdata]$ tar -zxvf flink-1.12.0-bin-scala_2.11.tgz 2.Flinnk的standalone模式安装 2.1修改配置文件并启动 修改&#xff0c;好像使用默认的就可以了 [dodahost166 conf]$ more flink-conf.yaml 启动 …

【办公自动化】使用Python批量生成PPT版荣誉证书

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

RocketMQ消费者可以手动消费但无法主动消费问题,或生成者发送超时

1.大多数是配置问题 修改rocketmq文件夹broker.conf 2.配置与集群IP或本地IPV4一样 重启 在RocketMQ独享实例中支持IPv4和IPv6双栈&#xff0c;主要是通过在网络层面上同时支持IPv4和IPv6协议栈来实现的。RocketMQ的Broker端、Namesrv端和客户端都需要支持IPv4和IPv6协议&…

Python土力学与基础工程计算.PDF-螺旋板载荷试验

python 求解代码如下&#xff1a; 1. import numpy as np 2. 3. # 已知参数 4. p_a 100 # 标准压力&#xff0c; kPa 5. p np.array([25, 50, 100, 200) # 荷载&#xff0c; kPa 6. s np.array([2.88, 5.28, 9.50, 15.00) / 10 # 沉降量&#xff0c; cm 7. D 10 # 螺旋板直…

C语言:选择+编程(每日一练)

目录 选择题&#xff1a; 题一&#xff1a; 题二&#xff1a; 题三&#xff1a; 题四&#xff1a; 题五&#xff1a; 编程题&#xff1a; 题一&#xff1a;尼科彻斯定理 示例1 题二&#xff1a;等差数列 示例2 本人实力有限可能对一些地方解释和理解的不够清晰&…

Redis知识(一)

目录 Redis过期删除和内存淘汰策略&#xff1a; 过期删除策略&#xff1a; 内存淘汰策略&#xff08;解决内存过大问题&#xff09;&#xff1a; LRU和LFU以及他们在Redis里的实现 主从复制 哨兵模式 缓存 缓存雪崩 缓存击穿 缓存穿透 数据库和缓存一致性问题 Redis…

windows下redis服务启动及.bat文件中中redis服务的启动

windows windows下redis服务的启动 1、不配置环境变量 找到redis服务的安装目录进入命令行窗口并输入命令redis-server.exe redis.windows.conf2、配置环境变量 将redis安装目录配置在path环境变量中之后就可以在cmd窗口的任意位置输入redis-server命令就可以启动redis服务…

材料行业可以转IC设计后端吗?

近来有许多材料行业的小伙伴通过后台来问我对于职业规划的看法&#xff0c;甚至有些小伙伴直接点明了某个行业适不适合自己&#xff0c;那么我这边仅以近年来比较热门的数字芯片设计来展开讲讲&#xff0c;材料适不适合转行做IC呢。 对于理工科的同学而言&#xff0c;选择哪个…

Graal 编译器

一开始,我们来讲一个故事。假设有一个名为 John 的开发人员,他正在尝试编写一些高性能的 Java 代码。他遇到了一些性能和速度问题,因为他的应用需要经常从大量的数据源中获取数据,并进行计算。他尝试了许多优化工具和技术,但是仍然无法满足他的需求。在这个时候,他听说了…

公告:微信小程序备案期限官方要求

备案期限要求 1、若微信小程序未上架&#xff0c;自2023年9月1日起&#xff0c;微信小程序须完成备案后才可上架&#xff0c;备案时间1-20日不等&#xff1b; 2、若微信小程序已上架&#xff0c;请于2024年3月31日前完成备案&#xff0c;逾期未完成备案&#xff0c;平台将按照…

Android Studio实现列表展示图片

效果&#xff1a; MainActivity 类 package com.example.tabulation;import android.content.Intent; import android.os.Bundle; import android.view.View;import androidx.appcompat.app.AppCompatActivity; import androidx.recyclerview.widget.LinearLayoutManager; im…

解决 Maven 创建 Spring Boot 项目时出现 “Cannot access alimaven“ 错误的方法

系列文章目录 文章目录 系列文章目录前言一、确认 Maven 配置二、创建 Spring Boot 项目三、修改项目的 Maven 配置四、清除 Maven 本地仓库五、重新构建项目总结前言 Maven 是 Java 项目的构建工具,而 Spring Boot 则是用于快速构建 Spring 应用程序的框架。但有时,在创建 …

Redis扩容与一致性Hash算法解析

推荐阅读 AI文本 OCR识别最佳实践 AI Gamma一键生成PPT工具直达链接 玩转cloud Studio 在线编码神器 玩转 GPU AI绘画、AI讲话、翻译,GPU点亮AI想象空间 资源分享 「java、python面试题」来自UC网盘app分享&#xff0c;打开手机app&#xff0c;额外获得1T空间 https://dr…