Streamsets-JDBC模式offset变化逻辑和如何向下传递offset

Streamsets的版本为3.16.0 离线版

offset在jdbc模式中起到非常关键的作用,是滚动查询的基础,offset的准确直接影响数据同步的质量。
本文主要分享一下JDBC Query Consumer中的offset,包括变化逻辑、存储方式、处理器如何获取到最新的offset。
源:JDBC Query Consumer
处理器:Jython Evaluator

变化逻辑

管道的启动方式有三种,启动管道、重置源并启动、带参数启动。
启动管道时,会根据当前的管道ID读取offset,如果为首次启动,那以JDBC标签中设置的initial offset作为offset来进行数据同步。通过过程中会不停的将配置的offsetColumn的值赋给offset,不一定是最大的值,但一定是最后取到的值。取决于Sql Query中的排序规则。
重置源并启动时,字面意思,强制使用initial offset来同步数据。
带参数启动时,对管道添加外置参数,还没需求,没实际用过。

存储方式:

默认情况下,Streamsets使用文件对管道的offset进行持久化,地址在Streamsets的数据目录下,系统路径中找到数据路径。
数据存在的格式为:/{dataDir}/runInfo/{pipelineId}/{version:0}/offset.json
注意:源、处理器和目标不是完全独立的,目标没有结束不会更新offset。
这部分还没研究,猜测:目标执行结束后触发offset的持久化。

处理器如何获取到最新的offset。

在官方文档和网上搜索了很久,无果。offset无法直接获取。
处理器为Jython,既然使用了Jython,肯定想做更多Streamsets做不到的事情,如:标记一下数据、处理一下格式、打印一下日志等等。
但是内置的Jython模块中封装的方法较少,在看源码之前以为是开放程度很高,仔细研究了一下发现支持的方法是固定的几个,全写在注释里面了。
Jython实现方式:

    import java.sql.DriverManager as DriverManagerimport java.lang.Class as Classimport timeimport jsonurl = "jdbc:mysql://localhost:3306/db?autoReconnect=true&useSSL=false&characterEncoding=utf8"Class.forName("com.mysql.jdbc.Driver")username = "root"password = "passwd"records = sdc.recordsconn = Nonestmt = Noners = Noneif len(records) != 0:try:conn = DriverManager.getConnection(url,username,password)if conn is not None:stmt = conn.createStatement()sourceId = records[0].sourceIdstart_time = time.time()# 源码修改之后 修改之前请用split切割后获取offsetoriginDict = json.loads(sourceId)sql = originDict['preparedQuery']if sql:rs = smt.executeQuery(sql)rs.last()origin_count = rs.getRow()end_time = time.time()if len(records) != origin_count:sdc.log.info("valid error")# do somethingelse:for record in records:sdc.output.write(record)except Exception as e:raise RuntimeError(e)finally:if rs:rs.close()if stmt:stmt.close()if conn:conn.close()else:sdc.log.trace('no more data')

soureId在origin阶段中的声明,存放在每条record的header中,格式为:{sqlQuery[0-100]}::rowCount:{rowCount}:{offset}。
此格式是固定的,在不修改源码的情况下,可以稍微花点时间使用分隔符将offset给分离出来。

final String recordContext = StringUtils.substring(query.replaceAll("[\n\r]", ""), 0, 100) + "::rowCount:" + rowCount + (StringUtils.isEmpty(offsetColumn) ? "" : ":" + resultSet.getString(offsetColumn));

但是不满足需求,解决更新时间作为同步字段丢失数据的问题,为了将查询和写入错峰,同时验证数据的准确性,在处理器中再次查询origin中的sql,对比传递过来的数量和再次查询的数量是否一致。但是sourceId中的sql只有100个字符,有风险。改之。

	// final String recordContext = StringUtils.substring(query.replaceAll("[\n\r]", ""), 0, 100) + "::rowCount:" + rowCount + (StringUtils.isEmpty(offsetColumn) ? "" : ":" + resultSet.getString(offsetColumn));// maven 引入 fastjson2JSONObject jsonObject = new JSONObject();jsonObject.put("preparedQuery", preparedQuery.replaceAll("[\n\r]", ""));jsonObject.put("rowCount", rowCount);if (StringUtils.isNotEmpty(offsetColumn)) {jsonObject.put("offsetColumn", resultSet.getString(offsetColumn));}final String recordContext = jsonObject.toString();

