逻辑回归进行鸢尾花分类的案例
背景说明:
基于IDEA + Spark 3.4.1 + sbt 1.9.3 + Spark MLlib 构建逻辑回归鸢尾花分类预测模型,这是一个分类模型案例,通过该案例,可以快速了解Spark MLlib分类预测模型的使用方法。
依赖
ThisBuild / version := "0.1.0-SNAPSHOT" ThisBuild / scalaVersion := "2.13.11" lazy val root = (project in file(".")) .settings( name := "SparkLearning", idePackagePrefix := Some("cn.lh.spark"), libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.1", libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1", libraryDependencies += "org.apache.hadoop" % "hadoop-auth" % "3.3.6", libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.4.1", libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.4.1", libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.4.1", libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.30"
)
代码如下:
package cn.lh.spark import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, StringIndexerModel, VectorIndexer, VectorIndexerModel}
import org.apache.spark.ml.linalg.{Vectors,Vector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession} case class Iris(features: org.apache.spark.ml.linalg.Vector, label: String) /** * 二项逻辑斯蒂回归来解决二分类问题 */
object MLlibLogisticRegression { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().master("local[2]") .appName("Spark MLlib Demo List").getOrCreate() val irisRDD: RDD[Iris] = spark.sparkContext.textFile("F:\\niit\\2023\\2023_2\\Spark\\codes\\data\\iris.txt") .map(_.split(",")).map(p => Iris(Vectors.dense(p(0).toDouble, p(1).toDouble, p(2).toDouble, p(3).toDouble), p(4).toString())) import spark.implicits._ val data: DataFrame = irisRDD.toDF() data.show() data.createOrReplaceTempView("iris") val df: DataFrame = spark.sql("select * from iris where label != 'Iris-setosa'") df.map(t => t(1)+":"+t(0)).collect().foreach(println) // 构建ML的pipeline val labelIndex: StringIndexerModel = new StringIndexer().setInputCol("label") .setOutputCol("indexedLabel").fit(df) val featureIndexer: VectorIndexerModel = new VectorIndexer().setInputCol("features") .setOutputCol("indexedFeatures").fit(df) // 划分数据集 val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3)) // 设置逻辑回归模型参数 val lr: LogisticRegression = new LogisticRegression().setLabelCol("indexedLabel") .setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) // 设置一个labelConverter,目的是把预测的类别重新转化成字符型的 val labelConverter: IndexToString = new IndexToString().setInputCol("prediction") .setOutputCol("predictedLabel").setLabels(labelIndex.labels) // 构建pipeline,设置stage,然后调用fit()来训练模型 val lrPipeline: Pipeline = new Pipeline().setStages(Array(labelIndex, featureIndexer, lr, labelConverter)) val lrmodle: PipelineModel = lrPipeline.fit(trainingData) val lrPredictions: DataFrame = lrmodle.transform(testData) lrPredictions.select("predictedLabel", "label", "features", "probability") .collect().foreach { case Row(predictedLabel: String, label: String, features: Vector, prob: Vector) => println(s"($label, $features) --> prob=$prob, predicted Label=$predictedLabel")} // 模型评估 val evaluator: MulticlassClassificationEvaluator = new MulticlassClassificationEvaluator() .setLabelCol("indexedLabel").setPredictionCol("prediction") val lrAccuracy: Double = evaluator.evaluate(lrPredictions) println("Test Error = " + (1.0 - lrAccuracy)) val lrmodel2: LogisticRegressionModel = lrmodle.stages(2).asInstanceOf[LogisticRegressionModel] println("Coefficients: " + lrmodel2.coefficients+"Intercept: " + lrmodel2.intercept+"numClasses: "+lrmodel2.numClasses+"numFeatures: "+lrmodel2.numFeatures) spark.stop() } }
运行结果如下: