生产上完成TopN统计流程

背景

现有城市信息和产品信息两张表在MySQL中,另外有用户点击产品日志以文本形式存在hdfs上,现要求统计每个个城市区域下点击量前三的产品名,具体信息见下方。

mysql> show tables;
+---------------------------------+
| Tables_in_d7                    |
+---------------------------------+
| city_info                       |
| product_info                    |
| result_product_area_clicks_top3 |
+---------------------------------+
3 rows in set (0.00 sec)mysql> desc city_info;
+-----------+--------------+------+-----+---------+-------+
| Field     | Type         | Null | Key | Default | Extra |
+-----------+--------------+------+-----+---------+-------+
| city_id   | int(11)      | YES  |     | NULL    |       |
| city_name | varchar(255) | YES  |     | NULL    |       |
| area      | varchar(255) | YES  |     | NULL    |       |
+-----------+--------------+------+-----+---------+-------+
3 rows in set (0.00 sec)mysql> select * from city_info;
+---------+-----------+------+
| city_id | city_name | area |
+---------+-----------+------+
|       1 | BEIJING   | NC   |
|       2 | SHANGHAI  | EC   |
|       3 | NANJING   | EC   |
|       4 | GUANGZHOU | SC   |
|       5 | SANYA     | SC   |
|       6 | WUHAN     | CC   |
|       7 | CHANGSHA  | CC   |
|       8 | XIAN      | NW   |
|       9 | CHENGDU   | SW   |
|      10 | HAERBIN   | NE   |
+---------+-----------+------+
10 rows in set (0.00 sec)mysql> desc product_info; 
+--------------+--------------+------+-----+---------+-------+
| Field        | Type         | Null | Key | Default | Extra |
+--------------+--------------+------+-----+---------+-------+
| product_id   | int(11)      | YES  |     | NULL    |       |
| product_name | varchar(255) | YES  |     | NULL    |       |
| extend_info  | varchar(255) | YES  |     | NULL    |       |
+--------------+--------------+------+-----+---------+-------+
3 rows in set (0.00 sec)mysql> select * from product_info limit 10;  <-- product_info总数100
+------------+--------------+----------------------+
| product_id | product_name | extend_info          |
+------------+--------------+----------------------+
|          1 | product1     | {"product_status":1} |
|          2 | product2     | {"product_status":1} |
|          3 | product3     | {"product_status":1} |
|          4 | product4     | {"product_status":1} |
|          5 | product5     | {"product_status":1} |
|          6 | product6     | {"product_status":1} |
|          7 | product7     | {"product_status":1} |
|          8 | product8     | {"product_status":1} |
|          9 | product9     | {"product_status":0} |
|         10 | product10    | {"product_status":1} |
+------------+--------------+----------------------+
10 rows in set (0.00 sec)[hadoop@hadoop001 data]$ more user_click.txt 
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:01:56,1(city_id),72(product_id)
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:52:26,1,68
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:17:03,1,40
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:32:07,1,21
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:26:06,1,63
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:03:11,1,60
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:43:43,1,30
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:09:58,1,96
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:18:45,1,71
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:42:39,1,8
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:24:30,1,6
95,2bf501a7637549c89cf55342331b15db,2016-05-05 21:29:49,1,26
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:24:12,1,83
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:07:50,1,62
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:19:31,1,61
95,5b8cdcb0b18645a19f4e3e34a241012e,2016-05-05 20:40:51,1,46
....
[hadoop@hadoop001 data]$ wc -l user_click.txt 
11448 user_click.txt
复制代码

解决思路

1)city_info表和product_info表通过sqoop放到Hive里面
2)通过user_click关联Hive里面的city_info和product_info
3)再使用窗口函数求分组内的TOPN将结果sqoop导入MySQL
4)shell脚本封装这个业务线的所有代码的思路,需要提及的一点,因为city_info/product_info数据变动少,所以通过其他的脚本导入,这个shell脚本不涉及,但我下面步骤依然会写出来。
5)使用crontab触发,每天凌晨2点开始执行
注意点:
a) 每次创建的临时表,在执行之前一定要先删除,要使用if not exits
b) 关键的执行要有日志输出
c) shell脚本如何解决幂等性问题

MySQL导入Hive

在sqoop部署篇讲到过怎么部署和使用sqoop,这里不在说明,直接上代码。

