系列文章目录
线上问诊:业务数据采集
线上问诊:数仓数据同步
线上问诊:数仓开发(一)
线上问诊:数仓开发(二)
线上问诊:数仓开发(三)
文章目录
- 系列文章目录
- 前言
- 一、ADS
- 1.交易主题
- 1.交易综合统计
- 2.各医院交易统计
- 3.各性别患者交易统计
- 4.各年龄段患者交易统计
- 2.医生主题
- 1.医生变动统计
- 3.用户主题
- 1.用户变动统计
- 4.评价主题
- 1.评价综合统计
- 2.各医院评价统计
- 5.数据装载脚本
- 一、报表数据导出
- 1.MySQL建库建表
- 1.创建数据库
- 2.创建表
- 2.数据导出
- 1.DataX配置文件生成脚本
- 2.执行配置文件生成器
- 3.编写每日导出脚本
- 总结
前言
这次我们继续进行数仓的开发,应该能写完。
一、ADS
1.交易主题
1.交易综合统计
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS ads_trade_stats
(`dt` STRING COMMENT '统计日期',`recent_days` BIGINT COMMENT '统计周期: 最近1,7,30日',`consultation_amount` DECIMAL(16, 2) COMMENT '问诊金额',`consultation_count` BIGINT COMMENT '问诊次数',`consultation_pay_suc_amount` DECIMAL(16, 2) COMMENT '问诊支付成功金额',`consultation_pay_suc_count` BIGINT COMMENT '问诊支付成功次数',`prescription_amount` DECIMAL(16, 2) COMMENT '处方金额',`prescription_count` BIGINT COMMENT '处方次数',`prescription_pay_suc_amount` DECIMAL(16, 2) COMMENT '处方支付成功金额',`prescription_pay_suc_count` BIGINT COMMENT '处方支付成功次数'
) COMMENT '交易综合统计'ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LOCATION '/warehouse/medical/ads/ads_trade_stats';
2.各医院交易统计
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS ads_hospital_trade_stats
(`dt` STRING COMMENT '统计日期',`recent_days` BIGINT COMMENT '统计周期: 最近1,7,30日',`hospital_id` STRING COMMENT '医院ID',`hospital_name` STRING COMMENT '医院名称',`consultation_amount` DECIMAL(16, 2) COMMENT '问诊金额',`consultation_count` BIGINT COMMENT '问诊次数',`consultation_pay_suc_amount` DECIMAL(16, 2) COMMENT '问诊支付成功金额',`consultation_pay_suc_count` BIGINT COMMENT '问诊支付成功次数',`prescription_amount` DECIMAL(16, 2) COMMENT '处方金额',`prescription_count` BIGINT COMMENT '处方次数',`prescription_pay_suc_amount` DECIMAL(16, 2) COMMENT '处方支付成功金额',`prescription_pay_suc_count` BIGINT COMMENT '处方支付成功次数'
) COMMENT '各医院交易统计'ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LOCATION '/warehouse/medical/ads/ads_hospital_trade_stats';
3.各性别患者交易统计
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS ads_gender_trade_stats
(`dt` STRING COMMENT '统计日期',`recent_days` BIGINT COMMENT '统计周期: 最近1,7,30日',`gender_code` STRING COMMENT '患者性别编码',`gender` STRING COMMENT '患者性别',`consultation_amount` DECIMAL(16, 2) COMMENT '问诊金额',`consultation_count` BIGINT COMMENT '问诊次数',`consultation_pay_suc_amount` DECIMAL(16, 2) COMMENT '问诊支付成功金额',`consultation_pay_suc_count` BIGINT COMMENT '问诊支付成功次数',`prescription_amount` DECIMAL(16, 2) COMMENT '处方金额',`prescription_count` BIGINT COMMENT '处方次数',`prescription_pay_suc_amount` DECIMAL(16, 2) COMMENT '处方支付成功金额',`prescription_pay_suc_count` BIGINT COMMENT '处方支付成功次数'
) COMMENT '各性别患者交易统计'ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LOCATION '/warehouse/medical/ads/ads_gender_trade_stats';
4.各年龄段患者交易统计
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS ads_age_group_trade_stats
(`dt` STRING COMMENT '统计日期',`recent_days` BIGINT COMMENT '统计周期: 最近1,7,30日',`age_group` STRING COMMENT '患者年龄段',`consultation_amount` DECIMAL(16, 2) COMMENT '问诊金额',`consultation_count` BIGINT COMMENT '问诊次数',`consultation_pay_suc_amount` DECIMAL(16, 2) COMMENT '问诊支付成功金额',`consultation_pay_suc_count` BIGINT COMMENT '问诊支付成功次数',`prescription_amount` DECIMAL(16, 2) COMMENT '处方金额',`prescription_count` BIGINT COMMENT '处方次数',`prescription_pay_suc_amount` DECIMAL(16, 2) COMMENT '处方支付成功金额',`prescription_pay_suc_count` BIGINT COMMENT '处方支付成功次数'
) COMMENT '各年龄段患者交易统计'ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LOCATION '/warehouse/medical/ads/ads_age_group_trade_stats';
数据装载
2.医生主题
1.医生变动统计
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS ads_doctor_change_stats(`dt` STRING COMMENT '统计日期',`recent_days` BIGINT COMMENT '统计周期: 最近1,7,30日',`new_doctor_count` BIGINT COMMENT '新增医生数',`activated_doctor_count` BIGINT COMMENT '激活医生数',`active_doctor_count` BIGINT COMMENT '活跃医生数'
) COMMENT '医生变动统计'ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LOCATION '/warehouse/medical/ads/ads_doctor_change_stats';
3.用户主题
1.用户变动统计
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS ads_user_change_stats(`dt` STRING COMMENT '统计日期',`recent_days` BIGINT COMMENT '统计周期: 最近1,7,30日',`new_user_count` BIGINT COMMENT '新增用户数',`new_patient_count` BIGINT COMMENT '新增患者数'
) COMMENT '用户变动统计'ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LOCATION '/warehouse/medical/ads/ads_user_change_stats';
4.评价主题
1.评价综合统计
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS ads_review_stats(`dt` STRING COMMENT '统计日期',`review_user_count` BIGINT COMMENT '评价人数',`review_count` BIGINT COMMENT '评价次数',`good_review_rate` DECIMAL(16,2) COMMENT '好评率'
) COMMENT '用户变动统计'ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LOCATION '/warehouse/medical/ads/ads_review_stats';
2.各医院评价统计
建表语句
CREATE EXTERNAL TABLE IF NOT EXISTS ads_hospital_review_stats(`dt` STRING COMMENT '统计日期',`hospital_id` STRING COMMENT '医院ID',`hospital_name` STRING COMMENT '医院名称',`review_user_count` BIGINT COMMENT '评价人数',`review_count` BIGINT COMMENT '评价次数',`good_review_rate` DECIMAL(16,2) COMMENT '好评率'
) COMMENT '各医院评价统计'ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LOCATION '/warehouse/medical/ads/ads_hospital_review_stats';
5.数据装载脚本
vim ~/bin/medical_dws_to_ads.sh
#!/bin/bashAPP=medicalif [ -n $2 ]
then do_date=$2
else echo "请传入日期参数!!!"exit
fiads_trade_stats="
insert overwrite table ${APP}.ads_trade_stats
select dt,recent_days,consultation_amount,consultation_count,consultation_pay_suc_amount,consultation_pay_suc_count,prescription_amount,prescription_count,prescription_pay_suc_amount,prescription_pay_suc_count
from ${APP}.ads_trade_stats
union
select '$do_date' dt,consul.recent_days,consultation_amount,consultation_count,consultation_pay_suc_amount,consultation_pay_suc_count,prescription_amount,prescription_count,prescription_pay_suc_amount,prescription_pay_suc_count
from (select 1 recent_days,sum(consultation_amount) consultation_amount,sum(consultation_count) consultation_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_1dwhere dt = '$do_date'unionselect recent_days,sum(if(recent_days = 7, consultation_amount_7d, consultation_amount_30d)) consultation_amount,sum(if(recent_days = 7, consultation_count_7d, consultation_count_30d)) consultation_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days) consulleft join(select 1 recent_days,sum(consultation_pay_suc_amount) consultation_pay_suc_amount,sum(consultation_pay_suc_count) consultation_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_pay_suc_1dwhere dt = '$do_date'unionselect recent_days,sum(if(recent_days = 7, consultation_pay_suc_amount_7d,consultation_pay_suc_amount_30d)) consultation_pay_suc_amount,sum(if(recent_days = 7, consultation_pay_suc_count_7d,consultation_pay_suc_count_30d)) consultation_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_pay_suc_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days) consul_pay_sucon consul.recent_days = consul_pay_suc.recent_daysleft join(select 1 recent_days,sum(prescription_amount) prescription_amount,sum(prescription_count) prescription_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_1dwhere dt = '$do_date'unionselect recent_days,sum(if(recent_days = 7, prescription_amount_7d, prescription_amount_30d)) prescription_amount,sum(if(recent_days = 7, prescription_count_7d, prescription_count_30d)) prescription_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days) prescriptionon consul.recent_days = prescription.recent_daysleft join(select 1 recent_days,sum(prescription_pay_suc_amount) prescription_pay_suc_amount,sum(prescription_pay_suc_count) prescription_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_pay_suc_1dwhere dt = '$do_date'unionselect recent_days,sum(if(recent_days = 7, prescription_pay_suc_amount_7d,prescription_pay_suc_amount_30d)) prescription_pay_suc_amount,sum(if(recent_days = 7, prescription_pay_suc_count_7d,prescription_pay_suc_count_30d)) prescription_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_pay_suc_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days) prescription_pay_sucon consul.recent_days = prescription_pay_suc.recent_days;
"
ads_hospital_trade_stats="
insert overwrite table ${APP}.ads_hospital_trade_stats
select dt,recent_days,hospital_id,hospital_name,consultation_amount,consultation_count,consultation_pay_suc_amount,consultation_pay_suc_count,prescription_amount,prescription_count,prescription_pay_suc_amount,prescription_pay_suc_count
from ${APP}.ads_hospital_trade_stats
union
select '$do_date' dt,consul.recent_days,consul.hospital_id,consul.hospital_name,consultation_amount,consultation_count,consultation_pay_suc_amount,consultation_pay_suc_count,prescription_amount,prescription_count,prescription_pay_suc_amount,prescription_pay_suc_count
from (select 1 recent_days,hospital_id,hospital_name,sum(consultation_amount) consultation_amount,sum(consultation_count) consultation_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_1dwhere dt = '$do_date'group by hospital_id,hospital_nameunionselect recent_days,hospital_id,hospital_name,sum(if(recent_days = 7, consultation_amount_7d, consultation_amount_30d)) consultation_amount,sum(if(recent_days = 7, consultation_count_7d, consultation_count_30d)) consultation_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,hospital_id,hospital_name) consulleft join(select 1 recent_days,hospital_id,hospital_name,sum(consultation_pay_suc_amount) consultation_pay_suc_amount,sum(consultation_pay_suc_count) consultation_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_pay_suc_1dwhere dt = '$do_date'group by hospital_id,hospital_nameunionselect recent_days,hospital_id,hospital_name,sum(if(recent_days = 7, consultation_pay_suc_amount_7d,consultation_pay_suc_amount_30d)) consultation_pay_suc_amount,sum(if(recent_days = 7, consultation_pay_suc_count_7d,consultation_pay_suc_count_30d)) consultation_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_pay_suc_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,hospital_id,hospital_name) consul_pay_sucon consul.recent_days = consul_pay_suc.recent_daysand consul.hospital_id = consul_pay_suc.hospital_idand consul.hospital_name = consul_pay_suc.hospital_nameleft join(select 1 recent_days,hospital_id,hospital_name,sum(prescription_amount) prescription_amount,sum(prescription_count) prescription_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_1dwhere dt = '$do_date'group by hospital_id,hospital_nameunionselect recent_days,hospital_id,hospital_name,sum(if(recent_days = 7, prescription_amount_7d, prescription_amount_30d)) prescription_amount,sum(if(recent_days = 7, prescription_count_7d, prescription_count_30d)) prescription_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,hospital_id,hospital_name) prescriptionon consul.recent_days = prescription.recent_daysand consul.hospital_id = prescription.hospital_idand consul.hospital_name = prescription.hospital_nameleft join(select 1 recent_days,hospital_id,hospital_name,sum(prescription_pay_suc_amount) prescription_pay_suc_amount,sum(prescription_pay_suc_count) prescription_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_pay_suc_1dwhere dt = '$do_date'group by hospital_id,hospital_nameunionselect recent_days,hospital_id,hospital_name,sum(if(recent_days = 7, prescription_pay_suc_amount_7d,prescription_pay_suc_amount_30d)) prescription_pay_suc_amount,sum(if(recent_days = 7, prescription_pay_suc_count_7d,prescription_pay_suc_count_30d)) prescription_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_pay_suc_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,hospital_id,hospital_name) prescription_pay_sucon consul.recent_days = prescription_pay_suc.recent_daysand consul.hospital_id = prescription_pay_suc.hospital_idand consul.hospital_name = prescription_pay_suc.hospital_name;
"
ads_gender_trade_stats="
insert overwrite table ${APP}.ads_gender_trade_stats
select dt,recent_days,gender_code,gender,consultation_amount,consultation_count,consultation_pay_suc_amount,consultation_pay_suc_count,prescription_amount,prescription_count,prescription_pay_suc_amount,prescription_pay_suc_count
from ${APP}.ads_gender_trade_stats
union
select '$do_date' dt,consul.recent_days,consul.gender_code,consul.gender,consultation_amount,consultation_count,consultation_pay_suc_amount,consultation_pay_suc_count,prescription_amount,prescription_count,prescription_pay_suc_amount,prescription_pay_suc_count
from (select 1 recent_days,gender_code,gender,sum(consultation_amount) consultation_amount,sum(consultation_count) consultation_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_1dwhere dt = '$do_date'group by gender_code,genderunionselect recent_days,gender_code,gender,sum(if(recent_days = 7, consultation_amount_7d, consultation_amount_30d)) consultation_amount,sum(if(recent_days = 7, consultation_count_7d, consultation_count_30d)) consultation_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,gender_code,gender) consulleft join(select 1 recent_days,gender_code,gender,sum(consultation_pay_suc_amount) consultation_pay_suc_amount,sum(consultation_pay_suc_count) consultation_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_pay_suc_1dwhere dt = '$do_date'group by gender_code,genderunionselect recent_days,gender_code,gender,sum(if(recent_days = 7, consultation_pay_suc_amount_7d,consultation_pay_suc_amount_30d)) consultation_pay_suc_amount,sum(if(recent_days = 7, consultation_pay_suc_count_7d,consultation_pay_suc_count_30d)) consultation_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_pay_suc_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,gender_code,gender) consul_pay_sucon consul.recent_days = consul_pay_suc.recent_daysand consul.gender_code = consul_pay_suc.gender_codeand consul.gender = consul_pay_suc.genderleft join(select 1 recent_days,gender_code,gender,sum(prescription_amount) prescription_amount,sum(prescription_count) prescription_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_1dwhere dt = '$do_date'group by gender_code,genderunionselect recent_days,gender_code,gender,sum(if(recent_days = 7, prescription_amount_7d, prescription_amount_30d)) prescription_amount,sum(if(recent_days = 7, prescription_count_7d, prescription_count_30d)) prescription_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,gender_code,gender) prescriptionon consul.recent_days = prescription.recent_daysand consul.gender_code = prescription.gender_codeand consul.gender = prescription.genderleft join(select 1 recent_days,gender_code,gender,sum(prescription_pay_suc_amount) prescription_pay_suc_amount,sum(prescription_pay_suc_count) prescription_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_pay_suc_1dwhere dt = '$do_date'group by gender_code,genderunionselect recent_days,gender_code,gender,sum(if(recent_days = 7, prescription_pay_suc_amount_7d,prescription_pay_suc_amount_30d)) prescription_pay_suc_amount,sum(if(recent_days = 7, prescription_pay_suc_count_7d,prescription_pay_suc_count_30d)) prescription_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_pay_suc_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,gender_code,gender) prescription_pay_sucon consul.recent_days = prescription_pay_suc.recent_daysand consul.gender_code = prescription_pay_suc.gender_codeand consul.gender = prescription_pay_suc.gender;
"
ads_age_group_trade_stats="
insert overwrite table ${APP}.ads_age_group_trade_stats
select dt,recent_days,age_group,consultation_amount,consultation_count,consultation_pay_suc_amount,consultation_pay_suc_count,prescription_amount,prescription_count,prescription_pay_suc_amount,prescription_pay_suc_count
from ${APP}.ads_age_group_trade_stats
union
select '$do_date' dt,consul.recent_days,consul.age_group,consultation_amount,consultation_count,consultation_pay_suc_amount,consultation_pay_suc_count,prescription_amount,prescription_count,prescription_pay_suc_amount,prescription_pay_suc_count
from (select 1 recent_days,age_group,sum(consultation_amount) consultation_amount,sum(consultation_count) consultation_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_1dwhere dt = '$do_date'group by age_groupunionselect recent_days,age_group,sum(if(recent_days = 7, consultation_amount_7d, consultation_amount_30d)) consultation_amount,sum(if(recent_days = 7, consultation_count_7d, consultation_count_30d)) consultation_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,age_group) consulleft join(select 1 recent_days,age_group,sum(consultation_pay_suc_amount) consultation_pay_suc_amount,sum(consultation_pay_suc_count) consultation_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_pay_suc_1dwhere dt = '$do_date'group by age_groupunionselect recent_days,age_group,sum(if(recent_days = 7, consultation_pay_suc_amount_7d,consultation_pay_suc_amount_30d)) consultation_pay_suc_amount,sum(if(recent_days = 7, consultation_pay_suc_count_7d,consultation_pay_suc_count_30d)) consultation_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_consultation_pay_suc_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,age_group) consul_pay_sucon consul.recent_days = consul_pay_suc.recent_daysand consul.age_group = consul_pay_suc.age_groupleft join(select 1 recent_days,age_group,sum(prescription_amount) prescription_amount,sum(prescription_count) prescription_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_1dwhere dt = '$do_date'group by age_groupunionselect recent_days,age_group,sum(if(recent_days = 7, prescription_amount_7d, prescription_amount_30d)) prescription_amount,sum(if(recent_days = 7, prescription_count_7d, prescription_count_30d)) prescription_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,age_group) prescriptionon consul.recent_days = prescription.recent_daysand consul.age_group = prescription.age_groupleft join(select 1 recent_days,age_group,sum(prescription_pay_suc_amount) prescription_pay_suc_amount,sum(prescription_pay_suc_count) prescription_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_pay_suc_1dwhere dt = '$do_date'group by age_groupunionselect recent_days,age_group,sum(if(recent_days = 7, prescription_pay_suc_amount_7d,prescription_pay_suc_amount_30d)) prescription_pay_suc_amount,sum(if(recent_days = 7, prescription_pay_suc_count_7d,prescription_pay_suc_count_30d)) prescription_pay_suc_countfrom ${APP}.dws_trade_hospital_gender_age_group_prescription_pay_suc_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'group by recent_days,age_group) prescription_pay_sucon consul.recent_days = prescription_pay_suc.recent_daysand consul.age_group = prescription_pay_suc.age_group;
"
ads_doctor_change_stats="
insert overwrite table ${APP}.ads_doctor_change_stats
select dt,recent_days,new_doctor_count,activated_doctor_count,active_doctor_count
from ${APP}.ads_doctor_change_stats
union
select '$do_date' dt,new.recent_days,new_doctor_count,activated_doctor_count,active_doctor_count
from (select recent_days,count(*) new_doctor_countfrom ${APP}.dwd_doctor_register_inc lateral view explode(array(1, 7, 30)) tmp as recent_dayswhere dt >= date_add('$do_date', -recent_days + 1)group by recent_days) newleft join(select recent_days,count(*) activated_doctor_countfrom ${APP}.dws_trade_doctor_consultation_td lateral view explode(array(1, 7, 30)) tmp as recent_dayswhere dt = '$do_date'and first_consultation_dt >= date_add('$do_date', -recent_days + 1)group by recent_days) activatedon new.recent_days = activated.recent_daysleft join(select 1 recent_days,count(*) active_doctor_countfrom ${APP}.dws_trade_doctor_consultation_1dwhere dt = '$do_date'and consultation_count >= 2unionselect recent_days,count(*) active_doctor_countfrom ${APP}.dws_trade_doctor_consultation_nd lateral view explode(array(7, 30)) tmp as recent_dayswhere dt = '$do_date'and ((recent_days = 7 and consultation_count_7d >= 2)or (recent_days = 30 and consultation_count_30d >= 2))group by recent_days) activeon new.recent_days = active.recent_days;
"
ads_user_change_stats="
insert overwrite table ${APP}.ads_user_change_stats
select dt,recent_days,new_user_count,new_patient_count
from ${APP}.ads_user_change_stats
union
select '$do_date' dt,new_user.recent_days,new_user_count,new_patient_count
from (select recent_days,count(*) new_user_countfrom ${APP}.dwd_user_register_inc lateral view explode(array(1, 7, 30)) tmp as recent_dayswhere dt >= date_add('$do_date', -recent_days + 1)group by recent_days) new_userleft join(select recent_days,count(*) new_patient_countfrom ${APP}.dwd_user_patient_add_inc lateral view explode(array(1, 7, 30)) tmp as recent_dayswhere dt >= date_add('$do_date', -recent_days + 1)group by recent_days) new_patienton new_user.recent_days = new_patient.recent_days;
"
ads_review_stats="
insert overwrite table ${APP}.ads_review_stats
select dt,review_user_count,review_count,good_review_rate
from ${APP}.ads_review_stats
union
select '$do_date' dt,review_user_count,review_count,good_review_rate
from (select count(distinct user_id) review_user_countfrom ${APP}.dws_interaction_hospital_user_review_tdwhere dt = '$do_date') user_countleft join(select sum(review_count) review_count,sum(good_review_count) / sum(review_count) good_review_ratefrom ${APP}.dws_interaction_hospital_review_tdwhere dt = '$do_date') review_stats;
"
ads_hospital_review_stats="
insert overwrite table ${APP}.ads_hospital_review_stats
select dt,hospital_id,hospital_name,review_user_count,review_count,good_review_rate
from ${APP}.ads_hospital_review_stats
union
select '$do_date' dt,user_count.hospital_id,user_count.hospital_name,review_user_count,review_count,good_review_rate
from (select hospital_id,hospital_name,count(user_id) review_user_countfrom ${APP}.dws_interaction_hospital_user_review_tdwhere dt = '$do_date'group by hospital_id,hospital_name) user_countleft join(select hospital_id,hospital_name,review_count,good_review_count / review_count good_review_ratefrom ${APP}.dws_interaction_hospital_review_tdwhere dt = '$do_date') review_statson user_count.hospital_id = review_stats.hospital_idand user_count.hospital_name = review_stats.hospital_name;
"case $1 in ads_trade_stats | ads_hospital_trade_stats | ads_gender_trade_stats | ads_age_group_trade_stats | ads_doctor_change_stats | ads_user_change_stats | ads_review_stats | ads_hospital_review_stats)hive -e "${!1}";;"all")hive -e "$ads_trade_stats$ads_hospital_trade_stats$ads_gender_trade_stats$ads_age_group_trade_stats$ads_doctor_change_stats$ads_user_change_stats$ads_review_stats$ads_hospital_review_stats";;"*")echo "非法参数!!!";;
esac
数据装载
medical_dws_to_ads.sh all 2023-05-09
找个表看一下数据就行。
一、报表数据导出
1.MySQL建库建表
1.创建数据库
CREATE DATABASE IF NOT EXISTS medical_report DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
2.创建表
1.交易综合统计
CREATE TABLE `ads_trade_stats` (`dt` date NOT NULL COMMENT '统计日期',`recent_days` bigint NOT NULL COMMENT '统计周期: 最近1,7,30日',`consultation_amount` decimal(16,2) DEFAULT NULL COMMENT '问诊金额',`consultation_count` bigint DEFAULT NULL COMMENT '问诊次数',`consultation_pay_suc_amount` decimal(16,2) DEFAULT NULL COMMENT '问诊支付成功金额',`consultation_pay_suc_count` bigint DEFAULT NULL COMMENT '问诊支付成功次数',`prescription_amount` decimal(16,2) DEFAULT NULL COMMENT '处方金额',`prescription_count` bigint DEFAULT NULL COMMENT '处方次数',`prescription_pay_suc_amount` decimal(16,2) DEFAULT NULL COMMENT '处方支付成功金额',`prescription_pay_suc_count` bigint DEFAULT NULL COMMENT '处方支付成功次数',PRIMARY KEY (`dt`,`recent_days`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='交易综合统计';
2.各医院交易统计
CREATE TABLE `ads_hospital_trade_stats` (`dt` date NOT NULL COMMENT '统计日期',`recent_days` bigint NOT NULL COMMENT '统计周期: 最近1,7,30日',`hospital_id` varchar(255) NOT NULL COMMENT '医院ID',`hospital_name` varchar(255) NOT NULL COMMENT '医院名称',`consultation_amount` decimal(16,2) DEFAULT NULL COMMENT '问诊金额',`consultation_count` bigint DEFAULT NULL COMMENT '问诊次数',`consultation_pay_suc_amount` decimal(16,2) DEFAULT NULL COMMENT '问诊支付成功金额',`consultation_pay_suc_count` bigint DEFAULT NULL COMMENT '问诊支付成功次数',`prescription_amount` decimal(16,2) DEFAULT NULL COMMENT '处方金额',`prescription_count` bigint DEFAULT NULL COMMENT '处方次数',`prescription_pay_suc_amount` decimal(16,2) DEFAULT NULL COMMENT '处方支付成功金额',`prescription_pay_suc_count` bigint DEFAULT NULL COMMENT '处方支付成功次数',PRIMARY KEY (`dt`,`recent_days`,`hospital_id`,`hospital_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='各医院交易统计';
3.各性别患者交易统计
CREATE TABLE `ads_gender_trade_stats` (`dt` date NOT NULL COMMENT '统计日期',`recent_days` bigint NOT NULL COMMENT '统计周期: 最近1,7,30日',`gender_code` varchar(255) NOT NULL COMMENT '患者性别编码',`gender` varchar(255) NOT NULL COMMENT '患者性别',`consultation_amount` decimal(16,2) DEFAULT NULL COMMENT '问诊金额',`consultation_count` bigint DEFAULT NULL COMMENT '问诊次数',`consultation_pay_suc_amount` decimal(16,2) DEFAULT NULL COMMENT '问诊支付成功金额',`consultation_pay_suc_count` bigint DEFAULT NULL COMMENT '问诊支付成功次数',`prescription_amount` decimal(16,2) DEFAULT NULL COMMENT '处方金额',`prescription_count` bigint DEFAULT NULL COMMENT '处方次数',`prescription_pay_suc_amount` decimal(16,2) DEFAULT NULL COMMENT '处方支付成功金额',`prescription_pay_suc_count` bigint DEFAULT NULL COMMENT '处方支付成功次数',PRIMARY KEY (`dt`,`recent_days`,`gender_code`,`gender`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='各性别患者交易统计';
4.各年龄段患者交易统计
CREATE TABLE `ads_age_group_trade_stats` (`dt` date NOT NULL COMMENT '统计日期',`recent_days` bigint NOT NULL COMMENT '统计周期: 最近1,7,30日',`age_group` varchar(255) NOT NULL COMMENT '患者年龄段',`consultation_amount` decimal(16,2) DEFAULT NULL COMMENT '问诊金额',`consultation_count` bigint DEFAULT NULL COMMENT '问诊次数',`consultation_pay_suc_amount` decimal(16,2) DEFAULT NULL COMMENT '问诊支付成功金额',`consultation_pay_suc_count` bigint DEFAULT NULL COMMENT '问诊支付成功次数',`prescription_amount` decimal(16,2) DEFAULT NULL COMMENT '处方金额',`prescription_count` bigint DEFAULT NULL COMMENT '处方次数',`prescription_pay_suc_amount` decimal(16,2) DEFAULT NULL COMMENT '处方支付成功金额',`prescription_pay_suc_count` bigint DEFAULT NULL COMMENT '处方支付成功次数',PRIMARY KEY (`dt`,`recent_days`,`age_group`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='各年龄段患者交易统计';
5.医生变动统计
CREATE TABLE `ads_doctor_change_stats` (`dt` date NOT NULL COMMENT '统计日期',`recent_days` bigint NOT NULL COMMENT '统计周期: 最近1,7,30日',`new_doctor_count` bigint DEFAULT NULL COMMENT '新增医生数',`activated_doctor_count` bigint DEFAULT NULL COMMENT '激活医生数',`active_doctor_count` bigint DEFAULT NULL COMMENT '活跃医生数',PRIMARY KEY (`dt`,`recent_days`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='医生变动统计';
6.用户变动统计
CREATE TABLE `ads_user_change_stats` (`dt` date NOT NULL COMMENT '统计日期',`recent_days` bigint NOT NULL COMMENT '统计周期: 最近1,7,30日',`new_user_count` bigint DEFAULT NULL COMMENT '新增用户数',`new_patient_count` bigint DEFAULT NULL COMMENT '新增患者数',PRIMARY KEY (`dt`,`recent_days`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='用户变动统计';
7.评价综合统计
CREATE TABLE `ads_review_stats` (`dt` date NOT NULL COMMENT '统计日期',`review_user_count` bigint DEFAULT NULL COMMENT '评价人数',`review_count` bigint DEFAULT NULL COMMENT '评价次数',`good_review_rate` decimal(16,2) DEFAULT NULL COMMENT '好评率',PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='用户变动统计';
8.各医院评价统计
CREATE TABLE `ads_hospital_review_stats` (`dt` date NOT NULL COMMENT '统计日期',`hospital_id` varchar(255) NOT NULL COMMENT '医院ID',`hospital_name` varchar(255) NOT NULL COMMENT '医院名称',`review_user_count` bigint DEFAULT NULL COMMENT '评价人数',`review_count` bigint DEFAULT NULL COMMENT '评价次数',`good_review_rate` decimal(16,2) DEFAULT NULL COMMENT '好评率',PRIMARY KEY (`dt`,`hospital_id`,`hospital_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='各医院评价统计';
2.数据导出
1.DataX配置文件生成脚本
vim /opt/module/gen_datax_config/configuration.properties
mysql.username=root
mysql.password=000000
mysql.host=hadoop102
mysql.port=3306
mysql.database.import=medical
# 从HDFS导出进入的 MySQL 数据库名称
mysql.database.export=medical_report
mysql.tables.import=dict,doctor,hospital,medicine,patient,user
# MySQL 库中需要导出的表,空串表示导出库的所有表
mysql.tables.export=
is.seperated.tables=0
hdfs.uri=hdfs://hadoop102:8020
import_out_dir=/opt/module/datax/job/medical/import
# DataX 导出配置文件存放路径
export_out_dir=/opt/module/datax/job/medical/export
2.执行配置文件生成器
java -jar datax-config-generator-1.0-SNAPSHOT-jar-with-dependencies.jar
3.编写每日导出脚本
vim ~/bin/medical_hdfs_to_mysql.sh
#!/bin/bashDATAX_HOME=/opt/module/dataxhandle_path(){for file in `hadoop fs -ls -R $1 | awk '{print $8}'`dohadoop fs -test -z $fileif [[ $? -eq 0 ]]then echo "文件 $file 大小为零,正在删除..."hadoop fs -rm -f -r $filefidone
}export_data(){export_dir=$1datax_config=$2echo "正在校验目录 $export_dir ..."handle_path $export_dircount=`hadoop fs -count $export_dir | awk '{print $2}'`if [[ $count -eq 0 ]]then echo "目录为空,跳过"elseecho "正在处理目录 $export_dir ..."$DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_config >$DATAX_HOME/job/medical/export.log 2>&1if [[ $? -ne 0 ]]then echo "执行出错,日志如下 ..."cat $DATAX_HOME/job/medical/export.logfifi
}case $1 inads_trade_stats | ads_hospital_trade_stats | ads_gender_trade_stats | ads_age_group_trade_stats | ads_doctor_change_stats | ads_user_change_stats | ads_review_stats | ads_hospital_review_stats)export_data /warehouse/medical/ads/$1 $DATAX_HOME/job/medical/export/medical_report.$1.json;;"all")for tab in ads_trade_stats ads_hospital_trade_stats ads_gender_trade_stats ads_age_group_trade_stats ads_doctor_change_stats ads_user_change_stats ads_review_stats ads_hospital_review_statsdo export_data /warehouse/medical/ads/${tab} $DATAX_HOME/job/medical/export/medical_report.${tab}.jsondone;;"*")echo "非法参数!!!";;
esac
添加权限
chmod +x ~/bin/medical_hdfs_to_mysql.sh
数据装载
medical_hdfs_to_mysql.sh all
总结
数仓开发到这里就结束了。