用户画像之Spark ML实现

                          用户画像之Spark ML实现

1 Spark ML简单介绍

Spark ML是面向DataFrame编程的。Spark的核心开发是基于RDD(弹性分布式数据集),但是RDD,但是RDD的处理并不是非常灵活,如果要做一些结构化的处理,将RDD转换成DataFrame,DataFrame实际上就是行对象的RDD+schema,类似于原本的文本数据,加上schema,做一下结构的转换就变成数据库里面的表,表是有元数据的,有字段有类型。所以DataFrame处理起来更加灵活。

要进行机器学习是有一系列的流程,通常离线的处理现有一组数据集,然后进行预处理特征工程,完成之后分成训练集合测试集,基于训练集训练模型,然后选择算法,进行评估..这是可以形成一个管道的,整体是一个DAG有向无环图。

其实整个进行模型算法训练的过程就是一个管道,管道中就会有各种各样的组件,这些组件总体来说可以分成两类,①第一个是Transformers:transform()用于转换,把一个DataFrame转换为另一个DataFrame,如把原本的数据集拆分成测试集,那就是DataFrame的转换,像分词,抽样,模型的测试都是非常常见的转换操作,②第二种类型就是Estimators:fit()应用在DF上生成一个转换器算法,Estimators评估器,用到的函数是fit(),Estimators是为了生成一个转换器,在机器学习中会用到一些算法,需要去建模,根据训练集得到模型,模型本质上就是转换器,进行预测是用的这个模型进行预测,所以转换是基于这个模型进行预测,所以转换就是基于这个模型的转换器转换时他的实例来进行转换。

2 Spark ML的工作流程

首先进行预处理,包括模型训练的整个过程是一个管道pipline,这个pipline的目的是为了得到一个Estimator,即得到一个模型,假如说用逻辑回归,输入的数据是普通的文本,首先进行Toknizer分词,分完次后计算他的词频,这两个本质上否是transform的操作,接下来就要创建一个逻辑回归的实例,本质上就是一个Estimator,得到一个转换器。

模型有了接下来就要做预测,不管是训练集还是测试集,都是要进行分词,计算词频的,这个piplineModel整个都是transform操作,这个模型逻辑回归就是上一步通过训练的到的模型。

参数是所有转换器和评估器共享的一个公共api,参数名Param是一个参数,可以通过setter单独定义;也可以通过ParamMap定义一个参数的集合(parameter,value),传递参数的两种方式:①通过setter为实例设置参数②传递ParamMap给fit或者transform方法

3 Estimator,Transformer,Param使用案例

(1)准备带标签和特征的数据

(2)创建逻辑回归的评估器

(3)使用setter方法设置参数

(4)使用存储在lr中的参数来训练一个模型

(5)使用ParamMap选择指定的参数

(6)准备测试数据

(7)预测结果

代码具体实现

(1)准备带标签和特征的数据

任何应用首先要把需要的类通过import引入,性别预测是分类问题,选择逻辑回归

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.linalg.{Vector,Vectors}
import org.apache.spark.sql.Row

定义一个初始的DataFrame,通过sqlContext创建,用Seq序列的方式创建一个集合,第一个参数是标签即目标值,后面的为特征,

val sqlContext=new org.apache.spark.sql.SQLContext(sc)
val training = sqlContext.createDataFrame(Seq((1.0, Vectors.dense(1.0,2.1,1.1)),(0.0, Vectors.dense(3.0,2.0,-2.0)),(0.0, Vectors.dense(3.0,0.3,1.0)),(1.0, Vectors.dense(1.0,1.2,-1.5))
)).toDF("label","features")

(2)创建逻辑回归的评估器,设置参数

val lr = new LogisticRegression()
//评估器会带一些默认的参数,通过explainParams()查看
println(lr.explainParams())
//通过set方式修改迭代次数和正则化参数
lr.setMaxIter(10).setRegParam(0.01)//定义模型,
val model1 = lr.fit(training)
//查看模型的参数
model1.parent.extractParamMap//通过ParamMap设置参数
val paramMap = ParamMap(lr.maxIter -> 20).
put(lr.maxIter,30).
put(lr.regParam -> 0.1, lr.threshold -> 0.55)val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")
//将两个ParamMap对象合并
val paramMapCombined = paramMap ++ paramMap2//根据ParamMap设置的参数定义模型,
val model2 = lr.fit(training, paramMapCombined)
model2.parent.extractParamMap

(3)准备测试数据

val test = sqlContext.createDataFrame(Seq((1.0, Vectors.dense(-1.2,1.8,1.3)),(0.0, Vectors.dense(4.0,1.8,-0.1)),(1.0, Vectors.dense(0.0,1.9,-1.5))
)).toDF("label","features")

(4)预测结果

//调用模型1
model1.transform(test).select("label","features","probability","prediction").collect().foreach{case Row(label: Double, features: Vector, probability: Vector, prediction: Double) => println(s"($features, $label) -> probability=$probability, prediction=$prediction")}

4 构建Pipline和保存Pipline

步骤:

(1)准备训练的文档

(2)配置ML管道,包含三个stage:Tokenizer,HashingTF和LR

(3)安装管道到数据上

(4)保存管道到磁盘,包括安装好的和未安装好的

(5)加载管道

(6)准备测试文档

(7)预测结果

代码实现:

(1)引入需要的类

//用的数逻辑回归
import org.apache.spark.ml.classification.LogisticRegression
//因为特征工程处理的是特征向量,所以需要Vector,输入输出会用到
import org.apache.spark.ml.linalg.Vector
//行对象,为了输出美化
import org.apache.spark.sql.Row
//需要分词需要Tokenizer,需要转换计算词频需要HashingTF
import org.apache.spark.ml.feature.{Tokenizer,HashingTF}
//需要Pipeline将多个Transformers和Estimators连接起来以确定一个ML工作流程
import org.apache.spark.ml.{Pipeline,PipelineModel}

(2)准备数据集

//含Sprak的为一类
val training = sqlContext.createDataFrame(Seq((0L, "Spark Write applications quickly in Java, Scala, Python, R, and SQL.", 1.0),(1L, "Live and learn", 0.0),(2L, "Spark Run workloads 100x faster.", 1.0),(3L, "study hard and make progress every day", 0.0)
)).toDF("id","text","label")

(3)定义管道中的Tokenizer,HashingTF,LR这三个组件

//创建tokenizer分词器
//setInputCol指明输入DataFrame中的哪一列是被处理的,输入参数是Dataframe中存在的列名
//setOutputCol设置新增加列的名字,及对输入的列变换后会产生一个新列,该方法设置增加新列的列名
val tokenizer = new Tokenizer().
setInputCol("text").
setOutputCol("words")//创建hashingTF词频统计,他的inputcolumn是tokenizerget出来的
//setNumFeatures设置特征值的数量
val hashingTF = new HashingTF().
setNumFeatures(1000).
setInputCol(tokenizer.getOutputCol).
setOutputCol("features")//创建逻辑回归对象,setMaxIter设置逻辑回归的迭代次数,setRegParam设置正则化
val lr = new LogisticRegression().
setMaxIter(10).setRegParam(0.01)

(4)定义管道

//创建管道,setStages将各个计算阶段按照tokenizer,hashingTF,lr顺序,pipeline是没有安装好的管道
val pipeline = new Pipeline().
setStages(Array(tokenizer,hashingTF,lr))//使用pipeline构建模型,model是安装好的管道
val model = pipeline.fit(training)

(5)保存管道到磁盘

pipeline.save("/portrait/sparkML-LRpipeline")
model.save("/portrait/sparkML-LRmodel")

(6)加载模型

//加载保存到磁盘中模型
val model2 = PipelineModel.load("/portrait/sparkML-LRmodel")

(7)准备测试文档,通过回归预测,没有测试集

val test = sqlContext.createDataFrame(Seq((4L, "learn Spark"),(5L, "hadoop hive"),(6L, "bigdata hdfs a"),(7L, "apache Spark")
)).toDF("id","text")

(8)预测结果

model.transform(test).select("id","text","probability","prediction").collect().foreach{case Row(id: Long, text: String, probability: Vector, prediction: Double) => println(s"($id, $text) -> probability=$probability, prediction=$prediction")}

5 通过网格参数和交叉验证进行模型调优

所谓的调优就是怎样根据数据选择好的模型,或者为整个模型整个管道选择好的参数,这里是关注参数的调优,模型就选择逻辑回归。参数调优就是给一组参数而不是一个参数,让模型自己选择。调优是基于管道整体进行调优。

(1)准备训练的文档

(2)配置ML管道,包含三个stage:Tokenizer,HashingTF和LR

(3)使用ParamGridBuilder构建一个参数网格

(4)使用CrossValidator来选择模型和参数,CrossValidator需要一个estimator,一个评估器参数集合,和一个evaluator

(5)运行交叉验证,选择最好的参数集

(6)准备测试数据

(7)预测结果

代码实现过程:

(1)引入需要的包

