【Phoenix】phoenix实现每个Primarykey主键保留N版本数据,CDC数据记录为Changelog格式

一、背景:

CDC数据中包含了,数据的变更过程。当CDC写入传统数据库最终每一个primary key下会保存一条数据。当然可以使用特殊手段保存多分记录但是显然造成了数据膨胀。
另外数据湖Hudi(0.13.1)是不支持保存所有Changelog其Compaction机制会清除所有旧版本的内容。Iceberg支持TimeTravel,能查到某个时间点的数据状态,但是不能列举的单条记录的Change过程。
所以目前只能手动实现。
其实,实现思路很简单,将原PrimaryKey+Cdc的 ts_ms 一起作为新表的 PrimaryKey就可以了。但需要注意的是一条数据可能变更很多次,但一般需要保存近几次的变更,所以就需要删除部分旧变更记录。ts_ms 就是CDC数据中记录的日志实际产生的时间,具体参见debezium 。如果原表primarykey是联合主键,即有多个字段共同组成,则最好将这些字段拼接为一个字符串,方便后续关联。

本文思路
CDC --写入-> Phoenix + 定期删除旧版本记录

CDC数据写入略过,此处使用SQL模拟写入。

二、Phoenix旧版记录删除(DEMO)

phoenix doc

bin/sqlline.py www.xx.com:2181
-- 直接创建phoenix表
create table TEST.TEST_VERSION(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
) VERSIONS=5;

再去hbase shell中查看,hbase 关联表已经有phoenix创建了。

hbase(main):032:0> desc "TEST:TEST_VERSION"
Table TEST:TEST_VERSION is ENABLED
TEST:TEST_VERSION, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRe
gionObserver|805306366|', coprocessor$3 => '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|80
5306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.builder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix
.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', T
TL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPE
N => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
-- 在phoenix中向表插入数据
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:00:00'),'zhangsan');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:00:00'),'lisi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 12:00:00'),'wangwu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 13:00:00'),'zhaoliu');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 14:00:00'),'liuqi');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 15:00:00'),'sunba');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 07:00:00'),'sunyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 08:00:00'),'chaoyang');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:00:00'),'xuri');
UPSERT INTO TEST.TEST_VERSION(ID,TS,NAME) VALUES('rk002',TO_TIMESTAMP('2020-01-01 09:30:00'),'chenxi');
-- OK再查询一下数据插入情况
SELECT * FROM TEST.TEST_VERSION;

以下假设每个PrimaryKey需要保留最新的3版本数据。所以红色框内是需要删除的数据。
在这里插入图片描述

现在需要使用row_number的函数给每个primarykey的不通version数据标识。但是phoenix并没有开窗函数。只有agg聚合函数。
phoenix对SQL的限制还是比较多的如:
(1)join 非等值连接不支持,如on a.id>s.id 是不支持的,也不支持数组比较连接,如on a.id = ARRAY[1,2,3]。 会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)where exists 格式的非等值连接不支持。select ... from A where exists (select 1 from B where A.id>B.id) 是不支持的。会报错:Error: Does not support non-standard or non-equi correlated-subquery conditions. (state=,code=0)
(2)没有开窗window函数
(3)DELETE FROM不支持JOIN

最终发下有一下函数可用
(1)NTH_VALUE 获取分组排序的第N个值。 返回原值的类型。
(2)FIRST_VALUESLAST_VALUES 获取分区排序后的前、后的N个值,返回ARRAY类型。
此三个函数官网doc中,案例是这样的 FIRST_VALUES( name, 3 ) WITHIN GROUP (ORDER BY salary DESC) 是全局分组,而实际使用中是需要搭配 GROUP BY 使用的。

所以可以获取到

-- 方案一:使用NTH_VALUE获取阈值
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES-- 方案二:使用FIRST_VALUES获取到一个ARRAY 
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS);

由于phoenix支持行子查询,以下是官方案例。这样就能绕过不使用DELETE … JOIN了。

Row subqueries
A subquery can return multiple fields in one row, which is considered returning a row constructor. The row constructor on both sides of the operator (IN/NOT IN, EXISTS/NOT EXISTS or comparison operator) must contain the same number of values, like in the below example:
SELECT column1, column2
FROM t1
WHERE (column1, column2) IN(SELECT column3, column4FROM t2WHERE column5 = ‘nowhere’);
This query returns all pairs of (column1, column2) that can match any pair of (column3, column4) in the second table after being filtered by condition: column5 = ‘nowhere’.

