FlinkSQL的常用语言

FlinkSQL 常用语言指南

FlinkSQL 是 Apache Flink 提供的 SQL 接口,允许用户使用标准 SQL 或扩展的 SQL 语法来处理流式和批式数据。以下是 FlinkSQL 的常用语言元素和操作:

  1. 基本查询
-- 选择查询
SELECT * FROM table_name;-- 带条件的查询
SELECT column1, column2 FROM table_name WHERE condition;-- 分组聚合
SELECT user_id, COUNT(*) as cnt 
FROM orders 
GROUP BY user_id;
  1. 时间属性定义
-- 定义处理时间
CREATE TABLE orders (order_id STRING,product STRING,amount DOUBLE,order_time TIMESTAMP(3),-- 声明处理时间属性proc_time AS PROCTIME()
) WITH (...);-- 定义事件时间和水位线
CREATE TABLE orders (order_id STRING,product STRING,amount DOUBLE,order_time TIMESTAMP(3),-- 声明事件时间属性WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
  1. 窗口操作
-- 滚动窗口
SELECT window_start, window_end, SUM(amount) as total_amount
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;-- 滑动窗口
SELECT window_start, window_end, user_id,SUM(amount) as total_amount
FROM TABLE(HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTES, INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, user_id;-- 会话窗口
SELECT window_start, window_end, user_id,COUNT(*) as event_count
FROM TABLE(SESSION(TABLE orders, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end, user_id;
  1. 连接操作
-- 常规连接
SELECT o.order_id, o.product, u.user_name
FROM orders AS o
JOIN users AS u ON o.user_id = u.user_id;-- 时间区间连接
SELECT o.order_id, p.promotion_name,o.order_time,o.amount
FROM orders o
JOIN promotions p 
ON o.product_id = p.product_id
AND o.order_time BETWEEN p.start_time AND p.end_time;-- 窗口连接
SELECT o.order_id,s.shipment_id,o.order_time,s.ship_time,TIMESTAMPDIFF(HOUR, o.order_time, s.ship_time) as hours_to_ship
FROM orders o
JOIN shipments s
ON o.order_id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '1' HOUR AND s.ship_time;
  1. 常用函数

标量函数

-- 字符串函数
SELECT LOWER(name), SUBSTRING(email, 1, 5) FROM users;-- 数学函数
SELECT ABS(amount), ROUND(price, 2) FROM products;-- 时间函数
SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd'),TIMESTAMPDIFF(DAY, order_time, CURRENT_TIMESTAMP)
FROM orders;

聚合函数

SELECT COUNT(*) as total_orders,SUM(amount) as total_amount,AVG(amount) as avg_amount,MAX(amount) as max_amount,MIN(amount) as min_amount
FROM orders;

窗口函数

SELECT product_id,order_time,amount,ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY order_time) as row_num,SUM(amount) OVER (PARTITION BY product_id ORDER BY order_time ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as moving_sum
FROM orders;
  1. DDL 语句
-- 创建表
CREATE TABLE orders (order_id STRING,product_id STRING,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'kafka:9092','format' = 'json'
);-- 创建视图
CREATE VIEW large_orders AS
SELECT * FROM orders WHERE amount > 1000;-- 创建函数
CREATE FUNCTION my_udf AS 'com.example.MyUDF';
  1. DML 语句
-- 插入数据
INSERT INTO target_table
SELECT * FROM source_table WHERE amount > 100;-- 更新数据 (Flink 1.12+ 支持有限)
UPDATE orders SET amount = 200 WHERE order_id = '123';-- 删除数据 (Flink 1.12+ 支持有限)
DELETE FROM orders WHERE order_id = '456';
  1. 模式匹配 (MATCH_RECOGNIZE)
SELECT *
FROM orders
MATCH_RECOGNIZE (PARTITION BY user_idORDER BY order_timeMEASURESSTART_ROW.order_id AS start_order,LAST(PRICE_DOWN.order_id) AS bottom_order,LAST(PRICE_UP.order_id) AS end_orderONE ROW PER MATCHAFTER MATCH SKIP TO LAST PRICE_UPPATTERN (START_ROW PRICE_DOWN+ PRICE_UP+)DEFINEPRICE_DOWN AS (LAST(PRICE_DOWN.amount, 1) IS NULL AND PRICE_DOWN.amount < START_ROW.amount) OR PRICE_DOWN.amount < LAST(PRICE_DOWN.amount, 1),PRICE_UP AS PRICE_UP.amount > LAST(PRICE_DOWN.amount, 1)
) MR;
  1. 配置参数
-- 设置参数
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5 s';
SET 'table.exec.mini-batch.size' = '1000';
  1. 常用连接器配置
-- Kafka 源表
CREATE TABLE kafka_source (id INT,name STRING,event_time TIMESTAMP(3),WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'input_topic','properties.bootstrap.servers' = 'kafka:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'latest-offset','format' = 'json'
);-- JDBC 结果表
CREATE TABLE jdbc_sink (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://mysql:3306/mydb','table-name' = 'sink_table','username' = 'user','password' = 'password'
);

FlinkSQL 不断演进,这里只是举例一些常用的语句,参考官方文档可以获取最新语法和功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/77432.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spring mvc异步请求 sse 大文件下载 断点续传下载Range

学习连接 异步Servlet3.0 Spring Boot 处理异步请求&#xff08;DeferredResult 基础案例、DeferredResult 超时案例、DeferredResult 扩展案例、DeferredResult 方法汇总&#xff09; spring.io mvc Asynchronous Requests 官网文档 spring.io webflux&webclient官网文…

一问看懂——支持向量机SVM(Support Vector Machine)

目录 芜湖~~~支持向量机&#xff08;SVM&#xff09; 1. 引言 2. 基本思想 3. 数学模型 3.1 超平面定义 3.2 分类间隔与目标函数 3.3 软间隔与松弛变量 4. 核函数方法&#xff08;Kernel Trick&#xff09; 4.1 核函数定义 4.2 常用核函数 5. SVM 的几种类型 6. SV…

蓝桥杯 拼数(字符串大小比较)

题目描述 设有 n 个正整数 a1​…an​&#xff0c;将它们联接成一排&#xff0c;相邻数字首尾相接&#xff0c;组成一个最大的整数。 输入格式 第一行有一个整数&#xff0c;表示数字个数 n。 第二行有 n 个整数&#xff0c;表示给出的 n 个整数 ai​。 输出格式 一个正整…

Elasticsearch 系列专题 - 第三篇:搜索与查询

搜索是 Elasticsearch 的核心功能之一。本篇将介绍如何构建高效的查询、优化搜索结果,以及调整相关性评分,帮助你充分发挥 Elasticsearch 的搜索能力。 1. 基础查询 1.1 Match Query 与 Term Query 的区别 Match Query:用于全文搜索,会对查询词进行分词。 GET /my_index/_…

本地电脑使用sshuttle命令将网络流量代理到ssh连接的电脑去实现访问受限网络

本地电脑使用sshuttle命令将网络流量代理到ssh连接的电脑去实现访问受限网络 安装使用 工作过程中, 经常会遇到, 需要访问客户内网环境的问题, 一般都需要安转各式各样的VPN客户端到本地电脑上, 软件多了也会造成困扰, 所有, 找了一款还不错的命令工具去解决这个痛点 安装 官方…

双相机结合halcon的条码检测

以下是针对提供的C#代码的详细注释和解释&#xff0c;结合Halcon库的功能和代码结构进行说明&#xff1a; --- ### **代码整体结构** 该代码是一个基于Halcon库的条码扫描类GeneralBarcodeScan&#xff0c;支持单台或双台相机的条码检测&#xff0c;并通过回调接口返回结果。…

python基础语法12-迭代器与生成器

Python 生成器与迭代器详解 在 Python 中&#xff0c;生成器和迭代器是处理大量数据时的强大工具。它们能够帮助我们节省内存&#xff0c;避免一次性加载过多数据。生成器通过 yield 关键字实现&#xff0c;允许我们逐步产生数据&#xff0c;而迭代器通过实现特定的接口&#…

公司内部建立pypi源

有一篇建立apt源的文章在这里&#xff0c;需要的可以查看&#xff1a;公司内部建立apt源-CSDN博客 server: pip install pypiserver mkdir -d pypi/packages cp test.whl pypi/packages pypi-server run --port 8080 /home/xu/pypi/packages & 网页访问&#xff1a;http:…

VMware Workstation/Player 的详细安装使用指南

以下是 VMware Workstation/Player 的完整下载、安装指南&#xff0c;包含详细步骤、常见问题及解决方法&#xff0c;以及进阶使用技巧&#xff0c;适用于 Windows 和 macOS 用户。 VMware Workstation/Player 的详细安装使用指南—目录 一、下载与安装详细指南1. 系统要求2. 下…

蓝桥杯python组考前准备

1.保留k位小数 round(10/3, 2) # 第二个参数表示保留几位小数 2.输入代替方案&#xff08;加速读取&#xff09; import sys n int(sys.stdin.readline()) # 读取整数&#xff08;不加int就是字符串&#xff09; a, b map(int, sys.stdin.readline().split()) # 一行读取多个…

【JSON2WEB】16 login.html 登录密码加密传输

【JSON2WEB】系列目录 【JSON2WEB】01 WEB管理信息系统架构设计 【JSON2WEB】02 JSON2WEB初步UI设计 【JSON2WEB】03 go的模板包html/template的使用 【JSON2WEB】04 amis低代码前端框架介绍 【JSON2WEB】05 前端开发三件套 HTML CSS JavaScript 速成 【JSON2WEB】06 JSO…

计算机网络起源

互联网的起源和发展是一个充满创新、突破和变革的历程&#xff0c;从20世纪60年代到1989年&#xff0c;这段时期为互联网的诞生和普及奠定了坚实的基础。让我们详细回顾这一段激动人心的历史。 计算机的发展与ARPANET的建立&#xff08;20世纪60年代&#xff09; 互联网的诞生…

洛谷P1824进击的奶牛简单二分

题目如下 代码如下 谢谢观看

如何建立高效的会议机制

建立高效的会议机制需做到&#xff1a;明确会议目标、制定并提前分发议程、控制会议时长、确保有效沟通与反馈、及时跟进执行情况。其中&#xff0c;明确会议目标是核心关键&#xff0c;它直接决定了会议的方向与效率。只有明确目标&#xff0c;会议才不会偏离初衷&#xff0c;…

开源AI大模型AI智能名片S2B2C商城小程序:科技浪潮下的商业新引擎

摘要&#xff1a; 本文聚焦于科技迅猛发展背景下&#xff0c;开源AI大模型、AI智能名片与S2B2C商城小程序的融合应用。通过分析元宇宙、人工智能、区块链、5G等前沿科技带来的商业变革&#xff0c;阐述开源AI大模型AI智能名片S2B2C商城小程序在整合资源、优化服务、提升用户体验…

基于大模型构建金融客服的技术调研

OpenAI-SB api接口 https://openai-sb.com/ ChatGPT与Knowledge Graph (知识图谱)分享交流 https://www.bilibili.com/video/BV1bo4y1w72m/?spm_id_from333.337.search-card.all.click&vd_source569ef4f891360f2119ace98abae09f3f 《要研究的方向和准备》 https://ww…

WSA(Windows Subsystem for Android)安装LSPosed和应用教程

windows安卓子系统WSA的Lsposed和shamiko的安装教程 WSA(Windows Subsystem for Android)安装LSPosed和应用教程 一、环境准备 在开始之前,请确保: 已经安装好WSA(Windows Subsystem for Android)已经安装好ADB工具下载好LSPosed和Shamiko框架安装包 二、连接WSA 首先需要…

辛格迪客户案例 | 河南宏途食品实施电子合约系统(eSign)

01 河南宏途食品有限公司&#xff1a;食品行业的数字化践行者 河南宏途食品有限公司&#xff08;以下简称“宏途食品”&#xff09;作为国内食品行业的创新企业&#xff0c;专注于各类食品的研发、生产和销售。公司秉承“质量为先、创新驱动、服务至上”的核心价值观&#xff…

手机静态ip地址怎么获取?方法与解析‌

而在某些特定情境下&#xff0c;我们可能需要为手机设置一个静态IP地址。本文将详细介绍手机静态IP地址详解及获取方法 一、什么是静态IP地址&#xff1f; 静态IP&#xff1a;由用户手动设置的固定IP地址&#xff0c;不会因网络重启或设备重连而改变。 动态IP&#xff1a;由路…

天下飞飞【老飞飞服务端】+客户端+数据库测试带视频教程

天下飞飞服务器搭建测试视频 天下飞飞【老飞飞服务端】客户端数据库测试带视频教程 完整安装教程。 测试环境 系统server2019 sql2022数据库 sql的安装 odbc搭建 sql加载数据库 此测试端能用于服务器搭建测试。 下载地址为&#xff1a;https://download.csdn.net/d…