系列文章目录
线上问诊:业务数据采集
线上问诊:数仓数据同步
线上问诊:数仓开发(一)
文章目录
- 系列文章目录
- 前言
- 一、Hive on yarn
- 二、数仓开发
- 1.ODS开发
- 2.DIM开发
- 3.DWD开发
- 总结
前言
上次我们已经将MYSQL的数据传送到了HDFS,但是HDFS的数据没法直接进行查看和修改。这次我们将其转入hive仓库,并进行下一步的处理。
一、Hive on yarn
hive更换引擎
更换完成后,创建我们实验需要的数据库。
CREATE database medical;
二、数仓开发
为了实验方便我们现将之前的数据到删掉。
修改/opt/module/mock-medical/application.yml文件,统一时间。
修改 /opt/module/maxwell/config.properties
清空maxwell数据库
medical也清空
打开之前搭建的采集通道。
myhadoop.sh start
zk.sh start
kf.sh start
medical-f1.sh start
mxw.sh start
生成2023-05-01至2023-05-09的历史数据。
medical_mock.sh 9
增量表同步
medical_mysql_to_kafka_inc_init.sh all
全量表同步
medical_mysql_to_hdfs_full.sh all 2023-05-09
1.ODS开发
医生表(全量表)
DROP TABLE IF EXISTS `ods_doctor_full`;
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_doctor_full`
(`id` STRING COMMENT '医生ID',`create_time` STRING COMMENT '创建时间',`update_time` STRING COMMENT '修改时间',`birthday` STRING COMMENT '出生日期',`consultation_fee` DECIMAL(19, 2) COMMENT '就诊费用',`gender` STRING COMMENT '性别:101.男 102.女',`name` STRING COMMENT '姓名',`specialty` STRING COMMENT '专业:详情见字典表5xx条目',`title` STRING COMMENT '职称:301. 医士 302. 医师 303. 主治医师 304. 副主任医师 305. 主任医师',`hospital_id` STRING COMMENT '所属医院'
) COMMENT '医生全量表'PARTITIONED BY (`dt` STRING)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'NULL DEFINED AS ''LOCATION '/warehouse/medical/ods/ods_doctor_full/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
医院表(全量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_hospital_full`
(`id` STRING COMMENT '医院ID',`create_time` STRING COMMENT '创建时间',`update_time` STRING COMMENT '修改时间',`address` STRING COMMENT '地址',`alias` STRING COMMENT '医院别名',`bed_num` BIGINT COMMENT '病床数量',`city` STRING COMMENT '市',`department_num` BIGINT COMMENT '科室数量',`district` STRING COMMENT '区县',`establish_time` STRING COMMENT '建立时间',`health_care_num` BIGINT COMMENT '医护人数',`insurance` STRING COMMENT '是否医保',`level` STRING COMMENT '医院级别,一级甲等,二级甲等....',`name` STRING COMMENT '医院名称',`province` STRING COMMENT '省(直辖市)'
) COMMENT '医院表'PARTITIONED BY (`dt` STRING)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'NULL DEFINED AS ''LOCATION '/warehouse/medical/ods/ods_hospital_full/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
药品表(全量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_medicine_full`
(`id` STRING COMMENT '药品ID',`create_time` STRING COMMENT '创建时间',`update_time` STRING COMMENT '修改时间',`approval_code` STRING COMMENT '药物批号',`dose_type` STRING COMMENT '剂量',`name` STRING COMMENT '药品名称',`name_en` STRING COMMENT '英文名称',`price` DECIMAL(19, 2) COMMENT '药品价格',`specs` STRING COMMENT '规格',`trade_name` STRING COMMENT '商品名'
) COMMENT '药品表'PARTITIONED BY (`dt` STRING)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'NULL DEFINED AS ''LOCATION '/warehouse/medical/ods/ods_medicine_full/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
患者表(全量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_patient_full`
(`id` STRING COMMENT '患者ID',`create_time` STRING COMMENT '创建时间',`update_time` STRING COMMENT '修改时间',`birthday` STRING COMMENT '出生日期',`gender` STRING COMMENT '性别',`name` STRING COMMENT '姓名',`user_id` STRING COMMENT '所属用户'
) COMMENT '患者表'PARTITIONED BY (`dt` STRING)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'NULL DEFINED AS ''LOCATION '/warehouse/medical/ods/ods_patient_full/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
字典表(全量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_dict_full`
(`id` STRING COMMENT '编码ID',`create_time` STRING COMMENT '创建时间',`update_time` STRING COMMENT '修改时间',`value` STRING
) COMMENT '字典表'PARTITIONED BY (`dt` STRING)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'NULL DEFINED AS ''LOCATION '/warehouse/medical/ods/ods_dict_full/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
用户表(全量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_user_full`
(`id` STRING COMMENT '用户ID',`create_time` STRING COMMENT '创建时间',`update_time` STRING COMMENT '修改时间',`email` STRING COMMENT '电邮',`hashed_password` STRING COMMENT '密码',`telephone` STRING COMMENT '电话',`username` STRING COMMENT '用户名'
) COMMENT '用户全量表'PARTITIONED BY (`dt` STRING)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'NULL DEFINED AS ''LOCATION '/warehouse/medical/ods/ods_user_full/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
就诊表(增量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_consultation_inc`
(`type` STRING COMMENT '变动类型',`ts` BIGINT COMMENT '变动时间',`data` STRUCT<id :STRING,create_time :STRING,update_time :STRING,consultation_fee :DECIMAL(16, 2),description :STRING, diagnosis :STRING, rating :STRING, user_id :STRING, review :STRING, patient_id :STRING,doctor_id :STRING, status :STRING> COMMENT '变更后数据',`old` MAP<STRING,STRING> COMMENT '旧值'
) COMMENT '就诊表增量表'PARTITIONED BY (`dt` STRING)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'LOCATION '/warehouse/medical/ods/ods_consultation_inc/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
处方开单表(增量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_prescription_inc`
(`type` STRING COMMENT '变动类型',`ts` BIGINT COMMENT '变动时间',`data` STRUCT<id :STRING, create_time :STRING, update_time :STRING, instruction :STRING, status :STRING,total_amount :DECIMAL(16, 2), consultation_id :STRING, doctor_id :STRING, patient_id :STRING> COMMENT '变更后数据',`old` MAP<STRING,STRING> COMMENT '旧值'
) COMMENT '处方表增量表'PARTITIONED BY (`dt` STRING)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'LOCATION '/warehouse/medical/ods/ods_prescription_inc/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
处方开单详情表(增量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_prescription_detail_inc`
(`type` STRING COMMENT '变动类型',`ts` BIGINT COMMENT '变动时间',`data` STRUCT<id :STRING, create_time :STRING, update_time :STRING, count :STRING, instruction :STRING, medicine_id :STRING, prescription_id :STRING> COMMENT '变更后数据',`old` MAP<STRING,STRING> COMMENT '旧值'
) COMMENT '处方详情表增量表'PARTITIONED BY (`dt` STRING)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'LOCATION '/warehouse/medical/ods/ods_prescription_detail_inc/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
支付表(增量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_payment_inc`
(`type` STRING COMMENT '变动类型',`ts` BIGINT COMMENT '变动时间',`data` STRUCT<id :STRING, create_time :STRING, update_time :STRING, payment_amount :DECIMAL(16, 2), status :STRING,consultation_id :STRING, prescription_id :STRING, user_id :STRING> COMMENT '变更后数据',`old` MAP<STRING,STRING> COMMENT '旧值'
) COMMENT '支付表增量表'PARTITIONED BY (`dt` STRING)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'LOCATION '/warehouse/medical/ods/ods_payment_inc/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
医生表(增量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_doctor_inc`
(`type` STRING COMMENT '变动类型',`ts` BIGINT COMMENT '变动时间',`data` STRUCT<id :STRING, create_time :STRING, update_time :STRING, birthday :STRING, consultation_fee :DECIMAL(16, 2), gender :STRING, name :STRING, specialty :STRING, title :STRING, hospital_id :STRING> COMMENT '变更后数据',`old` MAP<STRING,STRING> COMMENT '旧值'
) COMMENT '医生增量表'PARTITIONED BY (`dt` STRING)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'LOCATION '/warehouse/medical/ods/ods_doctor_inc/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
用户表(增量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_user_inc`
(`type` STRING COMMENT '变动类型',`ts` BIGINT COMMENT '变动时间',`data` STRUCT<id :STRING, create_time :STRING, update_time :STRING, email :STRING, hashed_password :STRING,telephone :STRING, username :STRING> COMMENT '变更后数据',`old` MAP<STRING,STRING> COMMENT '旧值'
) COMMENT '用户增量表'PARTITIONED BY (`dt` STRING)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'LOCATION '/warehouse/medical/ods/ods_user_inc/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
患者表(增量表)
CREATE EXTERNAL TABLE IF NOT EXISTS `ods_patient_inc`
(`type` STRING COMMENT '变动类型',`ts` BIGINT COMMENT '变动时间',`data` STRUCT<`id` : STRING,`create_time` : STRING, `update_time` : STRING, `birthday` : STRING, `gender` : STRING,`name` : STRING, `user_id` : STRING> COMMENT '变更后数据',`old` MAP<STRING,STRING> COMMENT '旧值'
) COMMENT '用户增量表'PARTITIONED BY (`dt` STRING)ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'LOCATION '/warehouse/medical/ods/ods_patient_inc/'TBLPROPERTIES ('compression.codec' = 'org.apache.hadoop.io.compress.GzipCodec');
数据装载脚本
vim ~/bin/medical_hdfs_to_ods.sh
#!/bin/bashAPP=medicalif [ -n "$2" ] ;thendo_date=$2
else do_date=`date -d '-1 day' +%F`
fiload_data(){sql=""for i in $*; do#判断路径是否存在hadoop fs -test -e /origin_data/$APP/${i:4}/$do_date#路径存在方可装载数据if [[ $? = 0 ]]; thensql=$sql"load data inpath '/origin_data/$APP/${i:4}/$do_date' OVERWRITE into table ${APP}.$i partition(dt='$do_date');"fidonehive -e "$sql"
}case $1 inods_consultation_inc | ods_dict_full | ods_doctor_full | ods_doctor_inc | ods_hospital_full | ods_medicine_full | ods_patient_full | ods_patient_inc | ods_payment_inc | ods_prescription_detail_inc | ods_prescription_inc | ods_user_full | ods_user_inc)load_data "$1";;"all")load_data "ods_consultation_inc" "ods_dict_full" "ods_doctor_full" "ods_doctor_inc" "ods_hospital_full" "ods_medicine_full" "ods_patient_full" "ods_patient_inc" "ods_payment_inc" "ods_prescription_detail_inc" "ods_prescription_inc" "ods_user_full" "ods_user_inc" ;;
esac
添加权限
chmod +x ~/bin/medical_hdfs_to_ods.sh
执行脚本
medical_hdfs_to_ods.sh all 2023-05-09
2.DIM开发
医生维度表
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS dim_doctor_full
(`id` STRING COMMENT '医生ID',`birthday` STRING COMMENT '出生日期',`consultation_fee` DECIMAL(19, 2) COMMENT '就诊费用',`gender_code` STRING COMMENT '性别编码:101.男 102.女',`gender` STRING COMMENT '性别',`name` STRING COMMENT '姓名',`specialty_code` STRING COMMENT '专业编码:详情见字典表5xx条目',`specialty_name` STRING COMMENT '专业名称',`title_code` STRING COMMENT '职称编码:301. 医士 302. 医师 303. 主治医师 304. 副主任医师 305. 主任医师',`title_name` STRING COMMENT '职称名称',`hospital_id` STRING COMMENT '所属医院ID'
) COMMENT '医生维度表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dim/dim_doctor_full/'TBLPROPERTIES ('orc.compress' = 'snappy');
数据装载
insert overwrite table dim_doctor_fullpartition (dt = '2023-05-09')
select doc.id,birthday,consultation_fee,gender gender_code,gender_dic.value gender,name,specialty specialty_code,specialty_dic.value specialty_name,title title_code,title_dic.value title_name,hospital_id
from (select id,birthday,consultation_fee,gender,concat(substr(name,1,1), regexp_replace(substr(name, 2), '.', '*')) name,specialty,title,hospital_idfrom ods_doctor_fullwhere dt = '2023-05-09') docleft join(select id,valuefrom ods_dict_fullwhere dt = '2023-05-09') gender_dicon doc.gender = gender_dic.idleft join(select id,valuefrom ods_dict_fullwhere dt = '2023-05-09') specialty_dicon doc.specialty = specialty_dic.idleft join(select id,valuefrom ods_dict_fullwhere dt = '2023-05-09') title_dicon doc.title = title_dic.id;
医院维度表
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS dim_hospital_full
(`id` STRING COMMENT '医院ID',`address` STRING COMMENT '地址',`alias` STRING COMMENT '医院别名',`bed_num` BIGINT COMMENT '病床数量',`city` STRING COMMENT '所在城市',`department_num` BIGINT COMMENT '科室数量',`district` STRING COMMENT '所属区县',`establish_time` STRING COMMENT '建立时间',`health_care_num` BIGINT COMMENT '医护人数',`insurance` STRING COMMENT '是否医保',`level` STRING COMMENT '医院级别,一级甲等,二级甲等....',`name` STRING COMMENT '医院名称',`province` STRING COMMENT '所属省(直辖市)'
) COMMENT '医院维度表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dim/dim_hospital_full/'TBLPROPERTIES ('orc.compress' = 'snappy');
数据装载
insert overwrite table dim_hospital_fullpartition (dt = '2023-05-09')
select id,address,alias,bed_num,city,department_num,district,establish_time,health_care_num,insurance,level,name,province
from ods_hospital_full
where dt = '2023-05-09';
药品维度表
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS dim_medicine_full
(`id` STRING COMMENT '药品ID',`approval_code` STRING COMMENT '药物批号',`dose_type` STRING COMMENT '剂量',`name` STRING COMMENT '药品名称',`name_en` STRING COMMENT '英文名称',`price` DECIMAL(19, 2) COMMENT '药品价格',`specs` STRING COMMENT '规格',`trade_name` STRING COMMENT '商品名'
) COMMENT '药品维度表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dim/dim_medicine_full/'TBLPROPERTIES ('orc.compress' = 'snappy');
数据装载
insert overwrite table dim_medicine_fullpartition (dt = '2023-05-09')
select id,approval_code,dose_type,name,name_en,price,specs,trade_name
from ods_medicine_full
where dt = '2023-05-09';
患者维度表
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS dim_patient_full
(`id` STRING COMMENT '患者ID',`birthday` STRING COMMENT '出生日期',`gender_code` STRING COMMENT '性别编码',`gender` STRING COMMENT '性别',`name` STRING COMMENT '姓名',`user_id` STRING COMMENT '所属用户'
) COMMENT '患者维度表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dim/dim_patient_full/'TBLPROPERTIES ('orc.compress' = 'snappy');
数据装载
insert overwrite table dim_patient_fullpartition (dt = '2023-05-09')
select patient.id,birthday,gender gender_code,dic.value gender,name,user_id
from (select id,birthday,gender,concat(substr(name,1,1), regexp_replace(substr(name, 2), '.', '*')) name,user_idfrom ods_patient_fullwhere dt = '2023-05-09') patientleft join(select id,valuefrom ods_dict_fullwhere dt = '2023-05-09') dicon patient.gender = dic.id;
用户维度表
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS dim_user_full
(`id` STRING COMMENT '用户ID',`email` STRING COMMENT '电邮',`telephone` STRING COMMENT '电话',`username` STRING COMMENT '用户名'
) COMMENT '用户维度表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dim/dim_user_full/'TBLPROPERTIES ('orc.compress' = 'snappy');
数据装载
insert overwrite table dim_user_fullpartition (dt = '2023-05-09')
select id,concat('*@', split(email, '@')[1]) email,if(telephone regexp '^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$',concat(substr(telephone, 1, 3), '*'), null) telephone,username
from ods_user_full
where dt = '2023-05-09';
数据装载脚本
vim ~/bin/medical_ods_to_dim.sh
#!/bin/bashAPP=medicalif [ -n $2 ] ;thendo_date=$2
else echo 请传入日期参数exit
fi dim_doctor_full="
insert overwrite table ${APP}.dim_doctor_fullpartition (dt = '$do_date')
select doc.id,birthday,consultation_fee,gender gender_code,gender_dic.value gender,name,specialty specialty_code,specialty_dic.value specialty_name,title title_code,title_dic.value title_name,hospital_id
from (select id,birthday,consultation_fee,gender,concat(substr(name,1,1), regexp_replace(substr(name, 2), '.', '*')) name,specialty,title,hospital_idfrom ${APP}.ods_doctor_fullwhere dt = '$do_date') docleft join(select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date') gender_dicon doc.gender = gender_dic.idleft join(select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date') specialty_dicon doc.specialty = specialty_dic.idleft join(select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date') title_dicon doc.title = title_dic.id;
"dim_hospital_full="
insert overwrite table ${APP}.dim_hospital_fullpartition (dt = '$do_date')
select id,address,alias,bed_num,city,department_num,district,establish_time,health_care_num,insurance,level,name,province
from ${APP}.ods_hospital_full
where dt = '$do_date';
"dim_medicine_full="
insert overwrite table ${APP}.dim_medicine_fullpartition (dt = '$do_date')
select id,approval_code,dose_type,name,name_en,price,specs,trade_name
from ${APP}.ods_medicine_full
where dt = '$do_date';
"dim_patient_full="
insert overwrite table ${APP}.dim_patient_fullpartition (dt = '$do_date')
select patient.id,birthday,gender gender_code,dic.value gender,name,user_id
from (select id,birthday,gender,concat(substr(name,1,1), regexp_replace(substr(name, 2), '.', '*')) name,user_idfrom ${APP}.ods_patient_fullwhere dt = '$do_date') patientleft join(select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date') dicon patient.gender = dic.id;
"dim_user_full="
insert overwrite table ${APP}.dim_user_fullpartition (dt = '$do_date')
select id,concat('*@', split(email, '@')[1]) email,if(telephone regexp '^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$',concat(substr(telephone, 1, 3), '*'), null) telephone,username
from ${APP}.ods_user_full
where dt = '$do_date';
"case $1 indim_doctor_full | dim_hospital_full | dim_medicine_full | dim_patient_full | dim_user_full)hive -e "${!1}";;"all")hive -e "$dim_doctor_full$dim_hospital_full$dim_medicine_full$dim_patient_full$dim_user_full";;
esac
添加权限
chmod +x ~/bin/medical_ods_to_dim.sh
3.DWD开发
开启动态加载
set hive.exec.dynamic.partition.mode=nonstrict;
交易域问诊事务事实表
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS dwd_trade_consultation_inc
(`id` STRING COMMENT '问诊ID',`consultation_time` STRING comment '问诊时间',`consultation_fee` decimal(16, 2) comment '问诊费用',`doctor_id` STRING comment '医生id',`patient_id` STRING comment '患者ID',`user_id` STRING comment '用户id'
) COMMENT '交易域问诊事务事实表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dwd/dwd_trade_consultation_inc/'TBLPROPERTIES ('orc.compress' = 'snappy');
交易域问诊支付成功事务事实表
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS dwd_trade_consultation_pay_suc_inc
(`id` STRING COMMENT '问诊ID',`consultation_pay_suc_time` STRING comment '诊金支付成功时间',`consultation_fee` decimal(16, 2) comment '问诊费用',`doctor_id` STRING comment '医生ID',`patient_id` STRING comment '患者ID',`user_id` STRING comment '用户ID'
) COMMENT '交易域问诊支付成功事务事实表'PARTITIONED BY (`dt` STRING)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'STORED AS ORCLOCATION '/warehouse/medical/dwd/dwd_trade_consultation_pay_suc_inc/'TBLPROPERTIES ('orc.compress' = 'snappy');
交易域处方开单事务事实表
CREATE EXTERNAL TABLE IF NOT EXISTS dwd_trade_prescription_inc
(`id` STRING COMMENT '处方明细ID',`prescription_time` STRING COMMENT '处方开具时间',`count` BIGINT COMMENT '剂量',`medicine_id` STRING COMMENT '药品ID',`prescription_id` STRING COMMENT '处方ID',`total_amount` DECIMAL(16, 2) COMMENT '处方总金额',`consultation_id` STRING COMMENT '问诊ID',`doctor_id` STRING COMMENT '医生ID',`patient_id` STRING COMMENT '患者ID'
) COMMENT '交易域处方开单事务事实表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dwd/dwd_trade_prescription_inc/'TBLPROPERTIES ('orc.compress' = 'snappy');
交易域处方开单支付成功事务事实表
CREATE EXTERNAL TABLE IF NOT EXISTS dwd_trade_prescription_pay_suc_inc
(`id` STRING COMMENT '处方明细ID',`prescription_pay_suc_time` STRING COMMENT '处方支付成功时间',`count` BIGINT COMMENT '剂量',`medicine_id` STRING COMMENT '药品ID',`prescription_id` STRING COMMENT '处方ID',`total_amount` DECIMAL(16, 2) COMMENT '处方总金额',`consultation_id` STRING COMMENT '问诊ID',`doctor_id` STRING COMMENT '医生ID',`patient_id` STRING COMMENT '患者ID'
) COMMENT '交易域处方开单支付成功事务事实表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dwd/dwd_trade_prescription_pay_suc_inc/'TBLPROPERTIES ('orc.compress' = 'snappy');
医生域医生注册事务事实表
CREATE EXTERNAL TABLE IF NOT EXISTS dwd_doctor_register_inc
(`id` STRING COMMENT '医生ID',`register_time` STRING COMMENT '注册时间',`birthday` STRING COMMENT '出生日期',`consultation_fee` DECIMAL(19, 2) COMMENT '就诊费用',`gender_code` STRING COMMENT '性别编码:101.男 102.女',`gender` STRING COMMENT '性别',`name` STRING COMMENT '姓名',`specialty_code` STRING COMMENT '专业编码:详情见字典表5xx条目',`specialty_name` STRING COMMENT '专业名称',`title_code` STRING COMMENT '职称编码:301. 医士 302. 医师 303. 主治医师 304. 副主任医师 305. 主任医师',`title_name` STRING COMMENT '职称名称',`hospital_id` STRING COMMENT '所属医院'
) COMMENT '医生域医生注册事务事实表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dwd/dwd_doctor_register_inc/'TBLPROPERTIES ("orc.compress" = "snappy");
用户域用户注册事务事实表
CREATE EXTERNAL TABLE IF NOT EXISTS dwd_user_register_inc
(`id` STRING COMMENT '用户ID',`register_time` STRING COMMENT '注册日期',`email` STRING COMMENT '邮箱地址',`telephone` STRING COMMENT '手机号',`username` STRING COMMENT '用户名'
) COMMENT '用户域用户注册事务事实表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dwd/dwd_user_register_inc/'TBLPROPERTIES ("orc.compress" = "snappy");
用户域患者登记事务事实表
CREATE EXTERNAL TABLE IF NOT EXISTS dwd_user_patient_add_inc
(`id` STRING COMMENT '患者ID',`add_time` STRING COMMENT '登记时间',`birthday` STRING COMMENT '生日',`gender_code` STRING COMMENT '性别编码',`gender` STRING COMMENT '性别',`name` STRING COMMENT '姓名',`user_id` STRING COMMENT '所属用户ID'
) COMMENT '用户域患者登记事务事实表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dwd/dwd_user_patient_add_inc'TBLPROPERTIES ('orc.compress' = 'snappy');
互动域用户评价事务事实表
CREATE EXTERNAL TABLE IF NOT EXISTS dwd_interaction_review_inc
(`id` STRING COMMENT '问诊ID',`review_time` STRING COMMENT '评价时间',`rating` STRING COMMENT '评分',`doctor_id` STRING COMMENT '医生ID',`patient_id` STRING COMMENT '病人ID',`user_id` STRING COMMENT '用户ID'
) COMMENT '互动域用户评价事务事实表'PARTITIONED BY (`dt` STRING)STORED AS ORCLOCATION '/warehouse/medical/dwd/dwd_interaction_review_inc'TBLPROPERTIES ('orc.compress' = 'snappy');
首日装载脚本
vim ~/bin/medical_ods_to_dwd_init.sh
#!/bin/bashAPP=medicalif [ -n $2 ]
then do_date=$2
else echo "请传入日期参数!!!"exit
fidwd_trade_consultation_inc="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_trade_consultation_incpartition (dt)
select data.id,data.create_time consultation_time,data.consultation_fee,data.doctor_id,data.patient_id,data.user_id,date_format(data.create_time, 'yyyy-MM-dd') dt
from ${APP}.ods_consultation_inc
where dt = '$do_date'and type = 'bootstrap-insert';
"dwd_trade_consultation_pay_suc_inc="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_trade_consultation_pay_suc_incpartition (dt)
select data.id,data.update_time consultation_pay_suc_time,data.consultation_fee,data.doctor_id,data.patient_id,data.user_id,date_format(data.create_time, 'yyyy-MM-dd') dt
from ${APP}.ods_consultation_inc
where dt = '$do_date'and type = 'bootstrap-insert'and data.status <> '201'and data.status <> '202';
"dwd_trade_prescription_inc="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_trade_prescription_incpartition (dt)
select detail.id,prescription_time,count,medicine_id,prescription_id,total_amount,consultation_id,doctor_id,patient_id,date_format(prescription_time, 'yyyy-MM-dd') dt
from (select data.id,data.create_time prescription_time,data.count,data.medicine_id,data.prescription_idfrom ${APP}.ods_prescription_detail_incwhere dt = '$do_date'and type = 'bootstrap-insert') detailleft join(select data.id,data.total_amount,data.consultation_id,data.doctor_id,data.patient_idfrom ${APP}.ods_prescription_incwhere dt = '$do_date'and type = 'bootstrap-insert') infoon detail.prescription_id = info.id;
"dwd_trade_prescription_pay_suc_inc="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_trade_prescription_pay_suc_incpartition (dt)
select detail.id,prescription_pay_suc_time,count,medicine_id,prescription_id,total_amount,consultation_id,doctor_id,patient_id,date_format(prescription_pay_suc_time, 'yyyy-MM-dd') dt
from (select data.id,data.count,data.medicine_id,data.prescription_idfrom ${APP}.ods_prescription_detail_incwhere dt = '$do_date'and type = 'bootstrap-insert') detailjoin(select data.id,data.total_amount,data.update_time prescription_pay_suc_time,data.consultation_id,data.doctor_id,data.patient_idfrom ${APP}.ods_prescription_incwhere dt = '$do_date'and type = 'bootstrap-insert'and data.status = '203') infoon detail.prescription_id = info.id;
"dwd_doctor_register_inc="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_doctor_register_incpartition (dt)
select doc.id,register_time,birthday,consultation_fee,gender_code,gender_dic.value gender,name,specialty_code,specialty_dic.value specialty_name,title_code,title_dic.value title_name,hospital_id,date_format(register_time, 'yyyy-MM-dd') dt
from (select data.id,data.create_time register_time,data.birthday,data.consultation_fee,data.gender gender_code,concat(substr(data.name, 1, 1), regexp_replace(substr(data.name, 2), '.', '*')) name,data.specialty specialty_code,data.title title_code,data.hospital_idfrom ${APP}.ods_doctor_incwhere dt = '$do_date'and type = 'bootstrap-insert') docleft join (select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date'
) gender_dicon doc.gender_code = gender_dic.idleft join (select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date'
) specialty_dicon doc.specialty_code = specialty_dic.idleft join (select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date'
) title_dicon doc.title_code = title_dic.id;
"dwd_user_register_inc="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_user_register_incpartition (dt)
select data.id,data.create_time register_time,concat('*@', split(data.email, '@')[1]) email,if(data.telephone regexp '^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$',concat(substr(data.telephone, 1, 3), '*'), null) telephone,data.username,date_format(data.create_time, 'yyyy-MM-dd') dt
from ${APP}.ods_user_inc
where dt = '$do_date'and type = 'bootstrap-insert';
"dwd_user_patient_add_inc="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_user_patient_add_incpartition (dt)
select patient.id,add_time,birthday,gender_code,dic.value gender,name,user_id,date_format(add_time, 'yyyy-MM-dd') dt
from (select data.id,data.create_time add_time,data.birthday,data.gender gender_code,data.name,data.user_idfrom ${APP}.ods_patient_incwhere dt = '$do_date'and type = 'bootstrap-insert') patientleft join (select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date') dicon patient.gender_code = dic.id;
"dwd_interaction_review_inc="
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_interaction_review_incpartition (dt)
select data.id,data.update_time review_time,data.rating,data.doctor_id,data.patient_id,data.user_id,date_format(data.update_time, 'yyyy-MM-dd') dt
from ${APP}.ods_consultation_inc
where dt = '$do_date'and type = 'bootstrap-insert'and data.status = '207';
"case $1 indwd_trade_consultation_inc | dwd_trade_consultation_pay_suc_inc | dwd_trade_prescription_inc | dwd_trade_prescription_pay_suc_inc | dwd_doctor_register_inc | dwd_user_register_inc | dwd_user_patient_add_inc | dwd_interaction_review_inc)hive -e "${!1}";;"all")hive -e "$dwd_trade_consultation_inc$dwd_trade_consultation_pay_suc_inc$dwd_trade_prescription_inc$dwd_trade_prescription_pay_suc_inc$dwd_doctor_register_inc$dwd_user_register_inc$dwd_user_patient_add_inc$dwd_interaction_review_inc";;"*")echo "非法参数!!!";;
esac
添加权限
chmod +x ~/bin/medical_ods_to_dwd_init.sh
装载数据
medical_ods_to_dwd_init.sh all 2023-05-09
随便找张表,看看最后的日期是不是05-09
每日装载脚本
vim ~/bin/medical_ods_to_dwd.sh
#!/bin/bashAPP=medicalif [ -n $2 ]
then do_date=$2
elseecho "请传入日期参数!!!"exit
fidwd_trade_consultation_inc="
insert overwrite table ${APP}.dwd_trade_consultation_incpartition (dt = '$do_date')
select data.id,data.create_time consultation_time,data.consultation_fee,data.doctor_id,data.patient_id,data.user_id
from ${APP}.ods_consultation_inc
where dt = '$do_date'and type = 'insert';
"dwd_trade_consultation_pay_suc_inc="
insert overwrite table ${APP}.dwd_trade_consultation_pay_suc_incpartition (dt = '$do_date')
select data.id,data.update_time consultation_pay_suc_time,data.consultation_fee,data.doctor_id,data.patient_id,data.user_id
from ${APP}.ods_consultation_inc
where dt = '$do_date'and type = 'update'and data.status = '203';
"dwd_trade_prescription_inc="
insert overwrite table ${APP}.dwd_trade_prescription_incpartition (dt = '$do_date')
select detail.id,prescription_time,count,medicine_id,prescription_id,total_amount,consultation_id,doctor_id,patient_id
from (select data.id,data.create_time prescription_time,data.count,data.medicine_id,data.prescription_idfrom ${APP}.ods_prescription_detail_incwhere dt = '$do_date'and type = 'insert') detailleft join(select data.id,data.total_amount,data.consultation_id,data.doctor_id,data.patient_idfrom ${APP}.ods_prescription_incwhere dt = '$do_date'and type = 'insert') infoon detail.prescription_id = info.id;
"dwd_trade_prescription_pay_suc_inc="
insert overwrite table ${APP}.dwd_trade_prescription_pay_suc_incpartition (dt = '$do_date')
select detail.id,prescription_pay_suc_time,count,medicine_id,prescription_id,total_amount,consultation_id,doctor_id,patient_id
from (select data.id,data.count,data.medicine_id,data.prescription_idfrom ${APP}.ods_prescription_detail_incwhere (dt = '$do_date'or dt = date_add('$do_date', -1))and (type = 'bootstrap-insert' ortype = 'insert')) detailjoin(select data.id,data.total_amount,data.update_time prescription_pay_suc_time,data.consultation_id,data.doctor_id,data.patient_idfrom ${APP}.ods_prescription_incwhere dt = '$do_date'and type = 'update'and data.status = '203') infoon detail.prescription_id = info.id;
"dwd_doctor_register_inc="
insert overwrite table ${APP}.dwd_doctor_register_incpartition (dt = '$do_date')
select doc.id,register_time,birthday,consultation_fee,gender_code,gender_dic.value gender,name,specialty_code,specialty_dic.value specialty_name,title_code,title_dic.value title_name,hospital_id
from (select data.id,data.create_time register_time,data.birthday,data.consultation_fee,data.gender gender_code,concat(substr(data.name, 1, 1), regexp_replace(substr(data.name, 2), '.', '*')) name,data.specialty specialty_code,data.title title_code,data.hospital_idfrom ${APP}.ods_doctor_incwhere dt = '$do_date'and type = 'insert') docleft join (select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date'
) gender_dicon doc.gender_code = gender_dic.idleft join (select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date'
) specialty_dicon doc.specialty_code = specialty_dic.idleft join (select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date'
) title_dicon doc.title_code = title_dic.id;
"dwd_user_register_inc="
insert overwrite table ${APP}.dwd_user_register_incpartition (dt = '$do_date')
select data.id,data.create_time register_time,concat('*@', split(data.email, '@')[1]) email,if(data.telephone regexp '^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$',concat(substr(data.telephone, 1, 3), '*'), null) telephone,data.username
from ${APP}.ods_user_inc
where dt = '$do_date'and type = 'insert';
"dwd_user_patient_add_inc="
insert overwrite table ${APP}.dwd_user_patient_add_incpartition (dt = '$do_date')
select patient.id,add_time,birthday,gender_code,dic.value gender,name,user_id
from (select data.id,data.create_time add_time,data.birthday,data.gender gender_code,data.name,data.user_idfrom ${APP}.ods_patient_incwhere dt = '$do_date'and type = 'insert') patientleft join (select id,valuefrom ${APP}.ods_dict_fullwhere dt = '$do_date') dicon patient.gender_code = dic.id;
"dwd_interaction_review_inc="
insert overwrite table ${APP}.dwd_interaction_review_incpartition (dt = '$do_date')
select data.id,data.update_time review_time,data.rating,data.doctor_id,data.patient_id,data.user_id
from ${APP}.ods_consultation_inc
where dt = '$do_date'and type = 'update'and data.status = '207';
"case $1 indwd_trade_consultation_inc | dwd_trade_consultation_pay_suc_inc | dwd_trade_prescription_inc | dwd_trade_prescription_pay_suc_inc | dwd_doctor_register_inc | dwd_user_register_inc | dwd_user_patient_add_inc | dwd_interaction_review_inc)hive -e "${!1}";;"all")hive -e "$dwd_trade_consultation_inc$dwd_trade_consultation_pay_suc_inc$dwd_trade_prescription_inc$dwd_trade_prescription_pay_suc_inc$dwd_doctor_register_inc$dwd_user_register_inc$dwd_user_patient_add_inc$dwd_interaction_review_inc";;"*")echo "非法参数!!!";;
esac
添加权限
chmod +x ~/bin/medical_ods_to_dwd.sh
总结
数仓开发的代码太多了,还是分两次记录吧。