Flink SQL示例

1. 简单订单统计

假设有以下两个订单流数据,数据字段分别为用户ID、购买的商品名称、商品数量。

数据流A:

1L,"尺子",3

1L,"铅笔",4

3L,"橡皮",2

数据流B:

2L,"手表",3

2L,"笔记本",3

4L,"计算器",1

目标:合并两个流的数据,并筛选出商品数量大于2的订单数据。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{$,EnvironmentSettings}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api_/**
* Flink SQL统计订单流数据
* 知识点:DataStream转为Table、视图,Table转为DataStream
*/
object FlinkSQLDemo{def main(args:Array[String]):Unit={//创建流执行环境val env=StreamExecutionEnvironment.getExecutionEnvironment//创建EnvironmentSettings实例并设置参数val settings=EnvironmentSettings.newInstance() //创建一个用于创建EnvironmentSettings实例的构建器.useBlinkPlanner() //将Blink计划器设置为所需的模块(默认).inStreamingMode() //设置组件以流模式工作,默认启用.build() //创建一个不可变的EnvironmentSettings实例//构建流式表执行环境StreamTableEnvironmentval tableEnv: StreamTableEnvironment=StreamTableEnvironment.create(env,settings)//构建订单数据流Aval orderStreamA:DataStream[Order]=env.fromCollection(List(Order(1L,"尺子",3),Order(1L,"铅笔",4),Order(3L,"橡皮",2)))//构建订单数据流Bval orderStreamA:DataStream[Order]=env.fromCollection(List(Order(2L,"手表",3),Order(2L,"笔记本",3),Order(4L,"计算器",1)))//将DataStream转为Table,并指定Table的所有字段val tableA: Table=tableEnv.fromDataStream(orderStreamA,$"user",$"product",$"amount")//将Table的schema以摘要格式打印到控制台tableA.printSchema()//(// 'user' BIGINT,// 'product' STRING,// 'user' INT,//)//将DataStream转为视图,视图名称为tableB,并指定视图的所有字段tableEnv.createTemporaryView("tableB",orderStreamB,$("user"),$("product"),$("amount"))//执行SQL查询,合并查询结果println("tableA默认表名:"+tableA.toString)val resultTable:Table=tableEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount>2" +"UNION ALL "+"SELECT * FROM tableB WHERE amount > 2")//将结果Table转为仅追加流val dataStreamResult=tableEnv.toAppendStream[Order](resultTable)//将流打印到控制台dataStreamResult.print()//触发程序执行env.execute()}
}//创建订单样例
case class Order(user:Long,product:String,amount:Int)

在IDEA本地执行上述代码,控制台输出结果如下:

1> Order(1,铅笔,4)

11> Order(2,笔记本,3)

10> Order(2,手表,3)

12> Order(1,尺子,3)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/736680.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

51、WEB攻防——通用漏洞验证码识别复用调用找回密码重定向状态值

文章目录 回显状态判断用户名重定向验证码回显显示验证码简单机制验证码复用验证码智能识别验证码接口调用安全修复建议 回显状态判断 request前端判断不安全(前端接收验证的返回值来进行判断),使用burp的Response to this request可以抓取返回包~ 这种…

【JS】APIs:事件流、事件委托、其他事件、页面尺寸、日期对象与节点操作

1 事件流 捕获阶段&#xff1a;从父到子 冒泡阶段&#xff1a;从子到父 1.1 事件捕获 <body> <div class"fa"><div class"son"></div> </div> <script>const fadocument.querySelector(.fa);const sondocument.qu…

机器视觉学习(一)—— 认识OpenCV、安装OpenCV

目录 一、认识OpenCV 二、通过pip工具安装OpenCV 三、PyCharm安装OpenCV 一、认识OpenCV OpenCV&#xff08;Open Source Computer Vision Library&#xff0c;开源计算机视觉库&#xff09;是一个跨平台的计算机视觉库&#xff0c;最初由威尔斯理工学院的Gary Bradski于199…

强烈安利!FastReport 商业图形库,炫酷可视化报告开发首选~

FastReport Business Graphics .NET&#xff0c;是一款基于fastreport报表开发控件的商业图形库&#xff0c;借助 FastReport 商业图形库&#xff0c;您可以可视化不同的分层数据&#xff0c;构建业务图表以进行进一步分析和决策。利用数据呈现领域专家针对 .NET 7、.NET Core、…

NXP iMX8MM Cortex-M4 核心 GPT Capture 测试

By Toradex秦海 1). 简介 NXP i.MX8 系列处理器均为异构多核架构 SoC&#xff0c;除了可以运行 Linux 等复杂操作系统的 Cortax-A 核心&#xff0c;还包含了可以运行实时操作系统比如 FreeRTOS 的 Cortex-M 核心&#xff0c;本文就演示通过 NXP i.MX8MM 处理器集成的 Cortex-…

Node.Js编码注意事项

Node.js 中不能使用 BOM 和 DOM 的 API&#xff0c;可以使用 console 和定时器 APINode.js 中的顶级对象为 global&#xff0c;也可以用 globalThis 访问顶级对象 浏览器端js的组成 Node.js中的JavaScript组成 相比较之下发现只有console与定时器是两个API所共有的&#xff…

HarmonyOS系统开发基础环境搭建

目录 一 鸿蒙介绍&#xff1a; 1.1 HarmonyOS系统 1.2 HarmonyOS软件编程语言 二 HarmonyOS编程环境搭建 1.1 官网下载地址 1.2搭建开发流程 1.3 创建安装目录 1.4 下载DevEco Studio​编辑 1.5 下载后点击安装 1.6 自动添加桌面快捷和bin路径 ​编辑1.7 安装好运行 …