在源的事件中获取offset:

在源的配置中勾选“制造事件”,添加处理器和目标。
sdc发送给下一阶段的数据和发送给处理事件的处理器的协议是一样的,但是数据内容不同,即同一个方法在不同的阶段返回的数据内容不同。
发送给事件处理器的数据中有offset、query、rows和timestamp。
默认发向事件处理器的事件有两种,分别为jdbc-query-success和jdbc-query-failure。

records = sdc.recordsif len(records) != 0:try:offset = records[0].value['offset']query= records[0].value['query']rows= records[0].value['rows']for record in records:sdc.output.write(record)except Exception as e:raise RuntimeError(e)else:sdc.log.trace('no more data')

通道示意图:
在这里插入图片描述

结语:

Streamsets在网上的资料实在是太少了,官方文档也只是讲基本的使用,开发过程中要大胆猜测。
希望这篇文章对你有帮助,我的实现方案也未必完美,欢迎新的思路和idea,此致敬礼。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/847201.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

coze扣子自定义插件使用方式1

1,模型中的工具描述 2,大模型调用,触发接口:

Outlook 邮箱使用技巧

良好时间管理的基本原则 Outlook 是帮你管理电子邮件、日历、联系人和任务的工具。 因此,不论是在沟通还是在时间管理中,它都占据着中心位置。 为充分利用 Outlook,我们提出了一些基本原则: 减少阅读邮件的位置。 如果你使用的是…

机器学习的热门领域及应用趋势

机器学习的热门领域及应用趋势 近年来,机器学习(Machine Learning, ML)已经成为科技领域的热门话题,其在各个行业的应用越来越广泛和深入。本文将详细介绍当前机器学习的几个热门领域,以及人们在这些领域中使用的机器…

文件同步软件,PanguFlow局域网横着走

说到文件同步,它对企业来说及其重要,因为有了文件同步,这个文件数据它才能备份起来,才能用来抵抗自然灾害,比如服务器硬盘损坏了,你要是不备份,这损失可就大了,一款合适的文件同步软…

第二讲笔记:隐私计算助力数据要素流通

