一、介绍
Table API 和 SQL 进行基于时间的操作(比如时间窗口)时需要定义相关的时间语义和时间数据来源的信息。因此会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间 时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的 DDL 里直接定义为一个字段,也可以在 DataStream 转换成表时定义。一旦定义了时间属性,就可以作为一个普通字段引用,并且可以在基于时间的操作中使用 时间属性的数据类型为 TIMESTAMP,类似于常规时间戳,可以直接访问并且进行计算。 按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)
二、处理时间定义
public class TestTableProcessingTime { public static void main ( String [ ] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ; env. setParallelism ( 1 ) ; StreamTableEnvironment tableEnv = StreamTableEnvironment . create ( env) ; DataStream < String > inputStream = env. readTextFile ( "./sensor.txt" ) ; DataStream < SensorReading > dataStream = inputStream. map ( line -> { String [ ] fields = line. split ( "," ) ; return new SensorReading ( fields[ 0 ] , new Long ( fields[ 1 ] ) , new Double ( fields[ 2 ] ) ) ; } ) ; Table sensorTable = tableEnv. fromDataStream ( dataStream, "id, timestamp as ts, temperature, pt.proctime" ) ; tableEnv. connect ( new FileSystem ( ) . path ( "./sensor.txt" ) ) . withFormat ( new Csv ( ) ) . withSchema ( new Schema ( ) . field ( "id" , DataTypes . STRING ( ) ) . field ( "timestamp" , DataTypes . BIGINT ( ) ) . field ( "temperature" , DataTypes . DOUBLE ( ) ) . field ( "pt" , DataTypes . TIMESTAMP ( 3 ) ) . proctime ( ) ) . createTemporaryTable ( "sensorTable" ) ; String sinkDDL= "create table sensorTable (" + " id varchar(20) not null, " + " ts bigint, " + " temperature double, " + " pt AS PROCTIME() " + ") with (" + " 'connector.type' = 'filesystem', " + " 'connector.path' = '/sensor.txt', " + " 'format.type' = 'csv')" ; tableEnv. sqlUpdate ( sinkDDL) ; env. execute ( ) ; }
}
三、事件时间定义
public class TestTableEventTime { public static void main ( String [ ] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment . getExecutionEnvironment ( ) ; env. setParallelism ( 1 ) ; env. setStreamTimeCharacteristic ( TimeCharacteristic. EventTime ) ; StreamTableEnvironment tableEnv = StreamTableEnvironment . create ( env) ; DataStream < String > inputStream = env. readTextFile ( "./sensor.txt" ) ; DataStream < SensorReading > dataStream = inputStream. map ( line -> { String [ ] fields = line. split ( "," ) ; return new SensorReading ( fields[ 0 ] , new Long ( fields[ 1 ] ) , new Double ( fields[ 2 ] ) ) ; } ) . assignTimestampsAndWatermarks ( new BoundedOutOfOrdernessTimestampExtractor < SensorReading > ( Time . seconds ( 2 ) ) { @Override public long extractTimestamp ( SensorReading element) { return element. getTimestamp ( ) * 1000L ; } } ) ; Table sensorTable = tableEnv. fromDataStream ( dataStream, "id, timestamp as ts, temperature, rt.rowtime" ) ; tableEnv. connect ( new FileSystem ( ) . path ( "./sensor.txt" ) ) . withFormat ( new Csv ( ) ) . withSchema ( new Schema ( ) . field ( "id" , DataTypes . STRING ( ) ) . field ( "timestamp" , DataTypes . BIGINT ( ) ) . rowtime ( new RowTime ( ) . timestampsFromField ( "timestamp" ) . watermarksPeriodicBounded ( 1000 ) ) . field ( "temperature" , DataTypes . DOUBLE ( ) ) ) . createTemporaryTable ( "sensorTable" ) ; String sinkDDL = "create table dataTable (" + " id varchar(20) not null, " + " ts bigint, " + " temperature double, " + " rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " + " watermark for rt as rt - interval '1' second" + ") with (" + " 'connector.type' = 'filesystem', " + " 'connector.path' = '/sensor.txt', " + " 'format.type' = 'csv')" ; tableEnv. sqlUpdate ( sinkDDL) ; env. execute ( ) ; }
}