使用Flink CDC实现 Oracle数据库数据同步(非SQL)

文章目录

  • 前言
  • 一、开启归档日志
  • 二、创建flinkcdc专属用户
    • 2.1 对于Oracle 非CDB数据库,执行如下sql
    • 2.2 对于Oracle CDB数据库,执行如下sql
  • 三、指定oracle表、库级启用
  • 四、使用flink-connector-oracle-cdc实现数据库同步
    • 4.1 引入pom依赖
    • 4.1 Java主代码
    • 4.1 json转换为row


前言

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式实现数据库同步,同时也提供了Flink CDC Source Connector API。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
本文通过flink-connector-oracle-cdc来实现Oracle数据库的数据同步。


一、开启归档日志

1)数据库服务器终端,使用sysdba角色连接数据库

 sqlplus / as sysdba
或
sqlplus /nolog
CONNECT sys/password AS SYSDBA;

2)检查归档日志是否开启

archive log list;

(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3)启用归档日志

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

注意:
启用归档日志需要重启数据库。
归档日志会占用大量的磁盘空间,应定期清除过期的日志文件
4)启动完成后重新执行 archive log list; 查看归档打开状态

二、创建flinkcdc专属用户

2.1 对于Oracle 非CDB数据库,执行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;GRANT CREATE SESSION TO flinkuser;GRANT SET CONTAINER TO flinkuser;GRANT SELECT ON V_$DATABASE to flinkuser;GRANT FLASHBACK ANY TABLE TO flinkuser;GRANT SELECT ANY TABLE TO flinkuser;GRANT SELECT_CATALOG_ROLE TO flinkuser;GRANT EXECUTE_CATALOG_ROLE TO flinkuser;GRANT SELECT ANY TRANSACTION TO flinkuser;GRANT LOGMINING TO flinkuser;GRANT ANALYZE ANY TO flinkuser;GRANT CREATE TABLE TO flinkuser;-- need not to execute if set scan.incremental.snapshot.enabled=true(default)GRANT LOCK ANY TABLE TO flinkuser;GRANT ALTER ANY TABLE TO flinkuser;GRANT CREATE SEQUENCE TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;GRANT SELECT ON V_$LOG TO flinkuser;GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;GRANT SELECT ON V_$LOGFILE TO flinkuser;GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

2.2 对于Oracle CDB数据库,执行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;GRANT LOGMINING TO flinkuser CONTAINER=ALL;GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;-- need not to execute if set scan.incremental.snapshot.enabled=true(default)GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;

三、指定oracle表、库级启用

-- 指定表启用补充日志记录:
ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;-- 为数据库的所有表启用
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;-- 指定数据库启用补充日志记录
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

四、使用flink-connector-oracle-cdc实现数据库同步

4.1 引入pom依赖

 <dependency><groupId>com.ververica</groupId><artifactId>flink-connector-oracle-cdc</artifactId><version>2.4.0</version></dependency>

4.1 Java主代码

package test.datastream.cdc.oracle;import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Row;
import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction;
import test.datastream.cdc.oracle.function.CdcString2RowMap;
import test.datastream.cdc.oracle.function.DbCdcSinkFunction;import java.util.Properties;public class OracleCdcExample {public static void main(String[] args) throws Exception {Properties properties = new Properties();//数字类型数据 转换为字符properties.setProperty("decimal.handling.mode", "string");SourceFunction<String> sourceFunction = OracleSource.<String>builder()
//                .startupOptions(StartupOptions.latest()) // 从最晚位点启动.url("jdbc:oracle:thin:@localhost:1521:orcl").port(1521).database("ORCL") // monitor XE database.schemaList("c##flink_user") // monitor inventory schema.tableList("c##flink_user.TEST2") // monitor products table.username("c##flink_user").password("flinkpw").debeziumProperties(properties).deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message orderingSingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap());SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new CacheDataAllWindowFunction());//批量同步winStream.addSink(new DbCdcSinkFunction(null));env.execute();}
}

4.1 json转换为row

package test.datastream.cdc.oracle.function;import cn.com.victorysoft.common.configuration.VsConfiguration;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import test.datastream.cdc.CdcConstants;import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;/*** @desc cdc json解析,并转换为Row*/
public class CdcString2RowMap extends RichFlatMapFunction<String, Row> {private Map<String,Integer> columnMap =new HashMap<>();@Overridepublic void open(Configuration parameters) throws Exception {columnMap.put("ID",0);columnMap.put("NAME",1);columnMap.put("DESCRIPTION",2);columnMap.put("AGE",3);columnMap.put("CREATE_TIME",4);columnMap.put("SCORE",5);columnMap.put("C_1",6);columnMap.put("B_1",7);}@Overridepublic void flatMap(String s, Collector<Row> collector) throws Exception {System.out.println("receive: "+s);VsConfiguration conf=VsConfiguration.from(s);String op = conf.getString(CdcConstants.K_OP);VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);Row row =null;if(CdcConstants.OP_C.equals(op)){//插入,使用after数据row = convertToRow(after);row.setKind(RowKind.INSERT);}else if(CdcConstants.OP_U.equals(op)){//更新,使用after数据row = convertToRow(after);row.setKind(RowKind.UPDATE_AFTER);}else if(CdcConstants.OP_D.equals(op)){//删除,使用before数据row = convertToRow(before);row.setKind(RowKind.DELETE);}else {//r 操作,使用after数据row = convertToRow(after);row.setKind(RowKind.INSERT);}collector.collect(row);}private Row convertToRow(VsConfiguration data){Set<String> keys = data.getKeys();int size = keys.size();Row row=new Row(8);int i=0;for (String key:keys) {Integer index = this.columnMap.get(key);Object value=data.get(key);if(key.equals("CREATE_TIME")){//long日期转timestampvalue=long2Timestamp((Long)value);}row.setField(index,value);}return row;}private static  java.sql.Timestamp long2Timestamp(Long time){Timestamp timestamp = new Timestamp(time/1000);System.out.println(timestamp);return timestamp;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/36965.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker Desktop 简易操作指南 (Windows, macOS, Linux)

1. 下载最新版本 Docker Desktop https://www.docker.com/products/docker-desktop/ 2.启动 Docker Desktop 3.常用命令&#xff08;在 cmd 或 Terminal 中执行&#xff09; #列出所有镜像&#xff08;Images&#xff09; docker images #列出所有容器&#xff08;Containers&…

OpenSSL/3.3.0: error:0A00018A:SSL routines::dh key too small

php curl解决办法: curl_setopt($ch, CURLOPT_SSL_CIPHER_LIST, ‘DEFAULTSECLEVEL1’); python 解决办法: from twisted.internet.ssl import AcceptableCiphers from scrapy.core.downloader import contextfactory contextfactory.DEFAULT_CIPHERS AcceptableCiphers.from…

CSS 核心知识点 - grid

思维导图 参考网址: https://developer.mozilla.org/zh-CN/docs/Web/CSS/CSS_grid_layout 一、什么是 grid&#xff1f; CSS Grid布局是在CSS3规范中引入的一种新的布局方式&#xff0c;旨在解决传统布局方法&#xff08;如浮动、定位、表格布局&#xff09;存在的许多问题。C…

Spring Boot 集成 MyBatis-Plus 总结

Spring Boot 集成 MyBatis-Plus 总结 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在Java开发中&#xff0c;Spring Boot以其简洁和高效的特点&#xff0c;…

Oh My Zsh Git 插件

以下是一些常见的别名和它们对应的 Git 命令&#xff1a; g: gitga: git addgaa: git add --allgapa: git add --patchgau: git add --updategb: git branchgba: git branch -agbd: git branch -dgbda: git branch --no-color --merged | command grep -vE “^(||*|\s*(main|m…

第十九站:Java钛蓝——区块链技术的新探索

在区块链技术的新探索中&#xff0c;Java作为一门成熟的编程语言&#xff0c;正在通过Hyperledger Fabric和Web3j等技术实现其在区块链领域的应用。以下是对这些技术的简要介绍和如何使用Java源代码与它们进行交互的讲解。 Hyperledger Fabric Hyperledger Fabric是一个由Lin…

React.js 全面解析:从基础到实战案例

引言&#xff1a; React.js&#xff0c;由Facebook推出并维护的开源JavaScript库&#xff0c;以其组件化思想、虚拟DOM技术和声明式编程风格&#xff0c;成为构建用户界面的首选工具之一。本文将系统性地介绍React的基础概念、核心特性&#xff0c;并通过实际案例展示基础属性…

DataWhale-吃瓜教程学习笔记(四)

学习视频&#xff1a;第3章-二分类线性判别分析_哔哩哔哩_bilibili 西瓜书对应章节&#xff1a; 3.4 文章目录 - 算法原理- 损失函数推导-- 异类样本中心尽可能远-- 同类样本方差尽可能小-- 综合 知识点补充 - 二范数二范数&#xff08;2-norm&#xff09;详解定义几何意义性质…

vue3中省市区联动在同一个el-form-item中咋么设置rules验证都不为空的效果

在开发中出现如下情况&#xff0c;在同一个el-form-item设置了省市区三级联动的效果 <el-form-item label"地区" prop"extraProperties.Province"><el-row :gutter"20"><el-col :span"12"><el-select v-model&qu…

OpenHarmony开发实战:HDF驱动开发流程

概述 HDF&#xff08;Hardware Driver Foundation&#xff09;驱动框架&#xff0c;为驱动开发者提供驱动框架能力&#xff0c;包括驱动加载、驱动服务管理、驱动消息机制和配置管理。并以组件化驱动模型作为核心设计思路&#xff0c;让驱动开发和部署更加规范&#xff0c;旨在…

Unity3D Excel表格数据处理模块详解

一、引言 在Unity3D开发中&#xff0c;我们经常需要处理大量的数据&#xff0c;这些数据可能是游戏配置、角色属性、道具信息等。Excel表格作为一种常见的数据存储方式&#xff0c;具有结构清晰、易于编辑的特点&#xff0c;因此被广泛应用于游戏开发中。本文将详细介绍如何在…

四川赤橙宏海商务信息咨询有限公司抖音开店靠谱吗?

在数字化浪潮席卷全球的今天&#xff0c;电商行业正以前所未有的速度发展。而在这个大潮中&#xff0c;四川赤橙宏海商务信息咨询有限公司凭借其专业的团队和前瞻性的战略眼光&#xff0c;专注于抖音电商服务&#xff0c;为广大商家提供了一站式解决方案&#xff0c;成为了行业…

面经-常用框架

1.Spring 1.1什么是Spring框架&#xff1f; Spring 是⼀种轻量级开发框架&#xff0c;旨在提⾼开发⼈员的开发效率以及系统的可维护性。 Spring 的 6 个特征:核⼼技术&#xff0c;测试&#xff0c;数据访问&#xff0c;Web⽀持&#xff0c;集成&#xff0c;语⾔ 1.2列举⼀些重…

Ubuntu20.04安装LibTorch并完成高斯溅射环境搭建

0. 简介 最近受到优刻得的使用邀请&#xff0c;正好解决了我在大模型和自动驾驶行业对GPU的使用需求。UCloud云计算旗下的Compshare的GPU算力云平台。他们提供高性价比的4090 GPU&#xff0c;按时收费每卡2.6元&#xff0c;月卡只需要1.7元每小时&#xff0c;并附带200G的免费…

接口自动化测试-项目实战

什么是接口自动化测试&#xff1a;使用工具或代码代替人对接口进行测试 测试项目结构&#xff08;python包&#xff09; 1、接口api包 2、script:业务脚本 3、data:数据 4、config.py :配置文件 5、reporter:报告 错误问题&#xff1a; 1、未打印任何东西。添加pip ins…

走马灯封装

走马灯功能需求&#xff1a; 支持定时切换&#xff1b;支持左右按钮切换&#xff08;根据鼠标是否在切换组件内展示和隐藏左右切换按钮&#xff09;&#xff1b;支持底部标识切换&#xff1b; 走马灯 完整代码如下&#xff1a; /*** class 走马灯*/import react, { Compone…

C语言 指针——缓冲区溢出与缓冲区溢出攻击

目录 缓冲区溢出攻击 缓冲区溢出攻击实例 字符串的安全输入方法​编辑 防止缓冲区溢出的两个要点 缓冲区溢出攻击 网络黑客常针对系统和程序自身存在的漏洞&#xff0c;编写相应的攻击程序  对缓冲区溢出漏洞的攻击 —— 最常见  几乎占到了网络攻击次数的一半以上…

Android (已解决)Gradle 编译失败 Unsupported class file major version 61

文章目录 一、报错原因二、解决方法 一、报错原因 新版本的 Android Studio 默认使用的是 Java 17 LTS&#xff0c;而这个历史项目的 Gradle 版本很低&#xff0c;不支持高版本的 Java。 具体原因&#xff1a;Java 17 (major version 61) 编译的 class 文件&#xff0c;如果在…

逆向学习汇编篇:指令的操作

本节课在线学习视频&#xff08;网盘地址&#xff0c;保存后即可免费观看&#xff09;&#xff1a; ​​https://pan.quark.cn/s/660c759dea95​​ 在逆向工程中&#xff0c;深入理解汇编语言的指令操作是至关重要的。汇编指令是计算机硬件与软件之间的桥梁&#xff0c;它们直…

DevEco Studio有时会多出来.js和.map文件,导致项目不能运行

1、问题 在使用DevEco的时候有时候会出现啥都没干&#xff0c;但是在项目的目录下会自动生成和文件同名的.js和.js.map文件&#xff0c;至于为什么会生成目前我也不知道&#xff0c;如果想要更深了解可以到论坛讨论&#xff1a;华为开发者论坛。生成.js和.js.map文件优…