二百七十四、Kettle——ClickHouse中对错误数据表中进行数据修复(实时)

一、目的

在完成数据清洗、错误数据之后,需要根据修复规则对错误数据进行修复

二、Hive中原有代码

insert into table  hurys_db.dwd_queue  partition(day)
selecta3.id,a3.device_no,a3.source_device_type,a3.sn,a3.model,a3.create_time,a3.lane_no,a3.lane_type,case when a3.queue_count between 0 and 100 then a3.queue_count else a2.avg_queue_count end as queue_count,case when a3.queue_len   between 0 and 500 then a3.queue_len  else a2.avg_queue_len  end as queue_len,case when a3.queue_head  between 0 and 500 then a3.queue_head else a2.avg_queue_head end as queue_head,case when a3.queue_tail  between 0 and 500 then a3.queue_tail else a2.avg_queue_tail end as queue_tail,a3.day
from hurys_db.dwd_queue_error as a3
right join (selecta1.device_no,a1.create_time,a1.lane_no,round(avg(queue_count),0)     avg_queue_count,round(avg(queue_len),2)       avg_queue_len,round(avg(queue_head),2)      avg_queue_head,round(avg(queue_tail),2)      avg_queue_tail
from(select
t1.device_no, t1.create_time start_time, t2.create_time, t1.lane_no,
t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_db.dwd_queue as t1
right join hurys_db.dwd_queue_error as t2
on t2.device_no=t1.device_no  and  t2.lane_no=t1.lane_noand  concat(date_sub(t2.create_time,7),substr(t2.create_time,11,10)) = t1.create_time
where t1.device_no is not null
union all
select
t1.device_no, t1.create_time start_time, t2.create_time, t1.lane_no,
t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_db.dwd_queue as t1
right join hurys_db.dwd_queue_error as t2
on t2.device_no=t1.device_no  and  t2.lane_no=t1.lane_noand  concat(date_sub(t2.create_time,14),substr(t2.create_time,11,10)) = t1.create_time
where t1.device_no is not null
union all
select
t1.device_no, t1.create_time start_time, t2.create_time, t1.lane_no,
t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_db.dwd_queue as t1
right join hurys_db.dwd_queue_error as t2
on t2.device_no=t1.device_no  and  t2.lane_no=t1.lane_noand  concat(date_sub(t2.create_time,21),substr(t2.create_time,11,10)) = t1.create_time
where t1.device_no is not null
) as a1
group by a1.device_no, a1.create_time, a1.lane_no) as a2
on a3.device_no=a2.device_no and a3.create_time=a2.create_time and a3.lane_no=a2.lane_no
where a3.day='2024-09-04'
;

三、ClickHouse中现有代码

--43、静态排队字段数据修复--修复策略:使用前三周同期数据取平均进行修复
selecta3.id,a3.device_no,a3.source_device_type,a3.sn,a3.model,a3.create_time,a3.lane_no,a3.lane_type,case when a3.queue_count between 0 and 100 then a3.queue_count else a2.avg_queue_count end as queue_count,
       case when a3.queue_len   between 0 and 500 then a3.queue_len  else cast(a2.avg_queue_len   as Decimal(10,2)) end as queue_len,case when a3.queue_head  between 0 and 500 then a3.queue_head else cast(a2.avg_queue_head  as Decimal(10,2)) end as queue_head,case when a3.queue_tail  between 0 and 500 then a3.queue_tail else cast(a2.avg_queue_tail  as Decimal(10,2)) end as queue_tail,cast(a3.day as String) day
from hurys_jw.dwd_queue_error as a3
right join (
selectdevice_no,start_time,lane_no,round(avg(queue_count),0)     avg_queue_count,round(avg(queue_len) ,2)      avg_queue_len,round(avg(queue_head),2)      avg_queue_head,round(avg(queue_tail),2)      avg_queue_tail
from(
select
t1.device_no, t2.create_time start_time,t2.create_time_7 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_jw.dwd_queue as t1
inner join(selectdevice_no,lane_no,create_time,(create_time - interval 7 day) create_time_7,(create_time - interval 14 day)create_time_14,(create_time - interval 21 day)create_time_21
from hurys_jw.dwd_queue_error) as t2
on t2.device_no=t1.device_no  and  t2.lane_no=t1.lane_no and t2.create_time_7=t1.create_time
where t1.device_no is not null
union all
select
t1.device_no, t2.create_time start_time,t2.create_time_14 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_jw.dwd_queue as t1
inner join(selectdevice_no,lane_no,create_time,(create_time - interval 7 day) create_time_7,(create_time - interval 14 day)create_time_14,(create_time - interval 21 day)create_time_21
from hurys_jw.dwd_queue_error) as t2
on t2.device_no=t1.device_no  and  t2.lane_no=t1.lane_no and t2.create_time_14=t1.create_time
where t1.device_no is not null
union all
select
t1.device_no, t2.create_time start_time,t2.create_time_21 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_jw.dwd_queue as t1
inner join(selectdevice_no,lane_no,create_time,(create_time - interval 7 day) create_time_7,(create_time - interval 14 day)create_time_14,(create_time - interval 21 day)create_time_21
from hurys_jw.dwd_queue_error) as t2
on t2.device_no=t1.device_no  and  t2.lane_no=t1.lane_no and t2.create_time_21=t1.create_time
where t1.device_no is not null)
where lane_no is not null
group by device_no, start_time, lane_no) as a2
on a3.device_no=a2.device_no and a3.create_time=a2.start_time and a3.lane_no=a2.lane_no
where a3.day >= ?
;

注意:Hive中原有SQL语句和ClickHouse现有SQL语句很大不同

四、Kettle任务

方框标记是修复数据,高频率执行

下面其他几个是修复记录任务

4.1 newtime

4.2 替换NULL值

4.3 clickhouse输入

select
       a3.id,
       a3.device_no,
       a3.source_device_type,
       a3.sn,
       a3.model,
       a3.create_time,
       a3.lane_no,
       a3.lane_type,
       case when a3.queue_count between 0 and 100 then a3.queue_count else a2.avg_queue_count end as queue_count,
       case when a3.queue_len   between 0 and 500 then a3.queue_len  else cast(a2.avg_queue_len   as Decimal(10,2)) end as queue_len,
       case when a3.queue_head  between 0 and 500 then a3.queue_head else cast(a2.avg_queue_head  as Decimal(10,2)) end as queue_head,
       case when a3.queue_tail  between 0 and 500 then a3.queue_tail else cast(a2.avg_queue_tail  as Decimal(10,2)) end as queue_tail,
       cast(a3.day as String) day
from hurys_jw.dwd_queue_error as a3
right join (
select
       device_no,
       start_time,
       lane_no,
       round(avg(queue_count),0)     avg_queue_count,
       round(avg(queue_len) ,2)      avg_queue_len,
       round(avg(queue_head),2)      avg_queue_head,
       round(avg(queue_tail),2)      avg_queue_tail
from(
select
t1.device_no, t2.create_time start_time,t2.create_time_7 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_jw.dwd_queue as t1
inner join(select
       device_no,lane_no,create_time,
       (create_time - interval 7 day) create_time_7,
       (create_time - interval 14 day)create_time_14,
       (create_time - interval 21 day)create_time_21
from hurys_jw.dwd_queue_error) as t2
on t2.device_no=t1.device_no  and  t2.lane_no=t1.lane_no and t2.create_time_7=t1.create_time
where t1.device_no is not null
union all
select
t1.device_no, t2.create_time start_time,t2.create_time_14 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_jw.dwd_queue as t1
inner join(select
       device_no,lane_no,create_time,
       (create_time - interval 7 day) create_time_7,
       (create_time - interval 14 day)create_time_14,
       (create_time - interval 21 day)create_time_21
from hurys_jw.dwd_queue_error) as t2
on t2.device_no=t1.device_no  and  t2.lane_no=t1.lane_no and t2.create_time_14=t1.create_time
where t1.device_no is not null
union all
select
t1.device_no, t2.create_time start_time,t2.create_time_21 create_time, t1.lane_no,t1.queue_count, t1.queue_len, t1.queue_head, t1.queue_tail
from hurys_jw.dwd_queue as t1
inner join(select
       device_no,lane_no,create_time,
       (create_time - interval 7 day) create_time_7,
       (create_time - interval 14 day)create_time_14,
       (create_time - interval 21 day)create_time_21
from hurys_jw.dwd_queue_error) as t2
on t2.device_no=t1.device_no  and  t2.lane_no=t1.lane_no and t2.create_time_21=t1.create_time
where t1.device_no is not null)
where lane_no is not null
group by device_no, start_time, lane_no
    ) as a2
