hive left outer join 子查询临时表_基于历史数据的用户访问次数,每天新老用户,日活,周活,月活的hive计算...

最近有一个需求,统计每天的新老用户,日活,周活,月活。

我们每天的增量数据会加入到hive历史数据表中,包含用户访问网站的一些信息,字段有很多,包括用户唯一标识guid。

当然了日活,周活,月活就是一个count(distinct(guid))语句,非常常用的sql。

但是这里的问题是:

A:每天的新老用户应该怎么统计呢?
B:这还不简单,判断用户guid是否存在与历史库guid中嘛?
A:历史数据几十个T,大概一百亿行,你要每天将当日数据(2~3亿行)与历史数据几亿行进行join判断?
B:额,这个,这个,好像不行哦!

是的,历史数据里面是用户网站访问行为,同一个用户在同一天,不同的天都有可能出现,guid在历史表中会有多次。如果直接join,性能很差,实际上是做了很多不必要的工作。

解决方案:

维护一张用户表,里面有4列:guid, starttime, endtime, num,分别是用户的guid,第一次访问时间,最后一次访问时间,访问天数;
从某个状态开始,历史表中guid是唯一的;
当天数据去重后,与历史库join,如果guid在历史库出现过,则将endtime更新为当天时间,num加一;
否则,这是一个新用户,插入历史库,starttime, endtime都为当天时间,num初始值为1。

维护了这么一张用户表后,接下来就可以写hql统计业务了,计算当天新老用户时,只需要与这个历史库进行join就行了(目前为止4千万),当日guid去重后是1千多万,这样就是4千万~1千万的join了,与开始4千万~100亿的join,性能会有巨大提升。

hive历史表的设计与hive相关配置

可以看到这里hive历史表history_helper需要频繁修改,hive表支持数据修改需要在${HIVE_HOME}/conf/hive-site.xml中添加事务支持:

<property><name>hive.support.concurrency</name><value>true</value>
</property>
<property><name>hive.exec.dynamic.partition.mode</name><value>nonstrict</value>
</property>
<property><name>hive.txn.manager</name><value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property><name>hive.compactor.initiator.on</name><value>true</value>
</property>
<property><name>hive.compactor.worker.threads</name><value>1</value>
</property>

为了提高查询速度,hive历史表与增量表这里都分桶,hive-xite.xml配置:

<property><name>hive.enforce.bucketing</name><value>true</value>
</property>

为了提高reduce并行度,也设置一下:

set mapred.reduce.tasks = 50;

这个最好在hive命令行配置,表明只在当前程序使用该配置,就不要配置配置文件了。

历史库建表语句:

create external table if not exists hm2.history_helper
(guid string,starttime string,endtime string,num int
)
clustered by(guid) into 50 buckets
stored as orc TBLPROPERTIES ("transactional"="true");

当天增量表,保存去重后的guid,建表语句:

create table if not exists hm2.daily_helper
(guid string,dt string
)
clustered by(guid) into 50 buckets
stored as orc TBLPROPERTIES ("transactional"="true");

思路

由于这种需要写成定时模式,所以这里用python脚本来实现,将hive查询结果保存到本地文件result.txt,然后python读取result.txt,连接数据库,保存当天的查询结果。

代码

helper.py

#!/usr/bin/python
# -*- coding:utf-8 -*-# hive更新历史用户表,日常查询,保存到MySQLimport sys
import datetime
import commands
import MySQLdb# 获取起始中间所有日期
def getDays(starttime,endtime,regx):datestart=datetime.datetime.strptime(starttime,regx)dateend=datetime.datetime.strptime(endtime,regx)days = []while datestart<=dateend:days.append(datestart.strftime(regx))datestart+=datetime.timedelta(days=1)return days# 获得指定时间的前 n 天的年、月、日,n取负数往前,否则往后
def getExacYes(day, regx, n):return (datetime.datetime.strptime(day,regx) + datetime.timedelta(days=n)).strftime(regx)# 获得距离现在天数的年、月、日,n 取值正负含义同上,昨天就是getYes(regx,-1)
def getYes(regx, n):now_time = datetime.datetime.now()yes_time = now_time + datetime.timedelta(days=n)yes_time_nyr = yes_time.strftime(regx)return yes_time_nyr# 执行hive命令
def execHive(cmd):print cmdres = commands.getstatusoutput(cmd)return res# 获得当前是星期几
def getWeek(regx):now_time = datetime.datetime.now()week = now_time.strftime(regx)return week# 格式化日期,加上双引号
def formatDate(day):return """ + day + """# 数据保存到mysql
def insertMysql(dt, path, tbName, regx):# new, dayAll, stayvalues = []with open(path) as file:line = file.readline()while line:values.append(line.strip())line = file.readline()dayAll = int(values[1])new = float(values[0])/dayAllold = 1 - new# 获取数据库连接conn = MySQLdb.connect("0.0.0.0", "statistic", "123456", "statistic")# 获取游标cursor = conn.cursor()# 查询昨天的用户人数yesDay = getExacYes(dt, regx, -1)sql = 'select dayAll from %s where dt = %s'%(tbName, formatDate(yesDay))try:cursor.execute(sql)except Exception as e:print eyesAll = int(cursor.fetchall()[0][0])stay = float(values[2]) / yesAllprint stay# 获取游标cursor2 = conn.cursor()sql = 'insert into  %svalues("%s",%f,%f,%f,%d)'%(tbName, dt, new, old, stay, dayAll)print sqltry:cursor2.execute(sql)conn.commit()except:conn.rollback()finally:conn.close()# 初始化,删除临时表,并且创建
def init():# 设置分桶环境cmd = 'source /etc/profile;hive -e 'set hive.enforce.bucketing = true;set mapred.reduce.tasks = 50;''(status,result) = execHive(cmd)# 清除当天的临时表,结果保存cmd = 'source /etc/profile;hive -e 'drop table hm2.daily_helper;''(status,result) = execHive(cmd)if status == 0:print '%s昨天临时表删除完毕...'%(day)else:print resultsys.exit(1)cmd = 'source /etc/profile;hive -e 'create table if not exists hm2.daily_helper(guid string,dt string)clustered by(guid) into 50 buckets stored as orc TBLPROPERTIES ("transactional"="true");''(status,result) = execHive(cmd)if status == 0:print '%s临时表创建完毕...'%(day)else:print resultsys.exit(1)# 主函数入口
if __name__ == '__main__':regx = '%Y-%m-%d'resultPath = '/home/hadoop/statistic/flash/helper/result.txt'days = getDays('2018-07-01','2018-07-20',regx)tbName = 'statistic_flash_dailyActive_helper'for day in days:init()# 当天数据去重后保存到临时表daily_helpercmd = 'source /etc/profile;hive -e 'insert into hm2.daily_helper select distinct(guid),dt from hm2.helper where dt = "%s" and guid is not null;''%(day)print '%s数据正在导入临时表...'%(day)(status,result) = execHive(cmd)if status == 0:print '%s数据导入临时表完毕...'%(day)else:print resultsys.exit(1)# guid存在则更新 endtime 与 numcmd = 'source /etc/profile;hive -e 'update hm2.history_helper set endtime = "%s",num = num + 1 where guid in (select guid from hm2.daily_helper);''%(day)print '正在更新endtime 与 num...'(status,result) = execHive(cmd)if status == 0:print '%s history_helper数据更新完毕'%(day)else :print resultsys.exit(1)# 当天新用户cmd = 'source /etc/profile;hive -e 'select count(1) from hm2.daily_helper where guid not in (select guid from hm2.history_helper);' > %s'%(resultPath)(status,result) = execHive(cmd)if status != 0:print resultsys.exit(1)# 不存在插入cmd = 'source /etc/profile;hive -e 'insert into hm2.history_helperselect daily.guid,dt,dt,1 from hm2.daily_helper dailywhere daily.guid not in (select guid from hm2.history_helper where guid is not null);''print '正在插入数据到history_helper表...'(status,result) = execHive(cmd)if status == 0:print '%s数据插入hm2.history_helper表完成'%(day)else:print resultsys.exit(1)# 当天总人数cmd = 'source /etc/profile;hive -e 'select count(1) from hm2.daily_helper;' >> %s'%(resultPath)(status,result) = execHive(cmd)if status != 0:print resultsys.exit(1)# 次日活跃留存cmd = 'source /etc/profile;hive -e 'select count(1) from(select guid from hm2.helper where dt = "%s" group by guid) yesinner join(select guid from hm2.helper where dt = "%s" group by guid) todaywhere yes.guid = today.guid;' >> %s'%(getExacYes(day, regx, -1), day, resultPath)(status,result) = execHive(cmd)if status != 0:print resultsys.exit(1)# 结果保存到mysqlinsertMysql(day, resultPath, tbName, regx)print '=========================%s hive 查询完毕,结果保存数据到mysql完成=============================='%(day)

