Pipeline詳解及Spark MLlib使用
本文中,我們介紹機器學習管道的概念。機器學習管道提供一系列基於數據框的高級的介面來幫助用戶建立和調試實際的機器學習管道。
管道里的主要概念
MLlib提供標準的介面來使聯合多個演算法到單個的管道或者工作流,管道的概念源於scikit-learn項目。
1.數據框:機器學習介面使用來自Spark SQL的數據框形式數據作為數據集,它可以處理多種數據類型。比如,一個數據框可以有不同的列存儲文本、特徵向量、標籤值和預測值。
2.轉換器:轉換器是將一個數據框變為另一個數據框的演算法。比如,一個機器學習模型就是一個轉換器,它將帶有特徵數據框轉為預測值數據框。
3.估計器:估計器是擬合一個數據框來產生轉換器的演算法。比如,一個機器學習演算法就是一個估計器,它訓練一個數據框產生一個模型。
4.管道:一個管道串起多個轉換器和估計器,明確一個機器學習工作流。
5.參數:管道中的所有轉換器和估計器使用共同的介面來指定參數。
數據框
機器學習演算法可以應用於多種類型的數據,如向量、文本、圖像和結構化數據。管道介面中採用來自Spark SQL的數據框來支持多種類型的數據。
可以查看Spark SQLdatatype reference來了解數據框支持的基礎和結構化數據類型。除了Spark SQL指南中提到的數據類型外,數據框還可以使用機器學習向量類型。
可以顯式地建立數據框或者隱式地從規則的RDD建立數據框,下面的代碼將會給出示例。
數據框中的列需要命名。代碼中的示例使用如「text」,「features「和」label「的名字。
管道組件
轉換器包含特徵變化和學習模型。技術上來說,轉化器通過方法transform(),在原始數據上增加一列或者多列來將一個數據框轉為另一個數據框。如:
1.一個特徵轉換器輸入一個數據框,讀取一個文本列,將其映射為新的特徵向量列。輸出一個新的帶有特徵向量列的數據框。
2。一個學習模型轉換器輸入一個數據框,讀取包括特徵向量的列,預測每一個特徵向量的標籤。輸出一個新的帶有預測標籤列的數據框。
估計器估計器指用來擬合或者訓練數據的學習演算法或者任何演算法。技術上說,估計器通過fit()方法,接受一個數據框產生一個模型。比如,邏輯回歸就是一個估計器,通過fit()來產生一個邏輯回歸模型。
管道組件的特性
轉換器的transform()方法和估計器的fit()方法都是無狀態性的。將來,有狀態性的演算法可能通過其他概念得到支持。
每個轉換器或估計器實例有唯一的編號,這個特徵在制定參數的時候非常有用。
管道
在機器學習中,運行一系列演算法來處理和學習數據是非常常見的。如一個文檔數據的處理工作流可能包括下列步驟:
1.將文檔氛圍單個詞語。
2.將每個文檔中的詞語轉為數字化的特徵向量。
3.使用特徵向量和標籤學習一個預測模型。
MLlib將上述的工作流描述為管道,它包含一系列需要被執行的有順序的管道階段(轉換器和估計器)。本節中我們將會使用上述文檔處理工作流作為例子。
工作原理管道由一系列有順序的階段指定,每個狀態時轉換器或估計器。每個狀態的運行是有順序的,輸入的數據框通過每個階段進行改變。在轉換器階段,transform()方法被調用於數據框上。對於估計器階段,fit()方法被調用來產生一個轉換器,然後該轉換器的transform()方法被調用在數據框上。
下面的圖說明簡單的文檔處理工作流的運行。
上面的圖示中,第一行代表管道處理的三個階段。第一二個藍色的階段是轉換器,第三個紅色框中的邏輯回歸是估計器。底下一行代表管道中的數據流,圓筒指數據框。管道的fit()方法被調用於原始的數據框中,裡面包含原始的文檔和標籤。分詞器的transform()方法將原始文檔分為詞語,添加新的詞語列到數據框中。哈希處理的transform()方法將詞語列轉換為特徵向量,添加新的向量列到數據框中。然後,因為邏輯回歸是估計器,管道先調用邏輯回歸的fit()方法來產生邏輯回歸模型。如果管道還有其它更多階段,在將數據框傳入下一個階段之前,管道會先調用邏輯回歸模型的transform()方法。
整個管道是一個估計器。所以當管道的fit()方法運行後,會產生一個管道模型,管道模型是轉換器。管道模型會在測試時被調用,下面的圖示說明用法。
上面的圖示中,管道模型和原始管道有同樣數目的階段,然而原始管道中的估計器此時變為了轉換器。當管道模型的transform()方法被調用於測試數據集時,數據依次經過管道的各個階段。每個階段的transform()方法更新數據集,並將之傳到下個階段。管道和管道模型有助於確認訓練數據和測試數據經過同樣的特徵處理流程。
詳細信息
DAG管道:管道的狀態是有序的隊列。這兒給的例子都是線性的管道,也就是說管道的每個階段使用上一個階段產生的數據。我們也可以產生非線性的管道,數據流向為無向非環圖(DAG)。這種圖通常需要明確地指定每個階段的輸入和輸出列名(通常以指定參數的形式)。如果管道是DAG形式,則每個階段必須以拓撲序的形式指定。
運行時間檢查:因為管道可以運行在多種數據類型上,所以不能使用編譯時間檢查。管道和管道模型在實際運行管道之前就會進行運行時間檢查。這種檢查通過數據框摘要,它描述了數據框中各列的類型。
唯一的管道階段:管道的每個階段需要是唯一的實體。如同樣的實體「哈希變換」不可以進入管道兩次,因為管道的每個階段必須有唯一的ID。當然「哈希變換1」和「哈希變換2」(都是哈希變換類型)可以進入同個管道兩次,因為他們有不同的ID。參數MLlib估計器和轉換器使用統一的介面來指定參數。
Param是有完備文檔的已命名參數。ParamMap是一些列「參數-值」對。
有兩種主要的方法來向演算法傳遞參數:
1.給實體設置參數。比如,lr是一個邏輯回歸實體,通過lr.setMaxIter(10)來使得lr在擬合的時候最多迭代10次。這個介面與spark.mllib包相似。
2.傳遞ParamMap到fit()或者transform()。所有在ParamMap里的參數都將通過設置被重寫。
參數屬於指定估計器和轉換器實體過程。因此,如果我們有兩個邏輯回歸實體lr1和lr2,我們可以建立一個ParamMap來指定兩個實體的最大迭代次數參數:ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。這在一個管道里有兩個演算法都有最大迭代次數參數時非常有用。
存儲和讀取管道
我們經常需要將管道存儲到磁碟以供下次使用。在Spark1.6中,模型導入導出功能新添了管道介面,支持大多數轉換器。請到演算法介面文檔查看是否支持存儲和讀入。
代碼示例
下面給出上述討論功能的代碼示例:
估計器、轉換器和Param示例:
Scala:
import org.apache.spark.ml.classification.LogisticRegressionnimport org.apache.spark.ml.linalg.{Vector, Vectors}nimport org.apache.spark.ml.param.ParamMapnimport org.apache.spark.sql.Rownn// Prepare training data from a list of (label, features) tuples.nval training = spark.createDataFrame(Seq(n (1.0, Vectors.dense(0.0, 1.1, 0.1)),n (0.0, Vectors.dense(2.0, 1.0, -1.0)),n (0.0, Vectors.dense(2.0, 1.3, 1.0)),n (1.0, Vectors.dense(0.0, 1.2, -0.5))n)).toDF("label", "features")nn// Create a LogisticRegression instance. This instance is an Estimator.nval lr = new LogisticRegression()n// Print out the parameters, documentation, and any default values.nprintln("LogisticRegression parameters:n" + lr.explainParams() + "n")nn// We may set parameters using setter methods.nlr.setMaxIter(10)n .setRegParam(0.01)nn// Learn a LogisticRegression model. This uses the parameters stored in lr.nval model1 = lr.fit(training)n// Since model1 is a Model (i.e., a Transformer produced by an Estimator),n// we can view the parameters it used during fit().n// This prints the parameter (name: value) pairs, where names are unique IDs for thisn// LogisticRegression instance.nprintln("Model 1 was fit using parameters: " + model1.parent.extractParamMap)nn// We may alternatively specify parameters using a ParamMap,n// which supports several methods for specifying parameters.nval paramMap = ParamMap(lr.maxIter -> 20)n .put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.n .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.nn// One can also combine ParamMaps.nval paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name.nval paramMapCombined = paramMap ++ paramMap2nn// Now learn a new model using the paramMapCombined parameters.n// paramMapCombined overrides all parameters set earlier via lr.set* methods.nval model2 = lr.fit(training, paramMapCombined)nprintln("Model 2 was fit using parameters: " + model2.parent.extractParamMap)nn// Prepare test data.nval test = spark.createDataFrame(Seq(n (1.0, Vectors.dense(-1.0, 1.5, 1.3)),n (0.0, Vectors.dense(3.0, 2.0, -0.1)),n (1.0, Vectors.dense(0.0, 2.2, -1.5))n)).toDF("label", "features")nn// Make predictions on test data using the Transformer.transform() method.n// LogisticRegression.transform will only use the features column.n// Note that model2.transform() outputs a myProbability column instead of the usualn// probability column since we renamed the lr.probabilityCol parameter previously.nmodel2.transform(test)n .select("features", "label", "myProbability", "prediction")n .collect()n .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>n println(s"($features, $label) -> prob=$prob, prediction=$prediction")n }n
Java:
import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.classification.LogisticRegression;nimport org.apache.spark.ml.classification.LogisticRegressionModel;nimport org.apache.spark.ml.linalg.VectorUDT;nimport org.apache.spark.ml.linalg.Vectors;nimport org.apache.spark.ml.param.ParamMap;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.DataTypes;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nn// Prepare training data.nList<Row> dataTraining = Arrays.asList(n RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),n RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),n RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),n RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))n);nStructType schema = new StructType(new StructField[]{n new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),n new StructField("features", new VectorUDT(), false, Metadata.empty())n});nDataset<Row> training = spark.createDataFrame(dataTraining, schema);nn// Create a LogisticRegression instance. This instance is an Estimator.nLogisticRegression lr = new LogisticRegression();n// Print out the parameters, documentation, and any default values.nSystem.out.println("LogisticRegression parameters:n" + lr.explainParams() + "n");nn// We may set parameters using setter methods.nlr.setMaxIter(10).setRegParam(0.01);nn// Learn a LogisticRegression model. This uses the parameters stored in lr.nLogisticRegressionModel model1 = lr.fit(training);n// Since model1 is a Model (i.e., a Transformer produced by an Estimator),n// we can view the parameters it used during fit().n// This prints the parameter (name: value) pairs, where names are unique IDs for thisn// LogisticRegression instance.nSystem.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());nn// We may alternatively specify parameters using a ParamMap.nParamMap paramMap = new ParamMap()n .put(lr.maxIter().w(20)) // Specify 1 Param.n .put(lr.maxIter(), 30) // This overwrites the original maxIter.n .put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.nn// One can also combine ParamMaps.nParamMap paramMap2 = new ParamMap()n .put(lr.probabilityCol().w("myProbability")); // Change output column namenParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);nn// Now learn a new model using the paramMapCombined parameters.n// paramMapCombined overrides all parameters set earlier via lr.set* methods.nLogisticRegressionModel model2 = lr.fit(training, paramMapCombined);nSystem.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());nn// Prepare test documents.nList<Row> dataTest = Arrays.asList(n RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),n RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),n RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))n);nDataset<Row> test = spark.createDataFrame(dataTest, schema);nn// Make predictions on test documents using the Transformer.transform() method.n// LogisticRegression.transform will only use the features column.n// Note that model2.transform() outputs a myProbability column instead of the usualn// probability column since we renamed the lr.probabilityCol parameter previously.nDataset<Row> results = model2.transform(test);nDataset<Row> rows = results.select("features", "label", "myProbability", "prediction");nfor (Row r: rows.collectAsList()) {n System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)n + ", prediction=" + r.get(3));n}n
Python:
from pyspark.ml.linalg import Vectorsnfrom pyspark.ml.classification import LogisticRegressionnn# Prepare training data from a list of (label, features) tuples.ntraining = spark.createDataFrame([n (1.0, Vectors.dense([0.0, 1.1, 0.1])),n (0.0, Vectors.dense([2.0, 1.0, -1.0])),n (0.0, Vectors.dense([2.0, 1.3, 1.0])),n (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])nn# Create a LogisticRegression instance. This instance is an Estimator.nlr = LogisticRegression(maxIter=10, regParam=0.01)n# Print out the parameters, documentation, and any default values.nprint "LogisticRegression parameters:n" + lr.explainParams() + "n"nn# Learn a LogisticRegression model. This uses the parameters stored in lr.nmodel1 = lr.fit(training)nn# Since model1 is a Model (i.e., a transformer produced by an Estimator),n# we can view the parameters it used during fit().n# This prints the parameter (name: value) pairs, where names are unique IDs for thisn# LogisticRegression instance.nprint "Model 1 was fit using parameters: "nprint model1.extractParamMap()nn# We may alternatively specify parameters using a Python dictionary as a paramMapnparamMap = {lr.maxIter: 20}nparamMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.nparamMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # Specify multiple Params.nn# You can combine paramMaps, which are python dictionaries.nparamMap2 = {lr.probabilityCol: "myProbability"} # Change output column namenparamMapCombined = paramMap.copy()nparamMapCombined.update(paramMap2)nn# Now learn a new model using the paramMapCombined parameters.n# paramMapCombined overrides all parameters set earlier via lr.set* methods.nmodel2 = lr.fit(training, paramMapCombined)nprint "Model 2 was fit using parameters: "nprint model2.extractParamMap()nn# Prepare test datantest = spark.createDataFrame([n (1.0, Vectors.dense([-1.0, 1.5, 1.3])),n (0.0, Vectors.dense([3.0, 2.0, -0.1])),n (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])nn# Make predictions on test data using the Transformer.transform() method.n# LogisticRegression.transform will only use the features column.n# Note that model2.transform() outputs a "myProbability" column instead of the usualn# probability column since we renamed the lr.probabilityCol parameter previously.nprediction = model2.transform(test)nselected = prediction.select("features", "label", "myProbability", "prediction")nfor row in selected.collect():n print rown
管道示例:
Scala:
import org.apache.spark.ml.{Pipeline, PipelineModel}nimport org.apache.spark.ml.classification.LogisticRegressionnimport org.apache.spark.ml.feature.{HashingTF, Tokenizer}nimport org.apache.spark.ml.linalg.Vectornimport org.apache.spark.sql.Rownn// Prepare training documents from a list of (id, text, label) tuples.nval training = spark.createDataFrame(Seq(n (0L, "a b c d e spark", 1.0),n (1L, "b d", 0.0),n (2L, "spark f g h", 1.0),n (3L, "hadoop mapreduce", 0.0)n)).toDF("id", "text", "label")nn// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.nval tokenizer = new Tokenizer()n .setInputCol("text")n .setOutputCol("words")nval hashingTF = new HashingTF()n .setNumFeatures(1000)n .setInputCol(tokenizer.getOutputCol)n .setOutputCol("features")nval lr = new LogisticRegression()n .setMaxIter(10)n .setRegParam(0.01)nval pipeline = new Pipeline()n .setStages(Array(tokenizer, hashingTF, lr))nn// Fit the pipeline to training documents.nval model = pipeline.fit(training)nn// Now we can optionally save the fitted pipeline to disknmodel.write.overwrite().save("/tmp/spark-logistic-regression-model")nn// We can also save this unfit pipeline to disknpipeline.write.overwrite().save("/tmp/unfit-lr-model")nn// And load it back in during productionnval sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")nn// Prepare test documents, which are unlabeled (id, text) tuples.nval test = spark.createDataFrame(Seq(n (4L, "spark i j k"),n (5L, "l m n"),n (6L, "mapreduce spark"),n (7L, "apache hadoop")n)).toDF("id", "text")nn// Make predictions on test documents.nmodel.transform(test)n .select("id", "text", "probability", "prediction")n .collect()n .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>n println(s"($id, $text) --> prob=$prob, prediction=$prediction")n }n
Java:
import java.util.Arrays;nnimport org.apache.spark.ml.Pipeline;nimport org.apache.spark.ml.PipelineModel;nimport org.apache.spark.ml.PipelineStage;nimport org.apache.spark.ml.classification.LogisticRegression;nimport org.apache.spark.ml.feature.HashingTF;nimport org.apache.spark.ml.feature.Tokenizer;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nn// Prepare training documents, which are labeled.nDataset<Row> training = spark.createDataFrame(Arrays.asList(n new JavaLabeledDocument(0L, "a b c d e spark", 1.0),n new JavaLabeledDocument(1L, "b d", 0.0),n new JavaLabeledDocument(2L, "spark f g h", 1.0),n new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)n), JavaLabeledDocument.class);nn// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.nTokenizer tokenizer = new Tokenizer()n .setInputCol("text")n .setOutputCol("words");nHashingTF hashingTF = new HashingTF()n .setNumFeatures(1000)n .setInputCol(tokenizer.getOutputCol())n .setOutputCol("features");nLogisticRegression lr = new LogisticRegression()n .setMaxIter(10)n .setRegParam(0.01);nPipeline pipeline = new Pipeline()n .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});nn// Fit the pipeline to training documents.nPipelineModel model = pipeline.fit(training);nn// Prepare test documents, which are unlabeled.nDataset<Row> test = spark.createDataFrame(Arrays.asList(n new JavaDocument(4L, "spark i j k"),n new JavaDocument(5L, "l m n"),n new JavaDocument(6L, "mapreduce spark"),n new JavaDocument(7L, "apache hadoop")n), JavaDocument.class);nn// Make predictions on test documents.nDataset<Row> predictions = model.transform(test);nfor (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {n System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)n + ", prediction=" + r.get(3));n}n
Python:
from pyspark.ml import Pipelinenfrom pyspark.ml.classification import LogisticRegressionnfrom pyspark.ml.feature import HashingTF, Tokenizernn# Prepare training documents from a list of (id, text, label) tuples.ntraining = spark.createDataFrame([n (0, "a b c d e spark", 1.0),n (1, "b d", 0.0),n (2, "spark f g h", 1.0),n (3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])nn# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.ntokenizer = Tokenizer(inputCol="text", outputCol="words")nhashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")nlr = LogisticRegression(maxIter=10, regParam=0.01)npipeline = Pipeline(stages=[tokenizer, hashingTF, lr])nn# Fit the pipeline to training documents.nmodel = pipeline.fit(training)nn# Prepare test documents, which are unlabeled (id, text) tuples.ntest = spark.createDataFrame([n (4, "spark i j k"),n (5, "l m n"),n (6, "mapreduce spark"),n (7, "apache hadoop")], ["id", "text"])nn# Make predictions on test documents and print columns of interest.nprediction = model.transform(test)nselected = prediction.select("id", "text", "prediction")nfor row in selected.collect():n print(row)n
推薦閱讀: