每日top3热点搜索词统计案例

数据格式:

日期,用户,搜索词,平台,版本

需求:

1、筛选出符合条件(城市,平台,版本)的数据

2、统计每天搜索uv排名前三的搜索词

3、按照每天的top3搜索词的uv搜索总次数,倒叙排序

4、将数据保存到hive表中

思路分析

1、针对原始数据(HDFS文件),获取输入RDD

2、使用filter算法,针对输入RDD中的数据,进行数据过滤,过滤出符合条件的数据

21普通的算法:直接在filter算法函数中,使用外部的查询条件(map),但是,这样做的话,是不是查询条件map,会发送到每一个task上一份副本,(性能不好)

2.2优化后的做法,将查询条件,封装为broadCast广播变量,在filter算法中使用broadCast广播变量。

3、将数据转换为(日期_搜索词,用户)格式,对他进行分组。然后再次进行映射,对每天每个搜索词的搜索用户进行去重操作,并统计去重后的数据,即为每天每个搜搜词的uv,最后获得(日期_搜搜词,uv)。

4、将得到的每天每个搜索词的uvRDD,映射为元素类型的RowRDD,转换为DataFrame

5、注册为临时表,使用SparkSQL的开窗函数,统计每天的uv数量排名前三名的搜索词,以及他的搜索nv,最后获得一个DataFrame

6、DataFrame转换为RDD,继续操作,按照每天日期来分组,并进行映射,计算出每天的top3搜索词的uv的总数,然后将uv总数作为key,将每天的top3搜索词以及搜索次数,拼接为一个字符串

7、按照每天的top3搜索总uv,进行排序,倒序排序

8、将排好的数据,再次映射回来,变成 日期_搜索词_uv的格式

9、再次映射为DataFrame,并将数据保存到hive中。

文本:

   2018-10-1:leo:water:beijing:android:1.0
2018-10-1:leo1:water:beijing:android:1.0
2018-10-1:leo2:water:beijing:android:1.0
2018-10-1:jack:water:beijing:android:1.0
2018-10-1:jack1:water:beijing:android:1.0
2018-10-1:leo:seafood:beijing:android:1.0
2018-10-1:leo1:seafood:beijing:android:1.0
2018-10-1:leo2:seafood:beijing:android:1.0
2018-10-1:leo:food:beijing:android:1.0
2018-10-1:leo1:food:beijing:android:1.0
2018-10-1:leo2:meat:beijing:android:1.0
2018-10-2:leo:water:beijing:android:1.0
2018-10-2:leo1:water:beijing:android:1.0
2018-10-2:leo2:water:beijing:android:1.0
2018-10-2:jack:water:beijing:android:1.0
2018-10-2:leo1:seafood:beijing:android:1.0
2018-10-2:leo2:seafood:beijing:android:1.0
2018-10-2:leo3:seafood:beijing:android:1.0
2018-10-2:leo1:food:beijing:android:1.0
2018-10-2:leo2:food:beijing:android:1.0
2018-10-2:leo:meat:beijing:android:1.0


代码:

package com.bynear.spark_sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.util.*;

public class DailyTop3Keyword {public static void main(String[] args) {SparkConf conf = new SparkConf();
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new HiveContext(jsc.sc());
//        伪造数据(这些数据可以来自mysql数据库)
        final HashMap<String, List<String>> queryParaMap = new HashMap<String, List<String>>();
        queryParaMap.put("city", Arrays.asList("beijing"));
        queryParaMap.put("platform", Arrays.asList("android"));
        queryParaMap.put("version", Arrays.asList("1.0", "1.2", "2.0", "1.5"));
//      将数据进行广播
        final Broadcast<HashMap<String, List<String>>> queryParamMapBroadcast = jsc.broadcast(queryParaMap);
//      读取文本
        JavaRDD<String> rowRDD = jsc.textFile("hdfs://Spark01:9000/zjs/daily.txt");
//      filter算子进行过滤
        JavaRDD<String> filterRDD = rowRDD.filter(new Function<String, Boolean>() {@Override
            public Boolean call(String log) throws Exception {String[] logSplit = log.split(":");
                String city = logSplit[3];
                String platform = logSplit[4];
                String version = logSplit[5];
                HashMap<String, List<String>> queryParamMap = queryParamMapBroadcast.value();
                List<String> cities = queryParamMap.get("city");
                if (!cities.contains(city) && cities.size() > 0) {return false;
                }List<String> platforms = queryParamMap.get("platform");
                if (!platforms.contains(platform)) {return false;
                }List<String> versions = queryParamMap.get("version");
                if (!versions.contains(version)) {return false;
                }return true;
            }});
//        过滤出来的原始日志,映射为(日期_搜索词,用户)格式
        JavaPairRDD<String, String> dateKeyWordUserRDD = filterRDD.mapToPair(new PairFunction<String, String, String>() {@Override
            public Tuple2<String, String> call(String log) throws Exception {String[] logSplit = log.split(":");
                String date = logSplit[0];
                String user = logSplit[1];
                String keyword = logSplit[2];
                return new Tuple2<String, String>(date + "_" + keyword, user);
            }});
//        进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)
        JavaPairRDD<String, Iterable<String>> dateKeywordUsersRDD = dateKeyWordUserRDD.groupByKey();
        List<Tuple2<String, Iterable<String>>> collect1 = dateKeywordUsersRDD.collect();
        for (Tuple2<String, Iterable<String>> tuple2 : collect1) {System.out.println("进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)" + tuple2._2);
            System.out.println(tuple2);
        }//        对每天每个搜索词的搜索用户  去重操作  获得前uv
        JavaPairRDD<String, Long> dateKeywordUvRDD = dateKeywordUsersRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, String, Long>() {@Override
                    public Tuple2<String, Long> call(Tuple2<String, Iterable<String>> dataKeywordUsers) throws Exception {String dateKeyword = dataKeywordUsers._1;
                        Iterator<String> users = dataKeywordUsers._2.iterator();
//                去重   并统计去重后的数量
                        List<String> distinctUsers = new ArrayList<String>();
                        while (users.hasNext()) {String user = users.next();
                            if (!distinctUsers.contains(user)) {distinctUsers.add(user);
                            }}
//              获取uv
                        long uv = distinctUsers.size();
//                日期_搜索词,用户个数
                        return new Tuple2<String, Long>(dateKeyword, uv);
                    }});
        List<Tuple2<String, Long>> collect2 = dateKeywordUvRDD.collect();
        for (Tuple2<String, Long> stringLongTuple2 : collect2) {System.out.println("对每天每个搜索词的搜索用户  去重操作  获得前uv");
            System.out.println(stringLongTuple2);
        }//        将每天每个搜索词的uv数据,转换成DataFrame
        JavaRDD<Row> dateKeywordUvRowRDD = dateKeywordUvRDD.map(new Function<Tuple2<String, Long>, Row>() {@Override
            public Row call(Tuple2<String, Long> dateKeywordUv) throws Exception {String date = dateKeywordUv._1.split("_")[0];
                String keyword = dateKeywordUv._1.split("_")[1];
                long uv = dateKeywordUv._2;
                return RowFactory.create(date, keyword, uv);
            }});
        ArrayList<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("date", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("keyword", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("uv", DataTypes.LongType, true));
        StructType structType = DataTypes.createStructType(fields);
        DataFrame dateKeywordUvDF = sqlContext.createDataFrame(dateKeywordUvRowRDD, structType);
        dateKeywordUvDF.registerTempTable("sales");
//        使用开窗函数,统计每天搜索uv排名前三的热点搜索词
//        日期  搜索词   人数个数  前三名
        final DataFrame dailyTop3KeyWordDF = sqlContext.sql("select date,keyword,uv from (select date, keyword, uv, row_number() over (partition by date order by uv DESC ) rank from sales ) tmp_sales where rank <=3");
//        DataFrame转换为RDD, 映射,
        JavaRDD<Row> dailyTop3KeyWordRDD = dailyTop3KeyWordDF.javaRDD();

        JavaPairRDD<String, String> dailyTop3KeywordRDD = dailyTop3KeyWordRDD.mapToPair(new PairFunction<Row, String, String>() {@Override
            public Tuple2<String, String> call(Row row) throws Exception {String date = String.valueOf(row.get(0));
                String keyword = String.valueOf(row.get(1));
                String uv = String.valueOf(row.get(2));
//                映射为  日期  搜索词_总个数
                return new Tuple2<String, String>(date, keyword + "_" + uv);
            }});

        List<Tuple2<String, String>> collect = dailyTop3KeywordRDD.collect();
        for (Tuple2<String, String> stringStringTuple2 : collect) {System.out.println("开窗函数操作");
            System.out.println(stringStringTuple2);
        }//          根据 日期分组
        JavaPairRDD<String, Iterable<String>> top3DateKeywordsRDD = dailyTop3KeywordRDD.groupByKey();
//        进行映射
        JavaPairRDD<Long, String> uvDateKeywordsRDD = top3DateKeywordsRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<String>>, Long, String>() {@Override
            public Tuple2<Long, String> call(Tuple2<String, Iterable<String>> tuple) throws Exception {String date = tuple._1;
//                搜索词_总个数  集合
                Iterator<String> KeyWordUviterator = tuple._2.iterator();
                long totalUv = 0L;
                String dateKeyword = date;
                while (KeyWordUviterator.hasNext()) {
//                    搜索词_个数
                    String keywoarUv = KeyWordUviterator.next();
                    Long uv = Long.valueOf(keywoarUv.split("_")[1]);
                    totalUv += uv;
                    dateKeyword = dateKeyword + "," + keywoarUv;
                }return new Tuple2<Long, String>(totalUv, dateKeyword);
            }});
        JavaPairRDD<Long, String> sortedUvDateKeywordsRDD = uvDateKeywordsRDD.sortByKey(false);
        List<Tuple2<Long, String>> rows = sortedUvDateKeywordsRDD.collect();
        for (Tuple2<Long, String> row : rows) {System.out.println(row._2 + "    " + row._1);
        }//        映射
        JavaRDD<Row> resultRDD = sortedUvDateKeywordsRDD.flatMap(new FlatMapFunction<Tuple2<Long, String>, Row>() {@Override
            public Iterable<Row> call(Tuple2<Long, String> tuple) throws Exception {String dateKeywords = tuple._2;
                String[] dateKeywordsSplit = dateKeywords.split(",");
                String date = dateKeywordsSplit[0];
                ArrayList<Row> rows = new ArrayList<Row>();
                rows.add(RowFactory.create(date, dateKeywordsSplit[1].split("_")[0],
                        Long.valueOf(dateKeywordsSplit[1].split("_")[1])));

                rows.add(RowFactory.create(date, dateKeywordsSplit[2].split("_")[0],
                        Long.valueOf(dateKeywordsSplit[2].split("_")[1])));

                rows.add(RowFactory.create(date, dateKeywordsSplit[3].split("_")[0],
                        Long.valueOf(dateKeywordsSplit[3].split("_")[1])));

                return rows;
            }});
        DataFrame finalDF = sqlContext.createDataFrame(resultRDD, structType);
        List<Row> rows1 = finalDF.javaRDD().collect();
        for (Row row : rows1) {System.out.println(row);
        }jsc.stop();

    }
}

注意点:

1、如果文本案例使用的是txt编辑,将文本保存ANSI格式,否则在groupByKey的时候,第一行默认会出现一个空格,分组失败。最开始使用的是UTF-8格式

2、文本的最后禁止出现空行,否则在split的时候会报错,出现数组越界的错误。

3、使用到窗口函数的时候,必须使用到HiveContext方法,HiveContext使用到的是SparkContext,使用使用jsc.sc()

运行结果:

进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo]
(2018-10-2_meat,[leo])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1, leo2, jack]
(2018-10-2_water,[leo, leo1, leo2, jack])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1]
(2018-10-1_food,[leo, leo1])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo1, leo2, leo3]
(2018-10-2_seafood,[leo1, leo2, leo3])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1, leo2, jack, jack1]
(2018-10-1_water,[leo, leo1, leo2, jack, jack1])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo2]
(2018-10-1_meat,[leo2])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo, leo1, leo2]
(2018-10-1_seafood,[leo, leo1, leo2])
进行分组,获取每天每个搜索词,有哪些用户搜索了(没有去重)[leo1, leo2]

(2018-10-2_food,[leo1, leo2])

对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-2_meat,1)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-2_water,4)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-1_food,2)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-2_seafood,3)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-1_water,5)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-1_meat,1)
对每天每个搜索词的搜索用户  去重操作  获得前uv
(2018-10-1_seafood,3)
对每天每个搜索词的搜索用户  去重操作  获得前uv