这是在处理历史数据,然后就是每天定时处理了,在linux crontab里加个定时器任务就好了。

1e8581dd6c4ebea5f964f29b2e144b2a.png
微信扫码关注

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/503706.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sql server2008r2 没有提示_SQL学习之旅(1)

从2020.07.28开始跟着猴子哥系统学习数据库&#xff0c;在此记录自己的sql学习与练习过程&#xff0c;希望能为以后的自己带来帮助。SQL简介练习1&#xff1a;1.如何验证mysql数据库安装成功&#xff1f;在完成环境变量的配置的情况&#xff08;在Path中添加mysql的bin目录路径…

lptv自建服务器,如何搭建自己的IPTV平台

概述IPTV平台包括鉴权服务器、EPG服务器和媒体服务器。IPTV机顶盒首先与鉴权服务器进行通信&#xff0c;鉴权成功后获取EPG服务器和UserToken。EPG服务器实现节目内容导航、播放控制等用户界面。媒体服务器以RTSP协议实现流媒体传输。机顶盒与鉴权服务器和EPG服务器之间的通信都…

mysql事务prepare_mysql之 事务prepare 与 commit 阶段分析

打开binlog选项后&#xff0c;执行事务提交命令时&#xff0c;就会进入两阶段提交模式。两阶段提交分为prepare阶段和commit两个阶段。流程如下 &#xff1a;这里面涉及到两个重要的参数&#xff1a;innodb_flush_log_at_trx_commit和sync_binlog&#xff0c;参数可以设置不同的…

order by 子查询_【框架】118:mybatis之多表高级查询

今天是刘小爱自学Java的第118天。感谢你的观看&#xff0c;谢谢你。学习内容安排如下&#xff1a;补充说明知识点&#xff1a;resultMap&#xff0c;sql片段。mybatis中的高级查询&#xff0c;即多表关联查询。查询主要分为&#xff1a;一对一&#xff0c;一对多&#xff0c;多…

小程序使用css变量,小程序使用css变量实现“换肤”方案

使用sass,stylus可以很方便的使用变量来做样式设计&#xff0c;其实css也同样可以定义变量&#xff0c;在小程序中由于原生不支持动态css语法&#xff0c;so&#xff0c;可以使用css变量来使用开发工作变简单。基础用法page {--main-bg-color: brown;}.one {color: white;backg…

笔试MySQL读写速度优化_笔试题:优化mysql数据库的方法

优化mysql数据库的方法&#xff0c;(1).数据库设计方面,这是DBA和Architect的责任,设计结构良好的数据库,必要的时候,去正规化(英文是这个:denormalize,中文翻译成啥我不知道),允许部分数据冗余,避免JOIN操作,以提高查询效率(2).系统架构设计方面,表散列,把海量数据散列到几个不…

css找某个元素的下个子元素,CSS可以检测一个元素有多less个子元素?

注&#xff1a;这个解决scheme将返回一定长度的子集&#xff0c;而不是你所要求的父元素。 希望它仍然有用。安德烈路易斯提出了一个方法&#xff1a; http : //lea.verou.me/2011/01/styling-children-based-on-their-number-with-css3/不幸的是&#xff0c;它只适用于IE9及以…

python文本聚类分析_python机器学习kmeans算法——对文本进行聚类分析

#!/usr/bin/env python#-*- coding: utf-8 -*-#File : kmeans.py#Author: 田智凯#Date : 2020/3/19#Desc :机器学习kmeans算法&#xff0c;对科技成果项目进行聚类分析 from __future__ importprint_functionimporttimefrom sklearn.feature_extraction.text importTfidfVector…

lock字段mysql_MySQL的lock tables和unlock tables的用法(转载)

早就听说lock tables和unlock tables这两个命令&#xff0c;从字面也大体知道&#xff0c;前者的作用是锁定表&#xff0c;后者的作用是解除锁定。但是具体如何用&#xff0c;怎么用&#xff0c;不太清楚。今天详细研究了下&#xff0c;总算搞明白了2者的用法。lock tables 命令…

css中变形,css3中变形处理

transfrom功能在css3 中可以使用transfrom功能实现文字或图像的旋转&#xff0c;缩放&#xff0c;倾斜&#xff0c;移动等变形处理deg是css3中使用的一种角度单位。旋转&#xff1a; 使用rotate方法&#xff0c;在参数中加入角度值&#xff0c;在角度值后要加上角度单位deg。旋…

