梯度迭代樹(GBDT)演算法簡介及Spark MLlib調用

梯度迭代樹

演算法簡介:

梯度提升樹是一種決策樹的集成演算法。它通過反覆迭代訓練決策樹來最小化損失函數。決策樹類似,梯度提升樹具有可處理類別特徵、易擴展到多分類問題、不需特徵縮放等性質。Spark.ml通過使用現有decision tree工具來實現。

梯度提升樹依次迭代訓練一系列的決策樹。在一次迭代中,演算法使用現有的集成來對每個訓練實例的類別進行預測,然後將預測結果與真實的標籤值進行比較。通過重新標記,來賦予預測結果不好的實例更高的權重。所以,在下次迭代中,決策樹會對先前的錯誤進行修正。

對實例標籤進行重新標記的機制由損失函數來指定。每次迭代過程中,梯度迭代樹在訓練數據上進一步減少損失函數的值。spark.ml為分類問題提供一種損失函數(Log Loss),為回歸問題提供兩種損失函數(平方誤差與絕對誤差)。

Spark.ml支持二分類以及回歸的隨機森林演算法,適用於連續特徵以及類別特徵。

*注意梯度提升樹目前不支持多分類問題。

參數:

checkpointInterval:

類型:整數型。

含義:設置檢查點間隔(>=1),或不設置檢查點(-1)。

featuresCol:

類型:字元串型。

含義:特徵列名。

impurity:

類型:字元串型。

含義:計算信息增益的準則(不區分大小寫)。

labelCol:

類型:字元串型。

含義:標籤列名。

lossType:

類型:字元串型。

含義:損失函數類型。

maxBins:

類型:整數型。

含義:連續特徵離散化的最大數量,以及選擇每個節點分裂特徵的方式。

maxDepth:

類型:整數型。

含義:樹的最大深度(>=0)。

maxIter:

類型:整數型。

含義:迭代次數(>=0)。

minInfoGain:

類型:雙精度型。

含義:分裂節點時所需最小信息增益。

minInstancesPerNode:

類型:整數型。

含義:分裂後自節點最少包含的實例數量。

predictionCol:

類型:字元串型。

含義:預測結果列名。

rawPredictionCol:

類型:字元串型。

含義:原始預測。

seed:

類型:長整型。

含義:隨機種子。

subsamplingRate:

類型:雙精度型。

含義:學習一棵決策樹使用的訓練數據比例,範圍[0,1]。

stepSize:

類型:雙精度型。

含義:每次迭代優化步長。

示例:

下面的例子導入LibSVM格式數據,並將之劃分為訓練數據和測試數據。使用第一部分數據進行訓練,剩下數據來測試。訓練之前我們使用了兩種數據預處理方法來對特徵進行轉換,並且添加了元數據到DataFrame。

Scala:

import org.apache.spark.ml.Pipelinenimport org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}nimport org.apache.spark.ml.evaluation.MulticlassClassificationEvaluatornimport org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}nn// Load and parse the data file, converting it to a DataFrame.nval data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nn// Index labels, adding metadata to the label column.n// Fit on whole dataset to include all labels in index.nval labelIndexer = new StringIndexer()n .setInputCol("label")n .setOutputCol("indexedLabel")n .fit(data)n// Automatically identify categorical features, and index them.n// Set maxCategories so features with > 4 distinct values are treated as continuous.nval featureIndexer = new VectorIndexer()n .setInputCol("features")n .setOutputCol("indexedFeatures")n .setMaxCategories(4)n .fit(data)nn// Split the data into training and test sets (30% held out for testing).nval Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))nn// Train a GBT model.nval gbt = new GBTClassifier()n .setLabelCol("indexedLabel")n .setFeaturesCol("indexedFeatures")n .setMaxIter(10)nn// Convert indexed labels back to original labels.nval labelConverter = new IndexToString()n .setInputCol("prediction")n .setOutputCol("predictedLabel")n .setLabels(labelIndexer.labels)nn// Chain indexers and GBT in a Pipeline.nval pipeline = new Pipeline()n .setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))nn// Train model. This also runs the indexers.nval model = pipeline.fit(trainingData)nn// Make predictions.nval predictions = model.transform(testData)nn// Select example rows to display.npredictions.select("predictedLabel", "label", "features").show(5)nn// Select (prediction, true label) and compute test error.nval evaluator = new MulticlassClassificationEvaluator()n .setLabelCol("indexedLabel")n .setPredictionCol("prediction")n .setMetricName("accuracy")nval accuracy = evaluator.evaluate(predictions)nprintln("Test Error = " + (1.0 - accuracy))nnval gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel]nprintln("Learned classification GBT model:n" + gbtModel.toDebugString)n

Java:

import org.apache.spark.ml.Pipeline;nimport org.apache.spark.ml.PipelineModel;nimport org.apache.spark.ml.PipelineStage;nimport org.apache.spark.ml.classification.GBTClassificationModel;nimport org.apache.spark.ml.classification.GBTClassifier;nimport org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;nimport org.apache.spark.ml.feature.*;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.SparkSession;nn// Load and parse the data file, converting it to a DataFrame.nDataset<Row> data = sparkn .read()n .format("libsvm")n .load("data/mllib/sample_libsvm_data.txt");nn// Index labels, adding metadata to the label column.n// Fit on whole dataset to include all labels in index.nStringIndexerModel labelIndexer = new StringIndexer()n .setInputCol("label")n .setOutputCol("indexedLabel")n .fit(data);n// Automatically identify categorical features, and index them.n// Set maxCategories so features with > 4 distinct values are treated as continuous.nVectorIndexerModel featureIndexer = new VectorIndexer()n .setInputCol("features")n .setOutputCol("indexedFeatures")n .setMaxCategories(4)n .fit(data);nn// Split the data into training and test sets (30% held out for testing)nDataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});nDataset<Row> trainingData = splits[0];nDataset<Row> testData = splits[1];nn// Train a GBT model.nGBTClassifier gbt = new GBTClassifier()n .setLabelCol("indexedLabel")n .setFeaturesCol("indexedFeatures")n .setMaxIter(10);nn// Convert indexed labels back to original labels.nIndexToString labelConverter = new IndexToString()n .setInputCol("prediction")n .setOutputCol("predictedLabel")n .setLabels(labelIndexer.labels());nn// Chain indexers and GBT in a Pipeline.nPipeline pipeline = new Pipeline()n .setStages(new PipelineStage[] {labelIndexer, featureIndexer, gbt, labelConverter});nn// Train model. This also runs the indexers.nPipelineModel model = pipeline.fit(trainingData);nn// Make predictions.nDataset<Row> predictions = model.transform(testData);nn// Select example rows to display.npredictions.select("predictedLabel", "label", "features").show(5);nn// Select (prediction, true label) and compute test error.nMulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()n .setLabelCol("indexedLabel")n .setPredictionCol("prediction")n .setMetricName("accuracy");ndouble accuracy = evaluator.evaluate(predictions);nSystem.out.println("Test Error = " + (1.0 - accuracy));nnGBTClassificationModel gbtModel = (GBTClassificationModel)(model.stages()[2]);nSystem.out.println("Learned classification GBT model:n" + gbtModel.toDebugString());n

Python:

from pyspark.ml import Pipelinenfrom pyspark.ml.classification import GBTClassifiernfrom pyspark.ml.feature import StringIndexer, VectorIndexernfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatornn# Load and parse the data file, converting it to a DataFrame.ndata = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")nn# Index labels, adding metadata to the label column.n# Fit on whole dataset to include all labels in index.nlabelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)n# Automatically identify categorical features, and index them.n# Set maxCategories so features with > 4 distinct values are treated as continuous.nfeatureIndexer =n VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)nn# Split the data into training and test sets (30% held out for testing)n(trainingData, testData) = data.randomSplit([0.7, 0.3])nn# Train a GBT model.ngbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)nn# Chain indexers and GBT in a Pipelinenpipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])nn# Train model. This also runs the indexers.nmodel = pipeline.fit(trainingData)nn# Make predictions.npredictions = model.transform(testData)nn# Select example rows to display.npredictions.select("prediction", "indexedLabel", "features").show(5)nn# Select (prediction, true label) and compute test errornevaluator = MulticlassClassificationEvaluator(n labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")naccuracy = evaluator.evaluate(predictions)nprint("Test Error = %g" % (1.0 - accuracy))nngbtModel = model.stages[2]nprint(gbtModel) # summary onlyn

推薦閱讀:

Spark MLlib 數據預處理-特徵變換(二)
Pipeline詳解及Spark MLlib使用
在Spark上實現線性收斂的隨機優化演算法SVRG

TAG:Spark | mllib | 机器学习 |