1. 简单订单统计
假设有以下两个订单流数据,数据字段分别为用户ID、购买的商品名称、商品数量。
数据流A:
1L,"尺子",3
1L,"铅笔",4
3L,"橡皮",2
数据流B:
2L,"手表",3
2L,"笔记本",3
4L,"计算器",1
目标:合并两个流的数据,并筛选出商品数量大于2的订单数据。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{$,EnvironmentSettings}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api_/**
* Flink SQL统计订单流数据
* 知识点:DataStream转为Table、视图,Table转为DataStream
*/
object FlinkSQLDemo{def main(args:Array[String]):Unit={//创建流执行环境val env=StreamExecutionEnvironment.getExecutionEnvironment//创建EnvironmentSettings实例并设置参数val settings=EnvironmentSettings.newInstance() //创建一个用于创建EnvironmentSettings实例的构建器.useBlinkPlanner() //将Blink计划器设置为所需的模块(默认).inStreamingMode() //设置组件以流模式工作,默认启用.build() //创建一个不可变的EnvironmentSettings实例//构建流式表执行环境StreamTableEnvironmentval tableEnv: StreamTableEnvironment=StreamTableEnvironment.create(env,settings)//构建订单数据流Aval orderStreamA:DataStream[Order]=env.fromCollection(List(Order(1L,"尺子",3),Order(1L,"铅笔",4),Order(3L,"橡皮",2)))//构建订单数据流Bval orderStreamA:DataStream[Order]=env.fromCollection(List(Order(2L,"手表",3),Order(2L,"笔记本",3),Order(4L,"计算器",1)))//将DataStream转为Table,并指定Table的所有字段val tableA: Table=tableEnv.fromDataStream(orderStreamA,$"user",$"product",$"amount")//将Table的schema以摘要格式打印到控制台tableA.printSchema()//(// 'user' BIGINT,// 'product' STRING,// 'user' INT,//)//将DataStream转为视图,视图名称为tableB,并指定视图的所有字段tableEnv.createTemporaryView("tableB",orderStreamB,$("user"),$("product"),$("amount"))//执行SQL查询,合并查询结果println("tableA默认表名:"+tableA.toString)val resultTable:Table=tableEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount>2" +"UNION ALL "+"SELECT * FROM tableB WHERE amount > 2")//将结果Table转为仅追加流val dataStreamResult=tableEnv.toAppendStream[Order](resultTable)//将流打印到控制台dataStreamResult.print()//触发程序执行env.execute()}
}//创建订单样例
case class Order(user:Long,product:String,amount:Int)
在IDEA本地执行上述代码,控制台输出结果如下:
1> Order(1,铅笔,4)
11> Order(2,笔记本,3)
10> Order(2,手表,3)
12> Order(1,尺子,3)