开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673
概述
本篇基于 数据集成之任务流程 介绍任务执行时的数据过滤的使用场景和配置操作。
使用场景
业务系统与mydata集成时,核心是数据的来和去,在这两个方向上分别实现:数据预清洗
、数据权限控制
;
-
数据预清洗,从api获取数据后 过滤排除掉“脏”数据,然后再入库用于数据集成;
例如:接口返回的某字段值不能为空、字段值长度在指定范围等;
以下代码是
提供数据
类型的任务执行过程:// 提供数据 case MdConstant.DATA_PRODUCER:// 调用api 获取jsonString json = ApiUtil.read(taskInfo);// 将json按字段映射 解析为业务数据jobDataService.parseData(taskInfo, json);// 根据条件过滤数据jobDataFilterService.doFilter(taskInfo);// 保存业务数据jobDataService.saveTaskData(taskInfo);// 更新环境变量jobVarService.saveVarValue(taskInfo, json);break;
jobDataFilterService.doFilter
是对数据的预过滤处理,详见 JobDataFilterService.javapublic void doFilter(TaskInfo task) {Assert.notNull(task);// 获取业务数据List<Map> dataList = task.getProduceDataList();// 获取配置的过滤条件List<BizDataFilter> dataFilters = task.getDataFilters();if (CollUtil.isEmpty(dataList) || CollUtil.isEmpty(dataFilters)) {return;}// 定义新的数据集合,用于存储 过滤后的数据List<Map> filterDatas = ListUtil.toList();// 遍历数据,并进行过滤dataList.forEach(data -> {boolean isCorrect = false;for (BizDataFilter filter : dataFilters) {String key = filter.getKey();Object filterValue = filter.getValue();String op = filter.getOp();// 当数据中 不包含 过滤的字段名,则执行下一项过滤if (!data.containsKey(key)) {continue;}// 当数据中 指定字段的值 无效,则过滤该数据Object dataValue = data.get(key);if (ObjectUtil.isNull(dataValue)) {isCorrect = true;break;}// 判断业务数据值 和 过滤数据值 都可对比,否则过滤条件无效if (!(dataValue instanceof Comparable && filterValue instanceof Comparable)) {break;}String cDataValue = dataValue.toString();String cFilterValue = filterValue.toString();// 根据op类型,过滤数据switch (op) {case MdConstant.DATA_OP_EQ:// 等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) == 0);break;case MdConstant.DATA_OP_NE:// 不等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) != 0);break;case MdConstant.DATA_OP_GT:// 大于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) > 0);break;case MdConstant.DATA_OP_GTE:// 大于等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) >= 0);break;case MdConstant.DATA_OP_LT:// 小于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) < 0);break;case MdConstant.DATA_OP_LTE:// 小于等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) <= 0);break;default:throw new RuntimeException("JobDataFilter: 不支持的过滤操作");}}// 当 未被过滤,则添加到过滤结果if (isCorrect) {filterDatas.add(data);}});task.setProduceDataList(filterDatas);task.appendLog("过滤前的业务数据:{}", dataList);task.appendLog("过滤条件:{}", dataFilters);task.appendLog("过滤后的业务数据:{}", filterDatas); }
注:目前0.7版本暂时实现了关系运算,后续增加函数处理;
-
数据权限控制,限制应用接收的数据范围,即符合条件的数据才能共享给应用;
以下代码是
消费数据
类型任务的执行过程:// 消费数据 case MdConstant.DATA_CONSUMER:List<BizDataFilter> filters = taskInfo.getDataFilters();if (CollUtil.isNotEmpty(filters)) {// 解析过滤条件值中的 自定义字符串parseFilterValue(filters);// 排除值为null的条件filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());}String dataCode = taskInfo.getDataCode();if (StrUtil.isNotEmpty(dataCode)) {// 根据过滤条件 查询数据List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);taskInfo.setConsumeDataList(dataList);// 根据字段映射转换为api参数jobDataService.convertData(taskInfo);}// 调用api传输数据ApiUtil.write(taskInfo);break;
bizDataDAO.list
方法支持按配置条件查询数据,详见 BizDataDAO.javapublic List<Map> list(String dbCode, String dataCode, List<BizDataFilter> bizDataFilters) {MongoTemplate mongoTemplate = mongoFactory.getTemplate(dbCode);Query query = new Query();// 遍历数据过滤条件if (CollUtil.isNotEmpty(bizDataFilters)) {// mongodb的查询条件集合List<Criteria> criteriaList = CollUtil.newArrayList();for (BizDataFilter bizDataFilter : bizDataFilters) {// 条件keyString key = bizDataFilter.getKey();// 条件操作String op = bizDataFilter.getOp();// 条件值Object value = bizDataFilter.getValue();// 根据条件操作类型 调用mongodb对应的查询方法Criteria criteria = Criteria.where(key);switch (op) {case MdConstant.DATA_OP_EQ:criteria.is(value);break;case MdConstant.DATA_OP_NE:criteria.ne(value);break;case MdConstant.DATA_OP_GT:criteria.gt(value);break;case MdConstant.DATA_OP_GTE:criteria.gte(value);break;case MdConstant.DATA_OP_LT:criteria.lt(value);break;case MdConstant.DATA_OP_LTE:criteria.lte(value);break;default:throw new RuntimeException("BizDataDAO: 不支持的过滤操作");}// 存入mongodb的查询条件集合criteriaList.add(criteria);}// mongodb查询条件集合 加入查询中query.addCriteria(new Criteria().andOperator(criteriaList));}// 执行查询return mongoTemplate.find(query, Map.class, dataCode);}
配置操作
-
创建任务过程请参考 使用手册
-
在创建任务界面中,添加数据过滤条件
如下图过滤条件是 salary > 600
-
执行任务后 通过日志详情可以看到数据入库前预清洗;