洛阳网站改版/p2p万能搜索引擎

洛阳网站改版,p2p万能搜索引擎,权威的唐山网站建设,中国室内设计网站有哪些Paimon的下载及安装,并且了解了主键表的引擎以及changelog-producer的含义参考: 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1) 利用Paimon表做lookup join,集成mysql cdc等参考: 大数据组件(四)快速入门实时数据…

Paimon的下载及安装,并且了解了主键表的引擎以及changelog-producer的含义参考:

  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1)

利用Paimon表做lookup join,集成mysql cdc等参考:

  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)

利用Paimon的Tag兼容Hive,Branch管理等参考:

  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)

大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)

今天,我们继续快速了解下最近比较火的Apache Paimon:

  • 官方文档:https://paimon.apache.org/docs/1.0/
  • 推荐阅读:当流计算邂逅数据湖:Paimon 的前生今世

1 利用Paimon做维表join

  • 在流式处理中,Lookup Join用来从另一个表(Paimon)中“查字典”来补充流的信息。一个表需要有时间标记(处理时间属性),另一个表需要支持快速查找(查找源连接器)
  • Paimon在Flink中支持对带有主键的表和追加表进行Lookup Join操作

1.1 常规Lookup

SET 'execution.runtime-mode' = 'streaming';-- 1、用户维表
CREATE TABLE customers (id INT PRIMARY KEY NOT ENFORCED,name STRING    comment '姓名',country STRING comment '城市',zip STRING     comment '邮编'
) WITH ('connector' = 'paimon'
);-- 插入维表数据
INSERT INTO customers VALUES(1,'tom','伦敦','123'),(2,'hank','纽约','456'),(3,'小明','北京','789');-- 2、模拟订单表(数据流)
drop TEMPORARY TABLE orders_info;
CREATE TEMPORARY TABLE orders_info (order_id INT,total INT,customer_id INT,proc_time AS PROCTIME()
) WITH ('connector' = 'datagen', 'rows-per-second'='1', 'fields.order_id.kind'='sequence', 'fields.order_id.start'='1', 'fields.order_id.end'='1000000', 'fields.total.kind'='random', 'fields.total.min'='1', 'fields.total.max'='1000', 'fields.customer_id.kind'='random', 'fields.customer_id.min'='1', 'fields.customer_id.max'='3'
);-- 3、维表join
SELECT o.order_id, o.total, c.country, c.zip
FROM orders_info AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;-- 可以看到下面的结果
+----+----------+----------+-------------+---------+
| op | order_id |    total |     country |     zip |
+----+----------+----------+-------------+---------+
| +I |        1 |      179 |        纽约 |     456 |
| +I |        2 |       31 |        北京 |     789 |
| +I |        3 |      148 |        北京 |     789 |
| +I |        4 |      774 |        北京 |     789 |

1.2 同步重试Lookup

  • 如果用户维表(查找表)的数据未准备好,导致订单表(主表)的记录无法完成连接,你可以考虑使用Flink的延迟重试策略进行查找。
  • 这个功能仅适用于Flink 1.16及以上的版本。
  • 下面sql的提示(hints)设置了重试策略:
    • 'table'='c': 指定了应用查找重试策略的目标表的别名,在这个例子中是customers表,其别名为c。
    • 'retry-predicate'='lookup_miss': 定义了触发重试的条件。这里的条件是lookup_miss,意味着如果在查找过程中没有找到对应的数据(即查找缺失),就会触发重试机制。
    • 'retry-strategy'='fixed_delay': 设置了重试的策略为固定延迟,即每次重试之间会有固定的等待时间。
    • 'fixed-delay'='1s': 当采用固定延迟重试策略时,这里设定了每次重试前等待的时间长度为1秒。
    • 'max-attempts'='600': 设定最大重试次数为600次。结合上面的fixed-delay设置,这意味着系统会每隔1秒尝试一次数据查找,最多尝试600次。
-- enrich each order with customer information
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

