SparkSQL项目实战

1 准备数据

我们这次Spark-sql操作所有的数据均来自Hive,首先在Hive中创建表,并导入数据。一共有3张表:1张用户行为表,1张城市表,1张产品表。

1)将city_info.txt、product_info.txt、user_visit_action.txt上传到/opt/module/data

[atguigu@hadoop102 module]$ mkdir data

2)将创建对应的三张表

hive (default)>
CREATE TABLE `user_visit_action`(
  `date` string,
  `user_id` bigint,
  `session_id` string,
  `page_id` bigint,
  `action_time` string,
  `search_keyword` string,
  `click_category_id` bigint,
  `click_product_id` bigint, --点击商品id,没有商品用-1表示。
  `order_category_ids` string,
  `order_product_ids` string,
  `pay_category_ids` string,
  `pay_product_ids` string,
  `city_id` bigint --城市id
)
row format delimited fields terminated by '\t';CREATE TABLE `city_info`(
  `city_id` bigint, --城市id
  `city_name` string, --城市名称
  `area` string --区域名称
)
row format delimited fields terminated by '\t';CREATE TABLE `product_info`(
  `product_id` bigint, -- 商品id
  `product_name` string, --商品名称
  `extend_info` string
)
row format delimited fields terminated by '\t';

3)并加载数据

hive (default)>
load data local inpath '/opt/module/data/user_visit_action.txt' into table user_visit_action;
load data local inpath '/opt/module/data/product_info.txt' into table product_info;
load data local inpath '/opt/module/data/city_info.txt' into table city_info;

4)测试一下三张表数据是否正常

hive (default)>
select * from user_visit_action limit 5;
select * from product_info limit 5;
select * from city_info limit 5;

2 需求:各区域热门商品Top3

2.1 需求简介

这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。

例如:

地区

商品名称

点击次数

城市备注

华北

商品A

100000

北京21.2%,天津13.2%,其他65.6%

华北

商品P

80200

北京63.0%,太原10%,其他27.0%

华北

商品M

40000

北京63.0%,太原10%,其他27.0%

东北

商品J

92000

大连28%,辽宁17.0%,其他 55.0%

2.2 思路分析

CREATE TABLE `user_visit_action`(
  `date` string,
  `user_id` bigint,
  `session_id` string,
  `page_id` bigint,
  `action_time` string,
  `search_keyword` string,
  `click_category_id` bigint,
  `click_product_id` bigint, --点击商品id,没有商品用-1表示。
  `order_category_ids` string,
  `order_product_ids` string,
  `pay_category_ids` string,
  `pay_product_ids` string,
  `city_id` bigint --城市id
)
CREATE TABLE `city_info`(
  `city_id` bigint, --城市id
  `city_name` string, --城市名称
  `area` string --区域名称
)
CREATE TABLE `product_info`(
  `product_id` bigint, -- 商品id
  `product_name` string, --商品名称
  `extend_info` string
)
city_remark
IN: 城市名称 String
BUFF: totalcnt总点击量,Map[(cityName, 点击数量)]
OUT:城市备注 String
select
   c.area, --地区
   c.city_name, -- 城市
   p.product_name, -- 商品名称
   v.click_product_id -- 点击商品id
from user_visit_action v
join city_info c
on v.city_id = c.city_id
join product_info p
on v.click_product_id = p.product_id
where click_product_id > -1select
    t1.area, --地区
    t1.product_name, -- 商品名称count(*) click_count, -- 商品点击次数city_remark(t1.city_name) --城市备注
from t1
group by t1.area, t1.product_nameselect*,rank() over(partition by t2.area order by t2.click_count desc) rank -- 每个区域内按照点击量,倒序排行
from t2select*
from t3
where rank <= 3

使用Spark-SQL来完成复杂的需求,可以使用UDF或UDAF

(1)查询出来所有的点击记录,并与city_info表连接,得到每个城市所在的地区,与 Product_info表连接得到商品名称

(2)按照地区和商品名称分组,统计出每个商品在每个地区的总点击次数

(3)每个地区内按照点击次数降序排列

(4)只取前三名,并把结果保存在数据库中

(5)城市备注需要自定义UDAF函数

2.3 代码实现

package com.atguigu.sparksql.demo;import lombok.Data;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import static org.apache.spark.sql.functions.udaf;public class Test01_Top3 {public static void main(String[] args) {// 1. 创建sparkConf配置对象SparkConf conf = new SparkConf().setAppName("sql").setMaster("local[*]");// 2. 创建sparkSession连接对象SparkSession spark = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate();// 3. 编写代码// 将3个表格数据join在一起Dataset<Row> t1DS = spark.sql("select \n" +"\tc.area,\n" +"\tc.city_name,\n" +"\tp.product_name\n" +"from\n" +"\tuser_visit_action u\n" +"join\n" +"\tcity_info c\n" +"on\n" +"\tu.city_id=c.city_id\n" +"join\n" +"\tproduct_info p\n" +"on\n" +"\tu.click_product_id=p.product_id");        t1DS.createOrReplaceTempView("t1");        spark.udf().register("cityMark",udaf(new CityMark(),Encoders.STRING()));// 将区域内的产品点击次数统计出来Dataset<Row> t2ds = spark.sql("select \n" +"\tarea,\n" +"\tproduct_name,\n" +"\tcityMark(city_name) mark,\n" +"\tcount(*) counts\n" +"from\t\n" +"\tt1\n" +"group by\n" +"\tarea,product_name");//        t2ds.show(false);
        t2ds.createOrReplaceTempView("t2");// 对区域内产品点击的次数进行排序  找出区域内的top3
        spark.sql("select\n" +"\tarea,\n" +"\tproduct_name,\n" +"\tmark,\n" +"\trank() over (partition by area order by counts desc) rk\n" +"from \n" +"\tt2").createOrReplaceTempView("t3");// 使用过滤  取出区域内的top3
        spark.sql("select\n" +"\tarea,\n" +"\tproduct_name,\n" +"\tmark \n" +"from\n" +"\tt3\n" +"where \n" +"\trk < 4").show(50,false);// 4. 关闭sparkSession
        spark.close();}    @Datapublic static class Buffer implements Serializable {private Long totalCount;private HashMap<String,Long> map;public Buffer() {}public Buffer(Long totalCount, HashMap<String, Long> map) {
            this.totalCount = totalCount;
            this.map = map;}}public static class CityMark extends Aggregator<String, Buffer, String> {public static class CityCount {public String name;public Long count;public CityCount(String name, Long count) {
                this.name = name;
                this.count = count;}public CityCount() {}}public static class CompareCityCount implements Comparator<CityCount> {/**
             * 默认倒序
             * @param o1
             * @param o2
             * @return
             */
            @Overridepublic int compare(CityCount o1, CityCount o2) {if (o1.count > o2.count) {return -1;} else return o1.count.equals(o2.count) ? 0 : 1;}}        @Overridepublic Buffer zero() {return new Buffer(0L, new HashMap<String, Long>());}/**
         * 分区内的预聚合
         *
         * @param b map(城市,sum)
         * @param a 当前行表示的城市
         * @return
         */
        @Overridepublic Buffer reduce(Buffer b, String a) {HashMap<String, Long> hashMap = b.getMap();// 如果map中已经有当前城市  次数+1// 如果map中没有当前城市    0+1
            hashMap.put(a, hashMap.getOrDefault(a, 0L) + 1);            b.setTotalCount(b.getTotalCount() + 1L);return b;}/**
         * 合并多个分区间的数据
         *
         * @param b1 (北京,100),(上海,200)
         * @param b2 (天津,100),(上海,200)
         * @return
         */
        @Overridepublic Buffer merge(Buffer b1, Buffer b2) {
            b1.setTotalCount(b1.getTotalCount() + b2.getTotalCount());HashMap<String, Long> map1 = b1.getMap();HashMap<String, Long> map2 = b2.getMap();// 将map2中的数据放入合并到map1
            map2.forEach(new BiConsumer<String, Long>() {
                @Overridepublic void accept(String s, Long aLong) {
                    map1.put(s, aLong + map1.getOrDefault(s, 0L));}});return b1;}/**
         * map => {(上海,200),(北京,100),(天津,300)}
         *
         * @param reduction
         * @return
         */
        @Overridepublic String finish(Buffer reduction) {Long totalCount = reduction.getTotalCount();HashMap<String, Long> map = reduction.getMap();// 需要对map中的value次数进行排序ArrayList<CityCount> cityCounts = new ArrayList<>();// 将map中的数据放入到treeMap中 进行排序map.forEach(new BiConsumer<String, Long>() {
                @Overridepublic void accept(String s, Long aLong) {
                    cityCounts.add(new CityCount(s, aLong));}});            cityCounts.sort(new CompareCityCount());ArrayList<String> resultMark = new ArrayList<>();Double sum = 0.0;// 当前没有更多的城市数据  或者  已经找到两个城市数据了  停止循环while (!(cityCounts.size() == 0) && resultMark.size() < 2) {CityCount cityCount = cityCounts.get(0);
                resultMark.add(cityCount.name + String.format("%.2f",cityCount.count.doubleValue() / totalCount * 100) + "%");
                cityCounts.remove(0);}// 拼接其他城市if (cityCounts.size() > 0) {
                resultMark.add("其他" + String.format("%.2f", 100 - sum) + "%");}StringBuilder cityMark = new StringBuilder();for (String s : resultMark) {
                cityMark.append(s).append(",");}return cityMark.substring(0, cityMark.length() - 1);}        @Overridepublic Encoder<Buffer> bufferEncoder() {return Encoders.javaSerialization(Buffer.class);}        @Overridepublic Encoder<String> outputEncoder() {return Encoders.STRING();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/143811.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

51单片机应用从零开始(三)

51单片机应用从零开始&#xff08;一&#xff09;-CSDN博客 51单片机应用从零开始&#xff08;二&#xff09;-CSDN博客 详解 KEIL C51 软件的使用建立工程-CSDN博客 详解 KEIL C51 软件的使用设置工程编绎与连接程序-CSDN博客 目录 1. 用单片机控制第一个灯亮 2. 认识单片…

mask: rle, polygon

RLE 编码 RLE&#xff08;Run-Length Encoding&#xff09;是一种简单而有效的无损数据压缩和编码方法。它的基本思想是将连续相同的数据值序列用一个值和其连续出现的次数来表示&#xff0c;从而减少数据的存储或传输量。 在图像分割领域&#xff08;如 COCO 数据集中&#…

leetcode:476. 数字的补数

一、题目 476. 数字的补数 - 力扣&#xff08;LeetCode&#xff09; 函数原型&#xff1a; int findComplement(int num) 二、思路 将num的每一位取出来&#xff0c;取反后&#xff0c;乘以2的位次方&#xff0c;最终所有结果相加即可得到结果。 如何取出num的每一位&#xff1…

<MySQL> 查询数据进阶操作 -- 联合查询

目录 一、什么是笛卡尔积&#xff1f; 二、什么是联合查询&#xff1f; 三、内连接 3.1 简介 3.2 语法 3.3 更多的表 3.4 操作演示 四、外连接 4.1 简介 4.2 语法 4.3 操作演示 五、自连接 5.1 简介 5.2 自连接非必要不使用 六、子查询(嵌套查询) 6.1 简介 6.…

电源电压范 围宽、功耗小、抗干扰能力强的国产芯片GS069适用于电动工具等产品中,采用SOP8的封装形式封装

GS069电动工具直流调速电路是CMOS专用集成电路&#xff0c;具有电源电压范 围宽、功耗小、抗干扰能力强等特点。通过外接电阻网络&#xff0c;改变与之相接 的VMOS 管的输出&#xff0c;达到控制电动工具转速的作用。该电路输出幅值宽&#xff0c; 频率变化小&#xff0c;占空比…

自己动手实现一个深度学习算法——六、与学习相关的技巧

文章目录 1.参数的更新1&#xff09;SGD2&#xff09;Momentum3&#xff09;AdaGrad4&#xff09;Adam5&#xff09;最优化方法的比较6&#xff09;基于MNIST数据集的更新方法的比较 2.权重的初始值1&#xff09;权重初始值不能为02&#xff09;隐藏层的激活值的分布3&#xff…

04-学成在线之系统管理服务模块之查询数据字典表中的内容,前后端联调测试

前后端联调 配置前端环境 实际开发中先由后端工程师将接口设计好并编写接口文档并交给前端工程师&#xff0c;前后端的工程师就开始并行开发 前端开发人员先自己mock数据即使用假数据进行开发,当后端代码完成后前端工程师尝试请求后端接口获取数据然后渲染到页面 第一步: 首…

计算机网络之物理层

物理层 1. 物理层的基本概念 2.物理层下面的传输媒体 传输媒体可分为两类&#xff0c;一类是导引型传输媒体&#xff0c;另一类是非导引型传输媒体。 3.传输方式 3.1 串行传输和并行传输 串行传输&#xff1a;串行传输是指数据是一个比特依次发送的&#xff0c;因此在发送端…

MATLAB算法实战应用案例精讲-【目标检测】单目3D目标检测

目录 几个高频面试题目 3D目标检测中点云的稀疏性问题及解决方案 1. 点云稀疏性的定义

C/C++轻量级并发TCP服务器框架Zinx-框架开发002: 定义通道抽象类

文章目录 2 类图设计3 时序图数据输入处理&#xff1a;输出数据处理总流程 4 主要实现的功能4.1 kernel类&#xff1a;基于epoll调度所有通道4.2 通道抽象类&#xff1a;4.3 标准输入通道子类4.4 标准输出通道子类4.5 kernel和通道类的调用 5 代码设计5.1 框架头文件5.2 框架实…

wx.canvasToTempFilePath生成图片保存到相册

微信小程序保存当前画布指定区域的内容导出生成指定大小的图片&#xff0c;记录一下 api&#xff1a;wx.canvasToTempFilePath 效果&#xff1a; 代码&#xff1a;wxml <canvas style"width: {{screenWidth}}px; height: {{canvasHeight}}px;" canvas-id"my…

2023.11.15 每日一题(AI自生成应用)【C++】【Python】【Java】【Go】 动态路径分析

目录 一、题目 二、解决方法 三、改进 一、题目 背景&#xff1a; 在一个城市中&#xff0c;有数个交通节点&#xff0c;每个节点间有双向道路相连。每条道路具有一个初始权重&#xff0c;代表通行该路段的成本&#xff08;例如时间、费用等&#xff09;。随着时间的变化&am…

CentOS修改root用户密码

一、适用场景 1、太久没有登录CentOS系统&#xff0c;忘记管理密码。 2、曾经备份的虚拟化OVA或OVF模板&#xff0c;使用模板部署新系统后&#xff0c;忘记root密码。 3、被恶意攻击修改root密码后的紧急修复。 二、实验环境 1、VMware虚拟化的ESXI6.7下&#xff0c;通过曾经…

PCL 曲率 结构体

曲率结构体&#xff1a; struct PrincipalCurvatures{union{float principal_curvature[3];struct{float principal_curvature_x;float principal_curvature_y;float principal_curvature_z;};};float pc1 0.f;float pc2 0.f;inline constexpr PrincipalCurvatures (): Princ…

javaweb---maventomcat使用教程

文章目录 今日内容0 复习昨日1 Maven1.0 引言1.1 介绍1.2 下载1.3 安装1.3.1 解压1.3.2 配置环境变量1.3.3 测试 1.4 仓库1.5 Maven配置1.5.1 修改仓库位置1.5.2 设置镜像 2 IDEA - MAVEN2.1 idea关联maven2.2 创建java项目2.3 java项目结构2.4 pom2.5 导入依赖2.5.1 查找依赖2…

如何计算掩膜图中多个封闭图形的面积

import cv2def calMaskArea(image,idx):mask cv2.inRange(image, idx, idx)contours, hierarchy cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)for contour in contours:area cv2.contourArea(contour)print("图形的面积为", area) image是…

