Flink Table API 读写 MySQL
import org. apache. flink. connector. jdbc. table. JdbcConnectorOptions ;
import org. apache. flink. streaming. api. environment. StreamExecutionEnvironment ;
import org. apache. flink. table. api. DataTypes ;
import org. apache. flink. table. api. EnvironmentSettings ;
import org. apache. flink. table. api. Schema ;
import org. apache. flink. table. api. Table ;
import org. apache. flink. table. api. TableDescriptor ;
import org. apache. flink. table. api. TableEnvironment ;
import org. apache. flink. table. api. TableResult ; import static org. apache. flink. table. api. Expressions . $; public class TableApiMysql { public static void main ( String [ ] args) { EnvironmentSettings settings = EnvironmentSettings . newInstance ( ) . inBatchMode ( ) . build ( ) ; TableEnvironment tableEnv = TableEnvironment . create ( settings) ; Schema schema = Schema . newBuilder ( ) . column ( "user_id" , DataTypes . BIGINT ( ) ) . column ( "user_name" , DataTypes . STRING ( ) ) . build ( ) ; TableDescriptor tableDescriptor = TableDescriptor . forConnector ( "jdbc" ) . option ( JdbcConnectorOptions . URL , "jdbc:mysql://localhost:3306/tmp" ) . option ( JdbcConnectorOptions . USERNAME , "root" ) . option ( JdbcConnectorOptions . PASSWORD , "123456" ) . option ( JdbcConnectorOptions . TABLE_NAME , "test" ) . schema ( schema) . build ( ) ; tableEnv. createTable ( "source" , tableDescriptor) ; System . out. println ( "select format 1: " ) ; tableEnv. from ( "source" ) . select ( $( "user_id" ) , $( "user_name" ) ) . execute ( ) . print ( ) ; tableEnv. executeSql ( "insert into source(user_name) select 'hello'" ) ; System . out. println ( "select format 2: " ) ; Table table = tableEnv. sqlQuery ( "select * from source" ) ; TableResult execute = table. execute ( ) ; execute. print ( ) ; }
}