1.3 异步重试Lookup

  • 在同步重试机制下,如果处理某一条记录时遇到了问题(例如查找失败),系统会按照设定的重试策略反复尝试直到成功或者达到最大重试次数。在这期间,这条记录后面的其他记录即使没有问题也无法被处理,因为整个处理流程被阻塞了。这会导致数据处理的整体效率低下,甚至可能造成作业延迟或超时。
  • 使用异步加上allow_unordered,可以在某些记录在查找时缺失也不会再阻塞其他记录。
  • 下面sql配置项解释:
    • 'output-mode'='allow_unordered': 设置输出模式允许无序,意味着当查找失败时不会阻塞其他记录的处理,适合于不需要严格顺序的场景。
    • 'lookup.async'='true': 启用异步查找,提高效率,避免因等待某个查找结果而阻塞其它查找操作。
    • 'lookup.async-thread-number'='16': 设置用于异步查找的线程数为16,这可以加速查找过程,特别是在高并发情况下。
  • 注意:如果主表(orders)是CDC流,allow_unordered会被Flink SQL忽略(只支持追加流),流作业可能会被阻塞。可以尝试使用Paimon的audit_log系统表功能来绕过这个问题(将CDC流转为追加流)。
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

1.4 max_pt功能

  • 在传统的数据仓库中,每个分区通常维护最新的完整数据,因此这种分区表只需要连接最新的分区即可。Paimon特别为此场景开发了max_pt功能。

  • 通过max_pt,Lookup节点能够自动刷新并查询最新分区的数据,确保所使用的客户信息始终是最新的,而不需要手动干预来确定或切换到最新的数据分区。

  • 这种方法特别适用于需要频繁更新和查询最新数据的场景,可以大大提高数据处理的效率和准确性。

-- 1、分区用户维表
drop table if exists customers;
CREATE TABLE customers (id INT,name STRING,country STRING,zip STRING,dt STRING,PRIMARY KEY (id, dt) NOT ENFORCED
) PARTITIONED BY (dt);-- 插入数据(维表关联时候,只查找'2025-02-19'分区数据)
INSERT INTO customers VALUES
(1, 'Alice', 'USA', '10001', '2025-02-18'),
(2, 'Bob', 'UK', '20002', '2025-02-18'),
(3, 'Charlie', 'Germany', '30003', '2025-02-18'),
(1, 'Alice', 'USA', '10002', '2025-02-19'), -- 更新了 Alice 的邮编
(4, 'David', 'France', '40004', '2025-02-19');-- 2、订单信息表(解析kafka数据)
CREATE TEMPORARY TABLE orders (order_id BIGINT,customer_id INT,total DECIMAL(10, 2),proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'orders_topic','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','properties.group.id' = 'testordersGroup','scan.startup.mode' = 'earliest-offset','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true'
);-- 创建topic
/opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic orders_topic --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 
-- 启动命令行生产者,生产数据
/opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic orders_topic --bootstrap-server centos01:9092
{"order_id":1001,"customer_id":1,"total":50.0}
{"order_id":1002,"customer_id":2,"total":30.0}
{"order_id":1004,"customer_id":4,"total":20.0}-- 'lookup.dynamic-partition'='max_pt()': 这个选项指示查找节点自动定位并使用最新(最大)的分区。max_pt()函数帮助系统识别最新的分区,确保只查询最新的数据。
-- 'lookup.dynamic-partition.refresh-interval'='1 h': 设置了查找节点刷新最新分区的时间间隔为1小时。这意味着系统会每隔1小时检查一次是否有新的分区可用,并自动更新到最新分区的数据。SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;+----+----------------------+--------------+--------------------------------+--------------------------------+
| op |             order_id |        total |                        country |                            zip |
+----+----------------------+--------------+--------------------------------+--------------------------------+
| +I |                 1001 |        50.00 |                            USA |                          10002 |
| +I |                 1004 |        20.00 |                         France |                          40004 |

  • 可以运行一个Flink流式作业来启动针对该paimon维表的查询服务。当QueryService存在时,Flink查找连接(Lookup Join)会优先从该服务获取数据,这将有效提高查询性能。

  • 可以通过调用sys.query_service系统函数来实现:
    CALL sys.query_service('paimon_db.customers', 4); -- 设置并行度为4或者通过下面action包开启

    <FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-1.0.0.jar \query_service \--warehouse <warehouse-path> \--database <database-name> \--table <table-name> \[--parallelism <parallelism>] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
    
  • 查询服务能够在内存中缓存频繁访问的数据,并以高并发的方式提供这些数据,减少了磁盘I/O操作和网络延迟的影响。

