[自研开源] MyData 数据集成之数据过滤 v0.7.2

开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673

概述

本篇基于 数据集成之任务流程 介绍任务执行时的数据过滤的使用场景和配置操作。

使用场景

业务系统与mydata集成时,核心是数据的来和去,在这两个方向上分别实现:数据预清洗数据权限控制

  1. 数据预清洗,从api获取数据后 过滤排除掉“脏”数据,然后再入库用于数据集成;

    在这里插入图片描述

    例如:接口返回的某字段值不能为空、字段值长度在指定范围等;

    以下代码是 提供数据 类型的任务执行过程:

    // 提供数据
    case MdConstant.DATA_PRODUCER:// 调用api 获取jsonString json = ApiUtil.read(taskInfo);// 将json按字段映射 解析为业务数据jobDataService.parseData(taskInfo, json);// 根据条件过滤数据jobDataFilterService.doFilter(taskInfo);// 保存业务数据jobDataService.saveTaskData(taskInfo);// 更新环境变量jobVarService.saveVarValue(taskInfo, json);break;
    

    jobDataFilterService.doFilter 是对数据的预过滤处理,详见 JobDataFilterService.java

    public void doFilter(TaskInfo task) {Assert.notNull(task);// 获取业务数据List<Map> dataList = task.getProduceDataList();// 获取配置的过滤条件List<BizDataFilter> dataFilters = task.getDataFilters();if (CollUtil.isEmpty(dataList) || CollUtil.isEmpty(dataFilters)) {return;}// 定义新的数据集合,用于存储 过滤后的数据List<Map> filterDatas = ListUtil.toList();// 遍历数据,并进行过滤dataList.forEach(data -> {boolean isCorrect = false;for (BizDataFilter filter : dataFilters) {String key = filter.getKey();Object filterValue = filter.getValue();String op = filter.getOp();// 当数据中 不包含 过滤的字段名,则执行下一项过滤if (!data.containsKey(key)) {continue;}// 当数据中 指定字段的值 无效,则过滤该数据Object dataValue = data.get(key);if (ObjectUtil.isNull(dataValue)) {isCorrect = true;break;}// 判断业务数据值 和 过滤数据值 都可对比,否则过滤条件无效if (!(dataValue instanceof Comparable && filterValue instanceof Comparable)) {break;}String cDataValue = dataValue.toString();String cFilterValue = filterValue.toString();// 根据op类型,过滤数据switch (op) {case MdConstant.DATA_OP_EQ:// 等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) == 0);break;case MdConstant.DATA_OP_NE:// 不等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) != 0);break;case MdConstant.DATA_OP_GT:// 大于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) > 0);break;case MdConstant.DATA_OP_GTE:// 大于等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) >= 0);break;case MdConstant.DATA_OP_LT:// 小于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) < 0);break;case MdConstant.DATA_OP_LTE:// 小于等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) <= 0);break;default:throw new RuntimeException("JobDataFilter: 不支持的过滤操作");}}// 当 未被过滤,则添加到过滤结果if (isCorrect) {filterDatas.add(data);}});task.setProduceDataList(filterDatas);task.appendLog("过滤前的业务数据:{}", dataList);task.appendLog("过滤条件:{}", dataFilters);task.appendLog("过滤后的业务数据:{}", filterDatas);
    }
    

    注:目前0.7版本暂时实现了关系运算,后续增加函数处理;

  2. 数据权限控制,限制应用接收的数据范围,即符合条件的数据才能共享给应用;

    在这里插入图片描述

    以下代码是 消费数据 类型任务的执行过程:

    // 消费数据
    case MdConstant.DATA_CONSUMER:List<BizDataFilter> filters = taskInfo.getDataFilters();if (CollUtil.isNotEmpty(filters)) {// 解析过滤条件值中的 自定义字符串parseFilterValue(filters);// 排除值为null的条件filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());}String dataCode = taskInfo.getDataCode();if (StrUtil.isNotEmpty(dataCode)) {// 根据过滤条件 查询数据List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);taskInfo.setConsumeDataList(dataList);// 根据字段映射转换为api参数jobDataService.convertData(taskInfo);}// 调用api传输数据ApiUtil.write(taskInfo);break;
    

    bizDataDAO.list 方法支持按配置条件查询数据,详见 BizDataDAO.java

    public List<Map> list(String dbCode, String dataCode, List<BizDataFilter> bizDataFilters) {MongoTemplate mongoTemplate = mongoFactory.getTemplate(dbCode);Query query = new Query();// 遍历数据过滤条件if (CollUtil.isNotEmpty(bizDataFilters)) {// mongodb的查询条件集合List<Criteria> criteriaList = CollUtil.newArrayList();for (BizDataFilter bizDataFilter : bizDataFilters) {// 条件keyString key = bizDataFilter.getKey();// 条件操作String op = bizDataFilter.getOp();// 条件值Object value = bizDataFilter.getValue();// 根据条件操作类型 调用mongodb对应的查询方法Criteria criteria = Criteria.where(key);switch (op) {case MdConstant.DATA_OP_EQ:criteria.is(value);break;case MdConstant.DATA_OP_NE:criteria.ne(value);break;case MdConstant.DATA_OP_GT:criteria.gt(value);break;case MdConstant.DATA_OP_GTE:criteria.gte(value);break;case MdConstant.DATA_OP_LT:criteria.lt(value);break;case MdConstant.DATA_OP_LTE:criteria.lte(value);break;default:throw new RuntimeException("BizDataDAO: 不支持的过滤操作");}// 存入mongodb的查询条件集合criteriaList.add(criteria);}// mongodb查询条件集合 加入查询中query.addCriteria(new Criteria().andOperator(criteriaList));}// 执行查询return mongoTemplate.find(query, Map.class, dataCode);}
    

配置操作

  1. 创建任务过程请参考 使用手册

  2. 在创建任务界面中,添加数据过滤条件

    如下图过滤条件是 salary > 600
    在这里插入图片描述

  3. 执行任务后 通过日志详情可以看到数据入库前预清洗;
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/749920.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javaEE——线程的等待和结束

文章目录 Thread 类及常见方法启动一个线程中断一个线程变量型中断调用 interrupt() 方法来通知观察标志位是否被清除 等待一个线程获取当前线程引用休眠当前线程 线程的状态观察线程的所有状态观察 1: 关注 NEW 、 RUNNABLE 、 TERMINATED 状态的切换 多线程带来的风险为什么会…

【大模型系列】问答理解定位(Qwen-VL/Llama2/GPT)

文章目录 1 Qwen-VL(2023, Alibaba)1.1 网络结构1.2 模型训练 2 Llama2(2023, Meta)2.1 网络结构2.1.1 MHA/GQA/MQA2.1.2 RoPE(Rotary Position Embedding, 旋转式位置编码)2.1.3 RMSNorm 2.2 推理2.2.1 集束搜索(beam search)2.2.2 RoPE外推 3 GPT系列(OpenAI) 1 Qwen-VL(2023…

android中单例模式为什么会引起内存泄漏?

单例模式使用不恰当会造成内存泄漏。因为单例的静态特性使得单例的生命周期和应用的生命周期一样长&#xff0c; 如果一个对象已经不需要使用了&#xff0c;但是单例对象还持有该对象的引用&#xff0c;那么这个对象就不能被正常回收&#xff0c;因此会导致内存泄漏。 举个例子…

【数据可视化】使用Python + Gephi,构建中医方剂关系网络图!

代码和示例数据下载 前言 在这篇文章中&#xff0c;我们将会可视化 《七版方剂学》 的药材的关系&#xff0c;我们将使用Python制作节点和边的数据&#xff0c;然后在Gephi中绘制出方剂的网络图。 Gephi是一个专门用于构建网络图的工具&#xff0c;只要你能提供节点和边的数…

机器学习算法在数据挖掘中的应用

在数据挖掘的实践中&#xff0c;各种机器学习算法都扮演着重要的角色&#xff0c;它们能够从数据中学习规律和模式&#xff0c;并用于预测、分类、聚类等任务。以下是几种常见的机器学习算法以及它们在数据挖掘任务中的应用场景和优缺点。 1. 决策树&#xff08;Decision Tree…

Golang的CSP模型讲解

一.CSP是什么 CSP 是 Communicating Sequential Process 的简称&#xff0c;中文可以叫做通信顺序进程&#xff0c;是一种并发编程模型&#xff0c;是一个很强大的并发数据模型&#xff0c;是上个世纪七十年代提出的&#xff0c;用于描述两个独立的并发实体通过共享的通讯chann…

Stable Diffusion科普文章【附升级gpt4.0秘笈】

随着人工智能技术的飞速发展&#xff0c;我们越来越多地看到计算机生成的艺术作品出现在我们的生活中。其中&#xff0c;Stable Diffusion作为一种创新的图像生成技术&#xff0c;正在引领一场艺术创作的革命。本文将为您科普Stable Diffusion的相关知识&#xff0c;带您走进这…

微信小程序睡眠X秒【while循环模式】

// 微信小程序睡眠X秒sleep(numberMillis) { var now new Date(); var exitTime now.getTime() numberMillis; while (true) { now new Date(); if (now.getTime() > exitTime) {return;}} }, // 微信小程序睡眠X秒 this.sleep(2000); 参考&#xff1a;微信小程序睡眠…

Linux/Ubuntu/Debian控制台启动的程序和terminal分离的方法-正在运行怎么关闭窗口

disown 是一个 shell 内置函数&#xff0c;它从 shell 的作业表中删除指定的作业&#xff0c;使它们免受挂起的影响。 使用方法如下&#xff1a; 首先&#xff0c;正常运行命令&#xff1a; 你的命令然后&#xff0c;按 Ctrl Z 暂停命令。 现在&#xff0c;运行&#xff…

MT1069 圆切平面

n个圆最多把平面分成几部分&#xff1f;输入圆的数量N&#xff0c;问最多把平面分成几块。比如一个圆以把一个平面切割成2块。 不考虑负数&#xff0c;0或者其他特殊情况。 格式 输入格式&#xff1a;输入为整型 输出格式&#xff1a;输出为整型 样例 1 输入&#xff1a; …

全量知识系统“全基因序列” 的百度AI答问 之1

在您所描述的框架下&#xff0c;我们可以开始探索“知识”实体的起点以及如何认识它。首先&#xff0c;让我们明确一下“实体”的定义和性质。实体&#xff0c;在哲学和许多其他学科中&#xff0c;通常被理解为存在于我们世界中的具体事物或抽象概念。它们可以是物理的&#xf…

Flink 集群部署模式

文章目录 前言一、会话模式&#xff08;Session Mode&#xff09;二、单作业模式&#xff08;Per-Job Mode&#xff09;三、应用模式&#xff08;Application Mode&#xff09; 前言 Flink支持多种集群部署模式&#xff0c;以满足不同场景和需求。以下是Flink的主要集群部署模…

springboot多模块下swaggar界面出现异常(Knife4j文档请求异常)或者界面不报错但是没有显示任何信息

继上一篇博文&#xff0c;我们解决了多模块下扫描不到子模块的原因,建议先看上一个博客了解项目结构&#xff1a; springboot 多模块启动报错Field XXX required a bean of type XXX that could not be found. 接下来我们来解决swaggar异常的原因&#xff0c;我们成功启动项目…

QML 布局管理器之ColumnLayout

一.ColumnLayout讲解 QML中的ColumnLayout是一种布局元素&#xff0c;用于在垂直列中排列其子元素。它的主要使用下列附加属性: Layout.minimumWidth Layout.minimumHeight Layout.preferredWidth Layout.preferredHeight Layout.maximumWidth Layout.maximumHeight Layout.fil…

代码随想录算法训练营 Day25|回溯算法2

216.组合总和III 思路 按照回溯算法的逻辑&#xff0c;写代码。 递归函数的参数&#xff0c;除了原有的n, k, startIndex&#xff0c;增加一个sum&#xff0c;计算当前path中的和。 终止条件用sum与n比较&#xff0c;如果满足相等&#xff0c;且满足长度为k&#xff0c;则加入…

WPF-后台设置控件Background

有时候需要在后台设置控件的背景 方法1&#xff1a; Btn_SendNeedle_Admin.Content "送针结束"; Btn_SendNeedle_Admin.Background new SolidColorBrush((Media.Color)Media.ColorConverter.ConvertFromString("#AAFFFFFF")); 方法2&#xff1a; Btn…

SqlServer2008(R2)(一)SqlServer2008(R2)经典宝藏操作收集整理

一、常见操作 1、TRUNCATE TABLE 语句 删除表数据 TRUNCATE TABLE语句比DELET删除表中的所有行更快。从逻辑上讲&#xff0c;TRUNCATE TABLE它类似于DELETE没有WHERE子句的语句。 TRUNCATE TABLE语句从表中删除所有行&#xff0c;但表结构及其列&#xff0c;约束&#xff0c;…

JSON 配置文件

JSON 配置文件的作用 JSON 是一种数据格式&#xff0c;在实际开发中&#xff0c; JSON 总是以配置文件的形式出现。小程序项目中也不例外&#xff1a;通过不同的 .json 配置文件&#xff0c;可以对小程序项目进行不同级别的配置。 小程序项目中有 4 种 json 配置文件&#xff0…

1.AD域控如何强制删除不可以用域控服务器

(1)原因需求 (2)不可用的域控不删掉造成的问题 (3)实战配置步骤 第一步:连接登录到特定服务器 第二步:选择要删除域控所在的站点和名称 第三步:执行删除命令

Ubuntu 20.04 系统如何优雅地安装NCL?

一、什么是NCL&#xff1f; NCAR Command Language&#xff08;NCL&#xff09;是由美国大气研究中心&#xff08;NCAR&#xff09;推出的一款用于科学数据计算和可视化的免费软件。 它有着非常强大的文件输入和输出功能&#xff0c;可读写netCDF-3、netCDF-4 classic、HDF4、b…