case class outer(f1:String,f2:Inner)
case class outerV1(f1:String,f2:Inner,f3:Int)
case class Inner(f3:String,f4:Int)
测试代码
package com.yy.table.convertimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.types.DataTypeobject streamPOJO2table {case class outer(f1:String,f2:Inner)case class outerV1(f1:String,f2:Inner,f3:Int)case class Inner(f3:String,f4:Int)def main(args: Array[String]): Unit = {// flink1.13 流处理环境初始化val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)import org.apache.flink.streaming.api.scala._val ds1: DataStream[outer] = env.fromElements(outer("a",Inner("b",2)),outer("d",Inner("e",4)))val table1: Table = tEnv.fromDataStream(ds1)
// table1
// .execute()
// .print()/*+----+--------------------------------+--------------------------------+
| op | f1 | f2 |
+----+--------------------------------+--------------------------------+
| +I | a | (f3=b, f4=2) |
| +I | d | (f3=e, f4=4) |
+----+--------------------------------+--------------------------------+*/// table1
// .print()/*5> +I[d, Inner(e,4)]
4> +I[a, Inner(b,2)]*/tEnv.createTemporaryView("view1", ds1)val tableResult1: TableResult = tEnv.executeSql("select f1,f2,(f2.f4 + 100) as f3 from view1")tableResult1.print()/*+----+--------------------------------+--------------------------------+-------------+
| op | f1 | f2 | f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I | a | (f3=b, f4=2) | 102 |
| +I | d | (f3=e, f4=4) | 104 |
+----+--------------------------------+--------------------------------+-------------+*///val t1: Table = tEnv.sqlQuery("select f1,f2,(f2.f4 + 100) as f3 from view1")
// t1.print()// println(t1.getResolvedSchema)/*
+----+--------------------------------+--------------------------------+-------------+
| op | f1 | f2 | f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I | a | (f3=b, f4=2) | 102 |
| +I | d | (f3=e, f4=4) | 104 |
+----+--------------------------------+--------------------------------+-------------+
2 rows in set
(`f1` STRING,`f2` *com.yy.table.convert.streamPOJO2table$Inner<`f3` STRING, `f4` INT NOT NULL>* NOT NULL,`f3` INT NOT NULL
)*/println("---- 1 -------")// tableResult转datastreamval o1: DataStream[outerV1] = tEnv.toDataStream[outerV1](t1,classOf[outerV1])
// o1.print()println("---- 2 -------")tEnv.executeSql("""|select|f1|,f2.f3|,f2.f4|from view1|""".stripMargin)
// .print()/*+----+--------------------------------+--------------------------------+--------------------------------+
| op | f1 | f3 | f4 |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I | a | b | c |
| +I | d | e | f |
+----+--------------------------------+--------------------------------+--------------------------------+*/tEnv.executeSql("""|select|f1|,(f2.f3,f2.f4)|from view1|""".stripMargin)
// .print()env.execute("jobName1")}}