代码如下:
package com.weilanaoli.ruge.vlink.flink;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.logging.Level;
import java.util.logging.Logger;class MysqlExample {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("xx.xx.xx.xx") //输入地址.port(3306) //输入端口.databaseList("xx") //输入库名.tableList("xx.test") //输入表名.username("xx") //输入用户名.password("xxxx") //输入密码.startupOptions(StartupOptions.initial()) //读取binlog策略,这个启动选项有五种.deserializer(new JsonDebeziumDeserializationSchema()) //配置不要锁表,但是数据一致性不是精准一次,会变成最少一次.build();//配置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource.process(new ProcessFunction<String, Object>() {@Overridepublic void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) {try {System.out.println("processElement=====" + value);}catch (Exception e) {e.printStackTrace();}}});dataStreamSource.print("原始数据=====");env.execute("Print MySQL Snapshot + Binlog");}
}
运行结果如下: