10分钟了解Flink SQL使用

Flink 是一个流处理和批处理统一的大数据框架,专门为高吞吐量和低延迟而设计。开发者可以使用SQL进行流批统一处理,大大简化了数据处理的复杂性。本文将介绍Flink SQL的基本原理、使用方法、流批统一,并通过几个例子进行实践。

1、Flink SQL基本原理

Flink SQL建立在Apache Flink之上,利用Flink的强大处理能力,使得用户可以使用SQL语句进行流数据和批数据处理。Flink SQL既支持实时的流数据处理,也支持有界的批数据处理。

Flink SQL用SQL作为处理数据的接口语言,将SQL语句转换成数据流图(Dataflow Graph),再由Flink引擎执行。

2、Flink SQL固定编码套路

使用Flink SQL时,我们通常会遵循如下编码套路,这些套路和使用Flink API的套路是一样的:

  • 环境准备:初始化一个TableEnvironment对象,它是执行Flink SQL语句的核心。这个环境可以是流数据环境,也可以是批数据环境。

  • 数据源定义:通过CREATE TABLE语句定义输入数据源(source),可以是Kafka、CSV文件等。

  • 数据处理:编写SQL语句对数据进行处理,如查询、过滤、聚合等。

  • 数据输出:通过CREATE TABLE定义输出数据源(sink),并将处理结果输出。

3、Flink SQL代码示例

以下是一个从CSV文件读取数据,通过SQL查询,再将数据输出到CSV的完整例子。

  • 先准备input.csv文件内容,如下:

1,product_A,10.5
2,product_B,20.3
3,product_C,15.8
1,product_D,12.2
2,product_A,18.7

  • 编写demo代码

编写代码之前先在pom.xml中添加依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version>
</dependency>