1、数据要素流转与数据 2、数据外循环中的信任 焦虑 信任焦虑背后的代表性案例 内鬼门 : 2023 年 , 美国科技公司 Ubiquiti在2021年1月曝出数据泄露事 件, “攻击者”在随后的“谈判”中试 图向该企业勒索近200万美元(50比特 币&…

javacv ffmpeg使用笔记 (补充中...)

javacv ffmpeg使用笔记 一、maven依赖二、示例代码1. 获取视频时长 三、小技巧 一、maven依赖 使用javacv ffmpeg并指定classifier之后,就不需要额外安装ffmpeg软件(jar包中已经内置)了。 全量依赖包(不推荐)安装包总大…

PCIe的链路状态

目录 概述 链路训练的目的 两个概念 下面介绍LTSSM状态机 概述 PCie链路的初始化过程较为复杂,Pcie总线进行链路训练时,将初始化Pcie设备的物理层,发送接收模块和相关的链路状态信息,当链路训练成功结束后,PCIe链…

数据库 mysql 的彻底卸载

MySQL卸载步骤如下: (1)按 winr 快捷键,在弹出的窗口输入 services.msc,打开服务列表。 (2)在服务列表中, 找到 mysql 开头的所有服务, 右键停止,终止对应的…

3D摄影棚布光软件:Set A Light 3D for Mac 永久试用版

Set A Light 3D 是一款专业的灯光设计软件,可以帮助用户轻松创建逼真的灯光效果和场景。它提供了丰富的灯光模型和材质库,用户可以根据需要自由调整灯光的颜色、亮度和方向,实时预览效果。同时还支持灯光的投射、反射和阴影等高级特效&#x…

【Redis】 Java操作客户端命令——列表操作与哈希操作

文章目录 🍃前言🌴列表操作🚩lpush 和 lpop🚩rpush 和 rpop🚩lrange🚩bloop🚩brpop🚩lindex🚩linsert🚩llen 🎋哈希操作🚩hset 和 hge…

java面试题及答案2024,java2024最新面试题及答案(之二)

四、反射 57. 什么是反射? 反射主要是指程序可以访问、检测和修改它本身状态或行为的一种能力 Java反射: 在Java运行时环境中,对于任意一个类,能否知道这个类有哪些属性和方法?对于任意一个对象,能否调…

JCR一区级 | Matlab实现TCN-BiLSTM-MATT时间卷积双向长短期记忆神经网络多特征分类预测

JCR一区级 | Matlab实现TCN-BiLSTM-MATT时间卷积双向长短期记忆神经网络多特征分类预测 目录 JCR一区级 | Matlab实现TCN-BiLSTM-MATT时间卷积双向长短期记忆神经网络多特征分类预测分类效果基本介绍程序设计参考资料 分类效果 基本介绍 1.JMatlab实现TCN-BiLSTM-MATT时间卷积双…

html+CSS+js部分基础运用12

一、显示列表项的内容 编写javaScript代码实现用户登录时数据合法性校验功能,界面如图教材P338 第2题,效果如下图所示: 图1 显示列表项内容 二、日期的处理 实时显示当前时间及累计登录时间,如下图2所示。[提示window.setInt…

鸿蒙开发接口资源调度:【@ohos.workScheduler (延迟任务调度)】

延迟任务调度 本模块提供延迟任务注册、取消、查询的能力。 开发者在开发应用时,通过调用延迟任务注册接口,注册对实时性要求不高的延迟任务,该任务默认由系统安排,在系统空闲时根据性能、功耗、热等情况进行调度执行。 说明&am…

【开源三方库】Fuse.js:强大、轻巧、零依赖的模糊搜索库

1.简介 Fuse.js是一款功能强大且轻量级的JavaScript模糊搜索库,支持OpenAtom OpenHarmony(以下简称“OpenHarmony”)操作系统,它具备模糊搜索和排序等功能。该库高性能、易于使用、高度可配置,支持多种数据类型和多语…

大模型ChatGLM的部署与微调

前言:最近大模型太火了,导师让我看看能不能用到自己的实验中,就想着先微调一个chatGLM试试水,微调的过程并不难,难的的硬件条件跟不上,我试了一下lora微调,也算跑通了吧,虽然最后评估…

【问题随记】tightvnc 连接后灰屏

问题描述 刚刚入手了官方发的 OrangePi AI Pro,想用 tight vnc 来连接开发板,就不用连接屏幕那么麻烦了。结果连接后,没能显示 OrangePi AI Pro 桌面。 问题解决 看一下现有的桌面环境。 apt list --installed | grep desktop从中可以看到…

语言模型解构——Tokenizer

1. 认识Tokenizer 1.1 为什么要有tokenizer? 计算机是无法理解人类语言的,它只会进行0和1的二进制计算。但是呢,大语言模型就是通过二进制计算,让你感觉计算机理解了人类语言。 举个例子:单1,双2&#x…

【传知代码】探索视觉与语言模型的可扩展性(论文复现)

前言:在数字化时代的浪潮中,我们见证了人工智能(AI)技术的飞速发展,其中视觉与语言模型作为两大核心领域,正以前所未有的速度改变着我们的生活和工作方式。从图像识别到自然语言处理,从虚拟现实…

无人机推流/RTMP视频推拉流EasyDSS无法卸载软件是什么原因?

视频推拉流/直播点播EasyDSS平台支持音视频采集、视频推拉流、播放H.265编码视频、存储、分发等视频能力服务,在应用场景中可实现视频直播、点播、转码、管理、录像、检索、时移回看等。此外,平台还支持用户自行上传视频文件,也可将上传的点播…