Spark MLlib 文本特徵提取(TF-IDF/Word2Vec/CountVectorizer)
Spark MLlib 提供三種文本特徵提取方法,分別為TF-IDF、Word2Vec以及CountVectorizer,其原理與調用代碼整理如下:
TF-IDF
演算法介紹:
詞頻-逆向文件頻率(TF-IDF)是一種在文本挖掘中廣泛使用的特徵向量化方法,它可以體現一個文檔中詞語在語料庫中的重要程度。
詞語由t表示,文檔由d表示,語料庫由D表示。詞頻TF(t,,d)是詞語t在文檔d中出現的次數。文件頻率DF(t,D)是包含詞語的文檔的個數。如果我們只使用詞頻來衡量重要性,很容易過度強調在文檔中經常出現而並沒有包含太多與文檔有關的信息的詞語,比如「a」,「the」以及「of」。如果一個詞語經常出現在語料庫中,它意味著它並沒有攜帶特定的文檔的特殊信息。逆向文檔頻率數值化衡量詞語提供多少信息:
其中,|D|是語料庫中的文檔總數。由於採用了對數,如果一個詞出現在所有的文件,其IDF值變為0。
調用:
在下面的代碼段中,我們以一組句子開始。首先使用分解器Tokenizer把句子劃分為單個詞語。對每一個句子(詞袋),我們使用HashingTF將句子轉換為特徵向量,最後使用IDF重新調整特徵向量。這種轉換通常可以提高使用文本特徵的性能。然後,我們的特徵向量可以在演算法學習中。
Scala:
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}val sentenceData = spark.createDataFrame(Seq( (0, "Hi I heard about Spark"), (0, "I wish Java could use case classes"), (1, "Logistic regression models are neat"))).toDF("label", "sentence")val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")val wordsData = tokenizer.transform(sentenceData)val hashingTF = new HashingTF() .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)val featurizedData = hashingTF.transform(wordsData)// alternatively, CountVectorizer can also be used to get term frequency vectorsval idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")val idfModel = idf.fit(featurizedData)val rescaledData = idfModel.transform(featurizedData)rescaledData.select("features", "label").take(3).foreach(println)
Java:
importjava.util.Arrays;importjava.util.List;importorg.apache.spark.ml.feature.HashingTF;importorg.apache.spark.ml.feature.IDF;importorg.apache.spark.ml.feature.IDFModel;importorg.apache.spark.ml.feature.Tokenizer;importorg.apache.spark.ml.linalg.Vector;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.RowFactory;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.types.DataTypes;importorg.apache.spark.sql.types.Metadata;importorg.apache.spark.sql.types.StructField;importorg.apache.spark.sql.types.StructType;List<Row> data = Arrays.asList( RowFactory.create(0.0, "Hi I heard about Spark"), RowFactory.create(0.0, "I wish Java could use case classes"), RowFactory.create(1.0, "Logistic regression models are neat"));StructType schema = new StructType(new StructField[]{new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),new StructField("sentence", DataTypes.StringType, false, Metadata.empty())});Dataset<Row> sentenceData = spark.createDataFrame(data, schema);Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");Dataset<Row> wordsData = tokenizer.transform(sentenceData);int numFeatures = 20;HashingTF hashingTF = new HashingTF() .setInputCol("words") .setOutputCol("rawFeatures") .setNumFeatures(numFeatures);Dataset<Row> featurizedData = hashingTF.transform(wordsData);// alternatively, CountVectorizer can also be used to get term frequency vectorsIDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");IDFModel idfModel = idf.fit(featurizedData);Dataset<Row> rescaledData = idfModel.transform(featurizedData);for (Row r : rescaledData.select("features", "label").takeAsList(3)) { Vector features = r.getAs(0); Double label = r.getDouble(1); System.out.println(features); System.out.println(label);}
from pyspark.ml.feature import HashingTF, IDF, TokenizersentenceData = spark.createDataFrame([ (0, "Hi I heard about Spark"), (0, "I wish Java could use case classes"), (1, "Logistic regression models are neat")], ["label", "sentence"])tokenizer = Tokenizer(inputCol="sentence", outputCol="words")wordsData = tokenizer.transform(sentenceData)hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)featurizedData = hashingTF.transform(wordsData)# alternatively, CountVectorizer can also be used to get term frequency vectorsidf = IDF(inputCol="rawFeatures", outputCol="features")idfModel = idf.fit(featurizedData)rescaledData = idfModel.transform(featurizedData)for features_label in rescaledData.select("features", "label").take(3): print(features_label)
Word2Vec
演算法介紹:
Word2vec是一個Estimator,它採用一系列代表文檔的詞語來訓練word2vecmodel。該模型將每個詞語映射到一個固定大小的向量。word2vecmodel使用文檔中每個詞語的平均數來將文檔轉換為向量,然後這個向量可以作為預測的特徵,來計算文檔相似度計算等等。
在下面的代碼段中,我們首先用一組文檔,其中每一個文檔代表一個詞語序列。對於每一個文檔,我們將其轉換為一個特徵向量。此特徵向量可以被傳遞到一個學習演算法。
調用:
Scala:
import org.apache.spark.ml.feature.Word2Vec// Input data: Each row is a bag of words from a sentence or document.val documentDF = spark.createDataFrame(Seq( "Hi I heard about Spark".split(" "), "I wish Java could use case classes".split(" "), "Logistic regression models are neat".split(" ")).map(Tuple1.apply)).toDF("text")// Learn a mapping from words to Vectors.val word2Vec = new Word2Vec() .setInputCol("text") .setOutputCol("result") .setVectorSize(3) .setMinCount(0)val model = word2Vec.fit(documentDF)val result = model.transform(documentDF)result.select("result").take(3).foreach(println)
Java:
import java.util.Arrays;import java.util.List;import org.apache.spark.ml.feature.Word2Vec;import org.apache.spark.ml.feature.Word2VecModel;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.*;// Input data: Each row is a bag of words from a sentence or document.List<Row> data = Arrays.asList( RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))), RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))), RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" "))));StructType schema = new StructType(new StructField[]{ new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())});Dataset<Row> documentDF = spark.createDataFrame(data, schema);// Learn a mapping from words to Vectors.Word2Vec word2Vec = new Word2Vec() .setInputCol("text") .setOutputCol("result") .setVectorSize(3) .setMinCount(0);Word2VecModel model = word2Vec.fit(documentDF);Dataset<Row> result = model.transform(documentDF);for (Row r : result.select("result").takeAsList(3)) { System.out.println(r);}
Python:
from pyspark.ml.feature import Word2Vec# Input data: Each row is a bag of words from a sentence or document.documentDF = spark.createDataFrame([ ("Hi I heard about Spark".split(" "), ), ("I wish Java could use case classes".split(" "), ), ("Logistic regression models are neat".split(" "), )], ["text"])# Learn a mapping from words to Vectors.word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")model = word2Vec.fit(documentDF)result = model.transform(documentDF)for feature in result.select("result").take(3): print(feature)
Countvectorizer
演算法介紹:
Countvectorizer和Countvectorizermodel旨在通過計數來將一個文檔轉換為向量。當不存在先驗字典時,Countvectorizer可作為Estimator來提取辭彙,並生成一個Countvectorizermodel。該模型產生文檔關於詞語的稀疏表示,其表示可以傳遞給其他演算法如LDA。
在fitting過程中,countvectorizer將根據語料庫中的詞頻排序選出前vocabsize個詞。一個可選的參數minDF也影響fitting過程中,它指定辭彙表中的詞語在文檔中最少出現的次數。另一個可選的二值參數控制輸出向量,如果設置為真那麼所有非零的計數為1。這對於二值型離散概率模型非常有用。
示例:
假設我們有如下的DataFrame包含id和texts兩列:
id | texts
----|----------
0 |Array("a", "b", "c")
1 |Array("a", "b", "b", "c","a")
文本中的每一行都是一個文檔類型的數組(字元串)。調用的CountVectorizer產生辭彙(a,b,c)的CountVectorizerModel,轉換後的輸出向量如下:
id | texts | vector
----|---------------------------------|---------------
0 |Array("a", "b", "c") | (3,[0,1,2],[1.0,1.0,1.0])
1 |Array("a", "b", "b", "c","a") |(3,[0,1,2],[2.0,2.0,1.0])
每個向量代表文檔的辭彙表中每個詞語出現的次數。
調用:
Scala:
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}val df = spark.createDataFrame(Seq( (0, Array("a", "b", "c")), (1, Array("a", "b", "b", "c", "a")))).toDF("id", "words")// fit a CountVectorizerModel from the corpusval cvModel: CountVectorizerModel = new CountVectorizer() .setInputCol("words") .setOutputCol("features") .setVocabSize(3) .setMinDF(2) .fit(df)// alternatively, define CountVectorizerModel with a-priori vocabularyval cvm = new CountVectorizerModel(Array("a", "b", "c")) .setInputCol("words") .setOutputCol("features")cvModel.transform(df).select("features").show()
Java:
import java.util.Arrays;import java.util.List;import org.apache.spark.ml.feature.CountVectorizer;import org.apache.spark.ml.feature.CountVectorizerModel;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.*;// Input data: Each row is a bag of words from a sentence or document.List<Row> data = Arrays.asList( RowFactory.create(Arrays.asList("a", "b", "c")), RowFactory.create(Arrays.asList("a", "b", "b", "c", "a")));StructType schema = new StructType(new StructField [] { new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())});Dataset<Row> df = spark.createDataFrame(data, schema);// fit a CountVectorizerModel from the corpusCountVectorizerModel cvModel = new CountVectorizer() .setInputCol("text") .setOutputCol("feature") .setVocabSize(3) .setMinDF(2) .fit(df);// alternatively, define CountVectorizerModel with a-priori vocabularyCountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"}) .setInputCol("text") .setOutputCol("feature");cvModel.transform(df).show();
Python:
from pyspark.ml.feature import CountVectorizer# Input data: Each row is a bag of words with a ID.df = spark.createDataFrame([ (0, "a b c".split(" ")), (1, "a b b c a".split(" "))], ["id", "words"])# fit a CountVectorizerModel from the corpus.cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)model = cv.fit(df)result = model.transform(df)result.show()
推薦閱讀:
※Spark MLlib 數據預處理-特徵變換(二)
※三種線性方法的優化
※在Spark上實現線性收斂的隨機優化演算法SVRG
※Pipeline詳解及Spark MLlib使用