Debezium vs OGG vs Tapdata:如何实时同步 Oracle 数据到 Kafka 消息队列?

随着信息时代的蓬勃发展,企业对实时数据处理的需求逐渐成为推动业务创新和发展的重要驱动力。在这个快速变化的环境中,许多企业选择将 Oracle 数据库同步到 Kafka,以满足日益增长的实时数据处理需求。本文将深入探讨这一趋势的背后原因,并通过一个真实的客户案例来强调实时性在业务场景中的重要性。

为什么:理解 Oracle - Kafka 的实时同步需求

先来看一个典型的业务场景。实时库存管理需要迅速响应库存变化,以确保及时的补货和订单处理。通过将 Oracle 数据同步到 Kafka,可以实现实时捕获库存变更,并将这些变更事件传递给下游业务,如订单系统、报表系统等。如此一来,企业得以更加灵活、实时地管理库存,从而提高供应链效率。

为什么要同步至 Kafka?

再结合 Oracle 以及 Kafka 的自身特性,我们不难总结出 Oracle 到 Kafka 数据同步背后的实际需求,这通常涉及到满足实时数据需求、支持大规模数据处理、构建事件驱动体系结构以及提高整体系统性能等。以下是一些常见原因总结:

  • 实时数据流:作为一个分布式消息队列系统,Kafka 能够提供高吞吐量和低延迟的实时数据处理。通过将 Oracle 数据库同步到 Kafka,可以实现对数据的实时捕获和处理,使得企业能够更快地响应数据变化。

  • 高可用性:Kafka 通过分布式来实现高可用性,一个 Kafka 集群通常包含多个 Broker,每个 Broker 负责存储一部分的数据副本,这样即使某个 Broker 出现故障,其他 Broker 也可以继续工作,从而保证服务的可用性。

  • 可扩展性:Kafka 基于其分布式架构、消费者组、分区和副本机制、水平扩展能力、高并发处理能力和容错性等方式实现了可扩展性,从而能够处理大规模的消息数据,满足高吞吐量和高并发的需求。

  • 高效查询: Kafka 凭借顺序写入、索引、二分查找和内存缓存等技术,得以高效处理大规模的消息流,并保持高性能和低延迟,从而实现高效的数据读取。

  • 高并发写入: Kafka 的设计理念注重高并发的数据写入,采用了多种技术,如零拷贝技术、批量处理、消息压缩、异步处理等,提高了数据传输的效率和处理的速度,能够处理大规模的数据流。充分利用 Kafka 的高并发写入能力,有助于业务系统处理大量的写入请求,适用于需要高吞吐量的业务,比如日志记录、事件溯源等。

  • 解耦数据生产者和消费者:Kafka 的消息队列模型有助于解耦数据生产者和消费者之间的关系,可以使数据库的变更与实际数据使用者(应用程序、分析系统等)之间形成松耦合,从而提高整个系统的灵活性。

  • 支持事件驱动架构:通过将 Oracle 数据同步到 Kafka,可以构建基于事件的架构。数据库的变更可以作为事件流式传输,触发系统中其他组件的动作,从而实现更灵活、敏捷的业务流程。

  • 数据集成: Kafka 作为中间件,能够协调不同系统之间的数据流,可以轻松实现与其他数据源和目标的集成,促使系统更好地适应复杂的数据处理和交换需求。

为什么需要实时?

与此同时,数据同步的实时性在这个过程中被不断强调,还是举几个简单的例子:

  • 实时报表和监控系统: 对于需要实时监控和报表展示的业务,如运营监控、性能监控等,及时获取数据库中的数据变更是关键。通过实时同步到 Kafka,保障这些监控系统的数据时刻处于最新状态。

  • 事件驱动架构: 许多现代应用采用事件驱动架构,通过发布-订阅模型进行系统集成。在这种情况下,实时同步数据到 Kafka 是保证事件的及时传播和处理的关键。

  • 用户体验: 在需要实时交互和响应的应用中,用户期望看到最新的数据状态。例如,在在线协作或实时通讯应用中,用户需要实时看到其他用户的操作和变更。