2 集成Mysql CDC

  • 可以通过 Flink SQL 或者 Flink DataStream API 将 Flink CDC 数据写入 Paimon 中,也可以通过Paimon 提供的 CDC 工具来完成入湖。那这两种方式有什么区别呢?

    在这里插入图片描述

  • 上图是使用 Flink SQL 来完成入湖,简单,但是当源表添加新列后,同步作业不会同步新的列,下游 Paimon 表也不会增加新列。

在这里插入图片描述

  • 上图是使用 Paimon CDC 工具来同步数据,可以看到,当源表发生列的新增后,流作业会自动新增列的同步,并传导到下游的 Paimon 表中,完成 Schema Evolution 的同步

  • Paimon CDC 工具也提供了整库同步:

    • 一个作业同步多张表,以低成本的方式同步大量小表
    • 作业里同时自动进行 Schema Evolution
    • 新表将会被自动进行同步,你不用重启作业,全自动完成
  • 推荐阅读:Flink + Paimon 数据 CDC 入湖最佳实践

2.1 MySQL一张表同步到Paimon一张表

2.1.1 环境准备

# mysql-cdc相关jar包的下载地址,同时还需要mysql-connector
https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-mysql-cdc/3.1.1/
https://repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.1/
https://repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-common/3.1.1/# 在flink的lib目录添加下面的jar包
[root@centos01 lib]# ll /opt/apps/flink-1.16.0/lib/
-rw-r--r--. 1 root root     259756 Feb 19 15:13 flink-cdc-common-3.1.1.jar
-rw-r--r--. 1 root root   21286022 Feb 19 11:40 flink-cdc-pipeline-connector-mysql-3.1.1.jar
-rw-r--r--. 1 root root     388680 Feb 19 10:57 flink-connector-mysql-cdc-3.1.1.jar
-rw-r--r--. 1 root root    2475087 Feb 19 10:58 mysql-connector-java-8.0.27.jar
......# 开启MySQL Binlog并重启MySQL
[root@centos01 ~]# vim /etc/my.cnf
#添加如下配置信息,开启`cdc_db`数据库的Binlog
#数据库id
server-id = 1
##启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型
binlog_format=row
##启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=cdc_db[root@centos01 ~]# systemctl restart mysqld# 启动hadoop和yarn
[root@centos01 ~]# start-all.sh
# 启动Hive
[root@centos01 ~]# nohup hive --service metastore  2>&1 & 
[root@centos01 ~]# nohup  hive --service hiveserver2   2>&1 & 
# yarn-session模式
# http://centos01:8088/cluster中可以看到Flink session cluster的job ID
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/yarn-session.sh -d
# 启动Flink的sql-client
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/sql-client.sh -s yarn-session
  • 使用Hive的元数据
Flink SQL> CREATE CATALOG paimon_hive_catalog WITH ('type' = 'paimon','metastore' = 'hive','uri' = 'thrift://centos01:9083'
);Flink SQL> USE CATALOG paimon_hive_catalog;
Flink SQL> create database if not exists paimon_db;
Flink SQL> use paimon_db;# 设置显示模式
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
Flink SQL> SET 'execution.runtime-mode' = 'batch';

2.1.2 案例详解

  • 如果指定的Paimon表不存在,将自动创建表。其schema将从所有指定的MySQL表派生;
  • 如果Paimon 表已存在,则其schema将与所有指定MySQL表的schema进行比较;
  • 仅支持同步具有主键的MySQL表
  • 也可MySQL多张表同步到Paimon一张表,可以查看官网示例。
