Flinksql--订单宽表

参考: https://chbxw.blog.csdn.net/article/details/115078261 (datastream 实现)

一、ODS

模拟订单表及订单明细表

CREATE TABLE orders (order_id STRING,user_id STRING,order_time TIMESTAMP(3),-- 定义事件时间及 Watermark(允许5秒乱序)WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'chb1:9092',-- 如果source被多个任务使用,不在定义时指定group.id-- 通过hint指定  OPTIONS('properties.group.id'='test_group2')  注意是group.id 是点不是下划线-- 'properties.group.id' = 'flink-sql-group-orders',  -- 消费者组 ID'scan.startup.mode' = 'earliest-offset','format' = 'json'
);CREATE TABLE order_details (detail_id STRING,order_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,detail_time TIMESTAMP(3),-- 定义事件时间及 Watermark(允许5秒乱序)WATERMARK FOR detail_time AS detail_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'order_details','properties.bootstrap.servers' = 'chb1:9092',-- 'properties.group.id' = 'flink-sql-group-order_details',  -- 消费者组 ID'scan.startup.mode' = 'earliest-offset','format' = 'json'
);-- 造数据
insert into order_details values ('d001', 'o001', 'car', 5000, 1, now());
insert into orders values('o001', 'u001', now());insert into orders values('o003', 'u003', now());insert into order_details values ('d003', 'o003', 'water', 2, 12, now());
insert into order_details values ('d003', 'o003', 'food', 50, 3, now());

二、DWD 订单和订单明细关联


-- sink
CREATE TABLE dwd_trd_order (detail_id STRING,order_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,detail_time TIMESTAMP(3),user_id STRING,order_time TIMESTAMP(3),-- 定义事件时间及 Watermark(允许5秒乱序)WATERMARK FOR detail_time AS detail_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'dwd_trd_order','properties.bootstrap.servers' = 'chb1:9092','scan.startup.mode' = 'earliest-offset','format' = 'json'
);insert into dwd_trd_order
SELECT d.detail_id,o.order_id,d.product_id,d.price,d.quantity,d.detail_time,user_id,order_time
FROM orders o
JOIN order_details d 
ON o.order_id = d.order_id
AND d.detail_time BETWEEN o.order_time AND o.order_time + INTERVAL '10' MINUTE;

报错:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: The query contains more than one rowtime attribute column [detail_time, order_time] for writing into table 'default_catalog.default_database.dwd_trd_order'.
Please select the column that should be used as the event-time timestamp for the table sink by casting all other columns to regular TIMESTAMP or TIMESTAMP_LTZ.

在 Flink SQL 中,每个表只能有一个 行时间属性(rowtime attribute) 用于定义事件时间(Event Time)。当写入目标表时,若查询结果包含多个行时间属性字段(如 order_timedetail_time),会导致冲突。以下是解决方案:


1. 问题定位

错误信息表明目标表 dwd_trd_order 在写入时检测到多个行时间属性字段(detail_timeorder_time)。
根本原因是:JOIN 后的结果保留了双表的事件时间字段,且未被转换为普通时间戳


2. 解决方案

方案一:仅保留一个行时间属性

在查询中选择一个时间字段作为事件时间,将其他时间字段转为普通 TIMESTAMP 类型。
假设目标表 dwd_trd_order 使用 order_time 作为事件时间:

INSERT INTO dwd_trd_order
SELECT o.order_id,o.user_id,d.product_id,d.price,d.quantity,o.order_time,  -- 保留为行时间属性(需与目标表定义一致)CAST(d.detail_time AS TIMESTAMP(3)) AS detail_time  -- 转为普通时间戳
FROM orders o
JOIN order_details d ON o.order_id = d.order_id;
方案二:调整目标表定义

若业务需要同时保留两个时间字段,需在目标表 DDL 中 仅定义一个行时间属性,其他字段转为普通时间戳:

CREATE TABLE dwd_trd_order (order_id STRING,user_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,order_time TIMESTAMP(3),  -- 行时间属性detail_time TIMESTAMP(3),  -- 普通时间戳WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND  -- 仅一个事件时间
) WITH (...);

3. 关键步骤说明

  1. 检查目标表 DDL
    确保目标表仅有一个 WATERMARK 定义,且对应字段为行时间属性。

  2. 转换多余的行时间属性
    在查询中使用 CAST 将非主时间字段转为普通 TIMESTAMPTIMESTAMP_LTZ

    CAST(detail_time AS TIMESTAMP(3))  -- 转为非行时间属性
    
  3. 验证查询结果
    使用 DESCRIBE 确认查询结果的字段类型:

    DESCRIBE (SELECT ... FROM ...);
    
目标表 DDL(仅一个行时间属性)
CREATE TABLE dwd_trd_order (order_id STRING,user_id STRING,product_id STRING,price DECIMAL(10,2),quantity INT,order_time TIMESTAMP(3),  -- 行时间属性detail_time TIMESTAMP(3),  -- 普通时间戳WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
写入数据的 SQL(转换多余时间字段)
INSERT INTO dwd_trd_order
SELECT o.order_id,o.user_id,d.product_id,d.price,d.quantity,o.order_time,  -- 保留为行时间属性CAST(d.detail_time AS TIMESTAMP(3)) AS detail_time  -- 转为普通时间戳
FROM orders o
JOIN order_details d ON o.order_id = d.order_id;

三、DWS

CREATE TABLE dws_trd_order (window_start TIMESTAMP(3),window_end TIMESTAMP(3),product_num bigint,uv bigint,total_amount DECIMAL(10,2)
) WITH ('connector' = 'kafka','topic' = 'dws_trd_order','properties.bootstrap.servers' = 'chb1:9092','scan.startup.mode' = 'earliest-offset','format' = 'json'
);-- dws 
insert into dws_trd_order
SELECTwindow_start, window_end,COUNT(1) AS product_num,COUNT(DISTINCT user_id) AS uv,SUM(price * quantity) AS total_amount
FROM TABLE(CUMULATE(TABLE dwd_trd_order, DESCRIPTOR(detail_time), INTERVAL '5' SECOND, INTERVAL '1' DAY)
)
GROUP BY window_start, window_end;

有个问题: 为什么窗口结束时间从 2025-04-02 20:48:50.000 开始???


dwd_trd_order 表的时间如下order_time              detail_time2025-04-02 20:06:01.281 2025-04-02 20:07:35.4942025-04-02 20:50:27.975 2025-04-02 20:50:33.2332025-04-02 20:50:27.975 2025-04-02 20:50:34.405累计窗口运算如下selectwindow_start, window_end,count(1) product_num,count(distinct user_id) uv,sum(price*quantity) as total_amountfrom TABLE(CUMULATE(TABLE dwd_trd_order, DESCRIPTOR(detail_time ), INTERVAL '5' SECOND, INTERVAL '1' DAY)
)
group by window_start,window_end;
为什么窗口结束时间从 2025-04-02 20:48:50.000 开始???window_start              window_end                    product_num                   uv                             total_amount2025-04-02 00:00:00.000 2025-04-02 20:48:50.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:48:55.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:00.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:05.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:10.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:15.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:20.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:25.000                    1                    1                                  5000.002025-04-02 00:00:00.000 2025-04-02 20:49:30.000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/899890.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

粒子滤波介绍

目录 粒子滤波的主要流程可以分为以下 5 个步骤: 粒子滤波(PF) vs. ESKF(误差状态卡尔曼滤波) 粒子滤波的主要流程可以分为以下 5 个步骤: 初始化(Initialization) 生成 N 个粒子&…

一场国际安全厂商的交流会议简记

今天参与了一场国际安全厂商A公司组织的交流会议 与会有国际TOP企业跨境企业 还有国内一些头部商业公司。 A公司很有意思介绍了自己是怎么做安全运营中心SOC的。 介绍了很多内容,包括他们自己的员工量/设备量/事件量/SOC中心人员量,其中人员量只有个位数…

Java面试黄金宝典30

1. 请详细列举 30 条常用 SQL 优化方法 定义 SQL 优化是指通过对 SQL 语句、数据库表结构、索引等进行调整和改进,以提高 SQL 查询的执行效率,减少系统资源消耗,提升数据库整体性能的一系列操作。 要点 从索引运用、查询语句结构优化、数据…

花洒洗澡完毕并关闭后过段时间会突然滴水的原因探究

洗澡完毕后的残留水 在洗澡的过程中,我们通常会使用到大量的水。这些水会通过花洒管子到达花洒顶喷流出。由于大顶喷花洒的喷头较大,关闭后里面的存水会更多。 气压失衡后的滴水 当花洒关闭后,内部的水管和花洒头中仍存有一定量的水。由于…

QSettings用法实战(相机配置文件的写入和读取)

很多情况,在做项目开发的时候,将参数独立出来是比较好的方法 例如:相机的曝光次数、曝光时长等参数,独立成ini文件,用户可以在外面修改即可生效,无需在动代码重新编译等工作 QSettings便可以实现该功能 内…

运维培训班之最佳选择(The best Choice for Operation and Maintenance Training Courses)

运维培训班之最佳选择 从面试官的角度聊聊培训班对运维的帮助,同时给培训班出身的运维一些建议~ 谈到运维(尤其是零基础非科班转行的运维)找工作,培训班是个不可回避的讨论热点。虽然本人也做过兼职运维培训老师,多少…

网络安全与防护策略

随着信息技术的飞速发展,互联网已成为现代社会不可或缺的一部分。从日常生活到企业运营,几乎所有活动都离不开网络。然而,网络的开放性和广泛性也使得网络安全问题愈发严峻。无论是个人数据泄露,还是大规模的网络攻击,…

LLM 分词器Tokenizer 如何从 0 到 1 训练出来

写在前面 大型语言模型(LLM)处理的是人类的自然语言,但计算机本质上只能理解数字。Tokenizer(分词器) 就是架在自然语言和计算机数字表示之间的一座至关重要的桥梁。它负责将我们输入的文本字符串分解成模型能够理解的最小单元——Token,并将这些 Token 转换成对应的数字…

【ArcGIS微课1000例】0142:如何从谷歌地球保存高清影像图片

文章目录 一、选取影像区域1. 搜索地图区域2. 导入矢量范围二、添加输出图层三、保存高清影像1. 地图选项2. 输出分辨率3. 保存图像四、注意事项一、选取影像区域 首先需要选取影像区域,可通过以下方式快速定位。 1. 搜索地图区域 在搜索框内输入关键词,例如青海湖,点击【…

Unity注册表修改分辨率:探索幕后设置与手动调控

Unity注册表修改分辨率:探索幕后设置与手动调控 在Unity开发中,调整分辨率和显示模式是开发过程中常见的需求,尤其是当我们打包并运行应用时,可能会遇到显示模式不符合预期的情况。Unity在首次运行时会自动保存这些设置&#xff…

外部流输入的 Layer

在 Android 的 SurfaceFlinger 体系中,外部流输入的 Layer 通常通过 Sideband Stream 或 BufferQueue 机制传递给 SurfaceFlinger,然后由 HWC(Hardware Composer)或 OpenGL ES 进行合成。 1. 什么是外部流输入的 Layer&#xff1f…

31-体测管理系统

介绍 技术: 基于 B/S 架构 SpringBootMySQLvueelementui 环境: Idea mysql maven jdk1.8 node 用户端功能 1.系统首页展示轮播图及公告信息 2.测试项目:展示可以参加测试的项目列表 3.公告信息:公告信息列表及详情 可进行点赞和收藏 4.在线留言 5.个人…

NVR接入录像回放平台EasyCVR视频系统守护舌尖上的安全,打造“明厨亮灶”云监管平台

一、方案背景 近年来,餐饮行业食品安全和卫生等问题频发,比如后厨卫生脏乱差等,持续引发关注,这些事情导致连锁反应,使其收益遭受损失。同时,给消费者造成了心理和生理上的伤害。 加强餐饮行业的监管成为…

Python办公自动化(3)对Excel的操作

1.读取excel文件 1.安装工具 终端下载读取excel文档的工具库: pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple xlrd 若对版本有特殊需求: 删除当前版本:pip3 uninstall xlrd 下载所需要的版本:pip3 install -i htt…

go语言:开发一个最简单的用户登录界面

1.用deepseek生成前端页面&#xff1a; 1.提问&#xff1a;请你用html帮我设计一个用户登录页面&#xff0c;要求特效采用科技感的背景渲染加粒子流动&#xff0c;用css、div、span标签&#xff0c;并给出最终合并后的代码。 生成的完整代码如下&#xff1a; <!DOCTYPE h…

blender二次元上色

前&#xff1a; 后&#xff1a;&#xff08;脸自己会发光) 参考&#xff1a;05-模型导入与材质整理_哔哩哔哩_bilibili

Mysql+Demo 获取当前日期时间的方式

记录一下使用Mysql获取当前日期时间的方式 获取当前完整的日期时间有常见的四种方式&#xff0c;获取得到的默认格式(mysql的格式标准)是 %Y-%m-%d %H:%i:%s其它格式 %Y-%m-%d %H:%i:%s.%f方式一&#xff1a;now()函数 select now();mysql> select now(); -------------…

C#核心学习(六)面向对象--封装(5)静态成员及静态构造函数和静态类 以及和常量的区别

目录 一、什么是静态的&#xff1f;什么是常量&#xff1f; 1. ​静态&#xff08;Static&#xff09;​ 2. ​常量&#xff08;const&#xff09;​ 二、类中的静态成员有什么用&#xff1f; 1. ​共享数据 2. ​工具方法与全局配置 3. ​单例模式 三、静态类和静态成…

FreeRTOS源码下载分享

FreeRTOS源码下载分享 官网下载太慢了&#xff0c;分享下FreeRTOSv202411 FreeRTOSv202411.00.zip 链接: https://pan.baidu.com/s/1P4sVS5WroYEl0WTlPD7GXg 提取码: g6aq

2025年win10使用dockerdesktop安装k8s

一、写作背景 百度了一圈&#xff0c; 要么教程老&#xff0c;很多操作步骤冗余&#xff0c; 要么跑不通&#xff0c;或者提供的链接失效等情况。 二、看前须知 1、安装过程使用的AI辅助&#xff0c; 因为参考的部分博客卡柱了。 2、如果操作过程中遇到卡顿&#xff0c; …