[自研开源] 数据集成之分批传输 v0.7

开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673

介绍

本篇基于 数据集成之任务流程 介绍任务分批传输的使用场景和配置操作。

使用场景

mydata使用API方式集成数据,当一次请求或响应 传输数据量较多时 可能无法完成、或容易对服务端造成影响,因此需要分为多次处理;

例如 常见的分页查询、导入大量数据时分批处理、集成对接时的全量同步等;

分批传输数据

业务系统与mydata集成时,在提供数据消费数据这两个方向上分别实现分批传输;

提供数据

由mydata调用应用的API获取数据,通过配置分批参数 实现一次任务内多次调用API获取完整数据,有以下两种基本的配置模式:

  • 配置了 固定参数size=10、递增参数current从1开始每次递增1、每次间隔1秒的任务;

在这里插入图片描述

  • 配置了 递增参数start从1开始每次递增100、递增参数end从100开始每次递增100、每次间隔1秒的任务;

在这里插入图片描述

执行过程如下代码,要点有:

  • 通过do-while结构 兼容单次和分批;

  • lastProduceData记录上一次数据,用于和本次对比数据,若重复 则结束,避免死循环(理论上很少有2次完全一样的数据);

  • 若分批有异常,则复用任务3次出错 自动结束并发送邮件通知的功能;

  • 执行完一次后,自动计算递增参数值;

// 提供数据
case MdConstant.DATA_PRODUCER:// 分批模式 记录上一次数据,用于对比两次数据,若重复 则结束,避免死循环List<Map> lastProduceData = null;do {// 若启用分批,则将分批参数加入请求参数中if (taskInfo.isBatch()) {Map<String, Object> batchParam = jobBatchService.parseToMap(taskInfo);Map<String, Object> reqParams = MapUtil.union(taskInfo.getReqParams(), batchParam);taskInfo.setReqParams(reqParams);}// 调用api 获取jsonString json = ApiUtil.read(taskInfo);// 将json按字段映射 解析为业务数据jobDataService.parseData(taskInfo, json);// 若没有返回数据,则结束处理if (CollUtil.isEmpty(taskInfo.getProduceDataList())) {break;}// 对比上一次数据if (lastProduceData != null) {if (CollUtil.isEqualList(lastProduceData, taskInfo.getProduceDataList())) {// 异常任务失败,邮件通知用户检查任务throw new RuntimeException("分批获取数据异常,最后两次获取的数据相同!");}}lastProduceData = taskInfo.getProduceDataList();// 根据条件过滤数据jobDataFilterService.doFilter(taskInfo);// 保存业务数据jobDataService.saveTaskData(taskInfo);// 更新环境变量jobVarService.saveVarValue(taskInfo, json);// 递增分批参数jobBatchService.incBatchParam(taskInfo);// 若启用分批,则等待间隔if (taskInfo.isBatch()) {ThreadUtil.sleep(taskInfo.getBatchInterval(), TimeUnit.SECONDS);}} while (taskInfo.isBatch());break;

消费数据

由mydata通过API向应用发送数据,通过配置分批参数 限制每次向API发送的数据量,从而减少数据查询量和请求处理时间;

如下图,配置了分批数量为1000的任务,分批参数为选填,mydata将按1000为限制查询符合条件的数据,通过API请求发送给应用;

在这里插入图片描述

执行过程如下代码,要点有:

  • 通过do-while结构 兼容单次和分批;
  • 自动管理分页参数,执行分页查询数据,发送给API;
  • 直到分页查询没有数据 自动结束;