on a3.device_no=a2.device_no and a3.create_time=a2.start_time and a3.lane_no=a2.lane_no
where a3.day >= ?
;

4.4 字段选择

4.5 clickhouse输出

4.6 执行SQL脚本

由于是对每一天的错误进行修复,因此每次执行后需要先删除这个分区的错误数据。因为每次执行清洗后都会先执行错误数据任务!

4.7 执行任务

4.8 海豚调度

注意在DWD层静态排队数据清洗、DWD层静态排队错误数据之后

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/59191.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Golang | Leetcode Golang题解之第530题二叉搜索树的最小绝对差

题目&#xff1a; 题解&#xff1a; func getMinimumDifference(root *TreeNode) int {ans, pre : math.MaxInt64, -1var dfs func(*TreeNode)dfs func(node *TreeNode) {if node nil {return}dfs(node.Left)if pre ! -1 && node.Val-pre < ans {ans node.Val -…

Android Studio打包时不显示“Generate Signed APK”提示信息

Android Studio打包时&#xff0c;默认显示“Generate Signed APK”提示信息&#xff0c;如下图所示&#xff1a; 如果在打包时不显示“Generate Signed APK”提示信息&#xff0c;解决办法是&#xff1a; Android Studio菜单栏&#xff0c;“File->Settings->Appearan…

手游和应用出海资讯:可灵AI独立APP即将上架;Rollic在英国推出芭比合并解谜手游

NetMarvel帮助游戏和应用广告主洞察全球市场、获取行业信息&#xff0c;以下为10月第四周资讯&#xff1a; ● 苹果开发全新游戏中心应用 ● Meta计划开发人工智能搜索引擎 ● 微软已拥有20个游戏IP&#xff0c;收入达10亿美元 ● OpenAI计划在12月推出其下一代前沿模型Orion ●…

js中多let与var

在 JavaScript 中&#xff0c;let 和 var 都用于声明变量&#xff0c;但它们有一些关键的区别。主要区别包括作用域、变量提升、可重复声明、以及在全局作用域中的行为。 1. 作用域&#xff08;Scope&#xff09; let&#xff1a;块级作用域。用 let 声明的变量只在其所在的代…

qt管理系统框架(好看界面、漂亮界面、好看的界面、漂亮的界面)

概述 最近一个项目用QT开发&#xff0c;然后找了美工帮设计了下界面。总算完工&#xff0c;后想一下干脆抽出一个基础框架&#xff0c;方便以后用。 功能 支持mysql、echarts。 支持加载动态权限菜单&#xff0c;轻松权限控制。 支持遮罩对话框、抽屉 支持开机启动动画界面 内…

华为云计算知识总结——及案例分享

目录 一、华为云计算基础知识二、华为云计算相关案例实战案例一&#xff1a;搭建弹性云服务器&#xff08;ECS&#xff09;并部署Web应用案例二&#xff1a;构建基于OBS的图片存储和分发系统案例三&#xff1a;基于RDS的高可用数据库应用案例四&#xff1a;使用华为云DDoS防护保…

11.1组会汇报-基于区块链的安全多方计算研究现状与展望

基础知识 *1.背书&#xff0c;这个词源来自银行票据业务&#xff0c;是指票据转让时&#xff0c;原持有人在票据背面加盖自己的印鉴&#xff0c;证明该票据真实有效、如果有问题就可以找原持有人。 区块链中的背书就好理解了。可以简单的理解为验证交易并声明此交易合法&…

【Linux】进程间通信(命名管道、共享内存、消息队列、信号量)

作者主页&#xff1a; 作者主页 本篇博客专栏&#xff1a;Linux 创作时间 &#xff1a;2024年11月2日 命名管道&#xff1a; 如果我们想在不相关的进程之间交换数据&#xff0c;可以使用FIFO文件来做这项工作&#xff0c;它经常被称为命名管道。命名管道是一种特殊类型的文…

划界与分类的艺术:支持向量机(SVM)的深度解析

划界与分类的艺术&#xff1a;支持向量机&#xff08;SVM&#xff09;的深度解析 1. 引言 支持向量机&#xff08;Support Vector Machine, SVM&#xff09;是机器学习中的经典算法&#xff0c;以其强大的分类和回归能力在众多领域得到了广泛应用。SVM通过找到最优超平面来分…