C语言从入门到精通之【char类型】

char类型用于储存字符&#xff08;如&#xff0c;字母或标点符号&#xff09;&#xff0c;但是从技术层面看&#xff0c;char是整数类型。因为char类型实际上储存的是整数而不是字符。计算机使用数字编码来处理字符&#xff0c;即用特定的整数表示特定的字符。 char类型占1个字…

2023年09月 Python(五级)真题解析#中国电子学会#全国青少年软件编程等级考试

Python等级考试(1~6级)全部真题・点这里 一、单选题(共25题,每题2分,共50分) 第1题 阅读以下代码,程序输出结果正确的选项是?( ) def process_keywords(keywords_list):unique_keywords = list(set(keywords_list))

基于STM32的无线通信系统设计与实现

【引言】 随着物联网的迅速发展&#xff0c;无线通信技术逐渐成为现代通信领域的关键技术之一。STM32作为一款广受欢迎的微控制器&#xff0c;具有丰富的外设资源和强大的计算能力&#xff0c;在无线通信系统设计中具有广泛的应用。本文将介绍如何基于STM32实现一个简单的无线通…

浅尝:iOS的CoreGraphics和Flutter的Canvas

iOS的CoreGraphic 基本就是创建一个自定义的UIView&#xff0c;然后重写drawRect方法&#xff0c;在此方法里使用UIGraphicsGetCurrentContext()来绘制目标图形和样式 #import <UIKit/UIKit.h>interface MyGraphicView : UIView endimplementation MyGraphicView// Onl…