Streamsets-JDBC模式使用更新时间字段数据同步

StreamSets的开源地址:https://github.com/streamsets/datacollector-oss
Streamsets官网地址:https://streamsets.com/
Streamsets文档地址:https://docs.streamsets.com/portal/datacollector/3.16.x/help/index.html

我又来写Streamsets了,各种原因好久没研究Cassandra了。
本次分享主要介绍Streamsets的JDBC模式、为什么使用时间字段同步数据、遇到的问题和解决方案。
解决方案并不是最完美但也是基于当前条件下最优解,如有疑问,欢迎热烈讨论。
提供的脚本毫无保留,可直接使用。

Streamsets在3.22.2之后就闭源了,更高阶的特性已包装为平台产品。

结合周边讨论和网上的资料来看,Streamsets的活跃度不高,在网上搜的资料太少啦,又随着项目闭源,活跃度更低了,归其原因我分析Streamsets是一个大而全的数据同步工具,整合了市面上基本所有的数据源,但是每个公司不可能用到里面所有的数据源,真正能用到大部分数据源的公司,规模肯定大到不会依赖这种外部的工具,自己手写同步的自由度和效率要更好。

Streamsets对于我们的优势在于开箱即用,相比于手搓代码来实现业务细节,Streamsets将数据同步的每个阶段独立开来,将业务变动最大的数据清洗部分以处理器的形式开放出来,数据的转换和转换的实时配置并生效,直观的监控指标。

版本为Streamsets的3.16.0的离线版本,部署到内网时的最新版本为3.16.0,所以方案和问题的解决方案均以3.16.0为基础。

JDBC模式介绍:

JDBC模式的增量模式只支持新增的数据和不需要修改的数据,且官方建议的offsetColumn为PrimaryKey,如:ID。

Incremental mode
When the JDBC Query Consumer performs an incremental query, it uses the initial offset as the offset value in the first SQL query. As the origin completes processing the results of the first query, it saves the last offset value that it processes. Then it waits the specified query interval before performing a subsequent query.
When the origin performs a subsequent query, it returns data based on the last-saved offset. You can reset the origin to use the initial offset value.
Use incremental mode for append-only tables or when you do not need to capture changes to older rows. By default, JDBC Query Consumer uses incremental mode.

SELECT * FROM <table_name> WHERE <primaryKey> > ${OFFSET} ORDER BY <primaryKey>

这样支持的场景为不断的增量数据,无法捕获数据的更新。
但是正常的业务系统一般不存在只新增不更新的场景。
全量同步模式每次加载所有的数据,当表的数据量较大时,同步所需的时间和延迟不能接受。

修改为通过update_time来捕获数据变化:

SELECT * FROM user WHERE update_time > ${OFFSET} ORDER BY update_time

在配置管道时将OffsetColumn指定为update_time,业务系统使用mybatis-plus在数据新增和更新时补充创建时间和更新时间。数据库的时间精度为秒。

使用update_time的好处是对于开发者和运维人员可读性更好,在进行历史数据的同步和数据对接时更方便。
该方案看似非常合理,业务侧只要控制好update_time的逻辑,每次数据变化时update_time是不断滚动向前的,滚动查询不断的进行数据同步。
但是too young too simple。
按照Streamsets的处理逻辑,在两种场景下会丢数据。
分别是当单次同步的数据量超过maxBatchSize时,概率性丢数据和并发写入数据库时概率性丢数据。
这两种丢数据的场景是不可控的,时间不可控,完全看运气。但是不确定往往是最可怕的。

为什么会丢数据?

第一种场景:单次同步的数据量超过maxBatchSize
Offset的更新逻辑和jdbc-protolib源码中的逻辑:
origin会当根据sql查询的数据读取不超过配置的maxBatchSize的数量,并将最新的update_time赋值给offset。

