[自研开源] MyData 数据集成之数据过滤 v0.7.2

开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673

概述

本篇基于 数据集成之任务流程 介绍任务执行时的数据过滤的使用场景和配置操作。

使用场景

业务系统与mydata集成时,核心是数据的来和去,在这两个方向上分别实现:数据预清洗数据权限控制

  1. 数据预清洗,从api获取数据后 过滤排除掉“脏”数据,然后再入库用于数据集成;

    在这里插入图片描述

    例如:接口返回的某字段值不能为空、字段值长度在指定范围等;

    以下代码是 提供数据 类型的任务执行过程:

    // 提供数据
    case MdConstant.DATA_PRODUCER:// 调用api 获取jsonString json = ApiUtil.read(taskInfo);// 将json按字段映射 解析为业务数据jobDataService.parseData(taskInfo, json);// 根据条件过滤数据jobDataFilterService.doFilter(taskInfo);// 保存业务数据jobDataService.saveTaskData(taskInfo);// 更新环境变量jobVarService.saveVarValue(taskInfo, json);break;
    

    jobDataFilterService.doFilter 是对数据的预过滤处理,详见 JobDataFilterService.java

    public void doFilter(TaskInfo task) {Assert.notNull(task);// 获取业务数据List<Map> dataList = task.getProduceDataList();// 获取配置的过滤条件List<BizDataFilter> dataFilters = task.getDataFilters();if (CollUtil.isEmpty(dataList) || CollUtil.isEmpty(dataFilters)) {return;}// 定义新的数据集合,用于存储 过滤后的数据List<Map> filterDatas = ListUtil.toList();// 遍历数据,并进行过滤dataList.forEach(data -> {boolean isCorrect = false;for (BizDataFilter filter : dataFilters) {String key = filter.getKey();Object filterValue = filter.getValue();String op = filter.getOp();// 当数据中 不包含 过滤的字段名,则执行下一项过滤if (!data.containsKey(key)) {continue;}// 当数据中 指定字段的值 无效,则过滤该数据Object dataValue = data.get(key);if (ObjectUtil.isNull(dataValue)) {isCorrect = true;break;}// 判断业务数据值 和 过滤数据值 都可对比,否则过滤条件无效if (!(dataValue instanceof Comparable && filterValue instanceof Comparable)) {break;}String cDataValue = dataValue.toString();String cFilterValue = filterValue.toString();// 根据op类型,过滤数据switch (op) {case MdConstant.DATA_OP_EQ:// 等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) == 0);break;case MdConstant.DATA_OP_NE:// 不等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) != 0);break;case MdConstant.DATA_OP_GT:// 大于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) > 0);break;case MdConstant.DATA_OP_GTE:// 大于等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) >= 0);break;case MdConstant.DATA_OP_LT:// 小于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) < 0);break;case MdConstant.DATA_OP_LTE:// 小于等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) <= 0);break;default:throw new RuntimeException("JobDataFilter: 不支持的过滤操作");}}// 当 未被过滤,则添加到过滤结果if (isCorrect) {filterDatas.add(data);}});task.setProduceDataList(filterDatas);task.appendLog("过滤前的业务数据:{}", dataList);task.appendLog("过滤条件:{}", dataFilters);task.appendLog("过滤后的业务数据:{}", filterDatas);
    }
    

    注:目前0.7版本暂时实现了关系运算,后续增加函数处理;

  2. 数据权限控制,限制应用接收的数据范围,即符合条件的数据才能共享给应用;

    在这里插入图片描述

    以下代码是 消费数据 类型任务的执行过程:

    // 消费数据
    case MdConstant.DATA_CONSUMER:List<BizDataFilter> filters = taskInfo.getDataFilters();if (CollUtil.isNotEmpty(filters)) {// 解析过滤条件值中的 自定义字符串parseFilterValue(filters);// 排除值为null的条件filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());}String dataCode = taskInfo.getDataCode();if (StrUtil.isNotEmpty(dataCode)) {// 根据过滤条件 查询数据List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);taskInfo.setConsumeDataList(dataList);// 根据字段映射转换为api参数jobDataService.convertData(taskInfo);}// 调用api传输数据ApiUtil.write(taskInfo);break;
    

    bizDataDAO.list 方法支持按配置条件查询数据,详见 BizDataDAO.java

    public List<Map> list(String dbCode, String dataCode, List<BizDataFilter> bizDataFilters) {MongoTemplate mongoTemplate = mongoFactory.getTemplate(dbCode);Query query = new Query();// 遍历数据过滤条件if (CollUtil.isNotEmpty(bizDataFilters)) {// mongodb的查询条件集合List<Criteria> criteriaList = CollUtil.newArrayList();for (BizDataFilter bizDataFilter : bizDataFilters) {// 条件keyString key = bizDataFilter.getKey();// 条件操作String op = bizDataFilter.getOp();// 条件值Object value = bizDataFilter.getValue();// 根据条件操作类型 调用mongodb对应的查询方法Criteria criteria = Criteria.where(key);switch (op) {case MdConstant.DATA_OP_EQ:criteria.is(value);break;case MdConstant.DATA_OP_NE:criteria.ne(value);break;case MdConstant.DATA_OP_GT:criteria.gt(value);break;case MdConstant.DATA_OP_GTE:criteria.gte(value);break;case MdConstant.DATA_OP_LT:criteria.lt(value);break;case MdConstant.DATA_OP_LTE:criteria.lte(value);break;default:throw new RuntimeException("BizDataDAO: 不支持的过滤操作");}// 存入mongodb的查询条件集合criteriaList.add(criteria);}// mongodb查询条件集合 加入查询中query.addCriteria(new Criteria().andOperator(criteriaList));}// 执行查询return mongoTemplate.find(query, Map.class, dataCode);}
    

配置操作

  1. 创建任务过程请参考 使用手册

  2. 在创建任务界面中,添加数据过滤条件

    如下图过滤条件是 salary > 600
    在这里插入图片描述

  3. 执行任务后 通过日志详情可以看到数据入库前预清洗;
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/749920.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javaEE——线程的等待和结束

文章目录 Thread 类及常见方法启动一个线程中断一个线程变量型中断调用 interrupt() 方法来通知观察标志位是否被清除 等待一个线程获取当前线程引用休眠当前线程 线程的状态观察线程的所有状态观察 1: 关注 NEW 、 RUNNABLE 、 TERMINATED 状态的切换 多线程带来的风险为什么会…

【大模型系列】问答理解定位(Qwen-VL/Llama2/GPT)

文章目录 1 Qwen-VL(2023, Alibaba)1.1 网络结构1.2 模型训练 2 Llama2(2023, Meta)2.1 网络结构2.1.1 MHA/GQA/MQA2.1.2 RoPE(Rotary Position Embedding, 旋转式位置编码)2.1.3 RMSNorm 2.2 推理2.2.1 集束搜索(beam search)2.2.2 RoPE外推 3 GPT系列(OpenAI) 1 Qwen-VL(2023…

android中单例模式为什么会引起内存泄漏?

单例模式使用不恰当会造成内存泄漏。因为单例的静态特性使得单例的生命周期和应用的生命周期一样长&#xff0c; 如果一个对象已经不需要使用了&#xff0c;但是单例对象还持有该对象的引用&#xff0c;那么这个对象就不能被正常回收&#xff0c;因此会导致内存泄漏。 举个例子…

【数据可视化】使用Python + Gephi,构建中医方剂关系网络图!

代码和示例数据下载 前言 在这篇文章中&#xff0c;我们将会可视化 《七版方剂学》 的药材的关系&#xff0c;我们将使用Python制作节点和边的数据&#xff0c;然后在Gephi中绘制出方剂的网络图。 Gephi是一个专门用于构建网络图的工具&#xff0c;只要你能提供节点和边的数…

Stable Diffusion科普文章【附升级gpt4.0秘笈】

随着人工智能技术的飞速发展&#xff0c;我们越来越多地看到计算机生成的艺术作品出现在我们的生活中。其中&#xff0c;Stable Diffusion作为一种创新的图像生成技术&#xff0c;正在引领一场艺术创作的革命。本文将为您科普Stable Diffusion的相关知识&#xff0c;带您走进这…

Flink 集群部署模式

文章目录 前言一、会话模式&#xff08;Session Mode&#xff09;二、单作业模式&#xff08;Per-Job Mode&#xff09;三、应用模式&#xff08;Application Mode&#xff09; 前言 Flink支持多种集群部署模式&#xff0c;以满足不同场景和需求。以下是Flink的主要集群部署模…

springboot多模块下swaggar界面出现异常(Knife4j文档请求异常)或者界面不报错但是没有显示任何信息

继上一篇博文&#xff0c;我们解决了多模块下扫描不到子模块的原因,建议先看上一个博客了解项目结构&#xff1a; springboot 多模块启动报错Field XXX required a bean of type XXX that could not be found. 接下来我们来解决swaggar异常的原因&#xff0c;我们成功启动项目…

QML 布局管理器之ColumnLayout

一.ColumnLayout讲解 QML中的ColumnLayout是一种布局元素&#xff0c;用于在垂直列中排列其子元素。它的主要使用下列附加属性: Layout.minimumWidth Layout.minimumHeight Layout.preferredWidth Layout.preferredHeight Layout.maximumWidth Layout.maximumHeight Layout.fil…

SqlServer2008(R2)(一)SqlServer2008(R2)经典宝藏操作收集整理

一、常见操作 1、TRUNCATE TABLE 语句 删除表数据 TRUNCATE TABLE语句比DELET删除表中的所有行更快。从逻辑上讲&#xff0c;TRUNCATE TABLE它类似于DELETE没有WHERE子句的语句。 TRUNCATE TABLE语句从表中删除所有行&#xff0c;但表结构及其列&#xff0c;约束&#xff0c;…

Ubuntu 20.04 系统如何优雅地安装NCL?

一、什么是NCL&#xff1f; NCAR Command Language&#xff08;NCL&#xff09;是由美国大气研究中心&#xff08;NCAR&#xff09;推出的一款用于科学数据计算和可视化的免费软件。 它有着非常强大的文件输入和输出功能&#xff0c;可读写netCDF-3、netCDF-4 classic、HDF4、b…

Xinstall助力web唤起iOS,打破平台壁垒,实现无缝跳转

在移动互联网时代&#xff0c;web与App之间的跳转已成为用户日常使用中不可或缺的一部分。然而&#xff0c;对于iOS系统的用户来说&#xff0c;web唤起App的过程往往充满了挑战和不便。这时&#xff0c;Xinstall作为一款专业的移动开发者服务工具&#xff0c;为开发者们提供了解…

Lua中文语言编程源码-第一节,更改llex.c词法分析器模块, 使Lua支持中文关键词。

源码已经更新在CSDN的码库里&#xff1a; git clone https://gitcode.com/funsion/CLua.git 在src文件夹下的llex.c&#xff0c;是Lua的词法分析器模块。 增加中文保留字标识符列表&#xff0c;保留英文保留字标识符列表。 搜索“ORDER RESERVED”&#xff0c;将原始代码 …

ARM和AMD介绍

一、介绍 ARM 和 AMD 都是计算机领域中的知名公司&#xff0c;它们在不同方面具有重要的影响和地位。 ARM&#xff08;Advanced RISC Machine&#xff09;&#xff1a;ARM 公司是一家总部位于英国的公司&#xff0c;专注于设计低功耗、高性能的处理器架构。ARM 架构以其精简指…

如何在“Microsoft Visual Studio”中使用OpenCV编译应用程序

返回目录&#xff1a;OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 前一篇&#xff1a;OpenCV4.9.0在windows系统下的安装 后一篇&#xff1a; 警告&#xff1a; 本教程可以包含过时的信息。 我在这里描述的所有内容都将适用于 OpenCV 的C\C接口。我首先假…

图像处理ASIC设计方法 笔记10 插值算法的流水线架构

&#xff08;一&#xff09; 三次插值算法实现的图像旋转设计的流水线架构 传统上&#xff0c;三次插值算法实现的图像旋转设计需要三块一样的处理资源&#xff0c;为了节约资源&#xff0c;采用流水线设计&#xff0c;简单来讲就是三次插值算法共用一块资源&#xff0c;优化这…

数据结构的概念大合集02(线性表)

概念大合集02 1、线性表及其逻辑结构1.1 线性表的定义1.2 线性表的基本操作 2、线性表的顺序存储结构2.1 顺序表 3、线性表的链式存储3.1 链表3.1.1 头结点&#xff08;头指针&#xff09;&#xff0c;首指针&#xff0c;尾指针&#xff0c;尾结点3.1.2 单链表3.1.3 双链表3.1.…

软件供应链投毒 — NPM 恶意组件分析(二)

聚焦源代码安全&#xff0c;网罗国内外最新资讯&#xff01; 专栏供应链安全 数字化时代&#xff0c;软件无处不在。软件如同社会中的“虚拟人”&#xff0c;已经成为支撑社会正常运转的最基本元素之一&#xff0c;软件的安全性问题也正在成为当今社会的根本性、基础性问题。 随…

瑞熙贝通实验室安全培训考试系统

一、系统概述 瑞熙贝通实验室安全培训考试系统是一种基于互联网和人工智能技术的在线考试平台&#xff0c;旨在旨在提供实验室安全教育和考核的全面解决方案。该系统可以帮助实现实验室安全培训考试的在线化、智能化和规范化&#xff0c;提高实验室安全意识和能力&#xff0c;…

IntelliJ IDEA 面试题及答案整理,最新面试题

IntelliJ IDEA中的插件系统如何工作&#xff1f; IntelliJ IDEA的插件系统工作原理如下&#xff1a; 1、插件架构&#xff1a; IntelliJ IDEA通过插件架构扩展其功能&#xff0c;插件可以添加新的功能或修改现有功能。 2、安装和管理&#xff1a; 通过IDEA内置的插件市场下载…

第二门课:改善深层神经网络<超参数调试、正则化及优化>-超参数调试、Batch正则化和程序框架

文章目录 1 调试处理2 为超参数选择合适的范围3 超参数调试的实践4 归一化网络的激活函数5 将Batch Norm拟合进神经网络6 Batch Norm为什么会奏效&#xff1f;7 测试时的Batch Norm8 SoftMax回归9 训练一个SoftMax分类器10 深度学习框架11 TensorFlow 1 调试处理 需要调试的参…