二,几何相交---4,BO算法---(1)接近性和可分离性

提了三个观点 1&#xff0c;如果一条直线&#xff08;比如竖直&#xff09;可以分开两个线段&#xff0c;则这两个线段不相交 2&#xff0c;只需要观察与隔离线相交的几个线段 3&#xff0c;从左向右扫描线只需要观察每个线段的两个端点和一些可能的相交点。

HarmonyOS (一)ArkTS起源及UI框架

目录 1 引言 2 框架 3 ArkUI 4 特点 5 总结 1 引言 Mozilla创造了JS&#xff0c;Microsoft创建了TS&#xff0c;Huawei进一步推出了ArkTS。 ArkTS是HarmonyOS优选的主力应用开发语言。它在TypeScript&#xff08;简称TS&#xff09;的基础上&#xff0c;扩展了声明式UI、…

后悔没有早点看到这份产品说明书模板

产品说明书是连接产品与消费者的桥梁&#xff0c;它对产品具有多重好处。一份设计精良、内容准确的产品说明书有助于消费者全面了解产品&#xff0c;确保用户正确使用产品&#xff1b;减少消费者因误操作导致的故障&#xff0c;降低企业的售后服务成本&#xff1b;增强消费者对…

Rust: 开源线性代数库 nalgebra

在 Rust 中使用 nalgebra 库来处理线性代数问题相对简单。以下是一个基本的示例&#xff0c;展示了如何安装 nalgebra 库、导入它&#xff0c;并使用它来进行一些基础的线性代数运算。 步骤 1: 安装 nalgebra 首先&#xff0c;你需要在你的 Rust 项目中添加 nalgebra 作为依赖…

一文带你深度了解FreeRTOS的任务切换之PendSV异常

RTOS系统的核心是任务管理&#xff0c;而任务管理的核心是任务切换&#xff0c;任务切换决定了任务的执行顺序&#xff0c;任务切换效率的高低也决定了一款系统的性能&#xff0c;尤其是对于实时操作系统。而对于想深入了解 FreeRTOS系统运行过程的同学其任务切换是必须掌握的知…

Ubuntu给AndroidStudio软件添加桌面快捷方式图标

一、进入桌面目录 cd ~/Desktop 二、创建.desktop文件 touch androidStudio.desktop 三、Vim编辑.desktop文件 [Desktop Entry] NameAndroidStudio TypeApplication Exec/home/ubuntu/opt/android-studio/bin/studio.sh Icon/home/ubuntu/opt/android-studio/bin/studio.p…

精通Python函数,深入了解*args和**kwargs

大家好&#xff0c;如果能在Python中创建适应不同场景的函数&#xff0c;而无需每次都重写它们&#xff0c;会使得操作简洁方便&#xff0c;这就是*args和**kwargs的魔力所在。就像为函数准备了一个神奇的袋子&#xff0c;可以装下任意多的参数——使代码更整洁、更灵活。 为了…

RabbitMQ、kafaka、rocketmq等消息队列MQ消息堆积如何解决

文章目录 概述解决方案消息堆积如何处理如何解决消息队列的延时以及过期失效问题&#xff1f;消息队列满了以后该怎么处理&#xff1f;有几百万消息持续积压几小时&#xff0c;怎么办&#xff1f; 概述 1.产生背景&#xff1a; 生产者投递消息的速率与我们消费者消费的速率完全…

项目管理软件:如何确保项目启动顺利?

对所有项目经理来说&#xff0c;了解如何启动项目是最关键的技能之一。项目都是从小事开始&#xff0c;逐渐发展为更大型、更复杂的。好的开始是成功的一半&#xff0c;对项目管理来说更是如此。 启动项目的 10 个简单步骤 即使是最复杂的项目&#xff0c;也可以分解成简单的…

爬虫练习:获取某网站的房价信息

一、相关网站 二、相关代码 import requests from lxml import etree import csv with open(房天下数据.csv, w, newline, encodingutf-8) as csvfile:fieldnames [名称, 地点,价格,总价,联系电话]writer csv.DictWriter(csvfile, fieldnamesfieldnames)writer.writeheader…

计算机丢失msvcp140.dll是什么意思,电脑自带dll修复安装下载

在使用电脑的过程中那个大家是不是有遇到过电脑丢失某个文件&#xff0c;导致电脑的程序不能继续运行&#xff0c;那么出现这样的问题有什么办法可以解决呢&#xff1f;其实解决办法还是有很多的&#xff01;今天这篇文章就教大家如果电脑丢失的msvcp140.dll文件那么该怎么办&a…

基于PHP的餐厅管理系统APP设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 相关技术 3 1.1 MVC 3 1.2 ThinkPHP 3 1.3 MySQL数据库 3 1.4 uni-app 4 1.5 本章小结 4 2 系统分析 5 2.1 功能需求 5 2.2 用例分析 7 2.3 非功能需求 8 2.4 本章小结 8 3 系统设计 9 3.1 系统总体设计 9 3.2 系统详细设计 10 3.3 本章小…

qt如何配置ros环境

在Qt5.7的版本可以使用bash -i -c来启动qt&#xff0c;让Qt自己识别系统环境&#xff0c;不知道为什么Qt在之后的版本&#xff0c;这样使用都失效了。因为它会默认把CMAKE_PREFIX_PATH修改掉。 网上还有安装ros插件版本的qt creator&#xff0c;感觉失去了一些灵活性。 自己测试…