Spark MLlib數據類型介紹

MLlib支持幾種數據類型:本地向量(local vectors),和存儲在本地或者基於RDD的分散式矩陣(matrices)。底層的線性代數轉換操作是基於Breeze和jblas實現的。在MLlib中有監督學習演算法使用的訓練樣本數據類型被稱為「帶標籤的點(labeled point)」。

一、本地向量(Local Vector)

   一個本地向量是由從0開始的整型下標和double型數值組成的,存儲在單機節點上。MLlib支持兩種類型的本地向量:密集(dense)的和稀疏(sparse)的。密集向量用一個double數組來存儲值。而稀疏向量由兩個並列的數組,下標和值組成。例如,一個向量(1.0, 0.0, 3.0)可以由密集的數組[1.0, 0.0, 3.0]表示,或者可以由(3, [0, 2], [1.0, 3.0])表示,其中3是指向量大小。接下來第0個元素是1.0,第2個元素是3.0。由此可看出,0.0在稀疏向量中可省略不寫。

  本地向量的基本類型是Vector,並且有兩個子類型:DenseVector, SparseVector。建議使用Vectors中的工廠方法來創建本地向量。

import org.apache.spark.mllib.linalg.{Vector, Vectors}//創建一個密集向量(1.0, 0.0, 3.0)val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)//通過設置非零元素的下標和值來創建一個稀疏向量(1.0, 0.0, 3.0)val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))//通過另一種方式創建稀疏向量,採用下標,值對形式val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0),(2, 3.0))

注意:Scala默認imports scala.collection.immutable.Vector,所以一定要記得手動引入MLlib的Vector類。

二、帶標籤的點(Labeled point)

  帶標籤的點實質上由本地向量(既可以是密集向量也可以是稀疏向量) 和類標籤(label/response)組成。在MLlib中,Labeled point一般用在有監督學習演算法中。label的存儲類型為double,所以Labeled point既可用在回歸分析演算法又可用在分類演算法(regression and classification)中。對於二分型分類演算法,label應該用0和1表示。對多分型分類演算法,標籤可以用從0開始的下標表示:0,1,2……

  MLlib中Labeled point用LabeledPoint類表示

import org.apache.spark.mllib.linglg.Vectors import org.apache.spark.mllib.regression.LabeledPoint // 創建一個標籤為1,數據為密集型向量構成的帶標籤點 val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) // 創建一個標籤為0,數據為稀疏型向量構成的帶標籤點 val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

1、稀疏數據(sparse data)

  使用稀疏數據是很普遍的現象。MLlib支持讀取以LIBSVM格式存儲的訓練數據,LIBSVM是LIBSVM和LIBLINEAR默認的格式。它是一種text格式的數據,裡面的每一行代表一個含類標籤的稀疏向量,格式如下:

label index1:value1 index2:value2 .....

文件格式1.1如下所示:

0 1:0.052893 2:1.000000 3:0.750000 4:1.000000 5:0.066225 11:0.047634 12:1.000000 13:0.740506 14:1.000000 15:0.058539 16:0.003995 17:0.500000 18:0.400000 19:0.400000 20:0.004121 21:1.000000 22:1.000000 23:0.974510 24:1.000000 25:0.929240 26:1.000000 27:1.000000 28:0.829951 29:1.000000 30:1.000000 31:0.768123 32:1.000000 33:1.000000 34:1.000000 35:1.000000 36:1.000000 37:1.000000 38:1.000000 39:0.998377 40:1.000000 41:0.333333 42:0.434783 44:0.396910 45:0.447368 46:0.9666671 1:0.026446 2:0.750000 3:0.750000 4:0.500000 5:0.036424 11:0.024116 12:0.716563 13:0.702528 14:0.521932 15:0.031683 16:0.007919 17:1.000000 18:0.500000 19:0.200000 20:0.008170 21:0.780898 22:0.659859 23:0.722056 24:0.666725 25:0.428280 29:0.876585 30:0.939544 31:0.664658 32:0.953860 33:0.548049 37:0.843365 38:0.727569 39:0.786101 40:0.725966 42:0.608696 44:1.000000 45:1.000000 46:0.266667  