//用的数逻辑回归
import org.apache.spark.ml.classification.LogisticRegression
//因为特征工程处理的是特征向量,所以需要Vector,输入输出会用到
import org.apache.spark.ml.linalg.Vector
//行对象,为了输出美化
import org.apache.spark.sql.Row
//需要分词需要Tokenizer,需要转换计算词频需要HashingTF
import org.apache.spark.ml.feature.{Tokenizer,HashingTF}
//需要Pipeline将多个Transformers和Estimators连接起来以确定一个ML工作流程
import org.apache.spark.ml.{Pipeline,PipelineModel}
//因为是二元的,所以用BinaryClassificationEvaluator评估器
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
//使用交叉校验CrossValidator,把所有参数排列组合,交叉进行校验。ParamGridBuilder参数网格
import org.apache.spark.ml.tuning.{CrossValidator,ParamGridBuilder}
//需要引入SQLContext
import org.apache.spark.sql.SQLContext

(2)准备数据

val sqlContext=new SQLContext(sc)
val training = sqlContext.createDataFrame(Seq((0L, "Spark Write applications quickly in Java, Scala, Python, R, and SQL.", 1.0),(1L, "Live and learn", 0.0),(2L, "Spark Run workloads 100x faster.", 1.0),(3L, "study hard and make progress every day", 0.0),(4L, "Rdd Spark who", 1.0),(5L, "good good study", 0.0),(6L, "Spark faster", 1.0),(7L, "day day up", 0.0),(8L, "Spark program", 1.0),(9L, "hello world", 0.0),(10L, "hello Spark", 1.0),(11L, "hi how are you", 0.0)
)).toDF("id","text","label")

(3)构建管道

//创建tokenizer分词器
//setInputCol指明输入DataFrame中的哪一列是被处理的,输入参数是Dataframe中存在的列名
//setOutputCol设置新增加列的名字,及对输入的列变换后会产生一个新列,该方法设置增加新列的列名
val tokenizer = new Tokenizer().
setInputCol("text").
setOutputCol("words")
//创建hashingTF词频统计,他的inputcolumn是tokenizerget出来的
//特征值的数量网格调优
val hashingTF = new HashingTF().
setInputCol(tokenizer.getOutputCol).
setOutputCol("features")
//创建逻辑回归对象,setMaxIter设置,正则化参数网格调优
val lr = new LogisticRegression().
setMaxIter(10)
//创建管道,setStages将各个计算阶段按照tokenizer,hashingTF,lr顺序,pipeline是没有安装好的管道
val pipeline = new Pipeline().
setStages(Array(tokenizer,hashingTF,lr))

(4)构建网格参数

//构建网格参数,addGrid添加网格,hashingTF.numFeatures设置hashingTF特征数量,
val paramGrid = new ParamGridBuilder().
addGrid(hashingTF.numFeatures, Array(10,100,1000)).
addGrid(lr.regParam, Array(0.1,0.01)).
build()

(5)创建交叉验证CrossValidator对象

//创建CrossValidator交叉验证对象,setEstimator设置评估器,setEstimatorParamMaps设置参数集,setEvaluator设置评估器,setNumFolds创建交叉验证器,他会把训练集分成NumFolds份,实际生产要比2大
val cv = new CrossValidator().
setEstimator(pipeline).
setEstimatorParamMaps(paramGrid).
setEvaluator(new BinaryClassificationEvaluator()).
setNumFolds(2)

(6)根据最优参数构建模型

//构借助参数网格,交叉验证,选择最优的参数构建模型
val cvModel = cv.fit(training)

(7)添加测试数据

//添加测试集
val test = sqlContext.createDataFrame(Seq((12L, "learn Spark"),(13L, "hadoop hive"),(14L, "bigdata hdfs a"),(15L, "apache Spark")
)).toDF("id","text")

(8)预测结果

cvModel.transform(test).select("id","text","probability","prediction").collect().foreach{case Row(id: Long, text: String, probability: Vector, prediction: Double) => println(s"($id, $text) -> probability=$probability, prediction=$prediction")}

 

6 通过训练校验分类来调优模型

前面交叉验证是把数据分成多份,每一份把所有参数组合计算一次。而校验分类只需要把每一组参数计算一次,把数据自动分成训练集合校验集,这种方式依赖于比较大的数据量,如果数量不够生成的结果是不可信任的。不像校验验证数据集小没关系会交叉验证多次,所以即使数据量少但是计算多次,多次的结果足够评估选出最优的参数。所以TrainValidationSplit需要的数据量就要大,只会计算一次。这个例子采用线性回归。

与CrossValidator不同,TrainValidationSplit创建一个(训练,测试)数据集对。 它使用trainRatio参数将数据集分成这两个部分。 例如,trainRatio = 0.75,TrainValidationSplit将生成训练和测试数据集对,其中75%的数据用于训练,25%用于验证。

