文章目录
- 前言
- 一、开启归档日志
- 二、创建flinkcdc专属用户
- 2.1 对于Oracle 非CDB数据库,执行如下sql
- 2.2 对于Oracle CDB数据库,执行如下sql
- 三、指定oracle表、库级启用
- 四、使用flink-connector-oracle-cdc实现数据库同步
- 4.1 引入pom依赖
- 4.1 Java主代码
- 4.1 json转换为row
前言
Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式实现数据库同步,同时也提供了Flink CDC Source Connector API。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
本文通过flink-connector-oracle-cdc来实现Oracle数据库的数据同步。
一、开启归档日志
1)数据库服务器终端,使用sysdba角色连接数据库
sqlplus / as sysdba
或
sqlplus /nolog
CONNECT sys/password AS SYSDBA;
2)检查归档日志是否开启
archive log list;
(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3)启用归档日志
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
注意:
启用归档日志需要重启数据库。
归档日志会占用大量的磁盘空间,应定期清除过期的日志文件
4)启动完成后重新执行 archive log list; 查看归档打开状态
二、创建flinkcdc专属用户
2.1 对于Oracle 非CDB数据库,执行如下sql
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;GRANT CREATE SESSION TO flinkuser;GRANT SET CONTAINER TO flinkuser;GRANT SELECT ON V_$DATABASE to flinkuser;GRANT FLASHBACK ANY TABLE TO flinkuser;GRANT SELECT ANY TABLE TO flinkuser;GRANT SELECT_CATALOG_ROLE TO flinkuser;GRANT EXECUTE_CATALOG_ROLE TO flinkuser;GRANT SELECT ANY TRANSACTION TO flinkuser;GRANT LOGMINING TO flinkuser;GRANT ANALYZE ANY TO flinkuser;GRANT CREATE TABLE TO flinkuser;-- need not to execute if set scan.incremental.snapshot.enabled=true(default)GRANT LOCK ANY TABLE TO flinkuser;GRANT ALTER ANY TABLE TO flinkuser;GRANT CREATE SEQUENCE TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;GRANT SELECT ON V_$LOG TO flinkuser;GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;GRANT SELECT ON V_$LOGFILE TO flinkuser;GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
2.2 对于Oracle CDB数据库,执行如下sql
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;GRANT LOGMINING TO flinkuser CONTAINER=ALL;GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;-- need not to execute if set scan.incremental.snapshot.enabled=true(default)GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
三、指定oracle表、库级启用
-- 指定表启用补充日志记录:
ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;-- 为数据库的所有表启用
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;-- 指定数据库启用补充日志记录
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
四、使用flink-connector-oracle-cdc实现数据库同步
4.1 引入pom依赖
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-oracle-cdc</artifactId><version>2.4.0</version></dependency>
4.1 Java主代码
package test.datastream.cdc.oracle;import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Row;
import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction;
import test.datastream.cdc.oracle.function.CdcString2RowMap;
import test.datastream.cdc.oracle.function.DbCdcSinkFunction;import java.util.Properties;public class OracleCdcExample {public static void main(String[] args) throws Exception {Properties properties = new Properties();//数字类型数据 转换为字符properties.setProperty("decimal.handling.mode", "string");SourceFunction<String> sourceFunction = OracleSource.<String>builder()
// .startupOptions(StartupOptions.latest()) // 从最晚位点启动.url("jdbc:oracle:thin:@localhost:1521:orcl").port(1521).database("ORCL") // monitor XE database.schemaList("c##flink_user") // monitor inventory schema.tableList("c##flink_user.TEST2") // monitor products table.username("c##flink_user").password("flinkpw").debeziumProperties(properties).deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message orderingSingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap());SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new CacheDataAllWindowFunction());//批量同步winStream.addSink(new DbCdcSinkFunction(null));env.execute();}
}
4.1 json转换为row
package test.datastream.cdc.oracle.function;import cn.com.victorysoft.common.configuration.VsConfiguration;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import test.datastream.cdc.CdcConstants;import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;/*** @desc cdc json解析,并转换为Row*/
public class CdcString2RowMap extends RichFlatMapFunction<String, Row> {private Map<String,Integer> columnMap =new HashMap<>();@Overridepublic void open(Configuration parameters) throws Exception {columnMap.put("ID",0);columnMap.put("NAME",1);columnMap.put("DESCRIPTION",2);columnMap.put("AGE",3);columnMap.put("CREATE_TIME",4);columnMap.put("SCORE",5);columnMap.put("C_1",6);columnMap.put("B_1",7);}@Overridepublic void flatMap(String s, Collector<Row> collector) throws Exception {System.out.println("receive: "+s);VsConfiguration conf=VsConfiguration.from(s);String op = conf.getString(CdcConstants.K_OP);VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);Row row =null;if(CdcConstants.OP_C.equals(op)){//插入,使用after数据row = convertToRow(after);row.setKind(RowKind.INSERT);}else if(CdcConstants.OP_U.equals(op)){//更新,使用after数据row = convertToRow(after);row.setKind(RowKind.UPDATE_AFTER);}else if(CdcConstants.OP_D.equals(op)){//删除,使用before数据row = convertToRow(before);row.setKind(RowKind.DELETE);}else {//r 操作,使用after数据row = convertToRow(after);row.setKind(RowKind.INSERT);}collector.collect(row);}private Row convertToRow(VsConfiguration data){Set<String> keys = data.getKeys();int size = keys.size();Row row=new Row(8);int i=0;for (String key:keys) {Integer index = this.columnMap.get(key);Object value=data.get(key);if(key.equals("CREATE_TIME")){//long日期转timestampvalue=long2Timestamp((Long)value);}row.setField(index,value);}return row;}private static java.sql.Timestamp long2Timestamp(Long time){Timestamp timestamp = new Timestamp(time/1000);System.out.println(timestamp);return timestamp;}}