Spark MLlib 數據預處理-特徵變換(一)
Tokenizer(分詞器)
演算法介紹:
Tokenization將文本劃分為獨立個體(通常為單詞)。
RegexTokenizer基於正則表達式提供更多的劃分選項。默認情況下,參數「pattern」為劃分文本的分隔符。或者可以指定參數「gaps」來指明正則「patten」表示「tokens」而不是分隔符,這樣來為分詞結果找到所有可能匹配的情況。
調用:
Scala:
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}nnval sentenceDataFrame = spark.createDataFrame(Seq(n (0, "Hi I heard about Spark"),n (1, "I wish Java could use case classes"),n (2, "Logistic,regression,models,are,neat")n)).toDF("label", "sentence")nnval tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")nval regexTokenizer = new RegexTokenizer()n .setInputCol("sentence")n .setOutputCol("words")n .setPattern("W") // alternatively .setPattern("w+").setGaps(false)nnval tokenized = tokenizer.transform(sentenceDataFrame)ntokenized.select("words", "label").take(3).foreach(println)nval regexTokenized = regexTokenizer.transform(sentenceDataFrame)nregexTokenized.select("words", "label").take(3).foreach(println)n
Java:
import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.RegexTokenizer;nimport org.apache.spark.ml.feature.Tokenizer;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.DataTypes;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nnList<Row> data = Arrays.asList(n RowFactory.create(0, "Hi I heard about Spark"),n RowFactory.create(1, "I wish Java could use case classes"),n RowFactory.create(2, "Logistic,regression,models,are,neat")n);nnStructType schema = new StructType(new StructField[]{n new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),n new StructField("sentence", DataTypes.StringType, false, Metadata.empty())n});nnDataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);nnTokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");nnDataset<Row> wordsDataFrame = tokenizer.transform(sentenceDataFrame);nfor (Row r : wordsDataFrame.select("words", "label").takeAsList(3)) {n java.util.List<String> words = r.getList(0);n for (String word : words) System.out.print(word + " ");n System.out.println();n}nnRegexTokenizer regexTokenizer = new RegexTokenizer()n .setInputCol("sentence")n .setOutputCol("words")n .setPattern("W"); // alternatively .setPattern("w+").setGaps(false);n
Python:
from pyspark.ml.feature import Tokenizer, RegexTokenizernnsentenceDataFrame = spark.createDataFrame([n (0, "Hi I heard about Spark"),n (1, "I wish Java could use case classes"),n (2, "Logistic,regression,models,are,neat")n], ["label", "sentence"])ntokenizer = Tokenizer(inputCol="sentence", outputCol="words")nwordsDataFrame = tokenizer.transform(sentenceDataFrame)nfor words_label in wordsDataFrame.select("words", "label").take(3):n print(words_label)nregexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="W")n# alternatively, pattern="w+", gaps(False)n
StopWordsRemover
演算法介紹:
停用詞為在文檔中頻繁出現,但未承載太多意義的詞語,他們不應該被包含在演算法輸入中。
StopWordsRemover的輸入為一系列字元串(如分詞器輸出),輸出中刪除了所有停用詞。停用詞表由stopWords參數提供。一些語言的默認停用詞表可以通過StopWordsRemover.loadDefaultStopWords(language)調用。布爾參數caseSensitive指明是否區分大小寫(默認為否)。
示例:
假設我們有如下DataFrame,有id和raw兩列:
id | raw
----|----------
0 n| [I, saw, the, red, baloon]
1 n| [Mary, had, a, little, lamb]
通過對raw列調用StopWordsRemover,我們可以得到篩選出的結果列如下:
id | raw |nfiltered
----|-----------------------------|--------------------
0 n| [I, saw, the, red, baloon] n| [saw, red, baloon]
1 n| [Mary, had, a, little, lamb]|[Mary, little, lamb]
其中,「I」,n「the」, 「had」以及「a」被移除。
調用:
Scala:
import org.apache.spark.ml.feature.StopWordsRemovernnval remover = new StopWordsRemover()n .setInputCol("raw")n .setOutputCol("filtered")nnval dataSet = spark.createDataFrame(Seq(n (0, Seq("I", "saw", "the", "red", "baloon")),n (1, Seq("Mary", "had", "a", "little", "lamb"))n)).toDF("id", "raw")nnremover.transform(dataSet).show()n
Java:
import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.StopWordsRemover;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.DataTypes;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nnStopWordsRemover remover = new StopWordsRemover()n .setInputCol("raw")n .setOutputCol("filtered");nnList<Row> data = Arrays.asList(n RowFactory.create(Arrays.asList("I", "saw", "the", "red", "baloon")),n RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))n);nnStructType schema = new StructType(new StructField[]{n new StructField(n "raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())n});nnDataset<Row> dataset = spark.createDataFrame(data, schema);nremover.transform(dataset).show();n
Python:
from pyspark.ml.feature import StopWordsRemovernnsentenceData = spark.createDataFrame([n (0, ["I", "saw", "the", "red", "baloon"]),n (1, ["Mary", "had", "a", "little", "lamb"])n], ["label", "raw"])nnremover = StopWordsRemover(inputCol="raw", outputCol="filtered")nremover.transform(sentenceData).show(truncate=False)n
n-gram
演算法介紹:
一個n-gram是一個長度為整數n的字序列。NGram可以用來將輸入轉換為n-gram。
NGram的輸入為一系列字元串(如分詞器輸出)。參數n決定每個n-gram包含的對象個數。結果包含一系列n-gram,其中每個n-gram代表一個空格分割的n個連續字元。如果輸入少於n個字元串,將沒有輸出結果。
調用:
Scala:
import org.apache.spark.ml.feature.NGramnnval wordDataFrame = spark.createDataFrame(Seq(n (0, Array("Hi", "I", "heard", "about", "Spark")),n (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),n (2, Array("Logistic", "regression", "models", "are", "neat"))n)).toDF("label", "words")nnval ngram = new NGram().setInputCol("words").setOutputCol("ngrams")nval ngramDataFrame = ngram.transform(wordDataFrame)nngramDataFrame.take(3).map(_.getAs[Stream[String]]("ngrams").toList).foreach(println)n
Java:
import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.NGram;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.DataTypes;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nnList<Row> data = Arrays.asList(n RowFactory.create(0.0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),n RowFactory.create(1.0, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),n RowFactory.create(2.0, Arrays.asList("Logistic", "regression", "models", "are", "neat"))n);nnStructType schema = new StructType(new StructField[]{n new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),n new StructField(n "words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())n});nnDataset<Row> wordDataFrame = spark.createDataFrame(data, schema);nnNGram ngramTransformer = new NGram().setInputCol("words").setOutputCol("ngrams");nnDataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);nnfor (Row r : ngramDataFrame.select("ngrams", "label").takeAsList(3)) {n java.util.List<String> ngrams = r.getList(0);n for (String ngram : ngrams) System.out.print(ngram + " --- ");n System.out.println();n}n
Python:
from pyspark.ml.feature import NGramnnwordDataFrame = spark.createDataFrame([n (0, ["Hi", "I", "heard", "about", "Spark"]),n (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),n (2, ["Logistic", "regression", "models", "are", "neat"])n], ["label", "words"])nngram = NGram(inputCol="words", outputCol="ngrams")nngramDataFrame = ngram.transform(wordDataFrame)nfor ngrams_label in ngramDataFrame.select("ngrams", "label").take(3):n print(ngrams_label)n
Binarizer
演算法介紹:
二值化是根據閥值將連續數值特徵轉換為0-1特徵的過程。
Binarizer參數有輸入、輸出以及閥值。特徵值大於閥值將映射為1.0,特徵值小於等於閥值將映射為0.0。
調用:
Scala:
import org.apache.spark.ml.feature.Binarizernnval data = Array((0, 0.1), (1, 0.8), (2, 0.2))nval dataFrame = spark.createDataFrame(data).toDF("label", "feature")nnval binarizer: Binarizer = new Binarizer()n .setInputCol("feature")n .setOutputCol("binarized_feature")n .setThreshold(0.5)nnval binarizedDataFrame = binarizer.transform(dataFrame)nval binarizedFeatures = binarizedDataFrame.select("binarized_feature")nbinarizedFeatures.collect().foreach(println)n
Java:
import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.Binarizer;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.DataTypes;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nnList<Row> data = Arrays.asList(n RowFactory.create(0, 0.1),n RowFactory.create(1, 0.8),n RowFactory.create(2, 0.2)n);nStructType schema = new StructType(new StructField[]{n new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),n new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())n});nDataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);nBinarizer binarizer = new Binarizer()n .setInputCol("feature")n .setOutputCol("binarized_feature")n .setThreshold(0.5);nDataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);nDataset<Row> binarizedFeatures = binarizedDataFrame.select("binarized_feature");nfor (Row r : binarizedFeatures.collectAsList()) {n Double binarized_value = r.getDouble(0);n System.out.println(binarized_value);n}n
Python:
from pyspark.ml.feature import BinarizernncontinuousDataFrame = spark.createDataFrame([n (0, 0.1),n (1, 0.8),n (2, 0.2)n], ["label", "feature"])nbinarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")nbinarizedDataFrame = binarizer.transform(continuousDataFrame)nbinarizedFeatures = binarizedDataFrame.select("binarized_feature")nfor binarized_feature, in binarizedFeatures.collect():n print(binarized_feature)n
PCA
演算法介紹:
主成分分析是一種統計學方法,它使用正交轉換從一系列可能相關的變數中提取線性無關變數集,提取出的變數集中的元素稱為主成分。使用PCA
方法可以對變數集合進行降維。下面的示例介紹如何將5維特徵向量轉換為3維主成分向量。調用:
Scala:
import org.apache.spark.ml.feature.PCAnimport org.apache.spark.ml.linalg.Vectorsnnval data = Array(n Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),n Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),n Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)n)nval df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")nval pca = new PCA()n .setInputCol("features")n .setOutputCol("pcaFeatures")n .setK(3)n .fit(df)nval pcaDF = pca.transform(df)nval result = pcaDF.select("pcaFeatures")nresult.show()n
Java:
import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.PCA;nimport org.apache.spark.ml.feature.PCAModel;nimport org.apache.spark.ml.linalg.VectorUDT;nimport org.apache.spark.ml.linalg.Vectors;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nnList<Row> data = Arrays.asList(n RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),n RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),n RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))n);nnStructType schema = new StructType(new StructField[]{n new StructField("features", new VectorUDT(), false, Metadata.empty()),n});nnDataset<Row> df = spark.createDataFrame(data, schema);nnPCAModel pca = new PCA()n .setInputCol("features")n .setOutputCol("pcaFeatures")n .setK(3)n .fit(df);nnDataset<Row> result = pca.transform(df).select("pcaFeatures");nresult.show();n
Python:
from pyspark.ml.feature import PCAnfrom pyspark.ml.linalg import Vectorsnndata = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),n (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),n (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]ndf = spark.createDataFrame(data, ["features"])npca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")nmodel = pca.fit(df)nresult = model.transform(df).select("pcaFeatures")nresult.show(truncate=False)n
PolynomialExpansion
演算法介紹:
多項式擴展通過產生n維組合將原始特徵將特徵擴展到多項式空間。下面的示例會介紹如何將你的特徵集拓展到3維多項式空間。
調用:
Scala:
import org.apache.spark.ml.feature.PolynomialExpansionnimport org.apache.spark.ml.linalg.Vectorsnnval data = Array(n Vectors.dense(-2.0, 2.3),n Vectors.dense(0.0, 0.0),n Vectors.dense(0.6, -1.1)n)nval df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")nval polynomialExpansion = new PolynomialExpansion()n .setInputCol("features")n .setOutputCol("polyFeatures")n .setDegree(3)nval polyDF = polynomialExpansion.transform(df)npolyDF.select("polyFeatures").take(3).foreach(println)n
Java:
import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.PolynomialExpansion;nimport org.apache.spark.ml.linalg.VectorUDT;nimport org.apache.spark.ml.linalg.Vectors;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nnPolynomialExpansion polyExpansion = new PolynomialExpansion()n .setInputCol("features")n .setOutputCol("polyFeatures")n .setDegree(3);nnList<Row> data = Arrays.asList(n RowFactory.create(Vectors.dense(-2.0, 2.3)),n RowFactory.create(Vectors.dense(0.0, 0.0)),n RowFactory.create(Vectors.dense(0.6, -1.1))n);nnStructType schema = new StructType(new StructField[]{n new StructField("features", new VectorUDT(), false, Metadata.empty()),n});nnDataset<Row> df = spark.createDataFrame(data, schema);nDataset<Row> polyDF = polyExpansion.transform(df);nnList<Row> rows = polyDF.select("polyFeatures").takeAsList(3);nfor (Row r : rows) {n System.out.println(r.get(0));n}n
Python:
from pyspark.ml.feature import PolynomialExpansionnfrom pyspark.ml.linalg import Vectorsnndf = sparkn .createDataFrame([(Vectors.dense([-2.0, 2.3]),),n (Vectors.dense([0.0, 0.0]),),n (Vectors.dense([0.6, -1.1]),)],n ["features"])npx = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")npolyDF = px.transform(df)nfor expanded in polyDF.select("polyFeatures").take(3):n print(expanded)n
DiscretenCosine Transform (DCT)
演算法介紹:
離散餘弦變換是與傅里葉變換相關的一種變換,它類似於離散傅立葉變換但是只使用實數。離散餘弦變換相當於一個長度大概是它兩倍的離散傅里葉變換,這個離散傅里葉變換是對一個實偶函數進行的(因為一個實偶函數的傅里葉變換仍然是一個實偶函數)。離散餘弦變換,經常被信號處理和圖像處理使用,用於對信號和圖像(包括靜止圖像和運動圖像)進行有損數據壓縮。
調用:
Scala:
import org.apache.spark.ml.feature.DCTnimport org.apache.spark.ml.linalg.Vectorsnnval data = Seq(n Vectors.dense(0.0, 1.0, -2.0, 3.0),n Vectors.dense(-1.0, 2.0, 4.0, -7.0),n Vectors.dense(14.0, -2.0, -5.0, 1.0))nnval df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")nnval dct = new DCT()n .setInputCol("features")n .setOutputCol("featuresDCT")n .setInverse(false)nnval dctDf = dct.transform(df)ndctDf.select("featuresDCT").show(3)n
Java:
import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.DCT;nimport org.apache.spark.ml.linalg.VectorUDT;nimport org.apache.spark.ml.linalg.Vectors;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nnList<Row> data = Arrays.asList(n RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),n RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),n RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))n);nStructType schema = new StructType(new StructField[]{n new StructField("features", new VectorUDT(), false, Metadata.empty()),n});nDataset<Row> df = spark.createDataFrame(data, schema);nDCT dct = new DCT()n .setInputCol("features")n .setOutputCol("featuresDCT")n .setInverse(false);nDataset<Row> dctDf = dct.transform(df);ndctDf.select("featuresDCT").show(3);n
Python:
from pyspark.ml.feature import DCTnfrom pyspark.ml.linalg import Vectorsnndf = spark.createDataFrame([n (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),n (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),n (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])nndct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")nndctDf = dct.transform(df)nnfor dcts in dctDf.select("featuresDCT").take(3):n print(dcts)n
Stringindexer
演算法介紹:
StringIndexer將字元串標籤編碼為標籤指標。指標取值範圍為[0,numLabels],按照標籤出現頻率排序,所以出現最頻繁的標籤其指標為0。如果輸入列為數值型,我們先將之映射到字元串然後再對字元串的值進行指標。如果下游的管道節點需要使用字元串-指標標籤,則必須將輸入和鑽還為字元串-指標列名。
示例:
假設我們有如下的DataFrame包含id和category兩列:
id | category
----|----------
0 n| a
1 n| b
2 n| c
3 n| a
4 n| a
5 n| c
category是有3種取值的字元串列,使用StringIndexer進行轉換後我們可以得到如下輸出:
id | category | categoryIndex
----|----------|---------------
0 n| a | 0.0
1 n| b | 2.0
2 n| c | 1.0
3 n| a | 0.0
4 n| a | 0.0
5 n| c | 1.0
另外,如果在轉換新數據時出現了在訓練中未出現的標籤,StringIndexer將會報錯(默認值)或者跳過未出現的標籤實例。
調用:
Scala:
import org.apache.spark.ml.feature.StringIndexernnval df = spark.createDataFrame(n Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))n).toDF("id", "category")nnval indexer = new StringIndexer()n .setInputCol("category")n .setOutputCol("categoryIndex")nnval indexed = indexer.fit(df).transform(df)nindexed.show()n
Java:
import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.StringIndexer;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nnimport static org.apache.spark.sql.types.DataTypes.*;nnList<Row> data = Arrays.asList(n RowFactory.create(0, "a"),n RowFactory.create(1, "b"),n RowFactory.create(2, "c"),n RowFactory.create(3, "a"),n RowFactory.create(4, "a"),n RowFactory.create(5, "c")n);nStructType schema = new StructType(new StructField[]{n createStructField("id", IntegerType, false),n createStructField("category", StringType, false)n});nDataset<Row> df = spark.createDataFrame(data, schema);nStringIndexer indexer = new StringIndexer()n .setInputCol("category")n .setOutputCol("categoryIndex");nDataset<Row> indexed = indexer.fit(df).transform(df);nindexed.show();n
Python:
from pyspark.ml.feature import StringIndexernndf = spark.createDataFrame(n [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],n ["id", "category"])nindexer = StringIndexer(inputCol="category", outputCol="categoryIndex")nindexed = indexer.fit(df).transform(df)nindexed.show()n
IndexToString
演算法介紹:
與StringIndexer對應,StringIndexer將指標標籤映射回原始字元串標籤。一個常用的場景是先通過StringIndexer產生指標標籤,然後使用指標標籤進行訓練,最後再對預測結果使用StringIndexer來獲取其原始的標籤字元串。
示例:
假設我們有如下的DataFrame包含id和categoryIndex兩列:
id | categoryIndex
----|---------------
0 n| 0.0
1 n| 2.0
2 n| 1.0
3 n| 0.0
4 n| 0.0
5 n| 1.0
使用originalCategory我們可以獲取其原始的標籤字元串如下:
id | categoryIndex | originalCategory
----|---------------|-----------------
0 n| 0.0 | a
1 n| 2.0 | b
2 n| 1.0 | c
3 n| 0.0 | a
4 n| 0.0 | a
5 n| 1.0 | c
調用:
Scala:
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}nnval df = spark.createDataFrame(Seq(n (0, "a"),n (1, "b"),n (2, "c"),n (3, "a"),n (4, "a"),n (5, "c")n)).toDF("id", "category")nnval indexer = new StringIndexer()n .setInputCol("category")n .setOutputCol("categoryIndex")n .fit(df)nval indexed = indexer.transform(df)nnval converter = new IndexToString()n .setInputCol("categoryIndex")n .setOutputCol("originalCategory")nnval converted = converter.transform(indexed)nconverted.select("id", "originalCategory").show()n
Java:
import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.IndexToString;nimport org.apache.spark.ml.feature.StringIndexer;nimport org.apache.spark.ml.feature.StringIndexerModel;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.DataTypes;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nnList<Row> data = Arrays.asList(n RowFactory.create(0, "a"),n RowFactory.create(1, "b"),n RowFactory.create(2, "c"),n RowFactory.create(3, "a"),n RowFactory.create(4, "a"),n RowFactory.create(5, "c")n);nStructType schema = new StructType(new StructField[]{n new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),n new StructField("category", DataTypes.StringType, false, Metadata.empty())n});nDataset<Row> df = spark.createDataFrame(data, schema);nnStringIndexerModel indexer = new StringIndexer()n .setInputCol("category")n .setOutputCol("categoryIndex")n .fit(df);nDataset<Row> indexed = indexer.transform(df);nnIndexToString converter = new IndexToString()n .setInputCol("categoryIndex")n .setOutputCol("originalCategory");nDataset<Row> converted = converter.transform(indexed);nconverted.select("id", "originalCategory").show();n
Python:
from pyspark.ml.feature import IndexToString, StringIndexernndf = spark.createDataFrame(n [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],n ["id", "category"])nnstringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")nmodel = stringIndexer.fit(df)nindexed = model.transform(df)nnconverter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")nconverted = converter.transform(indexed)nnconverted.select("id", "originalCategory").show()n
OneHotEncoder
演算法介紹:
獨熱編碼將標籤指標映射為二值向量,其中最多一個單值。這種編碼被用於將種類特徵使用到需要連續特徵的演算法,如邏輯回歸等。
調用:
Scala:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}nnval df = spark.createDataFrame(Seq(n (0, "a"),n (1, "b"),n (2, "c"),n (3, "a"),n (4, "a"),n (5, "c")n)).toDF("id", "category")nnval indexer = new StringIndexer()n .setInputCol("category")n .setOutputCol("categoryIndex")n .fit(df)nval indexed = indexer.transform(df)nnval encoder = new OneHotEncoder()n .setInputCol("categoryIndex")n .setOutputCol("categoryVec")nval encoded = encoder.transform(indexed)nencoded.select("id", "categoryVec").show()n
Java:
import java.util.Arrays;nimport java.util.List;nnimport org.apache.spark.ml.feature.OneHotEncoder;nimport org.apache.spark.ml.feature.StringIndexer;nimport org.apache.spark.ml.feature.StringIndexerModel;nimport org.apache.spark.sql.Dataset;nimport org.apache.spark.sql.Row;nimport org.apache.spark.sql.RowFactory;nimport org.apache.spark.sql.types.DataTypes;nimport org.apache.spark.sql.types.Metadata;nimport org.apache.spark.sql.types.StructField;nimport org.apache.spark.sql.types.StructType;nnList<Row> data = Arrays.asList(n RowFactory.create(0, "a"),n RowFactory.create(1, "b"),n RowFactory.create(2, "c"),n RowFactory.create(3, "a"),n RowFactory.create(4, "a"),n RowFactory.create(5, "c")n);nnStructType schema = new StructType(new StructField[]{n new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),n new StructField("category", DataTypes.StringType, false, Metadata.empty())n});nnDataset<Row> df = spark.createDataFrame(data, schema);nnStringIndexerModel indexer = new StringIndexer()n .setInputCol("category")n .setOutputCol("categoryIndex")n .fit(df);nDataset<Row> indexed = indexer.transform(df);nnOneHotEncoder encoder = new OneHotEncoder()n .setInputCol("categoryIndex")n .setOutputCol("categoryVec");nDataset<Row> encoded = encoder.transform(indexed);nencoded.select("id", "categoryVec").show();n
Python:
from pyspark.ml.feature import OneHotEncoder, StringIndexernndf = spark.createDataFrame([n (0, "a"),n (1, "b"),n (2, "c"),n (3, "a"),n (4, "a"),n (5, "c")n], ["id", "category"])nnstringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")nmodel = stringIndexer.fit(df)nindexed = model.transform(df)nencoder = OneHotEncoder(dropLast=False, inputCol="categoryIndex", outputCol="categoryVec")nencoded = encoder.transform(indexed)nencoded.select("id", "categoryVec").show()n
推薦閱讀:
※在Spark上實現線性收斂的隨機優化演算法SVRG
※Pipeline詳解及Spark MLlib使用
※Spark MLlib 數據預處理-特徵變換(二)