(2018-10-2_food,2)

窗函数操作
(2018-10-1,water_5)
开窗函数操作
(2018-10-1,seafood_3)
开窗函数操作
(2018-10-1,food_2)
开窗函数操作
(2018-10-2,water_4)
开窗函数操作
(2018-10-2,seafood_3)
开窗函数操作

(2018-10-2,food_2)

最终结果

[2018-10-1,water,5]
[2018-10-1,seafood,3]

[2018-10-1,food,2]


[2018-10-2,water,4]
[2018-10-2,seafood,3]
[2018-10-2,food,2]










本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/326034.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2019蓝桥杯省赛---java---A---4(迷宫)

题目描述 【问题描述】 下图给出了一个迷宫的平面图&#xff0c;其中标记为 1 的为障碍&#xff0c;标记为 0 的为可以通行的地方。 010000 000100 001001 110000 迷宫的入口为左上角&#xff0c;出口为右下角&#xff0c;在迷宫中&#xff0c;只能从一个位置走到这个它的上、…

面试官最爱问的并发问题

转载自 面试官最爱问的并发问题 在Java相关的岗位面试中&#xff0c;很多面试官都喜欢考察面试者对Java并发的了解程度&#xff0c;而以volatile关键字作为一个小的切入点&#xff0c;往往可以一问到底&#xff0c;把Java内存模型&#xff08;JMM&#xff09;&#xff0c;Jav…

西瓜显示服务器错误,西瓜云服务器

西瓜云服务器 内容精选换一换云服务器组是对云服务器的一种逻辑划分&#xff0c;云服务器组中的弹性云服务器遵从同一策略。当前仅支持反亲和性&#xff0c;即同一云服务器组中的弹性云服务器分散地创建在不同的主机上&#xff0c;提高业务的可靠性。您可以使用云服务器组将业务…

第四章使用jQuery操作DOM元素

一、DOM的分类&#xff1a; 1.DOM core 2.HTML-DOM 3.CSS-DOM 二、css操作 语法&#xff1a; $("#div1").css(“color”,“red”); $("#div1").css({“color”:“red”,“margin”:“5px”}); //添加样式 $("#div1").addClass(“div1_style”); …

30分钟掌握 C#6

1. 只读自动属性&#xff08;Read-only auto-properties&#xff09; C# 6之前我们构建只读自动属性&#xff1a; public string FirstName { get; private set; } public string LastName { get; private set; } 原理解析&#xff1a;就是编译器在生成set访问器时&#xff0c…

2019蓝桥杯省赛---java---A---6(完全二叉树的权值)

题目描述 思路分析 等差数列 代码实现 package lanqiao;import java.util.*; public class Main {public static void main(final String[] args) {Scanner sc new Scanner(System.in);int nsc.nextInt();int[]numsnew int[n1];long maxSum0;int minDeepInteger.MAX_VALUE;…

如何让面试官认可你的简历

转载自 如何让面试官认可你的简历 在之前的博客里写了很多面试技巧&#xff0c;这是有个前提&#xff1a; 至少候选人被面试了&#xff01; 在这个前提下&#xff0c;候选人哪怕失败了&#xff0c;至少也能用实战来检验和校对面试准备的结果&#xff0c;用句比较时髦的话来…

Linux中安装nc(netcat)常见问题

Linux中安装nc&#xff08;netcat&#xff09;常见问题 Spark Streaming编写wordCount程序时&#xff0c;在Linux集群中需要安装nc&#xff0c;来对程序中使用到的端口进行开放。在安装nc的时候&#xff0c;常出现的问题有以下几点!1、不建议使用 yum staill nc 进行安装&…

服务器c盘有个inetpub文件夹,老司机为你示范win10系统C盘inetpub文件夹删不掉的图文教程...

也许大家在用电脑工作的使用中会遇到win10系统C盘inetpub文件夹删不掉的情况&#xff0c;目前就有很多朋友向我资讯关于win10系统C盘inetpub文件夹删不掉的具体处理步骤。小编把总结的关于win10系统C盘inetpub文件夹删不掉的方法了教给大家&#xff0c;只用你按照  1、右键点…

体验VS2017的Live Unit Testing

相对于传统的Unit Test,VS2017 带来了一个新的功能,叫Live Unit Testing,从字面意思理解就是实时单元测试,在实际的使用中,这个功能就是可以在编写代码的时候进行实时的background的单元测试. 在体验之前,有几点注意事项是需要了解的: 1.目前 live unit tesing仅仅支持 C#和V…

sqlserver复习总结

--已知有一个表&#xff1a;该表的字段有&#xff1a;id,name,date,gradeid,email --1.插入一条记录&#xff1a; insert into table_name values (1,刘世豪,2017-10-21,1,666qq.com) --2.修改 update table_name set name 张云飞 where id 1 --3.删除 delete from table_name…

Map的两种遍历方法

COPY/*** HashMap的使用* 存储结构&#xff1a;哈希表&#xff08;数组链表红黑树&#xff09;*/public class Demo2 {public static void main(String[] args) {HashMap<Student, String> hashMapnew HashMap<Student, String>();Student s1new Student("tan…

服务器windows系统如何登陆,如何登陆windows云服务器

如何登陆windows云服务器 内容精选换一换远程登录的账号和密码是多少&#xff1f;登录云服务器的用户名和密码&#xff1a;Windows操作系统用户名&#xff1a;AdministratorLinux操作系统用户名&#xff1a;root登录云服务器的用户名和密码&#xff1a;Windows操作系统用户名&a…

Java集合面试问题

转载自 Java集合面试问题 Java集合框架为Java编程语言的基础&#xff0c;也是Java面试中很重要的一个知识点。在本文中&#xff0c;列出了一些关于Java集合的重要问题和答案。 1.Java集合框架是什么&#xff1f;说出一些集合框架的优点&#xff1f; 每种编程语言中都有集合…

深入.net平台的分层开发

一、搭建三层的步骤&#xff1a; 1.创建一个窗体应用程序&#xff1a;新建–》新建项目–》选择窗体应用程序–》给项目命名–》选择存放的路径。 2.创建数据访问层&#xff08;DLL层&#xff09;&#xff1a;文件–》添加–》新建项目–》选择类库–》起个名字&#xff08;项目…

2018蓝桥杯省赛---java---B---8(日志统计)

题目描述 小明维护着一个程序员论坛。现在他收集了一份"点赞"日志&#xff0c;日志共有N行。其中每一行的格式是&#xff1a; ts id 表示在ts时刻编号id的帖子收到一个"赞"。 现在小明想统计有哪些帖子曾经是"热帖"。如果一个帖子曾在任意一个长…

法兰克服务器电源维修,发那科FANUC系统控制电源简介

摘要发那科系统电源 推荐的外部24VDC电源(稳压电源)(电源电压必须满足UL1950的要求)。该电源不能与机床强电柜内I/O点或伺服电动机的抱闸共用。如果在控制单元的电源关掉后断开电池&#xff0c;则控制单元中存储的绝对脉冲编码器的当前位置&#xff0c;系统参数&#xff0c;程序…

这些代码优化的方法,你都用过吗

转载自 这些代码优化的方法&#xff0c;你都用过吗 代码优化的最重要的作用应该是&#xff1a; 避免未知的错误 在代码上线运行的过程中&#xff0c;往往会出现很多我们意想不到的错误&#xff0c;因为线上环境和开发环境是非常不同的&#xff0c;错误定位到最后往往是一个…

myeclipse如何换一个漂亮的主题

熟悉hbuilder的童鞋们都知道&#xff0c;在开发的时候主题有好几种可以随意切换&#xff0c;但是在开发java时用到的myeclipse的主题是否可以随意切换呢&#xff1f;如果可以在哪里切换&#xff0c;今天我们就来看看如何修改myeclipse的主题&#xff01; 一、首先我们要切换个漂…

基于C#.NET的高端智能化网络爬虫

前两天朋友发给我了一篇文章&#xff0c;是携程网反爬虫组的技术经理写的&#xff0c;大概讲的是如何用他的超高智商通过&#xff08;挑衅、怜悯、嘲讽、猥琐&#xff09;的方式来完美碾压爬虫开发者。今天我就先带大家开发一个最简单低端的爬虫&#xff0c;突破携程网超高智商…