步骤:

(1)准备训练和测试数据

(2)使用ParamGridBuilder构建一个参数网格

(3)使用TrainValidationSplit来选择模型和参数,CrossValidator需要一个estimator,一个评估器参数集合,和一个evaluator

(4)运行校验分类选择最好的参数

(5)在测试数据上做测试,模型是参数组合中执行最好的一个


//使用线性回归求解
import org.apache.spark.ml.regression.LinearRegression
因为是回归问题,所以用RegressionEvaluator回归评估器
import org.apache.spark.ml.evaluation.RegressionEvaluator
//使用ParamGridBuilder参数网格和,TrainValidationSplit
import org.apache.spark.ml.tuning.{TrainValidationSplit,ParamGridBuilder}
//需要引入SQLContext
import org.apache.spark.sql.SQLContextval = sqlContext = new SQLContext(sc)
val data = sqlContext.read.format("libsvm").load("file:/data/sample_linear_regression_data.txt")//randomSplits随机拆分,seed随机种子
val Array(training, test) = data.randomSplit(Array(0.75, 0.25), seed=12345)//创建线性回归
val lr = new LinearRegression()//elasticNetParam是Elastic net (回归)参数,取值介于0和1之间。
//fitIntercept是否允许阶段,默认是true。regParam参数定义规范化项的权重
val paramGrid = new ParamGridBuilder().
addGrid(lr.elasticNetParam, Array(0.0,0.5,1.0)).
addGrid(lr.fitIntercept).
addGrid(lr.regParam, Array(0.1,0.01)).
build()//训练校验的比例setTrainRatio
val trainValidationSplit = new TrainValidationSplit().
setEstimator(lr).
setEstimatorParamMaps(paramGrid).
setEvaluator(new RegressionEvaluator).
setTrainRatio(0.8)val model = trainValidationSplit.fit(training)model.transform(test).select("features","label","prediction").show()

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473772.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Kaggle] Digit Recognizer 手写数字识别(神经网络)

文章目录1. baseline2. 改进2.1 增加训练时间2.2 更改网络结构Digit Recognizer 练习地址 相关博文: [Hands On ML] 3. 分类(MNIST手写数字预测) [Kaggle] Digit Recognizer 手写数字识别 1. baseline 导入包 import tensorflow as tf fr…

逻辑回归原理

逻辑回归原理 1 逻辑回归简介 logistic回归(LR),是一种广义的线性回归分析模型,常用于数据挖掘,疾病预测,经济预测等方面。 优点:计算代价低,思路清晰易于理解和实现;…

LeetCode 956. 最高的广告牌(DP)

文章目录1. 题目2. 解题1. 题目 你正在安装一个广告牌,并希望它高度最大。 这块广告牌将有两个钢制支架,两边各一个。每个钢支架的高度必须相等。 你有一堆可以焊接在一起的钢筋 rods。 举个例子,如果钢筋的长度为 1、2 和 3,则…

Tensorflow线程队列与IO操作

目录 Tensorflow线程队列与IO操作 1 线程和队列 1.1 前言 1.2 队列 1.3 队列管理器 1.4 线程协调器 2 文件读取 2.1 流程 2.2 文件读取API: 3 图像读取 3.1 图像读取基本知识 3.2 图像基本操作 3.3 图像读取API 3.4 图片批处理流程 3.5 读取图片案例 …

LeetCode 1298. 你能从盒子里获得的最大糖果数(BFS)

文章目录1. 题目2. 解题1. 题目 给你 n 个盒子,每个盒子的格式为 [status, candies, keys, containedBoxes] ,其中: - 状态字 status[i]:整数,如果 box[i] 是开的,那么是 1 ,否则是 0 。 - 糖…

给javascript初学者的24条最佳实践

1.使用 代替 JavaScript 使用2种不同的等值运算符:|! 和 |!,在比较操作中使用前者是最佳实践。 “如果两边的操作数具有相同的类型和值,返回true,!返回false。”——JavaScript:语言精粹 然而,当使用和&a…

LeetCode 1614. 括号的最大嵌套深度

文章目录1. 题目2. 解题1. 题目 如果字符串满足一下条件之一,则可以称之为 有效括号字符串(valid parentheses string,可以简写为 VPS): 字符串是一个空字符串 "",或者是一个不为 "("…

[AngularJS]Chapter 1 AnjularJS简介

创建一个完美的Web应用程序是很令人激动的,但是构建这样应用的复杂度也是不可思议的。我们Angular团队的目标就是去减轻构建这样AJAX应用的复杂度。在谷歌我们经历过各种复杂的应用创建工作比如:GMail、Map和日历。我们认为我们有必要把这些经验总结下来…

LeetCode 1615. 最大网络秩(出入度)

文章目录1. 题目2. 解题1. 题目 n 座城市和一些连接这些城市的道路 roads 共同组成一个基础设施网络。 每个 roads[i] [ai, bi] 都表示在城市 ai 和 bi 之间有一条双向道路。 两座不同城市构成的 城市对 的 网络秩 定义为:与这两座城市 直接 相连的道路总数。如果…

使用JSLint提高JS代码质量

随着富 Web 前端应用的出现,开发人员不得不重新审视并重视 JavaScript 语言的能力和使用,抛弃过去那种只靠“复制 / 粘贴”常用脚本完成简单前端任务的模式。JavaScript 语言本身是一种弱类型脚本语言,具有相对于 C 或 Java 语言更为松散的限…

Django工具:Git简介与基本操作

1.Git简介: 1.Git是目前世界上最先进的分布式版本控制系统 网址:http://github.com 2.总结git的两大特点: 版本控制:可以解决多人同时开发的代码问题,也可以解决找回历史代码的问题 分布式:Git是分布式…

LeetCode 1616. 分割两个字符串得到回文串

文章目录1. 题目2. 解题1. 题目 给你两个字符串 a 和 b ,它们长度相同。 请你选择一个下标,将两个字符串都在 相同的下标 分割开。 由 a 可以得到两个字符串: aprefix 和 asuffix ,满足 a aprefix asuffix ,同理&am…

Kafka基础

Kafka基础 1 消息队列 1.1 什么是消息队列 消息队列(MQ):消息队列,保存消息的队列。消息的传输过程中的容器;主要提供生产、消费接口供外部调用做数据的存储和获取。 1.2 为什么要有消息队列 当网站面对教大的流量…

系统总结学习 Python 的 14 张思维导图

本文主要涵盖了 Python 编程的核心知识(暂不包括标准库及第三方库)。 首先,按顺序依次展示了以下内容的一系列思维导图:基础知识,数据类型(数字,字符串,列表,元组&#x…

LeetCode 1617. 统计子树中城市之间最大距离(枚举所有可能+图的最大直径)

文章目录1. 题目2. 解题1. 题目 给你 n 个城市,编号为从 1 到 n 。同时给你一个大小为 n-1 的数组 edges ,其中 edges[i] [ui, vi] 表示城市 ui 和 vi 之间有一条双向边。 题目保证任意城市之间只有唯一的一条路径。换句话说,所有城市形成了…

MYSQL电脑客户端免安装教程以及出现问题解决方案

准备工作:window 7 64位旗舰版 MySQL 5.6.35免安装。 1. 下载MySQL 1.1 进入MySQL官网下载(https://www.mysql.com/)MySQL的安装包。 1.2. 根据自己电脑的位数(32位/64位)来下载响应的MySQL 、 2. 部署MySQL 2.1 解压压缩包到自己的某个盘…

[Kaggle] Digit Recognizer 手写数字识别(卷积神经网络)

文章目录1. 使用 LeNet 预测1.1 导入包1.2 建立 LeNet 模型1.3 读入数据1.4 定义模型1.5 训练1.6 绘制训练曲线1.7 预测提交2. 使用 VGG16 迁移学习2.1 导入包2.2 定义模型2.3 数据处理2.4 配置模型、训练2.5 预测提交Digit Recognizer 练习地址 相关博文: [Hands …

SparkCore基础

目录 Spark简介 1 什么是Spark 2 Spark特点 3 Spark分布式环境安装 3.1 Spark HA的环境安装 3.2 动态增删一个worker节点到集群 4 Spark核心概念 5 Spark案例 5.2 Master URL 5.3 spark日志的管理 5.4 WordCount案例程序的执行过程 6 Spark作业运行架构图&#xff…

LeetCode 1320. 二指输入的的最小距离(动态规划)

文章目录1. 题目2. 解题1. 题目 二指输入法定制键盘在 XY 平面上的布局如上图所示,其中每个大写英文字母都位于某个坐标处, 例如字母 A 位于坐标 (0,0),字母 B 位于坐标 (0,1),字母 P 位于坐标 (2,3) 且字母 Z 位于坐标 (4,1)。 …

SparkStreaming基础

目录 SparkStreaming基础 1 流式计算 1.1 常见的离线和流式计算框架 2 SparkStreaming简介 2.1 核心概念DStream 2.2 工作原理 2.3 Storm,SparkStreaming和Flink的对比 2.4 如何选择流式处理框架 3 SparkStreaming实时案例 3.1 StreamingContext和Receiver…