获取 子文件夹 后缀_后期制作老司机教你一键批量生成项目文件夹

我猜你的项目工程是这样的&#xff0c;当你老板说去修改一下之前几个月的工程的时候&#xff0c;你都不知道哪个工程才是最终版呀。乱糟糟的工程而且当你打开工程的时候&#xff0c;wo艹&#xff0c;素材怎么丢失了~~不管是后期制作者还是平常我们日常工作&#xff0c;一定要养…

mysql排序行号_mysql 取得行号后再排序

一.理论准备Map是键值对的集合接口&#xff0c;它的实现类主要包括&#xff1a;HashMap,TreeMap,Hashtable以及LinkedHashMap等。TreeMap&#xff1a;基于红黑树(Red-Black tree)的 NavigableMap 实现&#xff0c;该映射根据其键的自然顺序进行排序&#xff0c;或者根据创建映射…

ffmpeg将文件转码后推向服务器,使用 Serverless 云函数 + ffmpeg 实现音视频转码服务...

核心价值视频应用、社交应用等场景下&#xff0c;用户上传的图片、音视频的总量大、频率高&#xff0c;对处理系统的实时性和并发能力都有较高的要求。例如&#xff1a;对于用户上传的视频短片&#xff0c;我们可以使用多个云函数对其分别处理&#xff0c;对应不同的清晰度(108…

python 怎么判断字符串是否有换行_JAVA中如何判断一个字符串是否换行

展开全部${rr.right_name}扩展资料 java控制台程序判断String字符e68a8462616964757a686964616f31333431373263串中只输入了一个回车&#xff1a; importjava.io.BufferedReader; importjava.io.InputStreamReader; importjava.util.Scanner; publicclassTest{ publicstaticvoi…

python提取文本中的字符串到新的txt_Python实现jieba对文本分词并写入新的文本文件,然后提取出文本中的关键词...

版权声明&#xff1a;本文为博主原创文章&#xff0c;遵循CC 4.0 BY-SA版权协议&#xff0c;转载请附上原文出处链接和本声明。 Python实现jieba对文本分词并写入新的文本文件&#xff0c;然后提取出文本中的关键词思想 先对文本进行读写操作&#xff0c;利用jieba分词对待分词…

logger 参数列表过长_[源码级解析] 巧妙解决并深度分析Linux下rm命令提示参数列表过长的问题...

在维护实习单位服务器的过程中&#xff0c;偶然发现一个有350万文件的文件夹需要清理&#xff0c;于是我习惯性执行了rm -rf ./*&#xff0c;却在数秒后被告知“参数列表过长”。在一番折腾过后&#xff0c;我终于通过取巧的办法完成了这一任务&#xff0c;也随着相关内核源码的…

collect的功能是什么?其底层如何实现的?_为什么你要用 Spring ?

前言现在Spring几乎成为了Java在企业级复杂应用开发的代名词&#xff0c;得益于Spring简单的设计哲学和其完善的生态圈&#xff0c;确实为廉颇老矣&#xff0c;尚能饭否的 Java 带来了“春天”&#xff0c;有很多同学刚接触Java就直接从Spring框架开始学习&#xff0c;导致产生…

2.3.0配置 spark_配置scala 2.11.12的spark-2.3.0 maven依赖项的问题

我在尝试在POM.xml中为spark-scala应用程序设置maven denpendency时遇到问题 .我在用 &#xff1a;SCALA版本$ scalaWelcome to Scala 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_162).SPARK版本&#xff1a;$ ./spark-shellWelcome to SPARK version 2.3.0Using Scala ver…

m3u8合并mp4软件_m3u8格式转mp4究极办法!

你们来这个号这么久了&#xff01;还没给你们分享过一些实用的干货。打今天起这个公众号将给大家推荐一些APP和实用的小软件和一些小教程。生命太短&#xff0c;没时间留给遗憾。若不是终点&#xff0c;都不要把自己留在原地&#xff0c;请一直微笑向前&#xff01;我是帮忙坏哥…

android 左移动画_android旋转动画和平移动画详解,补充说一下如果制作gif动画放到csdn博客上...

先上效果图:制作过程是先起一个模拟器&#xff0c;然后把GifCam的框拖到模拟器上面&#xff0c;点击Rec的new先&#xff0c;然后点击Rec,然后就save到本地成gif文件这里做一个左右旋转&#xff0c;上下旋转&#xff0c;和左右移动的动画&#xff0c;先自己建立一个View的类&…