FlinkSql 如何实现数据去重?

摘要

很多时候flink消费上游kafka的数据是有重复的,因此有时候我们想数据在落盘之前进行去重,这在实际开发中具有广泛的应用场景,此处不说详细代码,只粘贴相应的flinksql

代码

--********************************************************************--
-- 创建临时表(只在当前sessoin生效的表称为临时表) DDL
CREATE TEMPORARY TABLE UserAttrSource ( `data` string,`kafkaMetaTimestamp` TIMESTAMP(3) METADATA FROM 'timestamp', -- kafka record携带的源数据时间戳,参考官网kafka connectorproctime as PROCTIME() -- 获取数据处理时间,这是flink内置支持的关键字
) WITH ('connector' = 'kafka','topic' = 'user_attri_ad_dirty_data','properties.bootstrap.servers' = 'kafka地址','scan.startup.mode' = 'timestamp', -- kafka扫描数据模式,参考官网kafka connector'scan.startup.timestamp-millis' ='1687305600000' , -- 2023-06-21 08:00:00'format' = 'raw' -- 意思是将kafka数据格式化为string
);-- 创建SINKCREATE TEMPORARY TABLE ADB (log_date DATE,`errorType` int,appId string,`errorCode` int,`errorReason` string,`deserialization` string,`originalData` string,kafkaMetaTimestamp TIMESTAMP,data_hash string,PRIMARY KEY (`data_hash`) NOT ENFORCED
)
WITH ('connector' = 'adb3.0','url' = 'jdbc:mysql://xxxx:3306/flink_data?rewriteBatchedStatements=true','tableName' = 'usr_attr_dirty', 'userName'='username','password'='password'
);
-- 去重视图, 这是关键(json_value是flink的内置函数,data_hash是数据本身的primary key)
-- 下述语句含义是:根据data_hash字段分组,按照处理时间排序,取出最新的一条数据,其他的重复数据将被抛弃
CREATE TEMPORARY VIEW quchong ASSELECT data,kafkaMetaTimestamp FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY json_value(data,'$.data_hash') ORDER BY proctime DESC) as row_numFROM UserAttrSource)WHERE row_num = 1;--  插入目标表
insert into ADB
select TO_DATE(DATE_FORMAT(kafkaMetaTimestamp,'yyyy-MM-dd') )AS log_date,json_value(data,'$.errorType' RETURNING INT) errorType,json_value(data,'$.appId' NULL ON EMPTY) appId,json_value(data,'$.errorCode'  RETURNING INT) errorCode,json_value(data,'$.errorReason' NULL ON EMPTY) errorReason,json_value(data,'$.deserialization' NULL ON EMPTY) deserialization,json_value(data,'$.originalData') originalData,kafkaMetaTimestamp,json_value(data,'$.data_hash') data_hash
from quchong;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/58842.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot Mybatis 多数据源 MySQL+Oracle+Redis

一、背景 在SpringBoot Mybatis 项目中&#xff0c;需要连接 多个数据源&#xff0c;连接多个数据库&#xff0c;需要连接一个MySQL数据库和一个Oracle数据库和一个Redis 二、依赖 pom.xml <dependencies><dependency><groupId>org.springframework.boot&l…

Tortoise Git(乌龟git)常用命令总结

查看全局和本地 Git 配置 打开命令行终端&#xff08;如 Git Bash&#xff09;&#xff0c;分别执行以下命令查看全局和本地的 Git 配置信息&#xff1a; git config --global -l git config --local -l确保配置中没有任何与 SSH 相关的设置 移除全局和本地 SSH 相关配置&…

权限校验中的“双token”方案

1. 双Token中的两个token分别是什么&#xff1f; 1.1 access_token 1.2 fresh_token 2. 为什么需要双token&#xff1f;一个token不行吗&#xff1f; 答&#xff1a; 两个token的职责不同。其中&#xff0c;access_token是在每次请求的时候携带给后端进行权限校验&#xff…

DOCKER 部署 webman项目

# 设置基础镜像 FROM php:8.2-fpm# 安装必要的软件包和依赖项 RUN apt-get update && apt-get install -y \nginx \libzip-dev \libpng-dev \libjpeg-dev \libfreetype6-dev \&& rm -rf /var/lib/apt/lists/*# 安装 PHP 扩展 RUN docker-php-ext-configure gd …

【nacos】【sentinel】【gateway】docker-compose安装及web项目部署

docker安装 【centos】【docker】安装启动 docker-compose安装 【docker-compose】安装使用 配置文件 version: 2 networks: #自定义网络myapp&#xff0c;为了只有这些服务可以在该网络内相互访问myapp:driver: bridge services: #将容器抽象成服务nacos: #注册中心image…

自学TypeScript-基础、编译、类型

自学TypeScript-基础、编译、类型 TS 编译为 JS类型支持类型注解基础类型typeof 运算符高级类型class 类构造函数和实例方法继承可见性只读 类型兼容性交叉类型泛型泛型约束多个泛型泛型接口泛型类泛型工具 索引签名类型映射类型索引查询(访问)类型 类型声明文件 TypeScript 是…

旋转图片两种方法

这两种方法在旋转图像时&#xff0c;可能会产生一些不同的效果&#xff1a; rotate_image_new()旋转后的图像完全包含旋转前的内容&#xff0c;并且填充边界尽可能小 rotate_image() 保持原始图像的大小&#xff0c;并根据填充选项决定是否填充边界为白色。如果 if_fill_whit…

