学习大数据DAY61 宽表加工

目录

模型设计

加工宽表

任务调度:


大表 - 把很多数据整合起来
方便后续的明细查询和指标计算

模型设计

设计 建模
设计: excel 文档去编写
建模: 使用建模工具 PowerDesigner Navicat 在线画图工具... 把表结构给绘
制出来
共享\项目课工具\pd

加工宽表

数据层 DWS 层
dws_lijinquan.dws_xbd_mxm_memberinfo_dim_t
dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t:
Python:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime
import os
import re
# 宽表加工
# pyspark + spark sql
# 宽表加工
# pyspark + spark sql
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
# 会员检测临时表
def do_member_tmp_check():
sql = '''
SELECT
member_id AS member, -- 会员卡号
MIN(CASE WHEN detect_time = max_detect_time THEN erp_code END)
AS rec_detect_store, -- 最近检测门店
max(detect_time) AS rec_detect_date, -- 最近检测时间
count(1) AS check_count, -- 累计检测次数
min(substr(detect_time,1,10)) AS filing_date, -- 建档时间
min(CASE WHEN detect_time = min_detect_time THEN erp_code END)
AS store_name, -- 建档门店名称
max(extend) AS is_anamnesis, -- 有无既往病史
CASE WHEN COUNT(bec_chr_mbr_date) > 0 THEN 1 ELSE 0 END AS
is_chr_mbr -- 是否特慢病会员
FROM (
SELECT
*,
MIN(detect_time) OVER (PARTITION BY member_id) AS
min_detect_time,
MAX(detect_time) OVER (PARTITION BY member_id) AS
max_detect_time
FROMchange_shihaihong.his_chronic_patient_info_new
)
GROUP BY member_id
'''
df = spark.sql(sql)
df.show()
# 保存到 hive: change_shihaihong.member_check
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/member_check")
os.system("""
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
""")
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/member_check")
\
.saveAsTable("change_shihaihong.member_check")
print("写入 hive 表成功")
# 会员订单情况临时表
def do_member_tmp_sale():
@F.udf()
def handle_pay_fav_type(val1,val2,val3):
payments = {
"银行卡": val1,
"手机支付": val2,
"现金": val3
}
payment_tuples = list(payments.items())
payment_tuples.sort(key=lambda x: (-x[1], x[0])) #使用
负值来确保从大到小排序
result_strings = [method for method, _ in payment_tuples]
# 使用>符号连接字符串result = '>'.join(result_strings)
return result
sql='''
select
m.member, -- 会员卡号
count(1) as order_total, -- 总订单数
round(sum(m.precash),2) as order_amount, -- 消费总额
max(m.starttime) as last_order_date, -- 最后一单日期
min(m.starttime) as first_order_date, -- 首单日期
count(case when m.starttime >= date_sub('2018-01-01',
30) then 1 end) as order_30, -- 30 天订单量
count(case when m.starttime >= date_sub('2018-01-01',
90) then 1 end) as order_90, -- 90 天订单量
sum(case when m.starttime >= date_sub('2018-01-01', 30)
then round(m.precash,2) else 0 end) as amount_30, -- 30 天消费金
额
sum(case when m.starttime >= date_sub('2018-01-01', 90)
then round(m.precash,2) else 0 end) as amount_90, -- 90 天消费金
额
sum(case when m.starttime >= date_sub('2018-01-01',
180) then round(m.precash,2) else 0 end) as amount_180, -- 180
天消费金额
count(case when eusp.paytype = '银行卡 ' then 1 end) as
bank_count,
count(case when eusp.paytype = '手机支付' then 1 end)
as credit_card_count,
count(case when eusp.paytype = '现金 ' then 1 end) as
cash_count,
'' as pay_fav_type
from change_shihaihong.erp_u_sale_m_inc_new m
left join change_shihaihong.erp_u_sale_pay_inc_new eusp
on m.saleno = eusp.saleno -- 确保连接条件是正确的
group by m.member
'''
df = spark.sql(sql)
df = df.withColumn("pay_fav_type",
handle_pay_fav_type("bank_count","credit_card_count","cash_cou
nt"))
df.drop("bank_count")
df.drop("credit_card_count")
df.drop("cash_count")
df.show()# 保存到 Hive: change_shihaihong.member_sale
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat", "parquet") \
.option("location",
"/zhiyun/shihaihong/change/member_sale") \
.saveAsTable("change_shihaihong.member_sale")
print("临时表保存成功")
print("写入 hive 表成功")
# 宽表加工
def do_member_table():
# 主表可以列最多的那个表
# 会员的订单情况可以用子查询统计出来, 也可以使用临时表
sql = '''
with t as (select * from
change_shihaihong.crm_user_base_info_his_new )
select
t.user_id as mbr_code, -- 会员编码
t.user_type as mbr_type, -- 会员类型
t.source as mbr_resource, -- 会员来源
m.memcardno as mbr_cardno, -- 会员卡号
t.erp_code as store_code, -- 注册门店编码
t.active_time as sto_reg_date, -- 门店注册日期
"" as reg_platform, -- 注册外部平台
"" as platform_reg_date, -- 外部平台注册时间
t.name as name, -- 姓名
t.sex as gender, -- 性别
t.birthday as birthdate, -- 出生日期
t.age as age, -- 年龄
t.id_card_no as mbr_id_card, -- 身份证号
t.social_insurance_no as social_security_no, -- 社保卡号
t.education as edu_background, -- 教育背景
t.job as profession, -- 职业
"未知" as is_marriage, -- 婚姻状况
"无" as have_children, -- 是否有孩
t.address as address, -- 通信地址
"" as region, -- 区域
m.province as province, -- 省
m.city as city, -- 城市t.last_subscribe_time as cancel_date, -- 注销时间
m.tel as phone, -- 联系电话
m.handset as cell_phone, -- 手机号
t.email as email, -- 邮箱
t.wechat as wechat, -- 微信账号
t.webo as weibo, -- 微博账号
"" as alipay, -- 支付宝账号
"" as app, -- APP 账号
sale.order_total as order_total, -- 总订单数
sale.order_amount as order_amount, -- 消费总额
sale.last_order_date as last_order_date, -- 最后一单日期
sale.first_order_date as first_order_date, -- 首单日期
sale.order_30 as order_30, -- 30 天订单量
sale.order_90 as order_90, -- 90 天订单量
sale.amount_30 as amount_30, -- 30 天消费金额
sale.amount_90 as amount_90, -- 90 天消费金额
sale.amount_180 as amount_180, -- 180 天消费金额
sale.pay_fav_type as pay_fav_type, -- 付款方式偏爱排行
g.groupname as group_name, -- 会员分组
"" as ware_buy_sort, -- 药品购买排行
m.ness as sickness_motion, -- 疾病关注
check.rec_detect_store as rec_detect_store, -- 最近检测门
店
check.rec_detect_date as rec_detect_date, -- 最近检测时间
check.check_count as check_count, -- 累计检测次数
check.filing_date as filing_date, -- 建档时间
check.store_name as store_name, -- 建档门店名称
check.is_anamnesis as is_anamnesis, -- 有无既往病史
check.is_chr_mbr as is_chr_mbr, -- 是否特慢病会员
current_timestamp as etl_time, -- ETL 时间
"ETL by qinyuxiao" as comments -- 备注信息
from t
left join change_shihaihong.erp_u_memcard_reg_full_new m on
m.scrm_userid = t.user_id
left join change_shihaihong.member_sale sale on sale.member =
m.memcardno
left join change_shihaihong.member_check check on
check.member = m.memcardno
left join dwd_qinyuxiao.erp_c_memcard_class_group g on
sale.order_amount >=g.lg and sale.order_amount < g.gt
'''
df = spark.sql(sql)df.show()
# 保存
# 保存到 hive: dws_xbd_mxm_memberinfo_dim_t
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/dws/dws_xbd_mxm_memberinfo_dim_t")
os.system("""
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists dws_shihaihong location
"/zhiyun/shihaihong/dws";
'
""")
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/dws/dws_xbd_mxm_memb
erinfo_dim_t"). \
saveAsTable("dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t")
print("写入 hive 表成功")
# 验证数据
# 注意总数据量应该跟 CRM 表一致 168W 整 多一条都不行
# 计算原表的总记录数
original_count_sql = "select count(1) from
change_shihaihong.crm_user_base_info_his_new"
original_count =
spark.sql(original_count_sql).collect()[0][0]
print(f"原表总记录数: {original_count}")
# 计算新表的总记录数
new_count_sql = "select count(1) from
dws_shihaihong.dws_xbd_mxm_memberinfo_dim_t"
new_count = spark.sql(new_count_sql).collect()[0][0]
print(f"新表总记录数: {new_count}")
do_member_tmp_check()
do_member_tmp_sale()
do_member_table()
print("宽表加工完成")# 部署

任务调度:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/61121.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#.Net筑基-模式匹配汇总

01、模式匹配概述 从C#7开始支持的 模式匹配 语法&#xff08;糖&#xff0c;挺甜&#xff09;&#xff0c;可非常灵活的对数据进行条件匹配和提取&#xff0c;经过多个版本的完善&#xff0c;已经非常强大了。 C# 支持多种模式&#xff0c;包括声明、类型、常量、关系、属性…

Python蓝桥杯刷题1

1.确定字符串是否包含唯一字符 题解&#xff1a;调用count函数计算每一个字符出现的次数&#xff0c;如果不等于1就输出no&#xff0c;并且结束循环&#xff0c;如果等于1就一直循环直到计算到最后一个字符&#xff0c;若最后一个字符也满足条件&#xff0c;则输出yes import…

Unity类银河战士恶魔城学习总结(P127 Stat ToolTip属性提示)

【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili 教程源地址&#xff1a;https://www.udemy.com/course/2d-rpg-alexdev/ 本章节实现了把鼠标放到属性上面就会显示属性的作用 UI_StatToolTip.cs 这段代码实现了一个UI提示框&#xff08;ToolTip&#xff09;功能…

计算机编程中的事件驱动编程模型及其在构建响应式用户界面中的应用

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 计算机编程中的事件驱动编程模型及其在构建响应式用户界面中的应用 计算机编程中的事件驱动编程模型及其在构建响应式用户界面中…

ROS第九梯:ROS+VSCode+Python+C++自定义消息发布和订阅

首先,Python版本的ROS项目和C++版本的ROS项目前期创建功能包的步骤基本一致,具体可参考第二章。 费一步:新建msg文件 在功能包(data_input)目录下创建一个msg文件夹,并在msg文件夹下创建一个名为Box的msg文件,具体如下图所示: 该msg文件为一个用于描述3D Box的文件,…

selenium元素定位---元素点击交互异常解决方法

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 1、异常原因 在编写ui自动化时&#xff0c;执行报错元素无法点击&#xff1a;ElementClickInterceptedException 具体报错&#xff1a;selenium.common.exc…

Front Panel Window Bounds 与 Front Panel Window Bounds 的区别与应用

在LabVIEW中&#xff0c;Front Panel Window Bounds 和 Front Panel WindowBounds 是两个不同的属性节点&#xff0c;用于描述前面板窗口的位置和大小。它们的区别主要体现在它们表示的是窗口的不同部分&#xff0c;具体如下&#xff1a; 1 Window Bounds&#xff1a;调整整个…

H.265流媒体播放器EasyPlayer.js播放器出现加载视频等待画面时长过长的原因排查

在数字媒体时代&#xff0c;用户体验是衡量播放器性能的关键指标之一。EasyPlayer.js网页web无插件播放器作为一款流行的Web视频播放器&#xff0c;其加载速度和响应时间直接影响着用户的观看体验。 1、问题描述 加载视频等待画面时长过长。 2、可能的原因&#xff1a; 检查下…

计算机网络-MSTP基础实验一(单域多实例)

前面我们已经大致了解了MSTP的基本概念和工作原理&#xff0c;但是我自己也觉得MSTP的理论很复杂不结合实验是很难搞懂的&#xff0c;今天来做一个配套的小实验以及一些配置命令。 一、网络拓扑 单域多实例拓扑 基本需求&#xff1a;SW1为VLAN10的网关&#xff0c;SW2为VLAN20的…

大数据-227 离线数仓 - Flume 自定义拦截器(续接上节) 采集启动日志和事件日志

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇开始了&#xff01; 目前开始更新 MyBatis&#xff0c;一起深入浅出&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff0…

【langchain4j】AIservices能够实现更加灵活的chain

文章目录 AI service介绍如何工作的AiServices提供的能力支持的返回形式 简单的例子&#xff1a;接收用户消息&#xff0c;并按规定返回接收单个变量接收更多动态变量 advanced RAGChaining multiple AI Services&#xff1a;多个AiSerives合并到一起相关教程&#xff1a;[Lang…

【UGUI】背包的交互01(道具信息跟随鼠标+道具信息面板显示)

详细程序逻辑过程 初始化物品栏&#xff1a; 在 Awake 方法中&#xff0c;通过标签找到提示框和信息面板。 循环生成10个背包格子&#xff0c;并为每个格子设置图标和名称。 为每个格子添加 UInterMaager232 脚本&#xff0c;以便处理交互事件。 关闭提示框和信息面板&#…

同步互斥相关习题10道 附详解

PV操作 2016 某系统允许最多10个进程同时读文件F&#xff0c;当同时读文件F的进程不满10个时&#xff0c;欲读该文件的其他文件可立即读&#xff0c;当已有10个进程在读文件F时读&#xff0c;其他欲读文件F的进程必须等待&#xff0c;直至有进程读完后退出方可去读 在实现管…

Postman之数据提取

Postman之数据提取 1. 提取请求头\request中的数据2. 提取响应消息\response中的数据3. 通过正在表达式提取4. 提取cookies数据 本文主要讲解利用pm对象对数据进行提取操作&#xff0c;虽然postman工具的页面上也提供了一部分的例子&#xff0c;但是实际使用时不是很全面&#…

【专题】数据库原理与应用之故障恢复

1. 数据库故障恢复概述 数据库的可恢复性&#xff1a; DBMS能把数据库从被破坏、不正确的状态、恢复到最近一个正确的状态。 恢复管理任务的种类&#xff1a; 一是在未发生故障而系统正常运行时&#xff0c;采取一些必要措施为恢复工作打基础。 二是在发生故障后进行恢复处…

EXCEL 或 WPS 列下划线转驼峰

使用场景&#xff1a; 需要将下划线转驼峰&#xff0c;直接在excel或wps中第一行使用公式&#xff0c;然后快速刷整个列格式即可。全列工下划线转为格式&#xff0c;使用效果如下&#xff1a; 操作步骤&#xff1a; 第一步&#xff1a;在需要显示驼峰的一列&#xff0c;复制以…

MODBUS TCP转CANOpen网关

Modbus TCP转CANopen网关 型号&#xff1a;SG-TCP-COE-210 产品用途 本网关可以实现将CANOpen接口设备连接到MODBUS TCP网络中&#xff1b;并且用户不需要了解具体的CANOpen和Modbus TCP 协议即可实现将CANOpen设备挂载到MODBUS TCP接口的 PLC上&#xff0c;并和CANOpen设备…

分布式----Ceph部署

目录 一、存储基础 1.1 单机存储设备 1.2 单机存储的问题 1.3 商业存储解决方案 1.4 分布式存储&#xff08;软件定义的存储 SDS&#xff09; 1.5 分布式存储的类型 二、Ceph 简介 三、Ceph 优势 四、Ceph 架构 五、Ceph 核心组件 #Pool中数据保存方式支持两种类型&…

自动驾驶系列—面向自动驾驶的模型迭代:工具、平台与最佳实践

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

Spring Boot3自定义starter

1、加入必要依赖 plugins {id javaid org.springframework.boot version 3.2.6id io.spring.dependency-management version 1.1.5 } group org.example.test.starter version 1.1.0jar{enabledtrue// resolveMainClassName }java {toolchain {languageVersion JavaLanguage…