Java设计模式(代理模式整理中ing)

一、代理模式 1、代理模式定义&#xff1a; 代理模式&#xff1a;由于某些原因要给某对象提供一个代理以控制对该对象的访问&#xff0c;这时访问对象不适合或者不能够直接引用目标对象&#xff0c;代理对象作为访问对象与目标对象之间的中介进行连接调控调用。 2、代理模式的…

【含文档+源码】基于SpringBoot+Vue的新型吃住玩一体化旅游管理系统的设计与实现

开题报告 本文旨在探讨新型吃住玩一体化旅游管理系统的设计与实现。该系统融合了用户注册与登录、旅游景点管理、旅游攻略发帖、特色旅游路线推荐、附近美食推荐以及酒店客房推荐与预定等多项功能&#xff0c;旨在为游客提供全方位、一体化的旅游服务体验。在系统设计中&#…

如何卸载电脑上的软件?彻底删除第三方和系统自带软件方法!(新款)

如何卸载电脑上的软件&#xff1f;在日常使用电脑的过程中&#xff0c;我们经常会安装各种软件以满足不同的需求。然而&#xff0c;随着时间的推移&#xff0c;一些不再使用的软件可能会占用系统资源&#xff0c;影响电脑性能。因此&#xff0c;定期卸载不需要的软件是保持系统…

cocos开发QA

目录 TS相关foreach循环中使用return循环延迟动态获取类属性 Cocos相关属性检查器添加Enum属性使用Enum报错 枚举“XXX”用于其声明前实现不规则点击区域使用cc.RevoluteJoint的enable激活组件无效本地存储以及相关问题JSON.stringify(map)返回{}数据加密客户端复制文本使用客户…

LeetCode :21. 合并两个有序链表(Java)

目录 题目描述: 代码: 第一种: 第二种: 题目描述: 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1&#xff1a; 输入&#xff1a;l1 [1,2,4], l2 [1,3,4] 输出&#xff1a;[1,1,2,3,4,4]示例 2&#xff1a; …

删除的文件怎么找回?删除文件恢复全面指南

我们常常在日常生活或工作中不小心删除了重要文件&#xff0c;这样的情况可能瞬间让人感到无助。不过&#xff0c;数据恢复技术已相当成熟&#xff0c;我们可以通过多种方法来找回误删的文件。下面我们将从简单到复杂逐步讲解找回删除文件的方法&#xff0c;希望可以帮助大家在…

D57【python 接口自动化学习】- python基础之异常

day57 异常捕获 学习日期&#xff1a;20241103 学习目标&#xff1a;异常 -- 73 异常捕获&#xff1a;出现异常时&#xff0c;如何利用程序进行处理&#xff1f; 学习笔记&#xff1a; try-except代码块 # 捕获异常 num1 num10 try:num/num1except Exception as e:print(上…

【06】A-Maven项目SVN设置忽略文件

做Web项目开发时&#xff0c;运用的是Maven管理工具对项目进行管理&#xff0c;在项目构建的过程中自动生成了很多不需要SVN进行管理的文件&#xff0c;SVN在对源码进行版本管理时&#xff0c;需要将其忽略&#xff0c;本文给出了具体解决方案。 SVN设置忽略Maven项目中自动生成…

logback日志级别动态切换四种方案

生产环境中经常有需要动态修改日志级别。 现在就介绍几种方案 方案一&#xff1a;开启logback的自动扫描更新 配置如下 <?xml version"1.0" encoding"UTF-8"?> <configuration scan"true" scanPeriod"60 seconds" debug…

Linux——Ubuntu的基础操作

压缩与解压缩 gzip压缩工具 创建文件 a.c和b.c touch a.c touch b.c 压缩文件a.c和b.c gzip a.c gzip b.c 解压缩a.c.gz和b.c.gz gzip -d a.c.gz 对文件夹进行压缩 gzip -r 对文件夹进行解压缩 gzip -rd 注意&#xff1a;这只是对文件夹里所有文件进行压缩&#xff0c…

win10下MMSegmentation自定义数据集

下载1.2.1版本: Releases open-mmlab/mmsegmentation GitHub 安装环境 本地torch环境为1.9.1 pip install -U openmim mim install mmengine mim install "mmcv>=2.0.0" 报mmcv版本不匹配的问题,形如:MMCV==X.X.X is used but incompatible. Please inst…