上面第一行0就是label,1:0.052893就是index1:value1,以此類推。

LIBSVM中index是從1開始的,在載入完成後索引被轉換成從0開始。

用MLUtins.loadLibSVMFile從LIBSVM格式中讀取訓練數據,

import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD val examples:RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, data/mllib/sample_libsvm_data.txt")

$SPARK_HOME/data/mllib/sample_libSVM_data.txt文件就是上面文件示例格式1.1所示。

三、本地矩陣(Local matrix)

  本地矩陣由下標和double型的數值組成,其中下標是由整型數據表示行和列,保存在單個機器中。MLlib支持稀疏矩陣和密集矩陣,密集矩陣的值存在一個double數組中,一列列的進行存儲。如下所示為一個密集矩陣

  表示一個size為(3,2)的密集矩陣,其值存在一個一維數組中,數組值為[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]。稀疏矩陣的非零元素值都保存在壓縮的稀疏列(Compressed Sparse Column,CSC)中,按列優先的順序存儲。

  本地矩陣基本類型為Matrix,有兩個子類型:DenseMatrix和SparseMatrix。推薦使用Matrices的工廠方法來生成本地矩陣。必須牢記,MLlib中的本地矩陣數值是按列的順序進行存儲的。

import org.apache.spark.mllib.linalg.{Matrix,Matrices} // 創建一個密集矩陣((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val dm:Matrix = Matrices.dense(3,2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) // 創建一個稀疏矩陣((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) val sm:Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))

  註:稀疏矩陣解釋,首先指定矩陣是3行2列,Array(0, 1, 3)是指,第0個非零元素在第一列,第一第二個非零元素在第二列。Array(0, 2, 1)是指,第一個非零元素在第0行,第二個非零元素在第2行,第三個非零元素在第1行。此處設計比較好,假設100個元素分兩列,不需要把每個元素所在列都標出來,只需要記錄3個數字即可。Array(9, 6, 8)表示按順序存儲非零元素.

四、分散式矩陣(Distributed Matrix)

  一個分散式矩陣也是由下標和double型的值組成,不過分散式矩陣的下標不是Int型,而是long型,數據分散式保存在一個或多個RDD中。選擇正確的格式來保存海量和分散式的矩陣是非常重要的。將分散式矩陣轉換成不同的格式需要一個全局的shuffle(global shuffle),而全局shuffle的代價會非常高。到目前為止,Spark MLlib中已經實現了三種分散式矩陣。

  最基本的分散式矩陣是RowMatrix,它是一個行式的分散式矩陣,沒有行索引。比如一系列特徵向量的集合。RowMatrix由一個RDD代表所有的行,每一行是一個本地向量。假設一個RowMatrix的列數不是特別巨大,那麼一個簡單的本地向量能夠與driver進行聯繫,並且數據可以在單個節點上保存或使用。IndexedRowMatrix與RowMatrix類似但是有行索引,行索引可以用來區分行並且進行連接等操作。CoordinateMatrix是一個以協同列表(coordinate list)格式存儲數據的分散式矩陣,數據以RDD形式存儲。

  注意:因為我們需要緩存矩陣的大小,所以分散式矩陣的RDDs格式是需要確定的,使用非確定RDDs的話會報錯。

1、RowMatrix

