spark ML ,1概述:評估器,轉換器和管道
- Main concepts in Pipelines (管道中的主要概念)
- DataFrame
- Pipeline components (管道組件)
- Transformers (轉換器)
- Estimators (評估器)
- Properties of pipeline components (管道組件的屬性)
- Pipeline (管道)
- How it works (怎樣工作)
- Details (明細)
- Parameters (參數)
- Saving and Loading Pipelines (保存和載入管道)
- Code examples (實例代碼)
- Example: Estimator, Transformer, and Param (例:Estimator, Transformer, and Param )
- Example: Pipeline(例:管道)
- Example: model selection via cross-validation (例:通過交叉校驗選擇模型)
- Example: model selection via train validation split(例:通過訓練檢驗分離選擇模型)
DataFrame:spark ML從spark sql中獲取DataFrame作為學習的數據集。可以持有多種數據類型 ,例如:有個df可以包含文本,特徵向量,true標籤和預測結果的多個不同的列。
轉換器: 轉換器是把一個DF轉成另外一個DF的演算法,例:一個ML模型是一個轉換器 ,轉換帶有特徵的DataFrame成為帶有預測結果的DF.
評估器: 評估器是應用(fit)在一個DF上生成一個轉換器的演算法。 例:學習演算法是一個評估器,在DF上訓練並且生成一個模型。
管道: 一個管道連接多個轉換器和評估器在一起,作為一個工作流。
參數: 對於指定的參數來說,所有的轉換器和評估器 共享一個公共的api.
DataFrame
機器學習可以應用於很多的數據類型上,如:向量,文本,圖片,結構數據。ML採用DF就是為了支持多種數據類型。df支持很多基本和結構化的類型;支持spark sql里的類型,還有向量。DF可以顯性或隱性的從常規的RDD創建。 詳見代碼
df中的列已經命名了。下面的實例代碼中使用的名稱 如 「text,」 「features,」 和「label.」管道組件轉換器轉換器是包含特徵轉換器和學習模型的抽象。技術上,一個轉換器執行一個transform()方法轉換DF.一般是增加一個或多個列。例:- 一個特徵轉換器拿到一個DF,讀取一列映射到一個新列上。 然後輸出一個包含映射列的新的DF.
- 一個學習模型拿到一個DF.讀取包含特徵向量的列,為每個特徵向量預測標籤, 然後把預測標籤作為一個新列放到DF中輸出。
Transformer.transform()s and Estimator.fit()s 都是無狀態的, 未來,有狀態演算法可以通過替代概念來支持。
每一個Transformer or Estimator的實例都有一個唯一id, 在指定參數時很有用(下面討論)。管道在機器學習中, 通常會運行一系列的演算法去處理和學習數據。例: 一個簡單的文本文檔處理工作流可以包含一些stage:- 把文檔中的文本分成單詞。
- 轉換文檔中的每個單詞成為數字化的特徵向量
- 使用特徵向量和標籤學習預測模型。
一個管道被指定為一系列的階段,每一階段要麼是Transformer 或是Estimator。這些stage有序運行,輸入的DF被它通過的每個stage轉換。對於Transformer 階段, transform() 方法在df調用,對於Estimator 階段,fit()方法被調用產生一個新的Transformer( 轉換器變成PipelineModel的一部分,或合適的管道),然後轉換器調用transform() 方法應用在df上。
我們通過一個簡單文本文檔工作流來闡明, 下面的圖是關於管道的訓練time(時代)的用法。上面,頂行代表包含三個階段的管道。前兩個藍色的 (Tokenizer and HashingTF) 是轉換器,第三個LogisticRegression是評估器。底部的行代表通過管道的數據流,圓筒是DF. Pipeline.fit() 方法在初始的df上調用,它是行文本和標籤。Tokenizer.transform() 方法把行文本分成單詞,增加一個包含單詞的新列到DF上。HashingTF.transform()方法轉換單詞列成特徵向量,增加一個包含向量的新列到DF上。現在,LogisticRegression是評估器,管道首先調用 LogisticRegression.fit()生成LogisticRegressionModel。如果管道有多個階段,在傳遞df給下一階段之前它調用LogisticRegressionModel的 transform()方法。一個管道是一個評估器,因此,管道的fit()方法運行之後,產生一個PipelineModel ,它是一個轉換器。這個PipelineModel使用在校驗階段(test time); 下圖說明了PipelineModel的用法。在上圖裡, PipelineModel 與原管道的stage的數量相同。但是在原始管道中所有的評估器都變成轉換器。當在測試數據集上調用 PipelineModel』s transform() 時,數據有序的通過合適的管道。每一個stage的transform()更新數據集,然後傳給下一個stage。Pipeline 和 PipelineModel 有助於確保訓練集和測試集得到相同的特徵處理步驟。
明細DAG管道: 一個管道里的多個stage被指定為一個有序的數組,上面的例子給的是線性管道的例子,管道里的stage使用的數據是它上一個stage產生的。也可以創建非線性的管道,只要數據流圖是一個DAG圖。DAG圖可以基於每個stage 輸入和輸出的列名隱式的指定。如果管道是DAG的形式,然後stage的拓撲圖順序必須被指定。運行時檢測:由於管道可以操作多類型的DataFrame,所以不能執行編譯時類型檢測。Pipelines 和PipelineModel在運行時檢測。使用DataFrame的schema來做檢測。獨一無二的管道 stage:管道的stage應該是獨一無二實例 ,舉例:同一個myHashing實例不能插入管道兩次,因為管道必須有唯一id。然而,實例兩個實例(myHashingTF1 and myHashingTF2)可以放到同一個管道中, 因為不同的實例產生不同的id。參數ML的 Estimator和Transformer 使用相同的api來指定參數。參數名Param是一個參數, ParamMap是一個參數的集合 (parameter, value) 給演算法傳參的有兩種主要的方法:1.為實例設置參數,例:如果lr是LogisticRegression的實例。設置 lr.setMaxIter(10),可以讓 lr.fit()最多十次迭代。這個API整合了spark.mllib包里的api。
2.傳遞ParamMap給fit() or transform()方法, ParamMap里的參數將覆蓋前面通過setter方法設定的參數。參數屬於指定的Estimators and Transformer實例 ,例:如果有兩個邏輯回歸lr1 and lr2,然後可以創建包含兩個maxIter參數的ParamMap:ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。如果在一個管道中兩個演算法都有maxIter參數時,這很有用。保存和載入管道保存一個模型和一個管道到磁碟,為了下次使用是很有價值的。 在spark 1.6里。一個模型導入、導出的功能被加到管道的API里。大多數基本的transformer被支持,同樣,一些基本的學習模型也被支持。請參考演算法api文檔看看是否支持保存和載入。事例代碼:這部分給一些代碼來展示上面討論的功能。更多信息請看API. 一些spark ML演算法是對spark.mllib演算法的包裝。具體明細請看MLlib programming guide。Example: Estimator, Transformer, and Param
這部分涵蓋 Estimator,Transformer,和Param
import org.apache.spark.ml.classification.LogisticRegressionimport org.apache.spark.ml.param.ParamMapimport org.apache.spark.mllib.linalg.{Vector, Vectors}import org.apache.spark.sql.Row
// Prepare training data from a list of (label, features) tuples.//準備帶標籤和特徵的數據val training = sqlContext.createDataFrame(Seq( (1.0, Vectors.dense(0.0, 1.1, 0.1)), (0.0, Vectors.dense(2.0, 1.0, -1.0)), (0.0, Vectors.dense(2.0, 1.3, 1.0)), (1.0, Vectors.dense(0.0, 1.2, -0.5)))).toDF("label", "features")// Create a LogisticRegression instance. This instance is an Estimator.//創建一個邏輯回歸事例,這個實例是評估器val lr = new LogisticRegression()// Print out the parameters, documentation, and any default values.//輸出參數等默認值println("LogisticRegression parameters:
" + lr.explainParams() + "
")// We may set parameters using setter methods.//使用setter方法設置參數lr.setMaxIter(10) .setRegParam(0.01)// Learn a LogisticRegression model. This uses the parameters stored in lr.//使用存儲在lr中的參數來,學習一個模型,val model1 = lr.fit(training)// Since model1 is a Model (i.e., a Transformer produced by an Estimator),// we can view the parameters it used during fit().// This prints the parameter (name: value) pairs, where names are unique IDs for this// LogisticRegression instance.//由於model1是一個模型,(也就是,一個評估器產生一個轉換器),// 我們可以看lr在fit()上使用的參數。//輸出這些參數對,參數里的names是邏輯回歸實例的唯一idprintln("Model 1 was fit using parameters: " + model1.parent.extractParamMap)// We may alternatively specify parameters using a ParamMap,// which supports several methods for specifying parameters.//我們可以使用paramMap選擇指定的參數,並且提供了很多方法來設置參數val paramMap = ParamMap(lr.maxIter -> 20) .put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter. 指定一個參數。 .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params. 指定多個參數// One can also combine ParamMaps.val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name 改變輸出列的名稱val paramMapCombined = paramMap ++ paramMap2// Now learn a new model using the paramMapCombined parameters.// paramMapCombined overrides all parameters set earlier via lr.set* methods.//使用新的參數學習模型。val model2 = lr.fit(training, paramMapCombined)println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)// Prepare test data.//準備測試數據val test = sqlContext.createDataFrame(Seq( (1.0, Vectors.dense(-1.0, 1.5, 1.3)), (0.0, Vectors.dense(3.0, 2.0, -0.1)), (1.0, Vectors.dense(0.0, 2.2, -1.5)))).toDF("label", "features")// Make predictions on test data using the Transformer.transform() method.// LogisticRegression.transform will only use the features column.// Note that model2.transform() outputs a myProbability column instead of the usual// probability column since we renamed the lr.probabilityCol parameter previously.//使用轉換器的transform()方法在測試數據上作出預測.// 邏輯回歸的transform方法只使用「特徵」列.// 注意model2.transform()方法輸出的是myProbability列而不是probability列,因為在上面重命名了lr.probabilityCol 參數。model2.transform(test) .select("features", "label", "myProbability", "prediction") .collect() .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => println(s"($features, $label) -> prob=$prob, prediction=$prediction")}
個人認為: 在這個訓練過程中,最終要的步驟就是設置參數,來讓演算法工作的更好。yes
Example: Pipeline
這個例子是上面圖片中展示的文本文檔管道import org.apache.spark.ml.{Pipeline, PipelineModel}import org.apache.spark.ml.classification.LogisticRegressionimport org.apache.spark.ml.feature.{HashingTF, Tokenizer}import org.apache.spark.mllib.linalg.Vectorimport org.apache.spark.sql.Row
// Prepare training documents from a list of (id, text, label) tuples.//準備訓練文檔,(id,內容,標籤)val training = sqlContext.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0))).toDF("id", "text", "label")// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.//配置ML管道,由三個stage組成,tokenizer, hashingTF, and lr ,val tokenizer = new Tokenizer() .setInputCol("text") .setOutputCol("words")val hashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(tokenizer.getOutputCol) .setOutputCol("features")val lr = new LogisticRegression() .setMaxIter(10) .setRegParam(0.01)val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr))// Fit the pipeline to training documents.//安裝管道到數據上val model = pipeline.fit(training)// now we can optionally save the fitted pipeline to disk//現在可以保存安裝好的管道到磁碟上model.save("/tmp/spark-logistic-regression-model")// we can also save this unfit pipeline to disk//也可以保存未安裝的管道到磁碟上pipeline.save("/tmp/unfit-lr-model")// and load it back in during production//載入管道val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")// Prepare test documents, which are unlabeled (id, text) tuples.//準備測試文檔,不包含標籤val test = sqlContext.createDataFrame(Seq( (4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop"))).toDF("id", "text")// Make predictions on test documents.//在測試文檔上做出預測model.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction")}
Example: model selection via cross-validation 通過交叉驗證選擇模型
在機器學習中一個重要的任務是模型選擇,或使用數據發現最好的模型或給任務設置參數,這叫做調優。通過調優整個管道去促進管道選擇模型會變的容易, 而不是分開的調優管道內的每一個元素。當前,spark.ml支持使用交叉驗證器CrossValidator類選擇模型,這個類接收一個Estimator,一個參數集,一個Evaluator,CrossValidator 開始拆分數據集到一個fold集中,這個fold集被用來作為分開測試和訓練的數據集; 例:帶有3個fold的CrossValidator 將產生3組(訓練,測試)數據集,每一個數據集中2/3作為訓練數據,1/3作為測試數據. CrossValidator 通過參數集進行迭代計算。為每一個ParamMap,訓練給定的Estimator 並且使用給予的Evaluator來評估。RegressionEvaluator評估器Evaluator來評估回歸問題,BinaryClassificationEvaluator 來評估二元數據,MultiClassClassificationEvaluator 評估多元分類問題。
用於選擇最佳paraMap參數的默認度量可以被Evaluator 的setMetric方法覆蓋。產生最好評估度量的paramMap被選擇作為最好的模型。CrossValidator 最終使用最好的paramMap和整個數據集fit 評估器,(意思就是執行評估器的fit方法)下面的例子就是CrossValidator 從一個網格參數做選擇。只用ParamGridBuilder 工具構造參數網格。注意在一個網格參數上做交叉校驗是非常昂貴的。例,下面的例子中,hashingTF.numFeatures有3個值和lr.regParam有2個值的參數網路,並且CrossValidator 的fold是2個。這個相乘的輸出是 (3×2)×2=12 不同的明細需要訓練,在真實的設置中,參數會被設置的更大並且有更多的fold(一般是 3或者10)。換句話說。使用CorssValidator是非常昂貴的。然而,用來選擇參數它也是一個行之有效的方法。import org.apache.spark.ml.Pipelineimport org.apache.spark.ml.classification.LogisticRegressionimport org.apache.spark.ml.evaluation.BinaryClassificationEvaluatorimport org.apache.spark.ml.feature.{HashingTF, Tokenizer}import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}import org.apache.spark.mllib.linalg.Vectorimport org.apache.spark.sql.Row
// Prepare training data from a list of (id, text, label) tuples. //準備訓練數據,id 內容,標籤 val training = sqlContext.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0), (4L, "b spark who", 1.0), (5L, "g d a y", 0.0), (6L, "spark fly", 1.0), (7L, "was mapreduce", 0.0), (8L, "e spark program", 1.0), (9L, "a e c l", 0.0), (10L, "spark compile", 1.0), (11L, "hadoop software", 0.0) )).toDF("id", "text", "label") // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.// 配置機器學習管道,由tokenizer, hashingTF, lr評估器 組成 val tokenizer = new Tokenizer() .setInputCol("text") .setOutputCol("words") val hashingTF = new HashingTF() .setInputCol(tokenizer.getOutputCol) .setOutputCol("features") val lr = new LogisticRegression() .setMaxIter(10) val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) // We use a ParamGridBuilder to construct a grid of parameters to search over. // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam, // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from. //使用ParamGridBuilder 構造一個參數網格, //hashingTF.numFeatures有3個值,lr.regParam有2個值, // 這個網格有6個參數給CrossValidator來選擇 val paramGrid = new ParamGridBuilder() .addGrid(hashingTF.numFeatures, Array(10, 100, 1000)) .addGrid(lr.regParam, Array(0.1, 0.01)) .build() // We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. // This will allow us to jointly choose parameters for all Pipeline stages. // A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. // Note that the evaluator here is a BinaryClassificationEvaluator and its default metric // is areaUnderROC. //現在我們把管道看做成一個Estimator,把它包裝到CrossValidator實例中。 //這可以讓我們連帶的為管道的所有stage選擇參數。 //CrossValidator需要一個Estimator,一個評估器參數集合,和一個Evaluator。 //注意這裡的evaluator 是二元分類的BinaryClassificationEvaluator,它默認的度量是areaUnderROC. val cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(new BinaryClassificationEvaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) // Use 3+ in practice // 在實戰中使用3+ // Run cross-validation, and choose the best set of parameters. //運行交叉校驗,選擇最好的參數集 val cvModel = cv.fit(training) // Prepare test documents, which are unlabeled (id, text) tuples. //準備測試數據 val test = sqlContext.createDataFrame(Seq( (4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop") )).toDF("id", "text") // Make predictions on test documents. cvModel uses the best model found (lrModel). //在測試文檔上做預測,cvModel是選擇出來的最好的模型 cvModel.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") }
Example: model selection via train validation split 例:通過訓練校驗分離來模型選擇
除了CrossValidator 以外spark還提供TrainValidationSplit 來進行超-參數調優。 TrainValidationSplit 只評估每一種參數組合一次。而不是像CrossValidator評估k次,TrainValidationSplit 只有一次。因此不是很昂貴,但是如果訓練數據集不夠大就不能產生能信賴的結果。
TrainValidationSplit 需要傳入一個Estimator,一個包含estimatorParamMaps 參數的paraMap的集和一個Evaluator。它一開始使用trainRatio 參數值把數據集分成訓練數據和測試數據兩個部分。例如: 使用trainRatio=0.75 (默認值),TrainValidationSplit 就產生75%數據用於訓練,25%的數據用於測試。與CrossValidator相似的是,TrainValidationSplit 也是通過迭代參數集paramMap。對於每一種參數組合,使用給定的Estimator 訓練,在給定 Evaluator上評估。產生最好的評估度量的paramMap作為最好的選擇。TrainValidationSplit 最終會使用最好的參數和整個數據集條用Estimator的fit方法。import org.apache.spark.ml.evaluation.RegressionEvaluatorimport org.apache.spark.ml.regression.LinearRegressionimport org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
// Prepare training and test data.//準備訓練數據和測試數據val data = sqlContext.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)val lr = new LinearRegression()// We use a ParamGridBuilder to construct a grid of parameters to search over.// TrainValidationSplit will try all combinations of values and determine best model using// the evaluator.//ParamGridBuilder構建一組參數//TrainValidationSplit將嘗試從這些所有值的組合中使用evaluator選出最好的模型val paramGrid = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.1, 0.01)) .addGrid(lr.fitIntercept) .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0)) .build()// In this case the estimator is simply the linear regression.// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.//在這裡estimator是簡單的線性回歸//TrainValidationSplit 需要一個Estimator , 一個Estimator ParamMaps集,一個Evaluatorval trainValidationSplit = new TrainValidationSplit() .setEstimator(lr) .setEvaluator(new RegressionEvaluator) .setEstimatorParamMaps(paramGrid) // 80% of the data will be used for training and the remaining 20% for validation. //80%數據作為訓練,剩下的20%作為驗證 .setTrainRatio(0.8)// Run train validation split, and choose the best set of parameters.//運行訓練校驗分離,選擇最好的參數。val model = trainValidationSplit.fit(training)// Make predictions on test data. model is the model with combination of parameters// that performed best.//在測試數據上做預測,模型是參數組合中執行最好的一個model.transform(test) .select("features", "label", "prediction") .show()
推薦閱讀:
※hadoop 和spark如何系統的學習?
※Spark Streaming有沒有計劃引入Flink 那種真正的流式處理?
※大數據帶你看穿航班晚點的套路
※如何基於 Spark Streaming 構建實時計算平台
※Spark 2017歐洲技術峰會摘要(Spark 生態體系分類)
TAG:Spark |