至此,我们已经大致了解了 Oracle 到 Kafka 数据实时同步的重要性,下面再来看一些常见的同步方案。

怎么做:数据同步方案对比

手动方案:配合开源工具

实现 Oracle 到 Kafka 数据实时同步的手动方案涉及多个步骤,包括设置 Oracle 数据库、配置 Kafka 环境,以及编写同步程序。下面是一个简单的手动方案示例,主要涉及使用 Debezium 开源工具实现 Oracle 数据库到 Kafka 的实时同步。
在这里插入图片描述
步骤一:准备工作

  1. 安装 Oracle 数据库: 确保已安装并正确配置 Oracle 数据库。
  2. 安装 Kafka: 安装 Kafka 并启动 ZooKeeper 服务,作为 Kafka 的依赖。
  3. 安装并配置 Debezium: Debezium 是一个开源的变更数据捕获工具,用于监听数据库变更并将其发送到 Kafka。下载并配置 Debezium Connector for Oracle。(https://debezium.io/)

步骤二:配置 Oracle 数据库

  1. 启用归档日志: 在 Oracle 数据库中,确保归档日志已启用,这是 Debezium 监听变更的必要条件。
sqlCopy code
ALTER SYSTEM SET LOG_ARCHIVE_DEST_1='LOCATION=/archivelog';
ALTER SYSTEM SET LOG_ARCHIVE_FORMAT='arch_%t_%s_%r.arc';
  1. 创建 CDC 用户: 创建一个专用于 Change Data Capture (CDC) 的用户,并授予必要的权限。
sqlCopy code
CREATE USER cdc_user IDENTIFIED BY cdc_password;
GRANT CONNECT, RESOURCE, CREATE VIEW TO cdc_user;
  1. 启用 CDC: 启用 Oracle 的 CDC 特性,并指定 CDC 用户。
sqlCopy code
EXEC DBMS_CDC_PUBLISH.CREATE_CHANGE_SET('MY_CHANGE_SET', 'CDC_USER');
EXEC DBMS_CDC_PUBLISH.ALTER_CHANGE_SET('MY_CHANGE_SET', 'ADD');
EXEC DBMS_CDC_PUBLISH.CREATE_CAPTURE('MY_CAPTURE', 'CDC_USER');
EXEC DBMS_CDC_PUBLISH.ALTER_CAPTURE('MY_CAPTURE', 'ADD');
EXEC DBMS_CDC_PUBLISH.CREATE_CHANGE_TABLE('MY_CHANGE_TABLE', 'CDC_USER', 'MY_CAPTURE', 'MY_CHANGE_SET');

步骤三:配置 Debezium 连接器

  1. 配置 Debezium Connector: 创建一个 JSON 配置文件,指定 Oracle 连接信息、监控的表等信息。
jsonCopy code
{"name": "oracle-connector", // 服务注册连接器时分配给连接器的名称。
"config": {"connector.class": "io.debezium.connector.oracle.OracleConnector", // Oracle连接器类的名称"database.server.name": "my-oracle-server", //为连接器捕获更改的 Oracle 数据库服务器标识并提供命名空间的逻辑名称"database.hostname": "your-oracle-host", //oracle实例地址"database.port": "your-oracle-port", //oracle数据库端口"database.user": "cdc_user", //oracle数据库用户"database.password":"cdc_password", //oracle数据库密码"database.dbname":"your-oracle-database",//要从中捕获更改的数据库的名称"database.out.server.name":"oracle-server", // kafka主题"table.include.list": "CDC_USER.MY_TABLE",//orcle中表进行数据监测输出数据"schema.history.internal.kafka.bootstrap.servers": "192.3.65.195:9092",//此连接器用于将 DDL 语句写入和恢复到数据库历史主题的 Kafka 代理列表"schema.history.internal.kafka.topic": "schema-changes.inventory" // 连接器写入和恢复 DDL 语句的数据库历史主题的名称}}
  1. 启动 Debezium Connector: 使用 Kafka Connect 启动 Debezium Connector。
bashCopy code
bin/connect-standalone.sh config/worker.properties config/debezium-connector-oracle.properties

步骤四:验证同步

  1. 插入数据: 在 Oracle 数据库中插入一些数据。
sqlCopy code
INSERT INTO CDC_USER.MY_TABLE (ID, NAME) VALUES (1, 'John Doe');
  1. 检查 Kafka 主题: 检查 Kafka 中是否有与表变更相关的消息。
bashCopy code
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-oracle-server.CDC_USER.MY_TABLE --from-beginning

此处应该能够看到与插入操作相关的 JSON 消息。

以上只是一个简单的示例,实际情况可能会更加复杂,具体的配置和操作可能取决于 Oracle 和 Debezium 版本的不同。在生产环境中,请务必遵循相关安全和最佳实践。

经典方案:配合 Oracle 官方工具 OGG

使用 Oracle GoldenGate (OGG) 实现 Oracle 到 Kafka 数据实时同步的方案同样包含多个步骤,以下是一个基本示例,以 OGG Classic Replicat 为例。请注意,具体配置仍然会因 Oracle GoldenGate 版本的不同而有所变化。
在这里插入图片描述
步骤一:准备工作

  1. 安装 Oracle GoldenGate: 安装并配置 Oracle GoldenGate 软件。
  2. 安装 Kafka: 安装 Kafka 并启动 ZooKeeper 服务,作为 Kafka 的依赖。

步骤二:配置 Oracle 数据库

  1. 启用归档日志: 确保 Oracle 数据库的归档日志已启用。
sqlCopy code
ALTER SYSTEM SET LOG_ARCHIVE_DEST_1='LOCATION=/archivelog';
ALTER SYSTEM SET LOG_ARCHIVE_FORMAT='arch_%t_%s_%r.arc';

步骤三:配置 OGG Extract 和 Pump

  1. 创建 OGG Extract: 配置 OGG Extract 用于捕获变更数据。
bashCopy code
cd $OGG_HOME
./ggsci
GGSCI> ADD EXTRACT ext1, TRANLOG, BEGIN NOW
GGSCI> ADD EXTTRAIL /trail/et, EXTRACT ext1
GGSCI> ADD EXTRACT dpump, EXTTRAILSOURCE /trail/et
GGSCI> ADD RMTTRAIL /trail/rt, EXTRACT dpump
  1. 配置 OGG Pump: 配置 OGG Pump 用于将捕获的变更数据传输到 Kafka。
bashCopy code
GGSCI> ADD EXTRACT pump1, EXTTRAILSOURCE /trail/rt, BEGIN NOW
GGSCI> ADD RMTTRAIL /trail/pt, EXTRACT pump1
GGSCI> ADD REPLICAT rep1, EXTTRAIL /trail/pt, SPECIALRUN

步骤四:配置 OGG Replicat 和 Kafka

  1. 编辑 OGG Replicat 参数文件: 编辑 Replicat 参数文件,配置连接信息和目标 Kafka 主题。
plaintextCopy code
REPLICAT rep1
USERID ogguser, PASSWORD oggpassword
ASSUMETARGETDEFS
MAP source_table, TARGET kafka_topic, COLMAP (...)
  1. 启动 OGG Replicat: 启动 Replicat 进程。
bashCopy code
./ggsci
GGSCI> START REPLICAT rep1

步骤五:验证同步

  1. 插入数据: 在 Oracle 数据库中插入一些数据。
sqlCopy code
INSERT INTO source_table (ID, NAME) VALUES (1, 'John Doe');
  1. 检查 Kafka 主题: 检查 Kafka 中是否有与表变更相关的消息。
bashCopy code
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_topic --from-beginning

新一代数据实时平台工具:低成本、更简单

以 Tapdata 为例,作为中国的 “Fivetran/Airbyte”, Tapdata 是一个以低延迟数据移动为核心优势构建的现代数据平台,内置 100+ 数据连接器,拥有稳定的实时采集和传输能力、秒级响应的数据实时计算能力、稳定易用的数据实时服务能力,以及低代码可视化操作等。典型用例包括数据库到数据库的复制、将数据引入数据仓库或数据湖,以及通用 ETL 处理等。

Tapdata 是一个专注于实时数据同步的工具,拥有强大且稳定的数据管道能力,可以用来替换类似于 OGG/DSG 这样的同步工具,将数据从 Oracle 、MySQL 这样的数据库同步到同构或者异构类型的数据目标。
在这里插入图片描述
以下是详细的操作教程(演示版本为 Tapdata Cloud):

步骤一:Tapdata 安装与部署

  1. 注册并登录 Tapdata Cloud

  2. 安装并部署 Tapdata: 访问 Tapdata 官方网站,获取操作指引,完成 Tapdata Agent 的安装与部署。

注册 Tapdata Cloud,即刻开启您的实时数据之旅
申请试用 Tapdata 本地部署版本

步骤二:配置数据源和目标

  1. 新建 Oracle 数据源:进入 Tapdata Cloud 连接管理页面,创建数据源 Oracle 的连接并测试通过。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
  2. 新建数据目标 Kafka:重复上述操作,在数据源列表中找到 Kafka,参考连接配置帮助创建 Kafka 为数据目标的连接并测试通过:
    在这里插入图片描述
    步骤三:配置 Kafka

1.创建 Kafka Topic: 在 Kafka 中创建一个 Topic,用于接收从 Oracle 同步过来的数据。

bashCopy code
bin/kafka-topics.sh --create --topic my_oracle_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

步骤四:开启同步任务

  1. 新建数据同步任务:通过拖拉拽的方式,在 Tapdata 的可视化操作界面中,连接数据源和目标,快速创建 Oracle - Kafka 的数据同步任务
  2. 启动同步任务: 点击源节点与目标节点,分别选择待同步表和目标表后即可启动任务,Tapdata 将开始捕获 Oracle 数据库的数据及变更,并将其发送到 Kafka。

步骤五:验证同步

  1. 插入数据: 在 Oracle 数据库中插入一些数据。
sqlCopy code
INSERT INTO my_table (id, name) VALUES (1, 'John Doe');
  1. 检查 Kafka Topic: 使用 Kafka 命令行工具检查同步的数据是否已经到达 Kafka Topic。
bashCopy code
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_oracle_topic --from-beginning

如何选:综合对比,选择与自身需求更加匹配的方案

面对如此多的数据同步方案,在做调研时往往涉及多方考虑。综合来看,各个类型的方案各有优劣:

① 手动配置方案
优势:

  1. 定制性: 完全自定义的配置允许满足特定业务场景的要求。
  2. 无额外成本: 不需要额外的软件许可费用。

劣势:

  1. 复杂性: 需要手动处理所有步骤,可能会增加配置和管理的复杂性
  2. 维护难度: 对于复杂的同步需求,手动配置可能导致维护难度增加。
  3. 时间成本: 手动配置需要更多的时间和技术经验。

② 官方工具 OGG
优势:

  1. 成熟稳定: OGG 是 Oracle 提供的官方工具,经过多年的发展和改进,具有稳定性和成熟性。
  2. 可视化管理:OGG 提供了管理界面,简化了配置和监控过程。

劣势:

  1. 资金成本: 付费工具且价格较高,需要一定的成本投入。
  2. .学习成本: 具有一定的学习门槛,尤其是对于初学者而言。

③ Tapdata 方案
优势:

  1. 简化配置: Tapdata 提供了简化的配置界面,降低了配置复杂性,无论是操作还是维护都更加简单
  2. 实时监控: 提供实时监控和报警功能,方便管理和维护。
  3. 低延迟: Tapdata 强调低延迟,根据任务的 tps 对任务进行拆分,适用于对实时性要求较高的场景。
  4. 云原生:支持云版本,对云上生态融合更友好

劣势:

  1. 资金成本:Tapdata 本地部署版本,以及 Tapdata Cloud 到达一定链路数时需要支付额外费用。
  2. 资源占用:需要占用少量的数据库资源进行日志解析

考虑因素:

  • 根据需求选择: 根据具体需求和团队技能,选择适合的方案。手动配置适合对配置有深入理解的团队,OGG适合对稳定性和功能有更高要求的场景,而 Tapdata 则适用于希望快速配置和低延迟的场景。
  • 成本和效率权衡: 考虑购买费用、学习曲线和配置效率之间的权衡。
  • 生态整合: 考虑工具的生态整合,特别是与已有系统和工具的集成。

总体而言,将 Oracle 数据实时同步到 Kafka 为企业提供了更灵活、高效、实时的 数据处理和分析能力,有助于构建现代化的数据架构,适应迅速变化的业务环境。通过选择适合自身业务需求的同步方案,如 Debezium、OGG、Tapdata,并合理配置优化,企业可以更好地满足实时数据处理的需求,提升业务的竞争力和应变能力。在这其中,Tapdata 以其低延迟、易用性、可扩展性和实时监控等特点,为企业实现 Oracle 到 Kafka 的实时同步提供了可靠的解决方案。

产品优势:

  • 开箱即用与低代码可视化操作
  • 内置 100+ 数据连接器,稳定的实时采集和传输能力
  • 秒级响应的数据实时计算能力
  • 稳定易用的数据实时服务能力

【相关阅读】

  • Tapdata Connector 实用指南:云原生数仓场景之数据实时同步到 Databend
  • Tapdata Connector 实用指南:如何将 CRM 数据从 Salesforce 实时同步到 MongoDB 等其他库
  • Tapdata Connector 实用指南:实时数仓场景之数据实时同步至 ClickHouse
  • Tapdata Connector 实用指南:数据入仓场景之数据实时同步到 BigQuery

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/759538.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数学建模(熵权法 python代码 例子)

目录 介绍: 模板: 例子:择偶 极小型指标转化为极大型(正向化): 中间型指标转为极大型(正向化): 区间型指标转为极大型(正向化)&#xff1a…

力扣每日练习(3.18)补

200. 岛屿数量 岛屿是指上下左右都是被0包起来的。使用递归的方式,也就是深度优先搜索,需要确定终止条件,也就是badcase是什么情况出现的。 二叉树是递到叶子节点的时候,因为下面是空子树了;矩阵就是越界,…

基于BusyBox的imx6ull移植sqlite3到ARM板子上

1.官网下载源码 https://www.sqlite.org/download.html 下载源码解压到本地的linux环境下 2.解压并创建install文件夹 3.使用命令行配置 在解压的文件夹下打开终端,然后输入以下内容,其中arm-linux-gnueabihf是自己的交叉编译器【自己替换】 ./config…

PyTorch 深度学习(GPT 重译)(三)

六、使用神经网络拟合数据 本章内容包括 与线性模型相比,非线性激活函数是关键区别 使用 PyTorch 的nn模块 使用神经网络解决线性拟合问题 到目前为止,我们已经仔细研究了线性模型如何学习以及如何在 PyTorch 中实现这一点。我们专注于一个非常简单…

获取蓝牙Download_Linkey日志方法

::获取root权限 del bt_config.conf :retry adb root if %errorlevel% neq 0 ( echo adb root failed. Retrying... goto retry ) echo Congratulations To Adb Root For His Success.... :adb_pull adb pull /data/misc/bluedroid/bt_config.conf if %errorlevel% neq…

拷贝他人maven仓库jar包到自己本地仓库,加载maven依然提示无法下载对应依赖

所遇问题: 拷贝他人maven仓库jar包到自己本地maven仓库repository下的对应依赖位置,重新加载idea的maven依然提示无法下载对应依赖。 解决办法: 在maven->repository找到对应报错依赖路径,删除xxx.repositories 和 xxx.lastU…

websocket 中 request-line 中的URI编码问题

首先,request-line组成如下: Request-Line Method SP Request-URI SP HTTP-Version CRLF 在 rfc6455 规范的 5.1.2 Request-URI 中,有这样的描述: The Request-URI is transmitted in the format specified in section 3.2.1. …

【视频图像取证篇】模糊图像增强技术之去噪声类滤波场景应用小结

【视频图像取证篇】模糊图像增强技术之去噪声类滤波场景应用小结 模糊图像增强技术之去噪声类滤波场景应用小结—【蘇小沐】 文章目录 【视频图像取证篇】模糊图像增强技术之去噪声类滤波场景应用小结(一)去噪声类滤波器1、去块滤波器(Deblo…

不同“chatGPT”比较

通过两个问题比较不同版本的 生成式 AI 国内免费: 【通义千问】https://tongyi.aliyun.com/qianwen 【文心一言】https://yiyan.baidu.com 【豆包】https://www.doubao.com/chat 【360 智脑】https://chat.360.com/chat 归属主体: 【阿里-通义千问-免费-国内可访…

32.768K晶振X1A000141000300适用于无人驾驶汽车电子设备

科技的发展带动电子元器件的发展电子元器件-“晶振”为现代的科技带来了巨大的贡献,用小小的身体发挥着大大的能量。 近两年无人驾驶汽车热度很高,不少汽车巨头都已入局。但这项技术的难度不小,相信在未来几年里,无人驾驶汽车这项…

webpack从零开始搭建vue项目

webpack一步一步搭建vue项目 前提:node、git(可选)已安装。node我使用的版本是 16.13.1。本文基本上都是基础配置,更多特殊配置请看其他博客。 本项目包含内容: webapck vue sass postcss babel eslint typescript 项目源码地址&#xff1…

软考高级:软件架构风格-独立构件风格概念和例题

作者:明明如月学长, CSDN 博客专家,大厂高级 Java 工程师,《性能优化方法论》作者、《解锁大厂思维:剖析《阿里巴巴Java开发手册》》、《再学经典:《Effective Java》独家解析》专栏作者。 热门文章推荐&am…

MySQL 索引的分类和优化

​ 优质博文:IT-BLOG-CN 索引是什么 : MySQL 官方对索引的定义:索引(Index)是帮助 MySQL 高效获取数据的数据结构。可以得到索引的本质:索引是数据结构。索引的目的在于提高查询效率。可以简单理解为&#…

力扣爆刷第101天之hot100五连刷91-95

力扣爆刷第101天之hot100五连刷91-95 文章目录 力扣爆刷第101天之hot100五连刷91-95一、62. 不同路径二、64. 最小路径和三、5. 最长回文子串四、1143. 最长公共子序列五、72. 编辑距离 一、62. 不同路径 题目链接:https://leetcode.cn/problems/unique-paths/desc…

实现防抖函数并支持第一次立刻执行(vue3 + ts环境演示)

1、先看一效果: 2、实现思路: 使用定时器setTimeout和闭包实现常规防抖功能;增加immediate字段控制第一次是否执行一次函数(true or false);增加一个flag标识,在第一次执行时,将标…

【go从入门到精通】for循环控制

前言 Go 语言提供了 for 循环语句,用于重复执行一段程序逻辑,直到循环条件不再满足时终止。 循环可以用于迭代各种数据结构(例如切片、数组、映射或字符串)中的元素 本文将很基础的for循环语法,循环嵌套&#…

二、阅读器的开发(初始)-- 1、阅读器简介及开发准备工作

1、阅读器工作原理及开发流程 1.1阅读器工作原理简介 电子书(有txt、pdf、epub、mobi等格式)->解析(书名、作者、目录、封面、章节等)->(通过阅读器引擎)渲染 -> 功能(字号、背景色、…

PHP页面如何实现设置独立访问密码

PHP网页如果需要查看信息必须输入密码,验证后才可显示出内容的代码如何实现? 对某些php页面设置单独的访问密码,如果密码不正确则无法查看内容,相当于对页面进行了一个加密。 如何实现这个效果,详细教程可以参考:PHP页面如何实现…

谁将主导未来AI市场?Claude3、Gemini、Sora与GPT-4的技术比拼

【最新增加Claude3、Gemini、Sora、GPTs讲解及AI领域中的集中大模型的最新技术】 2023年随着OpenAI开发者大会的召开,最重磅更新当属GPTs,多模态API,未来自定义专属的GPT。微软创始人比尔盖茨称ChatGPT的出现有着重大历史意义,不亚…

Milvus笔记

一、创建操作 1、python版本 from pymilvus import Collection, FieldSchema, DataType, CollectionSchema, connections from pymilvus.orm import utility, dbfrom knowledge_brain.milvus_sink import milvus_sink from study.connect import Connectclass MilvusOperatC:…