# 这里给出hive里的city_info的表结构
hive (d7)> create table city_info(city_id int,city_name string,area string
)
row format delimited fields terminated by '\t';# 导入city_info
[hadoop@hadoop001 ~]$ sqoop import \
--connect "jdbc:mysql://localhost:3306/d7" \
--username root \
--password root \
--table city_info \
--split-by 'city_id' \
--fields-terminated-by '\t' \
--hive-import \
--hive-database d7 \
--target-dir '/user/hive/warehouse/d7.db/city_info' \
--delete-target-dir \
-m 2# 这里给出hive里的product_info的表结构
hive (d7)> create table product_info(product_id int,product_name string,extend_info string
)
row format delimited fields terminated by '\t';# 导入product_info
[hadoop@hadoop001 ~]$ sqoop import \
--connect "jdbc:mysql://localhost:3306/d7" \
--username root \
--password root \
--table product_info \
--split-by 'product_id' \
--fields-terminated-by '\t' \
--hive-import \
--hive-database d7 \
--target-dir '/user/hive/warehouse/d7.db/product_info' \
--delete-target-dir \
-m 2
复制代码

ps:如果你第一次用sqoop的话,这里肯定会有两个坑。这里暂且不说,下篇文章解答。

user_click加载数据

生产上hive的user_click表肯定是个一直数据增长的表,所以该表肯定是个分区表。但是一般来说清洗好的前一天数据会直接放在user_click表存放hdfs上路径上,比如分区表存放路径为hdfs://hadoop001:9000/user/hive/warehouse/d7.db/user_click,那么生产上会将2016-05-05日志清洗好并在该路径上创建分区路径。这时候你查询分区表不会出现该分区数据,该怎么高效的将数据刷新到分区表呢?请看下方代码

# 先给出user_click表结构
hive (d7)> create table user_click(user_id int,session_id string,action_time string,city_id int,product_id int
)
partitioned by(day string)
row format delimited fields terminated by ',';# 刷新分区表,另一种刷新方式不推荐,过于暴力
hive (d7)> alter table user_click add if not exists partition(day='2016-05-05');
复制代码

三表关联生成临时表

临时表有区域名,产品名,点击量三个字段。

hive (d7)> drop table if exists tmp_product_area_clicks;
hive (d7)> create table tmp_product_area_clicks as> select b.area,c.product_name,count(1) as click_count from user_click a> left join city_info b on a.city_id=b.city_id> left join product_info c on a.product_id=c.product_id > where a.day='2016-05-05'> group by b.area,c.product_name
复制代码

窗口函数得到TopN结果

使用row_number()函数

hive (d7)> drop table if exists result_product_area_clicks_top3;
hive (d7)> create table result_product_area_clicks_top3> row format delimited fields terminated by '\t' as> select * from ( > select > "2016-05-05" day,product_id,product_name,area,click_count, <-- 日期会在脚本中更改> row_number() over(partition by area order by click_count desc) rank> from tmp_product_area_clicks> ) t where t.rank<=3;
复制代码

Hive导出MySQL

# 我们事先在MySQL创建好结果表,下面为表结构
create table result_product_area_clicks_top3(
day varchar(15),
product_id int(11),
product_name varchar(50),
area varchar(10),
click_count int(11),
rank int(10)
)# 为了幂等性,会将MySQL结果表该日期的数据先删掉
# 日期会在脚本中更改
mysql> delete from result_product_area_clicks_top3 where day='2016-05-05'; [hadoop@hadoop001 ~]$ sqoop export \
--connect jdbc:mysql://localhost:3306/d7 \
--password root \
--username root \
--table result_product_area_clicks_top3\
--export-dir /user/hive/warehouse/d7_hive.db/result_product_area_clicks_top3 \
--columns "day,product_id,product_name,area,click_count,rank" \
--fields-terminated-by '\t' \
-m 2
复制代码

shell脚本编写

hive离线是一天一次,是今天某个时间去运行昨天的数据,所以要在shell脚本中获取前一天,该命令为'date --date '1 day ago' +%Y-%m-%d'。下面就是shell脚本代码。

[hadoop@hadoop001 ~]$ vim top3.sh
#!/bin/bash

CURRENT=`date +%Y-%m-%d_%H:%M:%S`
USE_DAY=`date --date '1 day ago' +%Y-%m-%d`
echo '当前使用的日期为:'$USE_DAY''echo ''$CURRENT',开始刷新分区'
HIVE_PARTITION_SQL="alter table d7.user_click add if not exists partition(day='${USE_DAY}');"
hive -e "${HIVE_PARTITION_SQL}"echo ''$CURRENT',开始创建临时表,其中数据为每个区域下每个产品的点击数'
HIVE_TMP_SQL="drop table if exists tmp_product_area_clicks;
create table tmp_product_area_clicks as
select b.area,c.product_name,count(1) as click_count from user_click a
left join city_info b on a.city_id=b.city_id
left join product_info c on a.product_id=c.product_id 
where a.day='${USE_DAY}'
group by b.area,c.product_name;"
hive -e "${HIVE_TMP_SQL}"echo ''$CURRENT',开始创建结果表,其中数据为每个区域下每个产品的前三点击数'
HIVE_RESULT_SQL="drop table if exists result_product_area_clicks_top3;
create table result_product_area_clicks_top3
row format delimited fields terminated by '\t' as
select * from ( 
select '${USE_DAY}' day,product_id,product_name,area,click_count,
row_number() over(partition by area order by click_count desc) rank
from tmp_product_area_clicks
) t where t.rank<=3;"
hive -e "${HIVE_RESULT_SQL}"echo ''$CURRENT',保持幂等性,开始删除MySQL结果表中当前'$USE_DAY'数据'
MySQL_DETELE_SQL="delete from result_product_area_clicks_top3 where day='${USE_DAY}';"
sudo mysql -uroot -proot -e "${MySQL_DETELE_SQL}"echo ''$CURRENT',开始将Hive结果表导入MySQL'
sqoop export \
--connect jdbc:mysql://localhost:3306/d7 \
--password root \
--username root \
--table result_product_area_clicks_top3\
--export-dir /user/hive/warehouse/d7_hive.db/result_product_area_clicks_top3 \
--columns "day,product_id,product_name,area,click_count,rank" \
--fields-terminated-by '\t' \
-m 2
echo ''$CURRENT',整个流程结束,请查看MySQL中数据是否导入'复制代码

定时后台执行

使用crontab来做定时,具体见下方代码

[hadoop@hadoop001 ~]$ crontab -e
* 2 * * * nohup /home/hadoop/top3.sh >> /tmp/top3_logs.log 2>&1 & 
复制代码

转载于:https://juejin.im/post/5d37feb26fb9a07ece681119

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/363470.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

最大公因数和最小公倍数

一丶 最大公因数求法&#xff1a;辗转相除法(也称欧几里得算法)原理: 二丶最小公倍数求法&#xff1a;两个整数的最小公倍数等于两整数之积除以最大公约数1 #include <iostream>2 3 using namespace std;4 5 //辗转相除法(欧几里得算法)6 7 int gcd(int a, int b)8 {9…

css实现div内一段文本的两端对齐

在一个固定宽度的div内&#xff0c;使得P标签内的文本两端对齐&#xff1a; text-align: justify;text-justify:inter-ideograph; <!DOCTYPE html><html lang"en"><head><meta charset"UTF-8"><title>justify</title>…

JPA 2.1实体图–第2部分:在运行时定义延迟/急切加载

这是我关于JPA 2.1实体图的第二篇文章。 第一篇文章描述了命名实体图的用法。 这些可用于定义在编译时将使用查找或查询方法获取的实体和/或属性的图形。 动态实体图以相同的方式但以动态方式这样做。 这意味着您可以在运行时使用EntityGraph API定义实体图。 如果您错过了第一…

HDU1166-敌兵布阵

http://acm.hdu.edu.cn/showproblem.php?pid1166 线段树第一题 #include<cstdio> #define lson l,m,rt<<1 #define rson m1,r,rt<<1|1 const int maxn55555; int sum[maxn<<2]; void PushUP(int rt) {sum[rt]sum[rt<<1]sum[rt<<1|1]; } …

js对象序列化为json字符串

网上找了找将js对象序列化为json字符串的方法。结果都不近人意&#xff0c;最后自己写了一个。 注意你得自己为Date增加toString()方法。 function Serialize(obj){switch(obj.constructor){case Object:var str "{";for(var o in obj){str o ":" Seri…

QT学习三 标准对话框 QMessageBox

QMessageBox内置了几种static方法,例如 QMessageBox::question() 返回值:StandardButton 参数:QWidget * 父窗口&#xff0c;标题名&#xff0c;内容&#xff0c;按钮 YES|NO,默认选中按钮) 示例: 1 #include "mainwindow.h"2 #include <QApplication>3 #incl…

react学习笔记2

1.build文件介绍 &#xff08;1&#xff09;react.js 是react的核心库 &#xff08;2&#xff09;react-dom.js 提供与DOM相关功能 &#xff08;3&#xff09;browser.js 是将JSX语法转为javascript语法 2.组件的继续学习 注意&#xff1a;组件的第一个字母必须大写&…

Spring4:没有默认构造函数的基于CGLIB的代理类

在Spring中&#xff0c;如果要代理的目标对象的类未实现任何接口&#xff0c;则将创建基于CGLIB的代理。 在Spring 4之前&#xff0c;基于CGLIB的代理类需要默认的构造函数。 这不是CGLIB库的限制&#xff0c;而是Spring本身。 幸运的是&#xff0c;从Spring 4开始&#xff0c;…

linux里面i386 i686 i486 i586代表什么?是什么意思

URL:http://hi.baidu.com/software_one/blog/item/85c7ccedd70d6925acafd5e0.html 在linux里面&#xff0c;我们经常会遇到i386 i686 i486 I586 这些代码&#xff0c;例如查看内核版本&#xff1a; [rootlocalhost logs]# uname -a Linux localhost.localdomain 2.6.18-164.el5…

汇编语言學習

汇编语言 汇编语言(Assembly Language)是面向机器的程序设计语言.汇编语言是一种功能很强的程序设计语言,也是利用计算机所有硬件特性并能直接控制硬件的语言。汇编语言”作为一门语言&#xff0c;对应于高级语言的编译器&#xff0c;需要一个“汇编器”来把汇编语言原文件汇编…

HOW-TO:带有MySQL的JEE应用程序中具有集群功能的Quartz Scheduler

Quartz Scheduler是Java世界中最流行的调度库之一。 过去&#xff0c;我主要在Spring应用程序中使用Quartz。 最近&#xff0c;我一直在研究将在云中部署的JBoss 7.1.1上运行的JEE 6应用程序中的调度。 我考虑的一种选择是Quartz Scheduler&#xff0c;因为它提供了与数据库的集…

DevStack方式安装queens版openstack

最近在学习openstack,在安装阶段就遇到了很多问题&#xff0c;特把安装过程记录如下&#xff0c;经笔者验证能正确安装openstack。 说明&#xff1a;安装后即为中文版。 2019/01/29: 安装环境&#xff1a; 宿主&#xff1a; Ubuntu 16.04 xenial Hypervisor: kvm 虚拟机&#x…

dev c++ 报错[Error] ld returned 1 exit status 的解决办法

我是个C语言的初学者&#xff0c;在使用dev c 编译器时&#xff0c;遇到一个情况&#xff1a;程序是正确的&#xff0c;能够正常的编译和运行&#xff0c;但是运行一次之后再次运行之时就出现了 报错[Error] ld returned 1 exit status&#xff0c;出现这个问题的原因是&#x…

2008年12月答疑贴

有问题请在此贴跟贴回复&#xff0c;我亦会在此贴回复。 请不要到无关的帖子中跟帖 请尽量描述清楚你的问题和需要&#xff0c;我的理解能力不是很强&#xff0c;呵呵。 请您遵守以下规则&#xff1a; 提问内容中请不要出现 感叹号&#xff0c;跪求等字样。 请尽量不要称呼我为…

androidmanifest.xml权限中文说明

程序执行需要读取到安全敏感项必需在androidmanifest.xml中声明相关权限请求, 完整列表如下: android.permission.ACCESS_CHECKIN_PROPERTIES 允 许读写访问”properties”表在checkin数据库中&#xff0c;改值可以修改上传( Allows read/write access to the “properties” t…

C语言使用scanf()函数时,%c前面和后面分别加上空格后的结果

在使用scanf()读取输入的字符时&#xff0c;当转换说明为%c时&#xff0c;"%c"、" %c"、"%c " 这三种不同的写法&#xff0c;对数据读取的结果有什么影响吗&#xff0c;答案是肯定的&#xff0c;%c 加不加空格&#xff0c;空格在前还是在后&am…

Python -- 自动导入所需要的模块

try: import xlwtexcept ImportError as e:   import os   print(e)   os.system("pip install xlwt")转载于:https://www.cnblogs.com/xlx12138/p/10551894.html

借助Apache Hadoop大规模扩展Apache Solr实时实时索引

播客的第22集是与Patrick Hunt的谈话 我们讨论了Apache Solr&#xff08;上游&#xff09;中的新工作&#xff0c;使它可以在Apache Hadoop上工作。 Solr支持将其索引和事务日志文件写入和读取到HDFS分布式文件系统。 这不使用Hadoop Map-Reduce处理Solr数据&#xff0c;而是仅…

C语言,关于getchar()清空回车符的几点经验

最近被getchar()弄的有点糊涂&#xff0c;现在基本缕清了。 拿程序举个例子&#xff1a; #include<stdio.h> int main(void) {char ch1,ch2;printf("Iam testing *********.\n");printf("So hard! ***********\n");ch1getchar();printf("$$$$$…

面试中关于多线程同步,你必须要思考的问题

ReentrantLock的实现网上有很多文章了&#xff0c;本篇文章会简单介绍下其java层实现&#xff0c;重点放在分析竞争锁失败后如何阻塞线程。因篇幅有限&#xff0c;synchronized的内容将会放到下篇文章。 Java Lock的实现 ReentrantLock是jdk中常用的锁实现&#xff0c;其实现逻…