1、 建库及测试数据插入脚本
--建增量更新目标表
-- Create table
create table EDW_T100_BAL_IU
( ID VARCHAR2(8) not null,BAL NUMBER(22,2),UPDATE_DT VARCHAR2(8)
);
-- Add comments to the table
comment on table EDW_T100_BAL_IUis '余额(增量更新方式加载)表';
-- Add comments to the columns
comment on column EDW_T100_BAL_IU.IDis '主键ID';
comment on column EDW_T100_BAL_IU.BALis '余额';
comment on column EDW_T100_BAL_IU.UPDATE_DTis '更新日期';
-- Create/Recreate primary, unique and foreign key constraints
alter table EDW_T100_BAL_IUadd constraint EDW_T100_BAL_IU_ID primary key (ID);
--建增量更新用到的临时表
-- Create table
create global temporary table TMP_T100_BAL_IU
( ID VARCHAR2(8) not null,BAL NUMBER(22,2),UPDATE_DT VARCHAR2(8) not null
)
on commit delete rows;
-- Create/Recreate primary, unique and foreign key constraints
alter table TMP_T100_BAL_IUadd constraint TMP_T100_BAL_IU_ID_UPDT primary key (ID, UPDATE_DT);
--建日志表
create table EDW_ETL_LOG_DETAIL
( PROC_NAME VARCHAR2(50),P_ETLDATE VARCHAR2(20),ETL_MEMO VARCHAR2(10),ETL_RECORD_NUM INTEGER,ERR_MSG VARCHAR2(1000),ERR_SQL VARCHAR2(1000),TABLE_NAME VARCHAR2(50),START_TIMESTAMP TIMESTAMP(6),END_TIMESTAMP TIMESTAMP(6)
);
--建源表及插入源测试数据
-- Create table
create table ODS_CMIS_YE
( ID VARCHAR2(8) not null,BAL NUMBER(22,2),ODS_DATA_DATE VARCHAR2(8) not null
);
-- Add comments to the table
comment on table ODS_CMIS_YEis 'ODS余额表(提供的是增量更新数据)';
-- Add comments to the columns
comment on column ODS_CMIS_YE.IDis '主键ID';
comment on column ODS_CMIS_YE.BALis '余额';
comment on column ODS_CMIS_YE.ODS_DATA_DATEis 'ODS数据日期';
-- Create/Recreate primary, unique and foreign key constraints
alter table ODS_CMIS_YEadd constraint ODS_CMIS_YE primary key (ID, ODS_DATA_DATE);
--insert test data
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('4', 34545.09, '20101202');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('1', 10.10, '20101201');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('2', 20.30, '20101201');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('3', 22.10, '20101204');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('4', 33.80, '20101205');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('5', 66.09, '20101203');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('6', 8889.08, '20101204');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('7', 2324.07, '20101205');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('8', 232449.98, '20101201');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('9', 3434.99, '20101201');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('1', 23.99, '20101202');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('2', 20.30, '20101202');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('1', 3333.98, '20101203');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('10', 3453252.09, '20101203');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('11', 2343432.78, '20101202');
commit;
2、 增量更新示例代码
CREATE OR REPLACE PROCEDURE P_T100_BAL_IU(P_ETLDATE IN VARCHAR2, --日期参数O_RUNSTATUS OUT NUMBER, --执行结果O_MSG OUT VARCHAR2 --错误返回) AS--Procedure Name:P_T100_BAL_IU
--Author :nisj
--Script File :P_T100_BAL_IU.SQL
/*###################################################*/
--Souce Databse.table :
--ODS_CMIS_YE ODS余额表(提供的是增量更新数据)
/*###################################################*/
--Target Databse.table :
--EDW_T100_BAL_IU 余额(增量更新方式加载)表
/*###################################################*/
--加载方式:增量更新--定义存储过程信息V_PROC_NAME VARCHAR2(50) := 'P_T100_BAL_IU';V_TABLE_NAME VARCHAR2(50) := 'EDW_T100_BAL_IU';V_START_TIMESTAMP TIMESTAMP; --加载开始时间V_END_TIMESTAMP TIMESTAMP; --加载结束时间V_RECORD_NUMBER INTEGER; --记录数--定义错误代码,错误状态V_SQLERRM VARCHAR2(1000); --异常信息V_ERR_SQL VARCHAR2(1000); --出错位置BEGIN--捕获过程开始时间SELECT SYSDATE INTO V_START_TIMESTAMP FROM DUAL;--增量更新P_ETLDATE后的数据处理--TMP_T100_BAL_IU存储的是EDW表中大于等于更新日期的所有临时数据,大于的由备份得到,等于的由加载当天数据时得到--大于跑批日期的的数据备份条件是Update_Dt > P_ETLDATE,而删除目标表时是Update_Dt >= P_ETLDATEV_ERR_SQL := 'INSERT INTO TMP_T100_BAL_IU FROM EDW_T100_BAL_IU WHERE Update_Dt > P_ETLDATE';INSERT INTO TMP_T100_BAL_IUSELECT * FROM EDW_T100_BAL_IU WHERE Update_Dt > P_ETLDATE;V_ERR_SQL := 'DELETE FROM EDW_T100_BAL_IU WHERE Update_Dt >= P_ETLDATE';DELETE FROM EDW_T100_BAL_IU WHERE Update_Dt >= P_ETLDATE;--加载当天全量数据V_ERR_SQL := 'INSERT INTO TMP_T100_BAL_IU FROM ODS_CMIS_YE';INSERT INTO TMP_T100_BAL_IU--该表无主键,但逻辑主键是(ID、UPDATE_DT)的复合主键(ID --主键ID,BAL --余额,UPDATE_DT --更新日期)SELECT a1.ID --主键ID,a1.BAL --余额,a1.ODS_DATA_DATE --ODS数据日期FROM ODS_CMIS_YE a1WHERE Ods_Data_Date = P_ETLDATE;--增量更新操作--按主键删除更新部分记录V_ERR_SQL := 'DELETE FROM EDW_T100_BAL_IU WHERE (ID) IN TMP_T100_BAL_IU';DELETE FROM EDW_T100_BAL_IUWHERE (ID)IN (SELECT IDFROM TMP_T100_BAL_IU);--插入新增和更新数据V_ERR_SQL := 'INSERT INTO EDW_T100_BAL_IU WHERE TMP_T100_BAL_IU [ID,MAX(UPDATE_DT)]';INSERT INTO EDW_T100_BAL_IU(ID --主键ID,BAL --余额,UPDATE_DT --更新日期)SELECT ID --主键ID,BAL --余额,UPDATE_DT --更新日期FROM TMP_T100_BAL_IUWHERE (ID,UPDATE_DT)IN (SELECT ID,MAX(UPDATE_DT)FROM TMP_T100_BAL_IUGROUP BY ID);--正常处理V_RECORD_NUMBER := SQL%ROWCOUNT;SELECT SYSDATE INTO V_END_TIMESTAMP FROM dual;INSERT INTO EDW_ETL_LOG_DETAIL(START_TIMESTAMP,END_TIMESTAMP,PROC_NAME,TABLE_NAME,ETL_RECORD_NUM,ETL_MEMO,P_ETLDATE)VALUES (V_START_TIMESTAMP,V_END_TIMESTAMP,V_PROC_NAME,V_TABLE_NAME,V_RECORD_NUMBER,'成功',P_ETLDATE);COMMIT;--异常处理EXCEPTION WHEN OTHERS THENBEGINROLLBACK;V_SQLERRM := SQLERRM;INSERT INTO EDW_ETL_LOG_DETAIL(START_TIMESTAMP,END_TIMESTAMP,PROC_NAME,TABLE_NAME,ETL_RECORD_NUM,ETL_MEMO,ERR_MSG,ERR_SQL,P_ETLDATE)VALUES (V_START_TIMESTAMP,V_END_TIMESTAMP,V_PROC_NAME,V_TABLE_NAME,0,'失败',V_SQLERRM,V_ERR_SQL,P_ETLDATE);O_RUNSTATUS := 1;O_MSG := 'PROGRAMMING ERROR HAPPENED';COMMIT;END;END;
/
3、 结果测试
Incremental_Update
truncate table edw_t100_bal_iu;
select * from ods_cmis_ye;
select * from edw_t100_bal_iu;
--无论批如何调,只要保证每一天都调了,edw_t100_bal_iu就会和下面的SQL得出的结果一致!
SELECT *FROM ODS_CMIS_YEWHERE ODS_DATA_DATE <= '20101205'AND (ID, ODS_DATA_DATE) IN(SELECT ID, MAX(ODS_DATA_DATE) FROM ODS_CMIS_YE GROUP BY ID);p_t100_bal_iu;
p_t100_bal_iu_new;
select * from edw_etl_log_detail order by 8 desc;
4、 总结说明
此过程代码,对于如12.5号数据已经跑完但发现12.1号那天的源数据有问题的情况,我们只要重新调度12.1的批就可以了,而不必从12.1号一直跑到当天,且能保证数据的完整性;而对于如果发现某一天数据有问题,需要从出错那一天起一直跑到当天的情况,代码如下,可根据实际情况选用。
CREATE OR REPLACE PROCEDURE P_T100_BAL_IU(P_ETLDATE IN VARCHAR2, --日期参数O_RUNSTATUS OUT NUMBER, --执行结果O_MSG OUT VARCHAR2 --错误返回) AS--Procedure Name:P_T100_BAL_IU
--Author :nisj
--Script File :P_T100_BAL_IU.SQL
/*###################################################*/
--Souce Databse.table :
--ODS_CMIS_YE ODS余额表(提供的是增量更新数据)
/*###################################################*/
--Target Databse.table :
--EDW_T100_BAL_IU 余额(增量更新方式加载)表
/*###################################################*/
--加载方式:增量更新--定义存储过程信息V_PROC_NAME VARCHAR2(50) := 'P_T100_BAL_IU';V_TABLE_NAME VARCHAR2(50) := 'EDW_T100_BAL_IU';V_START_TIMESTAMP TIMESTAMP; --加载开始时间V_END_TIMESTAMP TIMESTAMP; --加载结束时间V_RECORD_NUMBER INTEGER; --记录数--定义错误代码,错误状态V_SQLERRM VARCHAR2(1000); --异常信息V_ERR_SQL VARCHAR2(1000); --出错位置BEGIN--捕获过程开始时间SELECT SYSDATE INTO V_START_TIMESTAMP FROM DUAL;--增量更新历史回滚DELETE FROM EDW_T100_BAL_IU WHERE Update_Dt >= P_ETLDATE;--加载当天全量数据V_ERR_SQL := 'INSERT INTO TMP_T100_BAL_IU FROM ODS_CMIS_YE';INSERT INTO TMP_T100_BAL_IU(ID --主键ID,BAL --余额,UPDATE_DT --更新日期)SELECT a1.ID --主键ID,a1.BAL --余额,a1.ODS_DATA_DATE --ODS数据日期FROM ODS_CMIS_YE a1WHERE Ods_Data_Date = P_ETLDATE;--增量更新操作--按主键删除更新部分记录DELETE FROM EDW_T100_BAL_IUWHERE (ID)IN (SELECT IDFROM TMP_T100_BAL_IU);--出入新增和更新数据INSERT INTO EDW_T100_BAL_IU(ID --主键ID,BAL --余额,UPDATE_DT --更新日期)SELECT ID --主键ID,BAL --余额,UPDATE_DT --更新日期FROM TMP_T100_BAL_IU;--正常处理V_RECORD_NUMBER := SQL%ROWCOUNT;SELECT SYSDATE INTO V_END_TIMESTAMP FROM dual;INSERT INTO EDW_ETL_LOG_DETAIL(START_TIMESTAMP,END_TIMESTAMP,PROC_NAME,TABLE_NAME,ETL_RECORD_NUM,ETL_MEMO,P_ETLDATE)VALUES (V_START_TIMESTAMP,V_END_TIMESTAMP,V_PROC_NAME,V_TABLE_NAME,V_RECORD_NUMBER,'成功',P_ETLDATE);COMMIT;--异常处理EXCEPTION WHEN OTHERS THENBEGINROLLBACK;V_SQLERRM := SQLERRM;INSERT INTO EDW_ETL_LOG_DETAIL(START_TIMESTAMP,END_TIMESTAMP,PROC_NAME,TABLE_NAME,ETL_RECORD_NUM,ETL_MEMO,ERR_MSG,ERR_SQL,P_ETLDATE)VALUES (V_START_TIMESTAMP,V_END_TIMESTAMP,V_PROC_NAME,V_TABLE_NAME,0,'失败',V_SQLERRM,V_ERR_SQL,P_ETLDATE);O_RUNSTATUS := 1;O_MSG := 'PROGRAMMING ERROR HAPPENED';COMMIT;END;END;
/