[自研开源] 数据集成之分批传输 v0.7

开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673

介绍

本篇基于 数据集成之任务流程 介绍任务分批传输的使用场景和配置操作。

使用场景

mydata使用API方式集成数据,当一次请求或响应 传输数据量较多时 可能无法完成、或容易对服务端造成影响,因此需要分为多次处理;

例如 常见的分页查询、导入大量数据时分批处理、集成对接时的全量同步等;

分批传输数据

业务系统与mydata集成时,在提供数据消费数据这两个方向上分别实现分批传输;

提供数据

由mydata调用应用的API获取数据,通过配置分批参数 实现一次任务内多次调用API获取完整数据,有以下两种基本的配置模式:

  • 配置了 固定参数size=10、递增参数current从1开始每次递增1、每次间隔1秒的任务;

在这里插入图片描述

  • 配置了 递增参数start从1开始每次递增100、递增参数end从100开始每次递增100、每次间隔1秒的任务;

在这里插入图片描述

执行过程如下代码,要点有:

  • 通过do-while结构 兼容单次和分批;

  • lastProduceData记录上一次数据,用于和本次对比数据,若重复 则结束,避免死循环(理论上很少有2次完全一样的数据);

  • 若分批有异常,则复用任务3次出错 自动结束并发送邮件通知的功能;

  • 执行完一次后,自动计算递增参数值;

// 提供数据
case MdConstant.DATA_PRODUCER:// 分批模式 记录上一次数据,用于对比两次数据,若重复 则结束,避免死循环List<Map> lastProduceData = null;do {// 若启用分批,则将分批参数加入请求参数中if (taskInfo.isBatch()) {Map<String, Object> batchParam = jobBatchService.parseToMap(taskInfo);Map<String, Object> reqParams = MapUtil.union(taskInfo.getReqParams(), batchParam);taskInfo.setReqParams(reqParams);}// 调用api 获取jsonString json = ApiUtil.read(taskInfo);// 将json按字段映射 解析为业务数据jobDataService.parseData(taskInfo, json);// 若没有返回数据,则结束处理if (CollUtil.isEmpty(taskInfo.getProduceDataList())) {break;}// 对比上一次数据if (lastProduceData != null) {if (CollUtil.isEqualList(lastProduceData, taskInfo.getProduceDataList())) {// 异常任务失败,邮件通知用户检查任务throw new RuntimeException("分批获取数据异常,最后两次获取的数据相同!");}}lastProduceData = taskInfo.getProduceDataList();// 根据条件过滤数据jobDataFilterService.doFilter(taskInfo);// 保存业务数据jobDataService.saveTaskData(taskInfo);// 更新环境变量jobVarService.saveVarValue(taskInfo, json);// 递增分批参数jobBatchService.incBatchParam(taskInfo);// 若启用分批,则等待间隔if (taskInfo.isBatch()) {ThreadUtil.sleep(taskInfo.getBatchInterval(), TimeUnit.SECONDS);}} while (taskInfo.isBatch());break;

消费数据

由mydata通过API向应用发送数据,通过配置分批参数 限制每次向API发送的数据量,从而减少数据查询量和请求处理时间;

如下图,配置了分批数量为1000的任务,分批参数为选填,mydata将按1000为限制查询符合条件的数据,通过API请求发送给应用;

在这里插入图片描述

执行过程如下代码,要点有:

  • 通过do-while结构 兼容单次和分批;
  • 自动管理分页参数,执行分页查询数据,发送给API;
  • 直到分页查询没有数据 自动结束;