RowMatrix它是一個行式的分散式矩陣,沒有行索引。比如一系列特徵向量的集合。RowMatrix由一個RDD代表所有的行,每一行是一個本地向量。因為每一行代表一個本地向量,所以它的列數被限制在Integer.max的範圍內,在實際應用中不會太大。

  一個RowMatrix可以由一個RDD[Vector]的實例創建。因此我們可以計算統計信息或者進行分解。QR分解(QR decomposition)是A=QR,其中Q是一個矩陣,R是一個上三角矩陣。對sigular value decomposition(SVD和principal component analysis(PCA),可以去參考降維的部分。

import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.distributed.RowMatrix val rows:RDD[Vector] = .... //一個本地向量的RDD // 從RDD[Vector]創建一個RowMatrix val mat: RowMatrix = new RowMatrix(rows) // 獲取RowMatrix的維度 val m = mat.numRows() val n = mat.numCols() // QR降維 val qrResult = mat.takkSkinnyQR(true)

2、IndexedRowMatrix

IndexedRowMatrix與RowMatrix類似,但是它有行索引。由一個行索引RDD表示,索引每一行由一個long型行索引和一個本地向量組成。

  一個IndexedRowMatrix可以由RDD[IndexedRow]的實例來生成,IndexedRow是一個(Long, Vector)的封裝。去掉行索引,IndexedRowMatrix能夠轉換成RowMatrix。

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix,RowMatrix} val rows: RDD[IndexedRow] = ... //一個indexed rows的RDD // 從RDD[IndexedRow]創建一個IndexedRowMatrix val mat:IndexedMatrix = new IndexedRowMatrix(rows) // 獲取維度 val m = mat.numRows() val n = mat.numCols() // 去掉行索引,轉換成RowMatrix val rowMat:RowMatrix = mat.toRowMatrix()

3、CoordinateMatrix

CoordinateMatrix是一個分散式矩陣,其實體集合是一個RDD,每一個是一個三元組(i:Long, j:Long, value:Double)。其中i是行索引,j是列索引,value是實體的值。當矩陣的維度很大並且是稀疏矩陣時,才使用CoordinateMatrix。

  一個CoordinateMatrix可以通過一個RDD[MatrixEntry]的實例來創建,MatrixEntry是一個(Long, Long, Double)的封裝。CoordinateMatrix可以通過調用toIndexedRowMatrix轉換成一個IndexedRowMatrix。CoordinateMatrix的其他降維方法暫時還不支持(Spark-1.6.2)。

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,MatrixEntry} val entries:RDD[MatrixEntry] = ... //一個matrix entries的RDD // 由RDD[MatrixEntry]創建一個CoordinateMatrix val mat:CoordinateMatrix = new CoordinateMatrix(entries) // 獲取矩陣的維度 va l m = mat.numRows() val n = mat.numCols() // 調用toIndexedRowMatrix轉換成IndexedRowMatrix,它的行都是稀疏向量 vavl indexedRowMatrix = mat.toIndexedRowMatrix()

4、BlockMatrix

  一個BlockMatrix是一個分散式的矩陣,由一個MatrixBlocks的RDD組成。MatrixBlock是一個三元組((Int, Int), Matrix),其中(Int, Int)是block的索引,Matrix是一個在指定位置上的維度為rowsPerBlock * colsPerBlock的子矩陣。BlockMatrix支持與另一個BlockMatrix對象的add和multiply操作。BlockMatrix提供了一個幫助方法validate,這個方法可以用於檢測該`BlockMatrix·是否正確。

  可以通過IndexedRowMatrix或者CoordinateMatrix調用toBlockMatrix快速得到BlockMatrix對象。默認情況下toBlockMatrix方法會得到一個1024 x 1024的BlockMatrix。使用時可以通過手動傳遞維度值來設置維度,toBlockMatrix(rowsPerBlock, colsPerBlock)。

import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries// 從RDD[MatrixEntry]生成一個CoordinateMatrixval coordMat: CoordinateMatrix = new CoordinateMatrix(entries)// 將CoordinateMatrix轉換成BlockMatrixval matA: BlockMatrix = coordMat.toBlockMatrix().cache()// 檢測BlockMatrix格式是否正確,錯誤的話會拋出異常,正確的話無其他影響matA.validate()// 計算A^T * A.val ata = matA.transpose.multiply(matA)

推薦閱讀:

求通俗解釋下bandit老虎機到底是個什麼東西?
LR與SVM的區別與聯繫
李宏毅機器學習2016 第十六講 生成對抗網路 GAN
以AlphaGo為例,如何理解神經網路的存儲容量(storage capacity)?

TAG:Spark | 机器学习 |