概览
本篇主要介绍数据来源、数据加载进数据库过程
1 数据获取
使用Scrapy爬取豆瓣电影数据,然后利用movielens数据集来造一份rating数据。
1.1 数据集获取
- 数据集获取:选取movielens 数据集:movielens官网
- 数据集包括:movies,ratings,tags文件
1.2 数据爬取
- 使用scrapy+xpath爬取豆瓣电影数据,最后存入csv,命名为movie.csv
- 对于爬取的数据进行预处理:包括字段选取、相关字符处理
1.3 数据转换
- 由于缺少rating数据,因此我们使用movielens的rating文件来造一下评分数据。
- movielens数据集movie文件一共2791条电影数据,因此我们直接截取爬取的
movie.csv
前2791条数据。 - 直接用movielens的电影ID替换爬取
movie.csv
的电影ID,因此最后我们得到的电影数据也就有了对应的评分数据。 - 最后,我们需要的只是两个文件:movie.csv,rating.csv
(1)Movie数据
数据表格式为:
mid,title,desc,minute,year,year,language,geners,actors,director
(2)Rating数据
userID,mid,score,timestamp
2 将数据加载进MongoDB数据库
我们选择MongoDB数据库的原因如下:
- 千万级别的文档对象,近10G的数据,对有索引的ID的查询不会比mysql慢,而对非索引字段的查询,则是全面胜出
- 可以进行深度查询
接下来,我们在云服务器部署MongoDB,主机远程连接数据库,将文件加载进数据库中。
2.1 MongoDB安装
- 安装教程:linux安装MongoDB
2.2 maven依赖
maven相关依赖版本如下
注意:Spark版本和Spark集群的版本需要一致
scala
:2.11.8
Spark
:2.3.0
<properties><scala.version>2.11.8</scala.version>
</properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.3.0</version></dependency>
</dependencies>
2.3 数据加载的程序
// 加载数据主程序
object DataLoader {val MONGODB_MOVIE_COLLECTION = "Movie"val MONGODB_RATING_COLLECTION = "Rating"val mgo_host = "root"val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://root:123456@服务器公网IP:27017/recommender","mongo.db" -> "recommender")// 文件位置val MOVIE_DATA_PATH = "F:\\1-project\\offline\\src\\main\\resources\\file\\movie.csv"val RATING_DATA_PATH = "F:\\1-project\\offline\\src\\main\\resources\\file\\ratings.csv"def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")val spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._val movieRDD = spark.sparkContext.textFile(MOVIE_DATA_PATH) // 加载数据// 转为dfval movieDF = movieRDD.map(item => {val attr = item.split(",")Movie(attr(0).toInt, attr(1).trim, attr(2).trim, attr(3).trim, attr(4).trim, attr(5).trim, attr(6).trim, attr(7).trim, attr(8).trim)}).toDF()val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)val ratingDF = ratingRDD.map(item => {val attr = item.split(",")Rating(attr(0).toInt,attr(1).toInt,attr(2).toDouble,attr(3).toInt)}).toDF()implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))// 将数据保存到MongoDBstoreDataInMongoDB(movieDF, ratingDF)spark.stop()}}
def storeDataInMongoDB(movieDF: DataFrame, ratingDF: DataFrame)(implicit mongoConfig: MongoConfig): Unit ={// 新建一个mongodb的连接val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))// 将DF数据写入对应的mongodb表中movieDF.write.option("uri", mongoConfig.uri).option("collection", MONGODB_MOVIE_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()ratingDF.write.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()//对数据表建索引mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1))mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient.close()}
2.4 查看数据
- 使用软件Mongo Management studio查看是否成功