示例代码如下:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSqlDemo {public static void main(String[] args) throws Exception {// 设置环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); //为了方便测试看效果,这里并行度设置为1// 使用EnvironmentSettings创建StreamTableEnvironment,明确设置为批处理模式EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode() // 设置为批处理模式,这样后续才能一次性的输出到csv中.build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// 定义输入数据源String createSourceTableDdl = "CREATE TABLE csv_source (" +" user_id INT," +" product STRING," +" order_amount DOUBLE" +") WITH (" +" 'connector' = 'filesystem'," +" 'path' = 'file:///path/input.csv'," +" 'format' = 'csv'" +")";tableEnv.executeSql(createSourceTableDdl);//        // 编写 SQL 查询
//        String query = "SELECT user_id, SUM(order_amount) AS total_amount FROM csv_source GROUP BY user_id";
//        // 执行查询并打印
//        tableEnv.executeSql(query).print();
//        env.execute("Flink SQL Demo");// 定义输出数据源String createSinkTableDdl = "CREATE TABLE csv_sink (" +" user_id INT," +" total_amount DOUBLE" +") WITH (" +" 'connector' = 'filesystem'," +" 'path' = 'file:///path/output.csv'," +" 'format' = 'csv'" +")";tableEnv.executeSql(createSinkTableDdl);// 执行查询并将结果输出到csv_sinkString query = "INSERT INTO csv_sink " +"SELECT user_id, SUM(order_amount) as total_amount " +"FROM csv_source " +"GROUP BY user_id";tableEnv.executeSql(query);
//        env.execute("Flink SQL Job");}
}

  • 执行结果如下:

4、Flink SQL做流批统一

什么是流批统一?

流批统一是大数据处理领域的一个概念,它指的是使用一套代码来同时处理流数据(Streaming)和批数据(Batching)。

流处理和批处理的区别如下:

  1. 批处理(Batch Processing):批处理是指在某一时间点处理大量数据的手段。它通常涉及到对大量静止的(不再变化的)数据集进行一次性的处理。批处理作业通常在数据集完整可用后开始执行,并且经常是在数据仓库中进行。例如,一个电商平台可能在一天结束时运行一个批处理作业来处理当天所有的交易记录。

  2. 流处理(Stream Processing):流处理是指对数据实时进行处理,通常是数据生成或接收的同时立即进行。流处理适用于连续的数据输入,这些数据一直在变化,需要立即响应。例如,社交媒体平台在接收到新的帖子时,可能会实时分析这些帖子的内容和流行趋势。

在早期,流处理和批处理通常需要不同的系统来执行。对于批处理,可能使用如Hadoop这样的框架;而对于流处理,可能使用如Apache Storm这样的框架。这就导致开发者要同时学习多种框架才能处理不同类型的数据作业。

流批统一的概念,就是将这两种数据处理方式合并到一个平台中,这样一个系统既可以处理静止的大批量数据集,也可以处理实时的数据流。这样做的优点是显而易见的:

  • 统一的API:开发人员只需要学习和使用一套工具和API,可以共享更多的代码和逻辑。

  • 维护简便:只需维护一个系统,可以减少学习成本,减轻运维压力,减少故障点。

  • 灵活的数据处理:可以根据不同的业务需求灵活选择数据处理方式。

Flink SQL流批一体的实现原理

Flink很好的实现了流批统一,可以让开发人员用相同的方式来编写批处理和流处理程序。不论是对有界(批处理)还是无界(流处理)的数据源,Flink都可以使用相同的API和处理逻辑来处理数据。

Flink 通过内置的表抽象来实现流批一体,这里的"表"可以是动态变化的(例如,来自实时数据流的表)或是静态的(例如,存储在文件或数据库中的批量数据表)。Flink SQL引擎会根据数据的实际来源自动优化执行计划。

Flink SQL的流批统一核心在于三点:

  • 统一的API和SQL语义:Flink SQL提供一致的查询构建块(如窗口、时间处理函数),这些在流处理和批处理中语义一致,确保不同模式下行为的统一性。

  • 透明的状态处理:无论是流处理还是批处理,Flink都能够保持和恢复状态,为开发者提供一致的高容错性体验。

  • 多模态存储和处理能力:Flink SQL能够访问不同存储介质的数据,这意味着相同的SQL语句可以无缝在流数据和存储的批量数据上执行。

Flink SQL流批统一的代码示例

以下是一个完整的代码示例,用Flink来实现流批统一处理。Flink同时从Kafka 和 CSV读取数据,然后合并查询再输出结果:

  • 代码示例

代码中,先配置了Flink的流处理环境和表环境,然后用DDL语句在Flink中注册了Kafka和文件系统数据源。接着执行了一个SQL查询来合并来自这两种数据源的数据,并计算总金额。最后,打印出查询结果并开始执行Flink作业。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class StreamBatchUnifiedDemo {public static void main(String[] args) throws Exception {// 设置流处理的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// Kafka 流处理表String createKafkaSourceDDL = "CREATE TABLE kafka_stream_orders (" +"order_id STRING," +"amount DOUBLE)" +"WITH (" +"'connector' = 'kafka'," +"'topic' = 'topic_test'," +"'properties.bootstrap.servers' = '10.20.1.26:9092'," +"'format' = 'json'," +"'scan.startup.mode' = 'latest-offset'" +")";tableEnv.executeSql(createKafkaSourceDDL);// 文件系统批处理表String createFilesystemSourceDDL = "CREATE TABLE file_batch_orders (" +"order_id STRING," +"amount DOUBLE)" +"WITH (" +"'connector' = 'filesystem'," +"'path' = 'file:///Users/yclxiao/Project/bigdata/flink-blog/doc/input_order.csv'," +"'format' = 'csv'" +")";tableEnv.executeSql(createFilesystemSourceDDL);// 执行统一查询,计算总金额Table resultTable = tableEnv.sqlQuery("SELECT SUM(amount) FROM (" +"SELECT amount FROM kafka_stream_orders " +"UNION ALL " +"SELECT amount FROM file_batch_orders)");// 打印结果tableEnv.toRetractStream(resultTable, Row.class).print();// 开始执行程序env.execute("Stream-Batch Unified Job");}
}

  • 执行效果

通过以上示例代码,可以看出Flink SQL的流批一体设计:相同的SQL语句可以用在流处理和批处理中,而不需要做任何修改。Flink背后的执行引擎会自动根据数据的特性(流或者批)来进行相应的优化执行。

这就是Flink SQL非常强大的地方,它减少了开发者需要写不同代码逻辑的需求,简化了复杂的数据处理流程。

5、总结

Flink SQL是一个非常强大的数据处理工具,可以应对多种复杂的数据处理场景。

本文主要介绍了Flink SQL的基本原理、编码套路、流批统一,再结合正确的代码示例进行实践。希望对你有帮助。

文章转载自:不焦躁的程序员

原文链接:https://www.cnblogs.com/mangod/p/18187474

体验地址:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/836737.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】17. 进程间通信 --- 管道

1. 什么是进程间通信(进程间通信的目的) 数据传输&#xff1a;一个进程需要将它的数据发送给另一个进程 资源共享&#xff1a;多个进程之间共享同样的资源。 通知事件&#xff1a;一个进程需要向另一个或一组进程发送消息&#xff0c;通知它&#xff08;它们&#xff09;发生了…

Springboot自动装配源码分析

版本 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </par…

第六十节 Java设计模式 - 过滤器/标准模式

Java设计模式 - 过滤器/标准模式 过滤器模式使用不同的条件过滤对象。 这些标准可以通过逻辑操作链接在一起。 过滤器模式是一种结构型模式。 例子 import java.util.List; import java.util.ArrayList;class Employee {private String name;private String gender;private…

决策树学习记录

对于一个决策树的决策面&#xff1a; 他其实是在任意两个特征基础上对于所有的点进行一个分类&#xff0c;并且展示出不同类别的之间的决策面&#xff0c;进而可以很清楚的看出在这两个特征上各个数据点种类的分布。 对于多输出的问题&#xff0c;在利用人的上半张脸来恢复下半…

ICode国际青少年编程竞赛- Python-4级训练场-复杂嵌套for循环

ICode国际青少年编程竞赛- Python-4级训练场-复杂嵌套for循环 1、 for i in range(4):Dev.step(i6)for j in range(3):Dev.turnLeft()Dev.step(2)2、 for i in range(4):Dev.step(i3)for j in range(4):Dev.step(2)Dev.turnRight()Dev.step(-i-3)Dev.turnRight()3、 for i …

Windows关闭NGINX命令

1、首先用cmd进入NGINX的目录下,输入下面命令&#xff0c;查看nginx是否启动 tasklist /fi "imagename eq nginx.exe"2、关闭nginx taskkill /f /t /im nginx.exe3、启动&#xff1a;start nginx 4、重启&#xff1a;nginx -s reload

【牛客】SQL211 获取当前薪水第二多的员工的emp_no以及其对应的薪水salary

1、描述 有一个薪水表salaries简况如下&#xff1a; 请你获取薪水第二多的员工的emp_no以及其对应的薪水salary&#xff0c; 若有多个员工的薪水为第二多的薪水&#xff0c;则将对应的员工的emp_no和salary全部输出&#xff0c;并按emp_no升序排序。 2、题目建表 drop table …

ctfshow 源码审计 web301--web305

web301 在checklogin.php 发现了 $sql"select sds_password from sds_user where sds_username".$username." order by id limit 1;";在联合查询并不存在的数据时&#xff0c;联合查询就会构造一个虚拟的数据就相当于构造了一个虚拟账户&#xff0c;可以…

MSMQ消息队列

MQ是一种企业服务的消息中间节技术&#xff0c;这种技术常常伴随着企业服务总线相互使用&#xff0c;构成了企业分布式开发的一部分&#xff0c;如果考虑到消息的发送和传送之间是可以相互不联系的并且需要分布式架构&#xff0c;则可以考虑使用MQ做消息的中间价技术&#xff0…

如何给远程服务器配置代理

目录 前言 正文 更换镜像源 开始之前 安装过程 遇到的问题 尾声 &#x1f52d; Hi,I’m Pleasure1234&#x1f331; I’m currently learning Vue.js,SpringBoot,Computer Security and so on.&#x1f46f; I’m studying in University of Nottingham Ningbo China&#x1f4…

TFN CK1840B 喇叭天线 定向 18GHz~40GHz

沃比得 CK1840B 喇叭天线 定向 18GHz~40GHz 产品概述 沃比得 CK1840B喇叭天线工作频率为 18GHz~40GHz。具有频带宽&#xff0c; 性能可靠&#xff0c; 增益高等优 点&#xff0c; 是理想的 EMC 测试、电子对抗等领域的定向接收、发射天线。 应用领域 ● 电子对抗领域 ● EM…

IT服务台的演变趋势

在技术进步和用户期望变化的推动下&#xff0c;IT服务台正在经历重大变化。IT服务台的未来将主要受到以下趋势的推动&#xff1a; 先进的人工智能和认知技术 预计高级人工智能 &#xff08;AI&#xff09; 和认知技术在 IT 服务台中的集成度会更高。通过将 IT 服务台集成到 IT…

PMC高手如何玩转跨部门协作?让团队和谐共生

PMC&#xff08;生产与物料控制&#xff09;作为连接生产与供应链的关键部门&#xff0c;其与其他部门之间的协作关系显得尤为重要。本文&#xff0c;深圳天行健精益管理咨询公司分享具体方法如下&#xff1a; 首先&#xff0c;PMC需要明确自己的角色定位。作为生产与供应链之间…

Redis经典问题:数据并发竞争

大家好,我是小米!今天我们要聊的话题是在大流量系统中常见的一个问题:数据并发竞争。不管是火车票系统还是微博系统,一旦出现数据并发竞争,都可能导致用户体验下降,甚至系统崩溃。那么,我们该如何解决这个问题呢?让我们一起来深入探讨! 数据并发竞争 当我们谈论大流…

三. TensorRT基础入门-ONNX注册算子的方法

目录 前言0. 简述1. 执行一下我们的python程序2.转换swin-tiny时候出现的不兼容op的例子3. 当出现导出onnx不成功的时候&#xff0c;我们需要考虑的事情4. unsupported asinh算子5. unsupported deformable conv算子总结参考 前言 自动驾驶之心推出的 《CUDA与TensorRT部署实战…

EmotiVoice 实时语音合成TTS

参考:https://github.com/netease-youdao/EmotiVoice 测试整体速度可以 docker安装: 运行容器:默认运行了两个服务,8501 一个streamlit页面,另外8000是一个api接口服务 docker run -dp 8501:8501 -p 8250:8000 syq163/emoti-voice:latest ##gpu运行 docker run --gpus a…

第四届微调——炼丹

学习地址&#xff1a;Tutorial/xtuner/README.md at main InternLM/Tutorial GitHub 笔记 微调是一种在已有的预训练模型基础上&#xff0c;通过使用新的数据对模型进行进一步优化和调整的技术手段。它的目的是使模型能够更好地适应特定的应用场景和任务需求&#xff0c;进一…

基础ArkTS组件:帧动画,内置动画组件,跑马灯组件(HarmonyOS学习第三课【3.6】)

帧动画 帧动画也叫序列帧动画&#xff0c;其原理就是在时间轴的每帧上逐帧绘制不同的内容&#xff0c;使其连续播放而成动画。ArkUI开发框架提供了 ImageAnimator 组件实现帧动画能力&#xff0c;本节笔者介绍一下 ImageAnimator 组件的简单使用。 官方文献 说明 该组件从A…

9. SVG中的text元素

SVG (Scalable Vector Graphics) 提供了强大的文本渲染能力&#xff0c;其中<text>元素是常用 的文本操作的元素。本文将详细介绍<text>标签的基本使用方法&#xff0c;并展示如何通过<tspan>和<textPath>增强文本的表现力。 <text>标签基础 &…

防爆巡检手持终端在燃气巡检作业中的应用

在燃气巡检作业中&#xff0c;安全始终是首要考虑的因素。面对易燃易爆的燃气环境&#xff0c;传统的巡检方式已经难以满足现代安全管理的需求。随着科技的不断进步&#xff0c;防爆巡检手持终端应运而生&#xff0c;成为燃气巡检作业的得力助手。这些终端不仅具备高度的防爆性…