// 消费数据
case MdConstant.DATA_CONSUMER:String dataCode = taskInfo.getDataCode();if (StrUtil.isEmpty(dataCode)) {break;}List<BizDataFilter> filters = taskInfo.getDataFilters();if (CollUtil.isNotEmpty(filters)) {// 解析过滤条件值中的 自定义字符串parseFilterValue(filters);// 排除值为null的条件filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());}int round = 0;Long skip = null;Integer limit = taskInfo.isBatch() ? taskInfo.getBatchSize() : null;do {if (taskInfo.isBatch()) {skip = (long) round * taskInfo.getBatchSize();}// 根据过滤条件 查询数据List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters, skip, limit);if (CollUtil.isEmpty(dataList)) {break;}taskInfo.setConsumeDataList(dataList);// 根据字段映射转换为api参数jobDataService.convertData(taskInfo);// 调用api传输数据ApiUtil.write(taskInfo);round++;// 若启用分批,则等待间隔if (taskInfo.isBatch()) {ThreadUtil.sleep(taskInfo.getBatchInterval(), TimeUnit.SECONDS);}}while (taskInfo.isBatch());break;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/773160.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式下C/C++调用sqlite3简单开发

交叉编译sqlite3请关注我第一篇博文 sqlite3 交叉编译-CSDN博客 sqlite3的命令的简单使用&#xff08;增删改查&#xff0c;创建/删除表&#xff09;请关注我的上一篇博文 sqlite3嵌入式使用以及C/C代码开发-CSDN博客 一、新建文件夹 此文件夹用于放置工程&#xff0c;比如…

Qt实现TFTP Server和 TFTP Client(三)

3.2 Client Client包括下面3个类&#xff1a; ClientSockeTFtpClientTFtpClientWidget 3.2.1 ClientSocke ClientSocke从BaseUdp派生实现write接口. 3.2.1.1 ClientSocke定义 #include "baseudp.h"class QUdpSocket; class ClientSocket : public BaseUdp { pu…

【C++】每日一题 45 跳跃游戏

给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说&#xff0c;如果你在 nums[i] 处&#xff0c;你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - 1] 的最…

SQLite中的原子提交(四)

返回&#xff1a;SQLite—系列文章目录 上一篇:SQLite数据库成为内存中数据库&#xff08;三&#xff09; 下一篇&#xff1a;SQLite使用的临时文件&#xff08;二&#xff09; 1. 引言 SQLite等事务数据库的一个重要特性 是“原子提交”。 原子提交意味着所有数据库都在…

深度学习十大算法之图神经网络(GNN)

一、图神经网络的基础 图的基本概念 图是数学中的一个基本概念&#xff0c;用于表示事物间复杂的关系。在图论中&#xff0c;图通常被定义为一组节点&#xff08;或称为顶点&#xff09;以及连接这些节点的边。每个边可以有方向&#xff0c;称为有向边&#xff0c;或者没有方向…

网络原理讲解

目标 网络发展史 独立模式 独立模式&#xff1a;计算机之间相互独立&#xff1b; 网络互连 随着时代的发展&#xff0c;越来越需要计算机之间互相通信&#xff0c;共享软件和数据&#xff0c;即以多个计算机协同工作来完成 业务&#xff0c;就有了网络互连。 网络互连&a…

学习笔记:MYSQL数据库基础知识

MYSQL数据库基础知识学习笔记 MYSQL基础学习数据库相关概念现主流数据库排名数据模型SQL分类SQL数据库基础操作 2024/3/27 学习资料&#xff1a;黑马程序员:MYSQL MYSQL基础学习 数据库和数据库管理系统(DBMS) 数据库: 是存储数据的集合&#xff0c;包括表、视图、索引等对象…

存内计算:释放潜能的黑科技

什么是存内计算&#xff1f; 存内计算技术是一种新型的计算架构&#xff0c;它将存储器和计算单元融合在一起&#xff0c;以实现高效的数据处理。存内计算技术的优势在于能够消除数据搬运的延迟和功耗&#xff0c;从而提高计算效率和能效比。目前&#xff0c;存内计算技术正处…

React Native获取及监听网络状态

在React Native中&#xff0c;要获取和监听网络状态&#xff0c;你可以使用react-native-netinfo库&#xff08;以前是核心库的一部分&#xff0c;但在React Native 0.60之后被移出并作为一个独立的库提供&#xff09;。以下是使用这个库来获取和监听网络状态的基本步骤&#x…

苹果Find My产品需求增长迅速,伦茨科技ST17H6x芯片供货充足

苹果的Find My功能使得用户可以轻松查找iPhone、Mac、AirPods以及Apple Watch等设备。如今Find My还进入了耳机、充电宝、箱包、电动车、保温杯等多个行业。苹果发布AirTag发布以来&#xff0c;大家都更加注重物品的防丢&#xff0c;苹果的 Find My 就可以查找 iPhone、Mac、Ai…

jupyter notebook导出含中文的pdf(LaTex安装和Pandoc、MiKTex安装)

用jupyter notebook导出pdf时&#xff0c;因为报错信息&#xff0c;需要用到Tex nbconvert failed: xelatex not found on PATH, if you have not installed xelatex you may need to do so. Find further instructions at https://nbconvert.readthedocs.io/en/latest/install…

pytorch中的torch.hub.load()

pytorch提供了torch.hub.load()函数加载模型&#xff0c;该方法可以从网上直接下载模型或是从本地加载模型。官方文档 torch.hub.load(repo_or_dir, model, *args, sourcegithub, trust_repoNone, force_reloadFalse, verboseTrue, skip_validationFalse, **kwargs)参数说明&a…

Focal Modulation Networks聚焦调制网络

摘要 我们提出了 焦点调制网络 &#xff08;简称 FocalNets) &#xff0c;其中 自注意&#xff08; SA &#xff09;被 Focal Modulation 替换&#xff0c;这种机制 包括三个组件&#xff1a;&#xff08; 1 &#xff09;通过 depth-wise Conv 提取分级的上下文信息&#xff0…

吴恩达深度学习笔记:浅层神经网络(Shallow neural networks)3.6-3.8

目录 第一门课&#xff1a;神经网络和深度学习 (Neural Networks and Deep Learning)第三周&#xff1a;浅层神经网络(Shallow neural networks)3.6 激活函数&#xff08;Activation functions&#xff09;3.7 为什么需要非线性激活函数&#xff1f;&#xff08;why need a non…

Spring Cloud 九:服务间通信与消息队列

Spring Cloud 一&#xff1a;Spring Cloud 简介 Spring Cloud 二&#xff1a;核心组件解析 Spring Cloud 三&#xff1a;API网关深入探索与实战应用 Spring Cloud 四&#xff1a;微服务治理与安全 Spring Cloud 五&#xff1a;Spring Cloud与持续集成/持续部署&#xff08;CI/C…

react native hooks 页面出现重绘问题,如何解决

在React Native应用中&#xff0c;使用Hooks导致页面出现频繁重绘或性能问题时&#xff0c;可以尝试以下策略来优化和解决问题&#xff1a; 减少不必要的状态更新&#xff1a; 使用 React.memo 高阶组件包裹那些不需要每次父组件状态改变时都重新渲染的子组件。它通过浅比较pro…

Java Web-Maven

Maven是apache旗下的一个开源项目&#xff0c;是一款用于管理和构建java项目的工具 Maven的作用 1.依赖管理:方便快捷的管理项目依赖资源(jar包)&#xff0c;避免版本冲突问题 我们有的项目需要大量的jar包&#xff0c;采用手动导包的方式非常繁琐&#xff0c;并且版本升级也…

面试官:2PC和3PC有什么区别?

本文内容已收录至我的面试网站&#xff1a;www.javacn.site 在分布式事务中&#xff0c;通常使用两阶段协议或三阶段协议来保障分布式事务的正常运行&#xff0c;它也是 X/Open 公司定义的一套分布式事务标准。 X/Open 公司是由多家国际计算机厂商所组成的联盟组织&#xff0c;…

【前端】-【性能优化常识】

目录 前端性能优化指标首屏速度、白屏时间性能优化收效很大的操作&#xff1a;减少首屏资源体积收效不大或者特殊情况的优化操作 操作速度、渲染速度造成操作卡顿和渲染慢的场景性能优化 数据缓存 补充知识异步加载加载方式一&#xff1a;prefetch加载加载方式二&#xff1a;sc…

小迪安全48WEB 攻防-通用漏洞Py 反序列化链构造自动审计 bandit魔术方法

#知识点&#xff1a; 1、Python-反序列化函数使用 2、Python-反序列化魔术方法 3、Python-反序列化 POP 链构造&#xff08;payload构造&#xff09; 4、Python-自动化审计 bandit 使用 #前置知识&#xff1a; 函数使用&#xff1a; pickle.dump(obj, file) : 将对…