ssh访问远程宿主机的VMWare中NAT模式下的虚拟机

ssh访问远程宿主机的VMWare中NAT模式下的虚拟机 1.虚拟机端配置 1.1设置虚拟机的网络为NAT模式 1.2设置虚拟网络端口映射(NAT) 点击主菜单的编辑-虚拟网络编辑器&#xff1a; 启动如下对话框&#xff0c;选中NAT模式的菜单项&#xff0c;并点击NAT设置&#xff1a; 点击添…

计算机毕设 基于深度学习的植物识别算法 - cnn opencv python

文章目录 0 前言1 课题背景2 具体实现3 数据收集和处理3 MobileNetV2网络4 损失函数softmax 交叉熵4.1 softmax函数4.2 交叉熵损失函数 5 优化器SGD6 最后 0 前言 &#x1f525; 这两年开始毕业设计和毕业答辩的要求和难度不断提升&#xff0c;传统的毕设题目缺少创新和亮点&a…

WSL Opencv with_ffmpeg conan1.60.0

我是ubuntu18. self.options[“opencv”].with_ffmpeg True 关键是gcc版本需要conan支持&#xff0c;比如我的是&#xff1a; compilergcc compiler.version7.5 此外还需要安装系统所需库&#xff1a; https://qq742971636.blog.csdn.net/article/details/132559789 甚至来…

宏基官网下载的驱动怎么安装(宏基笔记本如何安装系统)

本文为大家介绍宏基官网下载的驱动怎么安装宏基笔记本驱动(宏基笔记本如何安装系统)&#xff0c;下面和小编一起看看详细内容吧。 宏碁笔记本怎么一键更新驱动 1. 单击“开始”&#xff0c;然后选择“所有程序”。 2. 单击Acer&#xff0c;然后单击Acer eRecovery Management。…

ChatGPT 制作可视化柱形图突出显示第1名与最后1名

对比分析柱形图的用法。在图表中显示最大值与最小值。 像这样的动态图表的展示只需要给ChatGPT,AIGC,OpenAI 发送一个指令就可以了, 人工智能会快速的写出HTML与JS代码来实现。 请使用HTML,JS,Echarts完成一个对比分析柱形图,在图表中突出显示第1名和最后1名用单独一种不…

Vue 项目性能优化 — 实践指南

前言 Vue 框架通过数据双向绑定和虚拟 DOM 技术&#xff0c;帮我们处理了前端开发中最脏最累的 DOM 操作部分&#xff0c; 我们不再需要去考虑如何操作 DOM 以及如何最高效地操作 DOM&#xff1b;但 Vue 项目中仍然存在项目首屏优化、Webpack 编译配置优化等问题&#xff0c;所…

leetcode分类刷题:哈希表(Hash Table)(一、数组交集问题)

1、当需要快速判断某元素是否出现在序列中时&#xff0c;就要用到哈希表了。 2、本文针对的总结题型为给定两个及多个数组&#xff0c;求解它们的交集。接下来&#xff0c;按照由浅入深层层递进的顺序总结以下几道题目。 3、以下题目需要共同注意的是&#xff1a;对于两个数组&…

idea查找maven所有依赖

文章目录 idea自带的依赖结构图idea安装maven helper插件 idea自带的依赖结构图 缺点是只有依赖&#xff0c;没有版本 idea安装maven helper插件 settings–>plugins–>搜索maven helper并安装 安装后打开pom.xml文件会有依赖解析 勾选conflict就是有冲突的依赖选中…

(笔记三)opencv图像基础操作

强调&#xff1a;本文只为学习记录做笔记 详细可参考opencv官网 &#xff1a;https://docs.opencv.org/4.1.1/d0/d86/tutorial_py_image_arithmetics.html &#xff08;1&#xff09;将cv2的BGR模式改为RGB模式 #!/usr/bin/env python # -*- coding:utf-8 -*- ""&q…

springboot中使用ElasticSearch

引入依赖 修改我们的pom.xml&#xff0c;加入spring-boot-starter-data-elasticsearch <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>编写配…

React 项目中引入msal验证以及部分报错处理

功能实现 如何在React 项目中引入msal身份验证&#xff0c; 微软在官网有提供文档支持&#xff0c;文档包含示例和具体使用的教程&#xff0c;地址如下&#xff1a; https://learn.microsoft.com/zh-cn/azure/active-directory/develop/tutorial-v2-nodejs-webapp-msal 照着文…

TP DP PP 并行训练方法介绍

这里写目录标题 张量并行TP流水线并行 PPnaive模型并行GPipePipeDream 数据并行DPFSDP 张量并行TP 挖坑 流水线并行 PP 经典的流水线并行范式有Google推出的Gpipe&#xff0c;和微软推出的PipeDream。两者的推出时间都在2019年左右&#xff0c;大体设计框架一致。主要差别为…

npm stomp.js 的消息断连报警

stomp.js的新版本是可以支持MQ断连后&#xff0c;自动重连&#xff0c;但是如果是直接物理断网了&#xff0c;那久无论如何都没有办法重连&#xff0c;为了不影响业务&#xff0c;可以实现一个MQ断连后&#xff0c;重连三次&#xff0c;然后还是连不上的话&#xff0c;就启动告…