// com.streamsets.pipeline.stage.origin.jdbc.JdbcSource.java
public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) {// ...try (Connection connection = dataSource.getConnection()) {if (null == resultSet || resultSet.isClosed()) {// 执行查询sql语句resultSet = statement.executeQuery(preparedQuery);}// 超过maxBatchSize的数据不发送到下一阶段,留到下次操作时处理。while (continueReading(rowCount, batchSize) && (haveNext = resultSet.next())) {final Record record = processRow(resultSet, rowCount);if (null != record) {// 记录下数据batchMaker.addRecord(record);}// 更新offsetif (isIncrementalMode) {nextSourceOffset = resultSet.getString(offsetColumn);} else {nextSourceOffset = initialOffset;}// 后续收尾工作}}return nextSourceOffset;}

结合Streamsets的Offset的更新逻辑和jdbc-protolib源码中的逻辑,当一秒内出现多条数据时,会因为精度问题导致数据丢失。

第二种场景:数据并发写入数据库时。
业务侧代码使用mybatis-plus作为ORM来处理数据的读写,当有大数据量写入数据时,如:Excel导入或高并发的数据写入。
mybatis-plus的内置处理逻辑为分批次提交,每次提交1000,所以单个线程写入的qps为1000。
以Excel导入为例,如果批量保存方法没有加@Transaction注解,会大大增加数据丢失的概率。

原因为结合mybatis的处理+没加@Transaction注解导致1000个insert语句一次性发给数据库,这1000条sql语句是以非事务的方式执行,每条数据都是一个完整的事务,执行完毕自动提交,立即可见。
这时当Streamsets触发查询操作时,时机恰好出现在一秒内的前半段,而一秒内的后半段还在数据写入,导致后半段的数据丢失。
场景分析

解决方案:

如果你拿到的是Streamsets的安装包,那第一种场景无法通过配置和升级的方式解决,因为使用的方式和增量模式的设计初衷不符。
有一份折中方案,但不保熟:
1.能力范围内update_time的精度越细越好,越细会有一定的性能损耗,但丢数据的概率大大降低。
2.评估每次同步的数据量大小,maxBatchSize的大小要大于单次同步的数据量。注意内存大小,小心OOM,(插一句:oracle的批量更新会存在连接泄露,需注意。如果有源码顺手改之。)
可以下载一份Streamsets的源码,改之。
代码如下:

// com.streamsets.pipeline.stage.origin.jdbc.JdbcSource.java
public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) {// ...try (Connection connection = dataSource.getConnection()) {if (null == resultSet || resultSet.isClosed()) {// 执行查询sql语句resultSet = statement.executeQuery(preparedQuery);}while ((haveNext = resultSet.next())) {if(continueReading(rowCount, batchSize)){final Record record = processRow(resultSet, rowCount);if (null != record) {// 记录下数据batchMaker.addRecord(record);}// 更新offsetif (isIncrementalMode) {nextSourceOffset = resultSet.getString(offsetColumn);} else {nextSourceOffset = initialOffset;}} else {// 当超过maxBatchSize时,继续查找最后一秒的数据。if(!nextSourceOffset.equals(initialOffset) && nextSourceOffset.equals(resultSet.getString(offsetColumn))){if(null != record) batchMaker.addRecord(record);}// 后续收尾工作}}return nextSourceOffset;}

第二种场景出现的原因是在同一秒内同时出现写入和查询操作,查询时无法取出应取出的数据。
解决的思路为错峰,通过配置手段将查询动作和写入动作错开。

// oracle
select * from user where update_time < TO_TIMESTAMP('${offset}','yyyy-MM-dd HH24:mi:ss.ff') and update_time < SYSDATE - INTERVAL '1' SECOND order by update_time;
// mysql
select * from user where update_time < '${offset}' and update_time < DATE_SUB(now(), INTERVAL 1 SECOND) order by update_time;
// dm
select * from user where update_time < TO_TIMESTAMP('${offset}','yyyy-MM-dd HH24:mi:ss.ff') and update_time < CURRENT_TIMESTAMP- INTERVAL '1' SECOND order by update_time;
// kingbase
select * from user where update_time < '${offset}' and update_time < current_timestamp - INTERVAL '1' SECOND order by update_time;

需要特别注意:因为数据库中存储的时间有可能为业务服务的时间,要保证数据库和业务服务的时区和时间要保持一致。

通道示意图:

新版Streamsets的布局,我的不长这样。streamsets-jdbc示意图
源:无特殊配置
Jython处理器:根据源传过来的数据查询目标表,对数据进行标记。
流选择器:根据数据的标记分发数据,标记为insert的走新增通道,标记为update的走修改通道。
目标:一个配置为INSERT,另一个配置为UPDATE。
Jython脚本:

import java.sql.DriverManager as DriverManager
import java.lang.Class as Class
import timeurl = "jdbc:mysql://localhost:3306/db?autoReconnect=true&useSSL=false&characterEncoding=utf8"
Class.forName("com.mysql.jdbc.Driver")
username = "root"
password = "passwd"
batch_size = 1000primary_key = "id"
table_name = "t_target"
ids = []
db_ids = set()
records = sdc.records
conn = None
stmt = None
rs = None
if len(records) != 0:try:conn = DriverManager.getConnection(url,username,password)if conn is not None:stmt = conn.createStatement()start_time = time.time()for record in records:id = record.value[primary_key]ids.append(id)num_batches = len(ids) // batch_size + (1 if len(ids) % batch_size != 0 else 0)for i in range(num_batches):start_index = i * batch_sizeend_index = min((i+1) * batch_size,len(ids))batch_ids = ids[start_index,end_index]sql = "select ' + primary_key + ' from " + table_name + " where '+ primary_key +' in ('"for j,id in enumerate(batch_ids):if j != 0:sql += "','"sql += str(id)sql += "')"rs = stmt.executeQuery(sql)while rs.next():id = rs.getString(primary_key)db_ids.add(id)end_time = time.time()sdc.log.info('from '+ table_name + 'query:' + str(len(ids)) + 'rows cost:'+str(end_time - start_time) + 's')for record in records:id = record.value[primary_key]if id in db_ids:record.value['insert_or_update'] = 'update'else:record.value['insert_or_update'] = 'insert'sdc.output.write(record)except Exception as e:raise RuntimeError(e)finally:if rs:rs.close()if stmt:stmt.close()if conn:conn.close()
else:sdc.log.trace('no more data')

结语:

截止到此,也算一套完整的解决方案。拷贝之后可直接食用。
后面有时间会分享一些定位时发现的问题和小技巧。

  • 国产化数据库达梦和人大金仓的适配。
  • 国产化服务器加密环境的打包和部署方案。
  • 为Streamsets减负,轻量化安装包。
  • JDBC模式的性能优化小技巧。
  • 穿插一些Streamsets组件的实现原理。
  • Streamsets CDC模式的配置。
  • 手写一份Streamsets的Stage,用以支撑国产化的需求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/20368.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LangChain实战技巧之四:当模型(Model)不支持Tool/Function的解决办法

文心大模型两大主力模型已全面免费&#xff0c;可参考我之前发的文章 AI菜鸟向前飞 — 今日三则AI相关新闻 但是&#xff0c;这些模型原生并不支持Tool/Function Call 如下所示&#xff1a; tool def greeting(name: str):向朋友致欢迎语return f"你好啊, {name}"…

基于LabVIEW虚拟示波器设计

随着计算机技术的发展&#xff0c;传统仪器开始向计算机化的方向发展。虚拟仪器是90年代提出的新概念。虚拟仪器技术的提出与发展&#xff0c;标志着二十一世纪自动测试与电子测量仪器领域技术发展的一个重要方向。所谓虚拟仪器&#xff0c;就是在通用的计算机平台上定义和设计…

TDR原理的介绍

目录 简介 简单定义 TDR测试原理 简介 时域和频域就像孪生兄弟一样&#xff0c;经常在测试测量领域同时出现&#xff0c;可谓是工程师们分析问题和解决问题的两大法宝。所以&#xff0c;在某些测试场景中&#xff0c;如果有时域信息的护法&#xff0c;咱们就能从时频域两个维…

【普通切换】【DC-based handover】【DAPS】协议栈分析

移动网络切换 移动通信中切换是保证终端业务的基本流程&#xff0c;而切换时延是终端(UE)不能与任何基站交互(传递)用户面数据包的最短时间。 在5G(NR)网络中当终端(UE)接收到切换命令时&#xff0c;将断开与源小区的连接向目标小区发起随机接入过程。在此期间终端(UE)的数据传…

牛客ONT45 距离是K的二叉树节点【中等 宽度优先遍历 Java/Go/PHP/C++】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/e280b9b5aabd42c9b36831e522485622 思路 图&#xff0c;队列 构件图&#xff0c;直接从target出发&#xff0c;扩展到第k层就是答案Java代码 import java.util.*;/** public class TreeNode {* int val 0;* …

架构设计之安全性属性深度剖析:从理论到实践的完美融合

文章目录 引言一、安全性属性的理论探讨1.1 定义说明1.2 安全原则1.3 安全模型1.4 安全机制 二、安全性属性的实践应用2.1 安全风险评估2.2 架构设计中的安全考虑2.3 技术手段和工具2.4 团队协作与沟通2.5 安全政策和流程2.6 合规性和标准2.7 持续监控和改进 三、理论与实践的融…

Python函数进阶

文章目录 1 函数多返回值2 函数多种传参方式2.1 位置参数2.2 关键字参数2.3 缺省参数2.4 不定长参数 3 匿名函数函数作为参数传递lambda匿名函数 1 函数多返回值 def test_return():return 1,2,3 x,y,z test_return() print(x) print(y) print(z)2 函数多种传参方式 2.1 位置参…

0基础认识C语言(理论+实操3)

所有籍籍无名的日子里 我从未看轻自己半分 小伙伴们&#xff0c;一起开始我们今天的话题吧 一、算法操作符 1.双目操作符 为何叫双目操作符呢&#xff1f;其实是因为我们进行加减乘除的时候&#xff0c;至少得需要两个数字进行这些运算&#xff0c;而这个数字就被称为操作数…

基于单片机的微型嵌入式温度测量仪的设计与实现分析

摘要 &#xff1a; 作为信息技术中重要的技术手段之一嵌入式单片机系统已经被应用到越来越多不同的行业领域中。如&#xff0c;各种手持监测设备、智能家电设备等。当前展开对单片机的微型嵌入式温度测量仪的设计和实现研究&#xff0c;从微型嵌入式单片机相关理论入手&#xf…

【实战教程】构建可复用的 Spring Boot starter 微服务组件

案例 Demo&#xff1a;https://gitee.com/regexpei/coding-trainee/tree/demo/20240526_starter 介绍 在 Spring Boot 中&#xff0c;starter 启动依赖就像一个“开箱即用”的工具箱&#xff0c;它包含了第三方组件的配置和依赖&#xff0c;让我们无需手动配置和添加这些组件。…

【多目标跟踪】《FlowMOT: 3D Multi-Object Tracking by Scene Flow Association》论文阅读笔记

0.论文 论文地址链接:https://arxiv.org/pdf/2012.07541v1 通过流的方式跟踪是一个比较新颖的点,所以这里比较关注运动跟踪,是如果做到流的跟踪来预测目标的位置以及ID绑定的。 FlowMOT的框架结构如下所示,本中会主要关注下运动跟踪、数据关联、ID分配、新生/消亡…

python替换占位符为变量,实现读取配置文件

文章目录 背景1、定义正则表达式2、替换变量占位符3、实现功能 背景 使用python编写小工具&#xff0c;有一个配置文件&#xff0c;希望实现类似shell命令的&#xff0c;定义变量并且使用${}或者$来引用。如果有好的建议欢迎讨论。 配置文件示例内容如下: D:\project\test\pr…

Arrays(操作数组工具类)、Lambda表达式

package exercise;import java.util.Arrays;public class ArraysDemo {public static void main(String[] args) {int[] arr {1, 2, 3, 4, 5};//将数组变成字符串System.out.println(Arrays.toString(arr));//二分查找法查找元素//细节1&#xff1a;1.数组必须是有序的 2.元素…

OpenCASCADE入门(2)——openCasCade7.6.0版本的exe方式安装,vs2017环境配置,编译和使用draw

3rd party Components | Open CASCADE Technology 目录 引出安装好vs2017和occt7.6设置环境变量 启动occt和编译关于custom.bat批处理文件双击运行 打开draw使用方式一&#xff1a;双击draw.bat批处理vs设置启动项 总结其他自定义信号和槽1.自定义信号2.自定义槽3.建立连接4.进…

路由器设置——隐藏SSID(隐藏WiFi名称)

参考来源&#xff1a; https://www.192ly.com/qiu-zhu/33315.htmlhttps://www.yunqishi.net/video/109743.html 一、什么是SSID? SSID是Service Set ldentifier的缩写&#xff0c;意思是服务集标识&#xff0c;简单来说SSID就是wifi的名字。 二、怎么隐藏SSID 将 开启SSI…

为什么要学习数据结构和算法

前言 控制专业转码学习记录&#xff0c;本科没学过这门课&#xff0c;但是要从事软件行业通过相关面试笔试基础还是要打牢固的&#xff0c;所以通过写博客记录一下。 必要性 1.越是厉害的公司&#xff0c;越是注重考察数据结构与算法这类基础知识 2.作为业务开发&#xff0c…

MMPose-RTMO推理详解及部署实现(上)

目录 前言1. 概述1.1 MMPopse1.2 MMDeploy1.3 RTMO 2. 环境配置3. Demo测试4. ONNX导出初探5. ONNX导出代码浅析6. 剔除NMS7. 输出合并8. LayerNormalization算子导出9. 动态batch的实现10. 导出修改总结11. 拓展-MMPose中导出ONNX结语下载链接参考 前言 最近在 MMPose 上看到了…

【NOIP2018普及组复赛】题2:龙虎斗

题2&#xff1a;龙虎斗 【题目描述】 轩轩和凯凯正在玩一款叫《龙虎斗》的游戏&#xff0c;游戏的棋盘是一条线段&#xff0c;线段上有 n n n 个兵营&#xff08;自左至右编号 1 ∼ n 1∼n 1∼n&#xff09;&#xff0c;相邻编号的兵营之间相隔 1 1 1 厘米&#xff0c;即棋…

【Python从入门到进阶】56、Mysql防止SQL注入及ORM库简化操作

接上篇《55、使用Python轻松操作Mysql数据库》 上一篇我们讲解了Mysql的基本链接和增删改查&#xff0c;本篇我们来介绍链接Mysql时参数化查询与防止SQL注入以及使用ORM&#xff08;对象关系映射&#xff09;库简化操作的内容。 一、参数化查询与防止SQL注入 在数据库操作中&…

雄鹰只属于天空

雄鹰只属于天空 成大事者&#xff0c;必有人生至暗时刻之经历&#xff0c;高处坠落折戟沉沙&#xff0c;在孤立无援时&#xff0c;世态炎凉人情冷落&#xff0c;饱经苦楚滋味&#xff0c;所有的热情关系一夜冰封&#xff0c;冷嘲热讽袖手旁观&#xff0c;落井下石四面楚歌&…