文章目录
- 1. DenseVector、SparseVector
- 2. DenseMatrix
- 3. SparseMatrix
- 4. Vector 运算
- 5. 矩阵运算
- 6. RowMatrix
- 7. IndexedRowMatrix
- 8. CoordinateMatrix
- 9. BlockMatrix
- 完整代码
- pom.xml
学自:Spark机器学习实战
https://book.douban.com/subject/35280412/
环境:win 10 + java 1.8.0_281 + Scala 2.11.11 + Hadoop 2.7.7 + Spark2.4.7
1. DenseVector、SparseVector
// 通过数组来创建 DenseVectorval CustomerFeatures1: Array[Double] = Array(1, 3, 5, 7, 9, 1, 3, 2, 4, 5, 6, 1, 2, 5, 3, 7, 4, 3, 4, 1)val x = Vectors.dense(CustomerFeatures1)println(x) // [1.0,3.0,5.0,7.0,9.0,1.0,3.0,2.0,4.0,5.0,6.0,1.0,2.0,5.0,3.0,7.0,4.0,3.0,4.0,1.0]// 通过字符串转化为 doubleval y = Vectors.dense("24".toDouble, "8".toDouble, "001".toDouble)println(y) // [24.0,8.0,1.0]// 创建 SparseVector稀疏向量, 下面进行对比两种Vectorval denseVec1 = Vectors.dense(5, 0, 3, 0, 0, 0, 7, 8)println(denseVec1.size) // 8println(denseVec1.numActives) // 8println(denseVec1.numNonzeros) // 4println(denseVec1) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]val sparseVec1 = Vectors.sparse(8, Array(0, 2, 6, 7), Array(5, 3, 7, 8))println(sparseVec1.size) // 8println(sparseVec1.numActives) // 4println(sparseVec1.numNonzeros) // 4println(sparseVec1) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0]), size, idx, values// 稀疏向量仅存储 非零元素的 位置 和 值//相互转换val ConvertedDenseVector = sparseVec1.toDenseprintln(ConvertedDenseVector) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]val ConvertedSparseVector = denseVec1.toSparseprintln(ConvertedSparseVector) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0])// 注意 : DenseVector, SparseVector 都是本地的 Vector, 不是分布式的
2. DenseMatrix
// 本地的 DenseMatrixval MyArr1 = Array(10, 11, 20, 30.3, 24, 8)val denseMat1 = Matrices.dense(2, 3, MyArr1) // 行、 列、 valuesprintln(denseMat1)// 10.0 20.0 24.0// 11.0 30.3 8.0// 也可以使用 内联的方式val denseMat2 = Matrices.dense(3, 2, Array(1, 2, 3, 4, 5, 6))println(denseMat2)// 1.0 4.0// 2.0 5.0// 3.0 6.0// 使用 多个 vector 内联方式创建 Matrixval v1 = Vectors.dense(1, 2, 3, 4)val v2 = Vectors.dense(5, 6, 7, 8)val v3 = Vectors.dense(9, 10, 11, 12)val Mat1 = Matrices.dense(4, 3, v1.toArray ++ v2.toArray ++ v3.toArray)println(Mat1)// 1.0 5.0 9.0// 2.0 6.0 10.0// 3.0 7.0 11.0// 4.0 8.0 12.0// 记住 : Spark 内部使用 列优先 存储机制,性能更好
3. SparseMatrix
// 本地 SparseMatrixval sparseMat1 = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 1, 2), Array(11, 22, 33))// 参数顺序 : 行,列,每列的元素个数的前缀和(上面例子表示的两列各有1-0,3-1个元素)、行索引、实际值// 参考图片理解:https://www.cnblogs.com/zhangbojiangfeng/p/7456961.htmlprintln(sparseMat1.numRows) // 3println(sparseMat1.numCols) // 2println(sparseMat1.numActives) // 3println(sparseMat1.numNonzeros) // 3println(sparseMat1)// 3 x 2 CSCMatrix// (0,0) 11.0// (1,1) 22.0// (2,1) 33.0
4. Vector 运算
// Vector 运算, spark 2.0 缺少vector运算支持,需要先转成 breezeVectorval w1 = Vectors.dense(1,2,3)val w2 = Vectors.dense(4,-5,6)// 将 Spark 支持的 Vector 转换为 Breeze库所支持的Vector,可以使用丰富的库API操作val w3 = new BreezeVector(w1.toArray)val w4 = new BreezeVector(w2.toArray)println(w3+w4) // DenseVector(5.0, -3.0, 9.0)println(w3-w4) // DenseVector(-3.0, 7.0, -3.0)println(w3.dot(w4)) // 12.0val sv1 = Vectors.sparse(10, Array(0,2,9), Array(5,3,13))val sv2 = Vectors.dense(1,0,1,1,0,0,1,0,0,13)println(sv1)println(sv2)println(new BreezeVector(sv1.toArray).dot(new BreezeVector(sv2.toArray)))
5. 矩阵运算
// spark 支持 SparseMatrix 和 DenseMatrix 运算,不需要转成 Breeze 库中相应类型// 创建 Matrix, 矩阵和向量相乘val sparseMat2 = Matrices.sparse(3,3,Array(0,2,3,6), Array(0,2,1,0,1,2),Array(1,2,3,4,5,6))val denseFeatureVector = Vectors.dense(1,2,1)val ans0 = sparseMat2.multiply(denseFeatureVector)println(ans0) // 稀疏矩阵可以和稠密矩阵相互转换 [5.0,11.0,8.0]val denseVec3 = Vectors.dense(5,3,0)val denseMat3 = Matrices.dense(3,3,Array(1,0,0,0,1,0,0,0,1))println(denseMat3)println(denseVec3)println(denseMat3.multiply(denseVec3)) // [5.0,3.0,0.0]// 矩阵转置println(sparseMat2)// 3 x 3 CSCMatrix// (0,0) 1.0// (2,0) 2.0// (1,1) 3.0// (0,2) 4.0// (1,2) 5.0// (2,2) 6.0val transposeSparseMat2 = sparseMat2.transposeprintln(transposeSparseMat2)// 3 x 3 CSCMatrix// (0,0) 1.0// (2,0) 4.0// (1,1) 3.0// (2,1) 5.0// (0,2) 2.0// (2,2) 6.0// 矩阵相乘val dMat1 = new DenseMatrix(2,2,Array(1,3,2,4))val dMat2 = new DenseMatrix(2,2,Array(2,1,0,2))println(dMat1)// 1.0 2.0// 3.0 4.0println(dMat2)// 2.0 0.0// 1.0 2.0println(dMat1.multiply(dMat2))// 4.0 4.0// 10.0 8.0println(dMat2.multiply(dMat1))// 2.0 4.0// 7.0 10.0
6. RowMatrix
- 面向行的 Matrix,缺点是 没有行索引用来追踪,它是由本地 Vector 作为行组成的
// RowMatrixval dataVectors = Seq(Vectors.dense(0, 1, 0),Vectors.dense(3, 1, 5),Vectors.dense(0, 7, 0))val identityVectors = Seq(Vectors.dense(1, 0, 0),Vectors.dense(0, 1, 0),Vectors.dense(0, 0, 1))// 获取原始序列,转成 RDDval distMat3 = new RowMatrix(spark.sparkContext.parallelize(dataVectors))println(distMat3) // org.apache.spark.mllib.linalg.distributed.RowMatrix@352ed70dprintln(distMat3.computeColumnSummaryStatistics().count) // 3println(distMat3.computeColumnSummaryStatistics().mean) // 列的属性 均值 [1.0,3.0,1.6666666666666665]println(distMat3.computeColumnSummaryStatistics().variance) // 列的方差 [3.0,12.0,8.333333333333334]println(distMat3.computeCovariance()) // 协方差// 3.0 -3.0 5.0// -3.0 12.0 -4.999999999999999// 5.0 -4.999999999999999 8.333333333333334val dd = identityVectors.map(x => x.toArray).flatten.toArraydd.foreach(println)val dmIdentity = Matrices.dense(3,3,dd)println(dmIdentity) // 本地矩阵// 1.0 0.0 0.0// 0.0 1.0 0.0// 0.0 0.0 1.0// 分布式矩阵 * 本地矩阵 得到 一个新的分布式矩阵// 应用:通过乘以 一个细长、竖直 或者 狭长的矩阵,实现数据量的降低和结果的维度约减val distMat4 = distMat3.multiply(dmIdentity)println(distMat4) // org.apache.spark.mllib.linalg.distributed.RowMatrix@205df5dcprintln(distMat4.computeColumnSummaryStatistics().count) // 3println(distMat4.computeColumnSummaryStatistics().mean)println(distMat4.computeColumnSummaryStatistics().variance)println(distMat4.computeCovariance())
7. IndexedRowMatrix
- 可以携带 索引 和 数据行 RDD,可以随机访问,定位数据
// IndexedRowMatrixval distIdxMat1 = spark.sparkContext.parallelize(List(IndexedRow(0L, dataVectors.head),IndexedRow(1L, dataVectors(1)), IndexedRow(1L, dataVectors(2))))distIdxMat1.foreach(println)// IndexedRow(0,[0.0,1.0,0.0]) index , RDD row// IndexedRow(1,[3.0,1.0,5.0])// IndexedRow(1,[0.0,7.0,0.0])println("distinct elements=", distIdxMat1.distinct().count()) // 3
8. CoordinateMatrix
- 涉及大量 3D 坐标系统数据时,这个形式的矩阵非常有用
// CoordinateMatrixval CoordinateEntries = Seq(MatrixEntry(1, 6, 300.0), // 对应坐标,x,y类型必须 long, z doubleMatrixEntry(3, 1, 5),MatrixEntry(1, 7, 10))val distCordMat1 = new CoordinateMatrix(spark.sparkContext.parallelize(CoordinateEntries.toList))println("First Row (MarixEntry) =",distCordMat1.entries.first())// (First Row (MarixEntry) =,MatrixEntry(1,6,300.0))
9. BlockMatrix
// BlockMatrixval distBlkMat1 = distCordMat1.toBlockMatrix().cache()distBlkMat1.validate()// Validates the block matrix info against the matrix data (blocks)// and throws an exception if any error is found.println("Is block empty =", distBlkMat1.blocks.isEmpty())// (Is block empty =,false)
完整代码
package spark.ml.cookbook.chapter2import breeze.linalg.{DenseVector => BreezeVector}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, IndexedRow, MatrixEntry, RowMatrix}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Vectors}
import org.apache.spark.sql.SparkSessionobject MyVectorMatrix {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)Logger.getLogger("akka").setLevel(Level.ERROR)val spark = SparkSession.builder.master("local[*]").appName("MyVectorMatrix").config("spark.sql.warehouse.dir", ".").config("spark.io.compression.codec", "snappy").getOrCreate()// 通过数组来创建 DenseVectorval CustomerFeatures1: Array[Double] = Array(1, 3, 5, 7, 9, 1, 3, 2, 4, 5, 6, 1, 2, 5, 3, 7, 4, 3, 4, 1)val x = Vectors.dense(CustomerFeatures1)println(x) // [1.0,3.0,5.0,7.0,9.0,1.0,3.0,2.0,4.0,5.0,6.0,1.0,2.0,5.0,3.0,7.0,4.0,3.0,4.0,1.0]// 通过字符串转化为 doubleval y = Vectors.dense("24".toDouble, "8".toDouble, "001".toDouble)println(y) // [24.0,8.0,1.0]// 创建 SparseVector稀疏向量, 下面进行对比两种Vectorval denseVec1 = Vectors.dense(5, 0, 3, 0, 0, 0, 7, 8)println(denseVec1.size) // 8println(denseVec1.numActives) // 8println(denseVec1.numNonzeros) // 4println(denseVec1) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]val sparseVec1 = Vectors.sparse(8, Array(0, 2, 6, 7), Array(5, 3, 7, 8))println(sparseVec1.size) // 8println(sparseVec1.numActives) // 4println(sparseVec1.numNonzeros) // 4println(sparseVec1) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0]), size, idx, values// 稀疏向量仅存储 非零元素的 位置 和 值//相互转换val ConvertedDenseVector = sparseVec1.toDenseprintln(ConvertedDenseVector) // [5.0,0.0,3.0,0.0,0.0,0.0,7.0,8.0]val ConvertedSparseVector = denseVec1.toSparseprintln(ConvertedSparseVector) // (8,[0,2,6,7],[5.0,3.0,7.0,8.0])// 注意 : DenseVector, SparseVector 都是本地的 Vector, 不是分布式的// 本地的 DenseMatrixval MyArr1 = Array(10, 11, 20, 30.3, 24, 8)val denseMat1 = Matrices.dense(2, 3, MyArr1) // 行、 列、 valuesprintln(denseMat1)// 10.0 20.0 24.0// 11.0 30.3 8.0// 也可以使用 内联的方式val denseMat2 = Matrices.dense(3, 2, Array(1, 2, 3, 4, 5, 6))println(denseMat2)// 1.0 4.0// 2.0 5.0// 3.0 6.0// 使用 多个 vector 内联方式创建 Matrixval v1 = Vectors.dense(1, 2, 3, 4)val v2 = Vectors.dense(5, 6, 7, 8)val v3 = Vectors.dense(9, 10, 11, 12)val Mat1 = Matrices.dense(4, 3, v1.toArray ++ v2.toArray ++ v3.toArray)println(Mat1)// 1.0 5.0 9.0// 2.0 6.0 10.0// 3.0 7.0 11.0// 4.0 8.0 12.0// 记住 : Spark 内部使用 列优先 存储机制,性能更好// 本地 SparseMatrixval sparseMat1 = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 1, 2), Array(11, 22, 33))// 参数顺序 : 行,列,每列的元素个数的前缀和(上面例子表示的两列各有1-0,3-1个元素)、行索引、实际值// 参考图片理解:https://www.cnblogs.com/zhangbojiangfeng/p/7456961.htmlprintln(sparseMat1.numRows) // 3println(sparseMat1.numCols) // 2println(sparseMat1.numActives) // 3println(sparseMat1.numNonzeros) // 3println(sparseMat1)// 3 x 2 CSCMatrix// (0,0) 11.0// (1,1) 22.0// (2,1) 33.0// Vector 运算, spark 2.0 缺少vector运算支持,需要先转成 breezeVectorval w1 = Vectors.dense(1, 2, 3)val w2 = Vectors.dense(4, -5, 6)// 将 Spark 支持的 Vector 转换为 Breeze库所支持的Vector,可以使用丰富的库API操作val w3 = new BreezeVector(w1.toArray)val w4 = new BreezeVector(w2.toArray)println(w3 + w4) // DenseVector(5.0, -3.0, 9.0)println(w3 - w4) // DenseVector(-3.0, 7.0, -3.0)println(w3.dot(w4)) // 12.0val sv1 = Vectors.sparse(10, Array(0, 2, 9), Array(5, 3, 13))val sv2 = Vectors.dense(1, 0, 1, 1, 0, 0, 1, 0, 0, 13)println(sv1)println(sv2)println(new BreezeVector(sv1.toArray).dot(new BreezeVector(sv2.toArray)))// spark 支持 SparseMatrix 和 DenseMatrix 运算,不需要转成 Breeze 库中相应类型// 创建 Matrix, 矩阵和向量相乘val sparseMat2 = Matrices.sparse(3, 3, Array(0, 2, 3, 6), Array(0, 2, 1, 0, 1, 2), Array(1, 2, 3, 4, 5, 6))val denseFeatureVector = Vectors.dense(1, 2, 1)val ans0 = sparseMat2.multiply(denseFeatureVector)println(ans0) // 稀疏矩阵可以和稠密矩阵相互转换 [5.0,11.0,8.0]val denseVec3 = Vectors.dense(5, 3, 0)val denseMat3 = Matrices.dense(3, 3, Array(1, 0, 0, 0, 1, 0, 0, 0, 1))println(denseMat3)println(denseVec3)println(denseMat3.multiply(denseVec3)) // [5.0,3.0,0.0]// 矩阵转置println(sparseMat2)// 3 x 3 CSCMatrix// (0,0) 1.0// (2,0) 2.0// (1,1) 3.0// (0,2) 4.0// (1,2) 5.0// (2,2) 6.0val transposeSparseMat2 = sparseMat2.transposeprintln(transposeSparseMat2)// 3 x 3 CSCMatrix// (0,0) 1.0// (2,0) 4.0// (1,1) 3.0// (2,1) 5.0// (0,2) 2.0// (2,2) 6.0// 矩阵相乘val dMat1 = new DenseMatrix(2, 2, Array(1, 3, 2, 4))val dMat2 = new DenseMatrix(2, 2, Array(2, 1, 0, 2))println(dMat1)// 1.0 2.0// 3.0 4.0println(dMat2)// 2.0 0.0// 1.0 2.0println(dMat1.multiply(dMat2))// 4.0 4.0// 10.0 8.0println(dMat2.multiply(dMat1))// 2.0 4.0// 7.0 10.0// RowMatrixval dataVectors = Seq(Vectors.dense(0, 1, 0),Vectors.dense(3, 1, 5),Vectors.dense(0, 7, 0))val identityVectors = Seq(Vectors.dense(1, 0, 0),Vectors.dense(0, 1, 0),Vectors.dense(0, 0, 1))// 获取原始序列,转成 RDDval distMat3 = new RowMatrix(spark.sparkContext.parallelize(dataVectors))println(distMat3) // org.apache.spark.mllib.linalg.distributed.RowMatrix@352ed70dprintln(distMat3.computeColumnSummaryStatistics().count) // 3println(distMat3.computeColumnSummaryStatistics().mean) // 列的属性 均值 [1.0,3.0,1.6666666666666665]println(distMat3.computeColumnSummaryStatistics().variance) // 列的方差 [3.0,12.0,8.333333333333334]println(distMat3.computeCovariance()) // 协方差// 3.0 -3.0 5.0// -3.0 12.0 -4.999999999999999// 5.0 -4.999999999999999 8.333333333333334val dd = identityVectors.flatMap(x => x.toArray).toArraydd.foreach(println)val dmIdentity = Matrices.dense(3,3,dd)println(dmIdentity) // 本地矩阵// 1.0 0.0 0.0// 0.0 1.0 0.0// 0.0 0.0 1.0// 分布式矩阵 * 本地矩阵 得到 一个新的分布式矩阵// 应用:通过乘以 一个细长、竖直 或者 狭长的矩阵,实现数据量的降低和结果的维度约减val distMat4 = distMat3.multiply(dmIdentity)println(distMat4) // org.apache.spark.mllib.linalg.distributed.RowMatrix@205df5dcprintln(distMat4.computeColumnSummaryStatistics().count) // 3println(distMat4.computeColumnSummaryStatistics().mean)println(distMat4.computeColumnSummaryStatistics().variance)println(distMat4.computeCovariance())// IndexedRowMatrixval distIdxMat1 = spark.sparkContext.parallelize(List(IndexedRow(0L, dataVectors.head),IndexedRow(1L, dataVectors(1)), IndexedRow(1L, dataVectors(2))))distIdxMat1.foreach(println)// IndexedRow(0,[0.0,1.0,0.0]) index , RDD row// IndexedRow(1,[3.0,1.0,5.0])// IndexedRow(1,[0.0,7.0,0.0])println("distinct elements=", distIdxMat1.distinct().count()) // 3// CoordinateMatrixval CoordinateEntries = Seq(MatrixEntry(1, 6, 300.0), // 对应坐标,x,y类型必须 long, z doubleMatrixEntry(3, 1, 5),MatrixEntry(1, 7, 10))val distCordMat1 = new CoordinateMatrix(spark.sparkContext.parallelize(CoordinateEntries.toList))println("First Row (MarixEntry) =",distCordMat1.entries.first())// (First Row (MarixEntry) =,MatrixEntry(1,6,300.0))// BlockMatrixval distBlkMat1 = distCordMat1.toBlockMatrix().cache()distBlkMat1.validate()// Validates the block matrix info against the matrix data (blocks)// and throws an exception if any error is found.println("Is block empty =", distBlkMat1.blocks.isEmpty())// (Is block empty =,false)spark.stop()}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>examples</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><dependencies><dependency><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId><version>1.3.0</version></dependency></dependencies><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties></project>