// 消费数据
case MdConstant.DATA_CONSUMER:String dataCode = taskInfo.getDataCode();if (StrUtil.isEmpty(dataCode)) {break;}List<BizDataFilter> filters = taskInfo.getDataFilters();if (CollUtil.isNotEmpty(filters)) {// 解析过滤条件值中的 自定义字符串parseFilterValue(filters);// 排除值为null的条件filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());}int round = 0;Long skip = null;Integer limit = taskInfo.isBatch() ? taskInfo.getBatchSize() : null;do {if (taskInfo.isBatch()) {skip = (long) round * taskInfo.getBatchSize();}// 根据过滤条件 查询数据List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters, skip, limit);if (CollUtil.isEmpty(dataList)) {break;}taskInfo.setConsumeDataList(dataList);// 根据字段映射转换为api参数jobDataService.convertData(taskInfo);// 调用api传输数据ApiUtil.write(taskInfo);round++;// 若启用分批,则等待间隔if (taskInfo.isBatch()) {ThreadUtil.sleep(taskInfo.getBatchInterval(), TimeUnit.SECONDS);}}while (taskInfo.isBatch());break;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/773160.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式下C/C++调用sqlite3简单开发

交叉编译sqlite3请关注我第一篇博文 sqlite3 交叉编译-CSDN博客 sqlite3的命令的简单使用&#xff08;增删改查&#xff0c;创建/删除表&#xff09;请关注我的上一篇博文 sqlite3嵌入式使用以及C/C代码开发-CSDN博客 一、新建文件夹 此文件夹用于放置工程&#xff0c;比如…

SQLite中的原子提交(四)

返回&#xff1a;SQLite—系列文章目录 上一篇:SQLite数据库成为内存中数据库&#xff08;三&#xff09; 下一篇&#xff1a;SQLite使用的临时文件&#xff08;二&#xff09; 1. 引言 SQLite等事务数据库的一个重要特性 是“原子提交”。 原子提交意味着所有数据库都在…

深度学习十大算法之图神经网络(GNN)

一、图神经网络的基础 图的基本概念 图是数学中的一个基本概念&#xff0c;用于表示事物间复杂的关系。在图论中&#xff0c;图通常被定义为一组节点&#xff08;或称为顶点&#xff09;以及连接这些节点的边。每个边可以有方向&#xff0c;称为有向边&#xff0c;或者没有方向…

学习笔记:MYSQL数据库基础知识

MYSQL数据库基础知识学习笔记 MYSQL基础学习数据库相关概念现主流数据库排名数据模型SQL分类SQL数据库基础操作 2024/3/27 学习资料&#xff1a;黑马程序员:MYSQL MYSQL基础学习 数据库和数据库管理系统(DBMS) 数据库: 是存储数据的集合&#xff0c;包括表、视图、索引等对象…

存内计算:释放潜能的黑科技

什么是存内计算&#xff1f; 存内计算技术是一种新型的计算架构&#xff0c;它将存储器和计算单元融合在一起&#xff0c;以实现高效的数据处理。存内计算技术的优势在于能够消除数据搬运的延迟和功耗&#xff0c;从而提高计算效率和能效比。目前&#xff0c;存内计算技术正处…

苹果Find My产品需求增长迅速,伦茨科技ST17H6x芯片供货充足

苹果的Find My功能使得用户可以轻松查找iPhone、Mac、AirPods以及Apple Watch等设备。如今Find My还进入了耳机、充电宝、箱包、电动车、保温杯等多个行业。苹果发布AirTag发布以来&#xff0c;大家都更加注重物品的防丢&#xff0c;苹果的 Find My 就可以查找 iPhone、Mac、Ai…

jupyter notebook导出含中文的pdf(LaTex安装和Pandoc、MiKTex安装)

用jupyter notebook导出pdf时&#xff0c;因为报错信息&#xff0c;需要用到Tex nbconvert failed: xelatex not found on PATH, if you have not installed xelatex you may need to do so. Find further instructions at https://nbconvert.readthedocs.io/en/latest/install…

Focal Modulation Networks聚焦调制网络

摘要 我们提出了 焦点调制网络 &#xff08;简称 FocalNets) &#xff0c;其中 自注意&#xff08; SA &#xff09;被 Focal Modulation 替换&#xff0c;这种机制 包括三个组件&#xff1a;&#xff08; 1 &#xff09;通过 depth-wise Conv 提取分级的上下文信息&#xff0…

吴恩达深度学习笔记:浅层神经网络(Shallow neural networks)3.6-3.8

目录 第一门课&#xff1a;神经网络和深度学习 (Neural Networks and Deep Learning)第三周&#xff1a;浅层神经网络(Shallow neural networks)3.6 激活函数&#xff08;Activation functions&#xff09;3.7 为什么需要非线性激活函数&#xff1f;&#xff08;why need a non…

Spring Cloud 九:服务间通信与消息队列

Spring Cloud 一&#xff1a;Spring Cloud 简介 Spring Cloud 二&#xff1a;核心组件解析 Spring Cloud 三&#xff1a;API网关深入探索与实战应用 Spring Cloud 四&#xff1a;微服务治理与安全 Spring Cloud 五&#xff1a;Spring Cloud与持续集成/持续部署&#xff08;CI/C…

Java Web-Maven

Maven是apache旗下的一个开源项目&#xff0c;是一款用于管理和构建java项目的工具 Maven的作用 1.依赖管理:方便快捷的管理项目依赖资源(jar包)&#xff0c;避免版本冲突问题 我们有的项目需要大量的jar包&#xff0c;采用手动导包的方式非常繁琐&#xff0c;并且版本升级也…

面试官:2PC和3PC有什么区别?

本文内容已收录至我的面试网站&#xff1a;www.javacn.site 在分布式事务中&#xff0c;通常使用两阶段协议或三阶段协议来保障分布式事务的正常运行&#xff0c;它也是 X/Open 公司定义的一套分布式事务标准。 X/Open 公司是由多家国际计算机厂商所组成的联盟组织&#xff0c;…

【前端】-【性能优化常识】

目录 前端性能优化指标首屏速度、白屏时间性能优化收效很大的操作&#xff1a;减少首屏资源体积收效不大或者特殊情况的优化操作 操作速度、渲染速度造成操作卡顿和渲染慢的场景性能优化 数据缓存 补充知识异步加载加载方式一&#xff1a;prefetch加载加载方式二&#xff1a;sc…

小迪安全48WEB 攻防-通用漏洞Py 反序列化链构造自动审计 bandit魔术方法

#知识点&#xff1a; 1、Python-反序列化函数使用 2、Python-反序列化魔术方法 3、Python-反序列化 POP 链构造&#xff08;payload构造&#xff09; 4、Python-自动化审计 bandit 使用 #前置知识&#xff1a; 函数使用&#xff1a; pickle.dump(obj, file) : 将对…

基于SIR模型的疫情发展趋势预测算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 5.完整程序 1.程序功能描述 基于SIR模型的疫情发展趋势预测算法.对病例增长进行SIR模型拟合分析&#xff0c;并采用模型参数拟合结果对疫情防控力度进行比较。整体思路为采用SIR微分方程模型…

免杀对抗-C2远控篇CC++SC转换格式UUID标识MAC物理IPV4地址减少熵值

参考文章&#xff1a; https://github.com/INotGreen/Bypass-AMSI https://mp.weixin.qq.com/s/oJ8eHdX8HGuk6dZv0kmFxg https://kyxiaxiang.github.io/2022/12/14/AMSIandEtw https://github.com/S3cur3Th1sSh1t/Amsi-Bypass-Powershell 文章参考&#xff1a; https://www.…

exec族

execl&#xff08;&#xff09;&#xff1b;------------------------------------------ 查看系统命令的路径&#xff1a; 只有错误时返回 -1 执行到execl 时&#xff0c;把execl 里的可执行程序的各个段&#xff08;数据、堆栈...&#xff09;替换掉本程序的a.out 实现镜像&a…

【数据分享】1929-2023年全球站点的逐年平均露点(Shp\Excel\免费获取)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、能见度等指标&#xff0c;说到气象数据&#xff0c;最详细的气象数据是具体到气象监测站点的数据&#xff01; 有关气象指标的监测站点数据&#xff0c;之前我们分享过1929-2023年全球气象站…

vue3全局引入element-plus使用Message教程

文章目录 安装引入 Element Plus和组件样式示例注意安装与引入&#xff1a;按需引入&#xff1a;API 使用&#xff1a;样式问题&#xff1a;组件上下文&#xff1a;版本兼容性&#xff1a;错误处理&#xff1a; 这是 Element UI 的 Vue 3 版本。ElMessage 是 Element Plus 中的…

四年创作,心路历程

四年创作&#xff0c;心路历程 前言初识收获日常憧憬 前言 这是我在这个网站整理的笔记,有错误的地方请指出&#xff0c;关注我&#xff0c;接下来还会持续更新。 作者&#xff1a;神的孩子都在歌唱 前言 今天打开csdn&#xff0c;发现官方发送了一条私信,原来我已经在计算机这…