最终实现删除 除N个较新的以外的所有旧版本数据, SQL如下:

-- NTH_VALUE方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,NTH_VALUE(TS,3) WITHIN GROUP (ORDER BY TS DESC) THRES FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < Z.THRES
);-- FIRST_VALUES方式
DELETE FROM TEST.TEST_VERSION
WHERE (ID,TS) IN (
SELECT A.ID,A.TS FROM TEST.TEST_VERSION A 
INNER JOIN (
SELECT ID,FIRST_VALUES(TS,3) WITHIN GROUP (ORDER BY TS DESC) TSS FROM TEST.TEST_VERSION GROUP BY ID) Z ON A.ID=Z.ID
WHERE A.TS < ALL(Z.TSS)
);

删除后效果:
在这里插入图片描述

三、探索

3.1 Phoenix的Row Timestamp 探索

Phoenix的Row Timestamp是为了在meta中更快检索数据而设置的。不能实现hbase 中的versions 数据在phoenix中展现。
如下测试案例:
phoenix建表,并插入数据:

create table TEST.TEST_ROW_TIMESTAMP(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS ROW_TIMESTAMP)
) VERSIONS=5;UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 09:30:00'),'windows');
UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 10:30:00'),'mac');
UPSERT INTO TEST.TEST_ROW_TIMESTAMP(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:30:00'),'linux');

在hbase中查询表:

hbase(main):050:0> desc 'TEST:TEST_ROW_TIMESTAMP'
Table TEST:TEST_ROW_TIMESTAMP is ENABLED
TEST:TEST_ROW_TIMESTAMP, {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver|805306366|', coprocessor$3
=> '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|805306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|index.b
uilder=org.apache.phoenix.index.PhoenixIndexBuilder,org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix.index.PhoenixIndexCodec', METADATA => {'OWNER' => 'dcetl'}}
COLUMN FAMILIES DESCRIPTION
{NAME => '0', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'FAST_DIFF', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICAT
ION_SCOPE => '0', BLOOMFILTER => 'NONE', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
1 row(s)
Took 0.0235 secondshbase(main):049:0> scan 'TEST:TEST_ROW_TIMESTAMP'
ROW                                                            COLUMN+CELLrk001\x00\x80\x00\x01o`p\xC1\xC0\x00\x00\x00\x00              column=0:\x00\x00\x00\x00, timestamp=1577871000000, value=xrk001\x00\x80\x00\x01o`p\xC1\xC0\x00\x00\x00\x00              column=0:\x80\x0B, timestamp=1577871000000, value=windowsrk001\x00\x80\x00\x01o`\xA7\xB0@\x00\x00\x00\x00              column=0:\x00\x00\x00\x00, timestamp=1577874600000, value=xrk001\x00\x80\x00\x01o`\xA7\xB0@\x00\x00\x00\x00              column=0:\x80\x0B, timestamp=1577874600000, value=macrk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x00           column=0:\x00\x00\x00\x00, timestamp=1577878200000, value=xrk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x00           column=0:\x80\x0B, timestamp=1577878200000, value=linux
3 row(s)
Took 0.0072 seconds

如上查询结果,我们希望在hbase中只有一行数据,并保存为对多个版本,但实际查询到了多条数据,timestamp做为hbase表的rowkey的一部分了。phoenix在创建表时候没有使用hbase多版本保存机制。

3.2 phoenix 和 hbase表结构不一致

先创建hbase Table

create 'TEST:TEST_DIF_TS',{NAME => 'COLS', VERSIONS => 3}
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','zhangsan'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189085000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','lisi'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189090000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','wangwu'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189095000
put 'TEST:TEST_DIF_TS','001', 'COLS:NAME','zhaoliu'
put 'TEST:TEST_DIF_TS','001', 'COLS:TS', 1695189105000get 'TEST:TEST_DIF_TS','001',{COLUMN=>'COLS:NAME',VERSIONS=>3}
# 结果:
COLUMN                                             CELLCOLS:NAME                                         timestamp=1695784642879, value=zhaoliuCOLS:NAME                                         timestamp=1695784642857, value=wangwuCOLS:NAME                                         timestamp=1695784642830, value=lisi

创建Phoenix Table

create table TEST.TEST_DIF_TS(
ID VARCHAR NOT NULL,
TS TIMESTAMP NOT NULL,
NAME VARCHAR,
CONSTRAINT my_pk PRIMARY KEY (ID,TS)
);
UPSERT INTO TEST.TEST_DIF_TS(ID,TS,NAME) VALUES('rk001',TO_TIMESTAMP('2020-01-01 11:30:00'),'XXX');0: jdbc:phoenix:...> select * from  TEST.TEST_DIF_TS;
+--------+--------------------------+-------+
|   ID   |            TS            | NAME  |
+--------+--------------------------+-------+
| rk001  | 2020-01-01 11:30:00.000  | XXX   |
+--------+--------------------------+-------+

再翻查hbase Table数据

hbase(main):004:0> scan 'TEST:TEST_DIF_TS'
ROW                                                COLUMN+CELL001                                               column=COLS:NAME, timestamp=1695784642879, value=zhaoliu001                                               column=COLS:TS, timestamp=1695784643741, value=1695189105000rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x column=0:\x00\x00\x00\x00, timestamp=1695786568345, value=x00rk001\x00\x80\x00\x01o`\xDE\x9E\xC0\x00\x00\x00\x column=0:\x80\x0B, timestamp=1695786568345, value=XXX

可以看到Phoenix只能查询到自己插入的数据,但是hbase可以查询到phoenix,所以phoenix会把不符合自己表结构的数据过滤掉。phoenix的会将自己所有的primary key字段拼接后作为hbase 的rowkey存入hbase。

参考文章:

Phoenix实践 —— Phoenix SQL常用基本语法总结小记
Phoenix 对 Hbase 中表的映射
phoenix使用详解
Phoenix 简介及使用方式
phoenix创建映射表和创建索引、删除索引、重建索引

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/89229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Cross Attention和 Self- Attention 的区别?

Cross Attention和Self-Attention都是深度学习中常用的注意力机制&#xff0c;用于处理序列数据&#xff0c;其中Self-Attention用于计算输入序列中每个元素之间的关系&#xff0c;Cross Attention则是计算两个不同序列中的元素之间的关系。它们的主要区别在于计算注意力分数时…

算法 只出现一次的两个数字-(哈希+异或)

牛客网: BM52 题目: 数组中仅2个数字出现1次&#xff0c;其余出现2次 思路: 出现2次的数字异或结果为0&#xff0c;另外两个不同的数字异或结果res不为0&#xff0c;异或结果的二进制位必与其中一个相同&#xff0c;求出二进制位为1的pos, 遍历数组&#xff0c;所有此位置为1…

安卓recovery流程分析(编译、界面、图片)

目录 recovery 界面菜单 recovery 界面操作 recovery 启动流程 recovery 编译makefile recovery 图片大小 ramdisk、boot.img、recovery.img之间的关系 authordaisy.skye的博客_CSDN博客-嵌入式,Qt,Linux领域博主 recovery 界面菜单 recovery 界面显示 android recoveryuse …

LINUX -SQL笔记(自学用)

1.安装 sudo apt-get install mysql-server sudo mysql -u root -p2.关系模型 在关系数据库中&#xff0c;一张表中的每一行数据被称为一条记录。一条记录就是由多个字段组成的。 每一条记录都包含若干定义好的字段。同一个表的所有记录都有相同的字段定义。 对于关系表&#…

TCP协议和UDP协议

TCP通信原理 TCP&#xff08;Transmission Control Protocol&#xff0c;传输控制协议&#xff09;是一种传输层协议&#xff0c;它主要负责点对点的数据传输TCP 主要特点是面向连接的&#xff0c;也就是说&#xff0c;在数据传输之前&#xff0c;它需要先建立一个连接。连接建…

uniapp实现表格冻结

效果图如下&#xff1a; 思路&#xff1a; 1.由于APP项目需要&#xff0c;起初想去插件市场直接找现成的&#xff0c;结果找了很久没找到合适的&#xff08;有的不支持vue2有的不能都支持APP和小程序&#xff09; 2.后来&#xff0c;就只能去改uni-table源码了&#xff0c;因…

Unity制作旋转光束

Unity制作旋转光束 大家好&#xff0c;我是阿赵。 这是一个在很多游戏里面可能都看到过的效果&#xff0c;在传送门、魔法阵、角色等脚底下往上散发出一束拉丝形状的光&#xff0c;然后在不停的旋转。 这次来在Unity引擎里面做一下这种效果。 一、准备材料 需要准备的素材很简…

leetCode 62.不同路径 动态规划 + 空间复杂度优化

62. 不同路径 - 力扣&#xff08;LeetCode&#xff09; 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish” &#xf…

登录业务实现 - token登录鉴权

登录业务实现&#xff1a; 登录成功/失败实现 -> pinia管理用户数据及数据持久化 -> 不同登录状态的模板适配 -> 请求拦截器携带token&#xff08;登录鉴权&#xff09; -> 退出登录实现 -> token失效&#xff08;401响应拦截&#xff09; 1. 登录成…

crypto:RSA

题目 利用代码跑一下解码 import gmpy2 e 17 p 473398607161 q 4511491 d gmpy2.invert(e,(p-1)*(q-1)) print(d)总结 RSA&#xff08;Rivest-Shamir-Adleman&#xff09;是一种非对称加密算法&#xff0c;常用于数据加密和数字签名。它基于两个大素数的乘积难以分解的数…

(数组 / 字符串) 55. 跳跃游戏 ——【Leetcode每日一题】

❓ 55. 跳跃游戏 难度&#xff1a;中等 给你一个非负整数数组 nums &#xff0c;你最初位于数组的 第一个下标 。数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标&#xff0c;如果可以&#xff0c;返回 true &#xff1b;否则&#xff…

【基于Qt和OpenCV的多线程图像识别应用】

基于Qt和OpenCV的多线程图像识别应用 前言多线程编程为什么需要多线程Qt如何实现多线程线程间通信 图像识别项目代码项目结构各部分代码 项目演示小结 前言 这是一个简单的小项目&#xff0c;使用Qt和OpenCV构建的多线程图像识别应用程序&#xff0c;旨在识别图像中的人脸并将…

2024华为校招面试真题汇总及其解答(一)

1. 我问你点java基础的问题吧,你平时都用什么集合啊,都什么情况下使用 在 Java 中,常用的集合有以下几种: List:有序集合,可以重复,常用实现类有 ArrayList、LinkedList、Vector。Set:无序集合,不能重复,常用实现类有 HashSet、TreeSet。Map:键值对集合,键不能重复…

软件可靠性基础

软件可靠性基础 软件可靠性基本概念串并联系统可靠性计算软件可靠性测试软件可靠性建模软件可靠性管理软件可靠性设计容错&#xff0c;检错的技术 选择题考基本概念&#xff08;MTBF&#xff09;&#xff0c;很少考 非重点 软件可靠性基本概念 这个章节中&#xff0c;第一个…

盲盒电商模式:神秘感+趣味性,轻松实现社交购物

最近&#xff0c;一种新型的电商模式——盲盒电商模式&#xff0c;开始在市场上崭露头角。这种模式的出现&#xff0c;不仅为消费者带来了全新的购物体验&#xff0c;也为商家拓展了销售渠道。本文将详细介绍盲盒电商模式及其玩法&#xff0c;帮助大家更好地了解这一新兴商业模…

【数据库——MySQL】(6)查询(1)

目录 1. 数据库查询1.1 输出项为列名1.2 输出项为表达式1.3 输出内容变换1.4 消除输出项的重复行1.5 聚合函数 2. 查询条件&#xff1a;逻辑条件2.1 比较运算2.2 模式匹配2.3 范围限定2.4 空值判断 3. 分组3.1 基本分组3.2 分组汇总 4. 分组后筛选5. 输出行排序5.1 ORDER BY5.2…

为什么SQL预编译可以防止SQL注入攻击

前言 防范SQL注入攻击是每一位做后端开发的程序员必须会的基本功。本文介绍其中一种防范攻击的方法&#xff1a;SQL预编译。 本文大部分内容引用自这篇文章&#xff0c;部分内容有修改。 注入例子 先简单回顾下SQL注入攻击的过程&#xff0c;假设有一个SQL语句&#xff1a; …

【面试必备】缓存的更新策略-缓存雪崩-缓存穿透-缓存预热-缓存击穿~

目录 1、什么是缓存 2、为什么使用Redis作为MySQL的缓存 3、缓存的更新策略 3.1、策略一&#xff1a;定期生成 3.2、策略二&#xff1a;实时生成 内存淘汰策略【面试重点】 4、缓存预热(Cache preheating)【面试重点】 5、缓存穿透(Cache penetration)【面试重点】 6、…

java 入门-使用eclipse、javaFX、SceneBuilder进行图形界面开发

个人是 一直在开C# CS端开发 &#xff0c; 目前 公司的软件 基本都使用了java作开发。 为了更好适应环境&#xff0c;我也只能再次学习这个陌生的开发工具。 java 的开发界面非常不友好 &#xff0c;对于我这样的初学者只能是借助插件来进行界面与后台联动&#xff0c; 上网度…