-- 准备测试数据
DROP TABLE IF EXISTS `user_info`;
CREATE TABLE `user_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',`login_name` varchar(200) DEFAULT NULL COMMENT '用户名称',`name` varchar(200) DEFAULT NULL COMMENT '用户姓名',`phone_num` varchar(200) DEFAULT NULL COMMENT '手机号',`birthday` date DEFAULT NULL COMMENT '用户生日',`gender` varchar(1) DEFAULT NULL COMMENT '性别 M男,F女',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`operate_time` datetime DEFAULT NULL COMMENT '修改时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB  DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;-- ----------------------------
-- Records of user_info
-- ----------------------------
INSERT INTO `user_info`(`login_name`, `name`, `phone_num`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES 
('zhangsan', '张三', '13800001234', '1990-01-01', 'M', NOW(), NOW()),
('lisi', '李四', '13800005678', '1992-02-02', 'F', NOW(), NOW()),
('wangwu', '王五', '13800009012', '1995-03-03', 'M', NOW(), NOW()),
('zhaoliu', '赵六', '13800003456', '1998-04-04', 'F', NOW(), NOW()),
('sunqi', '孙七', '13800007890', '2000-05-05', 'M', NOW(), NOW());CREATE TABLE `orders` (`order_id` bigint(20) NOT NULL  AUTO_INCREMENT COMMENT '订单编号',`user_id` bigint(20) NOT NULL COMMENT '用户编号',`order_date` datetime DEFAULT NULL COMMENT '订单日期',`total_price` decimal(10,2) DEFAULT NULL COMMENT '订单总价',PRIMARY KEY (`order_id`) USING BTREE,KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;INSERT INTO `orders`(`user_id`, `order_date`, `total_price`) VALUES 
(1001, '2025-02-18 14:32:10', 199.99),
(1002, '2025-02-19 09:15:30', 299.50),
(1001, '2025-02-19 12:47:05', 79.99),
(1003, '2025-02-19 13:00:00', 499.00),
(1004, '2025-02-20 08:20:25', 159.99);
# 按照天进行分区
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \mysql-sync-table \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table ods_user_info_cdc \--primary-keys pt,id \# 指定分区键,分区键是通过函数转换而来--partition_keys pt \--computed_column 'pt=date_format(operate_time, yyyyMMdd)' \--mysql-conf hostname=centos01 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=cdc_db \--mysql-conf table-name='user_info' \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--table-conf bucket=2 \--table-conf changelog-producer=input \--table-conf sink.parallelism=2 
  • 我们可以进行测试了
-- 此时会自动创建paimon表(无需我们手动建表了)
Flink SQL> select * from ods_user_info_cdc;
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
| id | login_name | name |   phone_num |   birthday | gender |         create_time |        operate_time |       pt |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
|  3 |     wangwu | 王五 | 13800009012 | 1995-03-03 |      M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  1 |   zhangsan | 张三 | 13800001234 | 1990-01-01 |      M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  2 |       lisi | 李四 | 13800005678 | 1992-02-02 |      F | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  4 |    zhaoliu | 赵六 | 13800003456 | 1998-04-04 |      F | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  5 |      sunqi | 孙七 | 13800007890 | 2000-05-05 |      M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+-- 修改原始表的schema
mysql> ALTER TABLE user_info ADD COLUMN email varchar(255);
mysql> INSERT INTO `user_info`(`login_name`, `name`, `phone_num`, `birthday`, `gender`, `create_time`, `operate_time`, `email`)
values('hank', '汉克', '18600007890', '2000-05-05', 'M', NOW(), NOW(), 'hank@163.com');-- 注意:我们此时在原始sql-client窗口查询,`email`字段并没有加上
Flink SQL> select * from ods_user_info_cdc;
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
| id | login_name | name |   phone_num |   birthday | gender |         create_time |        operate_time |       pt |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
|  3 |     wangwu | 王五 | 13800009012 | 1995-03-03 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  1 |   zhangsan | 张三 | 13800001234 | 1990-01-01 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  2 |       lisi | 李四 | 13800005678 | 1992-02-02 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  4 |    zhaoliu | 赵六 | 13800003456 | 1998-04-04 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  5 |      sunqi | 孙七 | 13800007890 | 2000-05-05 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  6 |       hank | 汉克 | 18600007890 | 2000-05-05 |      M | 2025-02-19 18:01:43 | 2025-02-19 18:01:43 | 20250219 |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+-- 其实,此时的schema已经改变,我们查看hdfs上的schema信息会发现`email`字实际上已经添加
[root@centos01 ~]# hdfs dfs -cat /user/hive/warehouse/paimon_db.db/ods_user_info_cdc/schema/schema-1-- 因此,我们需要另外重新启动一个sql-client窗口进行查询,可以发现是正确的
Flink SQL> select * from ods_user_info_cdc;
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+
| id | login_name | name |   phone_num |   birthday | gender |         create_time |        operate_time |       pt |        email |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+
|  3 |     wangwu | 王五 | 13800009012 | 1995-03-03 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  1 |   zhangsan | 张三 | 13800001234 | 1990-01-01 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  2 |       lisi | 李四 | 13800005678 | 1992-02-02 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  4 |    zhaoliu | 赵六 | 13800003456 | 1998-04-04 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  5 |      sunqi | 孙七 | 13800007890 | 2000-05-05 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  6 |       hank | 汉克 | 18600007890 | 2000-05-05 |      M | 2025-02-19 18:01:43 | 2025-02-19 18:01:43 | 20250219 | hank@163.com |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+

注:

  • 上面自动创建的paimon表是分区表,但是在Hive中不是分区表
-- 报错
0: jdbc:hive2://192.168.42.101:10000> show partitions ods_user_info_cdc ;
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Table ods_user_info_cdc is not a partitioned table (state=42000,code=1)
  • 要想在hive中也是分区表,需要在同步时候设置参数metastore.partitioned-table=true
# 同步到另一张paimon表,并设置hive分区参数
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \mysql-sync-table \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table ods_user_info_partition_cdc \--primary-keys pt,id \--partition_keys pt \--computed_column 'pt=date_format(operate_time, yyyyMMdd)' \--mysql-conf hostname=centos01 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=cdc_db \--mysql-conf table-name='user_info' \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--table-conf bucket=2 \--table-conf changelog-producer=input \--table-conf metastore.partitioned-table=true \--table-conf sink.parallelism=2 
  • 然后可以从hive中读取相关分区了
0: jdbc:hive2://192.168.42.101:10000> show partitions ods_user_info_partition_cdc ;
+--------------+
|  partition   |
+--------------+
| pt=20250219  |
+--------------+-- 读取相关分区数据
0: jdbc:hive2://192.168.42.101:10000> select * from ods_user_info_partition_cdc a where  pt=20250219;
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-----------+---------------+
| a.id  | a.login_name  | a.name  | a.phone_num  | a.birthday  | a.gender  |     a.create_time      |     a.operate_time     |   a.pt    |    a.email    |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-----------+---------------+
| 1     | zhangsan      | 张三      | 13800001234  | 1990-01-01  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
| 2     | lisi          | 李四      | 13800005678  | 1992-02-02  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | lisi@qq.com   |
| 4     | zhaoliu       | 赵六      | 13800003456  | 1998-04-04  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
| 5     | sunqi         | 孙七      | 13800007890  | 2000-05-05  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
| 6     | hank          | 汉克      | 18600007890  | 2000-05-05  | M         | 2025-02-19 18:01:43.0  | 2025-02-19 18:01:43.0  | 20250219  | hank@163.com  |
| 3     | wangwu        | 王五      | 13800009012  | 1995-03-03  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-----------+---------------+-- 需要注意的是,如果我们修改数据,原始的hive分区原始数据可能会减少
-- 下面示例,我们修改了两条数据
0: jdbc:hive2://192.168.42.101:10000> select * from ods_user_info_partition_cdc a  where pt=20250219;
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
| a.id  | a.login_name  | a.name  | a.phone_num  | a.birthday  | a.gender  |     a.create_time      |     a.operate_time     |    a.email    |   a.pt    |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
| 2     | lisi          | 李四      | 13800005678  | 1992-02-02  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | lisi@qq.com   | 20250219  |
| 4     | zhaoliu       | 赵六      | 13800003456  | 1998-04-04  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | NULL          | 20250219  |
| 5     | sunqi         | 孙七      | 13800007890  | 2000-05-05  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | NULL          | 20250219  |
| 6     | hank          | 汉克      | 18600007890  | 2000-05-05  | M         | 2025-02-19 18:01:43.0  | 2025-02-19 18:01:43.0  | hank@163.com  | 20250219  |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
4 rows selected (0.199 seconds)
0: jdbc:hive2://192.168.42.101:10000> select * from ods_user_info_partition_cdc a  where pt=20250220;
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-------------+-----------+
| a.id  | a.login_name  | a.name  | a.phone_num  | a.birthday  | a.gender  |     a.create_time      |     a.operate_time     |   a.email   |   a.pt    |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-------------+-----------+
| 3     | wangwu        | 王五      | 13800009012  | 1995-03-03  | M         | 2025-02-19 18:01:18.0  | 2025-02-20 12:01:18.0  | ww@163.com  | 20250220  |
| 1     | zhangsan      | 张三三     | 13800001234  | 1990-01-01  | M         | 2025-02-19 18:01:18.0  | 2025-02-20 18:01:18.0  | NULL        | 20250220  |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-------------+-----------+
2 rows selected (0.189 seconds)
0: jdbc:hive2://192.168.42.101:10000> select * from ods_user_info_partition_cdc a  ;
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
| a.id  | a.login_name  | a.name  | a.phone_num  | a.birthday  | a.gender  |     a.create_time      |     a.operate_time     |    a.email    |   a.pt    |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
| 2     | lisi          | 李四      | 13800005678  | 1992-02-02  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | lisi@qq.com   | 20250219  |
| 4     | zhaoliu       | 赵六      | 13800003456  | 1998-04-04  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | NULL          | 20250219  |
| 5     | sunqi         | 孙七      | 13800007890  | 2000-05-05  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | NULL          | 20250219  |
| 6     | hank          | 汉克      | 18600007890  | 2000-05-05  | M         | 2025-02-19 18:01:43.0  | 2025-02-19 18:01:43.0  | hank@163.com  | 20250219  |
| 3     | wangwu        | 王五      | 13800009012  | 1995-03-03  | M         | 2025-02-19 18:01:18.0  | 2025-02-20 12:01:18.0  | ww@163.com    | 20250220  |
| 1     | zhangsan      | 张三三     | 13800001234  | 1990-01-01  | M         | 2025-02-19 18:01:18.0  | 2025-02-20 18:01:18.0  | NULL          | 20250220  |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
  • 当然,我们在建立paimon表的时候,也可以指定参数'metastore.partitioned-table' = 'true'
Flink SQL> drop table if exists flink_p_demo;
Flink SQL> CREATE TABLE flink_p_demo (dt STRING NOT NULL,name STRING NOT NULL,amount BIGINT,PRIMARY KEY (dt, name) NOT ENFORCED 
) PARTITIONED BY (dt)
WITH ('connector' = 'paimon','changelog-producer' = 'lookup','metastore.partitioned-table' = 'true'  -- 设置为true
);
Flink SQL> insert into flink_p_demo values  ('20240725', 'apple', 3), ('20240726', 'banana', 5);0: jdbc:hive2://192.168.42.101:10000> show partitions flink_p_demo;
+--------------+
|  partition   |
+--------------+
| dt=20240725  |
| dt=20240726  |
+--------------+
0: jdbc:hive2://192.168.42.101:10000> select * from flink_p_demo a  where dt = 20240726;
+---------+-----------+-----------+
| a.name  | a.amount  |   a.dt    |
+---------+-----------+-----------+
| banana  | 5         | 20240726  |
+---------+-----------+-----------+

2.2 整库同步

  • 通过官方提供的paimon-action的jar包可以很方便的将 MySQL、Kafka、Mongo等中的数据实时摄入到Paimon中
  • 使用paimon-flink-action,采用mysql-sync-database(整库同步),并通过"–including_tables"参数选择要同步的表
  • 这种同步模式有效地节省了大量资源开销,相比每个表启动一个 Flink 任务而言,避免了资源的大量浪费。
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \mysql-sync-database \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table-prefix "ods_" \--table-suffix "_mysql_cdc" \--mysql-conf hostname=centos01 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=cdc_db \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--table-conf bucket=2 \--table-conf changelog-producer=input \--table-conf sink.parallelism=2 \--including-tables 'user_info|orders'-- 在一个flink任务中同步多个mysql表
-- 在paimon中会自动创建多张表
Flink SQL> select * from ods_orders_mysql_cdc;
Flink SQL> select * from ods_user_info_mysql_cdc;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/70596.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智慧校园系统在学生学习与生活中的应用

随着科技的快速发展&#xff0c;智慧校园系统逐渐成为现代教育不可或缺的一部分。它整合了先进的信息技术、物联网技术以及人工智能等&#xff0c;旨在构建一个全面、智能、个性化的学习与生活环境。对于学生而言&#xff0c;这一系统不仅能够极大地提高学习效率&#xff0c;还…

基于Flask的京东商品信息可视化分析系统的设计与实现

【Flask】基于Flask的京东商品信息可视化分析系统的设计与实现&#xff08;完整系统源码开发笔记详细部署教程&#xff09;✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 系统能够灵活地执行SQL查询&#xff0c;提取出用于分析的关键数据指标。为了将这…

Electron通过ffi-napi调用dll导出接口

electron使用ffi-napi环境搭建 附打包好的ffi-napi可以直接放到项目目录下使用&#xff0c;避免以后麻烦 一、安装node.js Node.js官网&#xff1a;https://nodejs.org/zh-cn/download&#xff0c;选择LTS长期稳定版本即可 需要注意Node.js 区分32和64位&#xff0c;32位版…

25工程管理研究生复试面试问题汇总 工程管理专业知识问题很全! 工程管理复试全流程攻略 工程管理考研复试真题汇总

工程管理复试面试心里没底&#xff1f;别慌&#xff01;学姐手把手教你怎么应对复试&#xff01; 很多同学面对复试总担心踩坑&#xff0c;其实只要避开雷区掌握核心技巧&#xff0c;逆袭上岸完全有可能&#xff01;这份保姆级指南帮你快速锁定重点&#xff0c;时间紧迫优先背…

深蓝学院自主泊车第3次作业-IPM

目录 1 题目介绍2 求解 1 题目介绍 已知鱼眼相机的参数&#xff0c; image_width&#xff0c;表示图像的宽度image_height&#xff0c;表示图像的高度 ξ \xi ξ&#xff0c;表示鱼眼相机参数 k 1 k_1 k1​、 k 2 k_2 k2​&#xff0c;表示径向相机参数 p 1 p_1 p1​、 p 2 p…

核货宝助力连锁门店订货数字化转型升级

在竞争激烈的连锁零售行业&#xff0c;传统订货模式弊端日益凸显&#xff0c;严重制约着企业的发展。核货宝订货系统以其卓越的数字化解决方案&#xff0c;为连锁门店订货带来了全方位的变革&#xff0c;助力企业实现数字化转型升级&#xff0c;在市场中抢占先机。 一、增强总部…

AutoGen 技术博客系列 八:深入剖析 Swarm—— 智能体协作的新范式

本系列博文在掘金同步发布, 更多优质文章&#xff0c;请关注本人掘金账号&#xff1a; 人肉推土机的掘金账号 AutoGen系列一&#xff1a;基础介绍与入门教程 AutoGen系列二&#xff1a;深入自定义智能体 AutoGen系列三&#xff1a;内置智能体的应用与实战 AutoGen系列四&am…

力扣每日一题【算法学习day.132】

前言 ###我做这类文章一个重要的目的还是记录自己的学习过程&#xff0c;我的解析也不会做的非常详细&#xff0c;只会提供思路和一些关键点&#xff0c;力扣上的大佬们的题解质量是非常非常高滴&#xff01;&#xff01;&#xff01; 习题 1.统计相似字符串对的数目 题目链…

Spring AI + Ollama 实现调用DeepSeek-R1模型API

一、前言 随着人工智能技术的飞速发展&#xff0c;大语言模型&#xff08;LLM&#xff09;在各个领域的应用越来越广泛。DeepSeek 作为一款备受瞩目的国产大语言模型&#xff0c;凭借其强大的自然语言处理能力和丰富的知识储备&#xff0c;迅速成为业界关注的焦点。无论是文本生…

自学Java-AI结合GUI开发一个石头迷阵的游戏

自学Java-AI结合GUI开发一个石头迷阵的游戏 准备环节1、创建石头迷阵的界面2、打乱顺序3、控制上下左右移动4、判断是否通关5、统计移动步骤&#xff0c;重启游戏6、拓展问题 准备环节 技术&#xff1a; 1、GUI界面编程 2、二维数组 3、程序流程控制 4、面向对象编程 ∙ \bulle…

本地安装 Grafana Loki

本地安装 Grafana Loki 一、 安装 Loki1. 下载 Loki2. 创建 Loki 配置文件3. 创建 Loki 服务 二、安装 Promtail1. 下载 Promtail2. 创建 Promtail 配置文件3. 创建 Promtail 服务 三、 安装 Grafana四、启动所有服务五、添加loki 数据源1. 添加仪表板2. 日志查询面板 json 参考…

趣味数学300题1981版-十五个正方形

分析&#xff1a;移动两根变成11个正方形很简单&#xff1a; 移动4根变成15个正方形&#xff0c;分析&#xff1a; 一个田字格包含5个正方形&#xff0c;若要15个正方形需要3个田字格&#xff0c;如果3个田字格完全不重合&#xff0c;需要6*318根火柴。如果合并正方形的边&…

基于Spring Boot的兴顺物流管理系统设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…

Andorid 学习 Compose UI(1):Box

今天学习和实验一下Android 的compose UI&#xff0c;写一些很小的Demo实验。下面和css 布局有点相似性。 如Box 看起来像html 当中的 div &#xff0c;compose UI 提供Modifier 很多设置。你会发现Text,Box,Row,Image等组件 都有这个属性。我们处理任务包括对齐布局&#xff0…

Selenium实战案例1:论文pdf自动下载

在上一篇文章中&#xff0c;我们介绍了Selenium的基础用法和一些常见技巧。今天&#xff0c;我们将通过中国科学&#xff1a;信息科学网站内当前目录论文下载这一实战案例来进一步展示Selenium的web自动化流程。 目录 中国科学&#xff1a;信息科学当期目录论文下载 1.网页内…

Perfectly Clear WorkBench深度解析:专业图像处理软件的高效应用

在图像处理领域,面对照片曝光不足、色彩失真、细节模糊等常见问题,一款专业且高效的图像处理软件显得尤为重要。今天,本文将为大家详细介绍Perfectly Clear WorkBench这款图像处理软件,帮助大家更好地了解并应用其功能,提升照片质量。 一、智能图像校正,解决常见问题 Pe…

使用 DistilBERT 进行资源高效的自然语言处理

DistilBERT 是 BERT 的一个更小、更快的版本&#xff0c;在减少资源消耗的同时仍能保持良好性能。对于计算能力和内存受限的环境来说&#xff0c;它是一个理想的选择。 在自然语言处理&#xff08;NLP&#xff09;中&#xff0c;像 BERT 这样的模型提供了高精度和出色的性能。然…

【后端基础】布隆过滤器原理

文章目录 一、Bloom Filter&#xff08;布隆过滤器&#xff09;概述1. Bloom Filter 的特点2. Bloom Filter 的工作原理 二、示例1. 添加与查询2. 假阳性 三、Bloom Filter 的操作1、假阳性概率2、空间效率3、哈希函数的选择 四、应用 Bloom Filter 是一种非常高效的概率型数据…

docker独立部署milvus向量数据库

milvus镜像&#xff1a;国外封锁&#xff0c;国内源也不好用。基本上所有源都不能用 首先想到阿里云服务&#xff0c;但是阿里云国外服务器便宜的300~400呢。 基于成本考虑终于装上心心念念的milvus(*^▽^*) 安装 Milvus 安装 Milvus 独立版 wget https://raw.githubuserco…

Git操作整体流程

文章目录 1.Git创建个人仓库2、Git全局配置3、Git本地管理4. Git本地管理常用命令汇总5、使用Git命令将项目提交到远程码云管理6.使用IDEA进行管理7、Idea里面的终端8、关于提交总结 1.Git创建个人仓库 打开https://gitee.com/&#xff0c;登录个人账号&#xff0c;右上角加号…