標籤:

深入機器學習系列4-KMeans

轉載請註明出處,該文章的官方來源:

KMeans | Teaching ML

k-means、k-means++以及k-means||演算法分析

本文會介紹一般的k-means演算法、k-means++演算法以及基於k-means++演算法的k-means||演算法。在spark ml,已經實現了k-means演算法以及k-means||演算法。 本文首先會介紹這三個演算法的原理,然後在了解原理的基礎上分析spark中的實現代碼。

1 k-means演算法原理分析

k-means演算法是聚類分析中使用最廣泛的演算法之一。它把n個對象根據它們的屬性分為k個聚類以便使得所獲得的聚類滿足:同一聚類中的對象相似度較高;而不同聚類中的對象相似度較小。

k-means演算法的基本過程如下所示:

  • (1)任意選擇k個初始中心c_{1},c_{2},...,c_{k}
  • (2)計算X中的每個對象與這些中心對象的距離;並根據最小距離重新對相應對象進行劃分;
  • (3)重新計算每個中心對象 C_{i} 的值

  • (4)計算標準測度函數,當滿足一定條件,如函數收斂時,則演算法終止;如果條件不滿足則重複步驟(2),(3)。

1.1 k-means演算法的缺點

k-means演算法雖然簡單快速,但是存在下面的缺點:

  • 聚類中心的個數K需要事先給定,但在實際中K值的選定是非常困難的,很多時候我們並不知道給定的數據集應該分成多少個類別才最合適。
  • k-means演算法需要隨機地確定初始聚類中心,不同的初始聚類中心可能導致完全不同的聚類結果。

第一個缺陷我們很難在k-means演算法以及其改進演算法中解決,但是我們可以通過k-means++演算法來解決第二個缺陷。

2 k-means++演算法原理分析

k-means++演算法選擇初始聚類中心的基本原則是:初始的聚類中心之間的相互距離要儘可能的遠。它選擇初始聚類中心的步驟是:

  • (1)從輸入的數據點集合中隨機選擇一個點作為第一個聚類中心 c_{1}
  • (2)對於數據集中的每一個點x,計算它與最近聚類中心(指已選擇的聚類中心)的距離D(x),並根據概率選擇新的聚類中心 c_{i}
  • (3)重複過程(2)直到找到k個聚類中心。

第(2)步中,依次計算每個數據點與最近的種子點(聚類中心)的距離,依次得到D(1)、D(2)、...、D(n)構成的集合D,其中n表示數據集的大小。在D中,為了避免雜訊,不能直接選取值最大的元素,應該選擇值較大的元素,然後將其對應的數據點作為種子點。 如何選擇值較大的元素呢,下面是spark中實現的思路。

  • 求所有的距離和Sum(D(x))
  • 取一個隨機值,用權重的方式來取計算下一個「種子點」。這個演算法的實現是,先用Sum(D(x))乘以隨機值Random得到值r,然後用currSum += D(x),直到其currSum > r,此時的點就是下一個「種子點」。

為什麼用這樣的方式呢?我們換一種比較好理解的方式來說明。把集合D中的每個元素D(x)想像為一根線L(x),線的長度就是元素的值。將這些線依次按照L(1)、L(2)、...、L(n)的順序連接起來,組成長線L。L(1)、L(2)、…、L(n)稱為L的子線。 根據概率的相關知識,如果我們在L上隨機選擇一個點,那麼這個點所在的子線很有可能是比較長的子線,而這個子線對應的數據點就可以作為種子點。

2.1 k-means++演算法的缺點

雖然k-means++演算法可以確定地初始化聚類中心,但是從可擴展性來看,它存在一個缺點,那就是它內在的有序性特性:下一個中心點的選擇依賴於已經選擇的中心點。 針對這種缺陷,k-means||演算法提供了解決方法。

3 k-means||演算法原理分析

k-means||演算法是在k-means++演算法的基礎上做的改進,和k-means++演算法不同的是,它採用了一個採樣因子l,並且l=A(k),在spark的實現中l=2k,。這個演算法首先如k-means++演算法一樣,隨機選擇一個初始中心, 然後計算選定初始中心確定之後的初始花費 psi (指與最近中心點的距離)。之後處理 log(psi ) 次迭代,在每次迭代中,給定當前中心集,通過概率 lcdot d^{2}(x,C)/phi_{X}(C) 來抽樣x,將選定的x添加到初始化中心集中,並且更新 phi_{X}(C) 。該演算法的步驟如下圖所示:

第1步隨機初始化一個中心點,第2-6步計算出滿足概率條件的多個候選中心點C,候選中心點的個數可能大於k個,所以通過第7-8步來處理。第7步給C中所有點賦予一個權重值 w_{x} ,這個權重值表示距離x點最近的點的個數。 第8步使用本地k-means++演算法聚類出這些候選點的k個聚類中心。在spark的源碼中,迭代次數是人為設定的,默認是5。

該演算法與k-means++演算法不同的地方是它每次迭代都會抽樣出多個中心點而不是一個中心點,且每次迭代不互相依賴,這樣我們可以並行的處理這個迭代過程。由於該過程產生出來的中心點的數量遠遠小於輸入數據點的數量, 所以第8步可以通過本地k-means++演算法很快的找出k個初始化中心點。何為本地k-means++演算法?就是運行在單個機器節點上的k-means++。

下面我們詳細分析上述三個演算法的代碼實現。

4 源代碼分析

在spark中,org.apache.spark.mllib.clustering.KMeans文件實現了k-means演算法以及k-means||演算法,org.apache.spark.mllib.clustering.LocalKMeans文件實現了k-means++演算法。 在分步驟分析spark中的源碼之前我們先來了解KMeans類中參數的含義。

class KMeans private ( private var k: Int,//聚類個數 private var maxIterations: Int,//迭代次數 private var runs: Int,//運行kmeans演算法的次數 private var initializationMode: String,//初始化模式 private var initializationSteps: Int,//初始化步數 private var epsilon: Double,//判斷kmeans演算法是否收斂的閾值 private var seed: Long)

在上面的定義中,k表示聚類的個數,maxIterations表示最大的迭代次數,runs表示運行KMeans演算法的次數,在spark 2.0。0開始,該參數已經不起作用了。為了更清楚的理解演算法我們可以認為它為1。 initializationMode表示初始化模式,有兩種選擇:隨機初始化和通過k-means||初始化,默認是通過k-means||初始化。initializationSteps表示通過k-means||初始化時的迭代步驟,默認是5,這是spark實現與第三章的演算法步驟不一樣的地方,這裡迭代次數人為指定, 而第三章的演算法是根據距離得到的迭代次數,為log(phi)。epsilon是判斷演算法是否已經收斂的閾值。

下面將分步驟分析k-means演算法、k-means||演算法的實現過程。

4.1 處理數據,轉換為VectorWithNorm集。

//求向量的二範式,返回double值val norms = data.map(Vectors.norm(_, 2.0))norms.persist()val zippedData = data.zip(norms).map { case (v, norm) => new VectorWithNorm(v, norm)}

4.2 初始化中心點。

初始化中心點根據initializationMode的值來判斷,如果initializationMode等於KMeans.RANDOM,那麼隨機初始化k個中心點,否則使用k-means||初始化k個中心點。

val centers = initialModel match { case Some(kMeansCenters) => { Array(kMeansCenters.clusterCenters.map(s => new VectorWithNorm(s))) } case None => { if (initializationMode == KMeans.RANDOM) { initRandom(data) } else { initKMeansParallel(data) } } }

  • (1)隨機初始化中心點。

隨機初始化k個中心點很簡單,具體代碼如下:

private def initRandom(data: RDD[VectorWithNorm]) : Array[Array[VectorWithNorm]] = { //採樣固定大小為k的子集 //這裡run表示我們運行的KMeans演算法的次數,默認為1,以後將停止提供該參數 val sample = data.takeSample(true, runs * k, new XORShiftRandom(this.seed).nextInt()).toSeq //選取k個初始化中心點 Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v => new VectorWithNorm(Vectors.dense(v.vector.toArray), v.norm) }.toArray) }

  • (2)通過k-means||初始化中心點。

相比於隨機初始化中心點,通過k-means||初始化k個中心點會麻煩很多,它需要依賴第三章的原理來實現。它的實現方法是initKMeansParallel。 下面按照第三章的實現步驟來分析。

  • 第一步,我們要隨機初始化第一個中心點。

//初始化第一個中心點val seed = new XORShiftRandom(this.seed).nextInt()val sample = data.takeSample(true, runs, seed).toSeqval newCenters = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense))

  • 第二步,通過已知的中心點,循環迭代求得其它的中心點。

var step = 0while (step < initializationSteps) { val bcNewCenters = data.context.broadcast(newCenters) val preCosts = costs //每個點距離最近中心的代價 costs = data.zip(preCosts).map { case (point, cost) => Array.tabulate(runs) { r => //pointCost獲得與最近中心點的距離 //並與前一次迭代的距離對比取更小的那個 math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) } }.persist(StorageLevel.MEMORY_AND_DISK) //所有點的代價和 val sumCosts = costs.aggregate(new Array[Double](runs))( //分區內迭代 seqOp = (s, v) => { // s += v var r = 0 while (r < runs) { s(r) += v(r) r += 1 } s }, //分區間合併 combOp = (s0, s1) => { // s0 += s1 var r = 0 while (r < runs) { s0(r) += s1(r) r += 1 } s0 } ) //選擇滿足概率條件的點 val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) => val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) pointsWithCosts.flatMap { case (p, c) => val rs = (0 until runs).filter { r => //此處設置l=2k rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r) } if (rs.length > 0) Some(p, rs) else None } }.collect() mergeNewCenters() chosen.foreach { case (p, rs) => rs.foreach(newCenters(_) += p.toDense) } step += 1}

在這段代碼中,我們並沒有選擇使用log(pha)的大小作為迭代的次數,而是直接使用了人為確定的initializationSteps,這是與論文中不一致的地方。 在迭代內部我們使用概率公式

來計算滿足要求的點,其中,l=2k。公式的實現如代碼rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)。sumCosts表示所有點距離它所屬類別的中心點的歐式距離之和。 上述代碼通過aggregate方法並行計算獲得該值。

  • 第三步,求最終的k個點。

通過以上步驟求得的候選中心點的個數可能會多於k個,這樣怎麼辦呢?我們給每個中心點賦一個權重,權重值是數據集中屬於該中心點所在類別的數據點的個數。 然後我們使用本地k-means++來得到這k個初始化點。具體的實現代碼如下:

val bcCenters = data.context.broadcast(centers) //計算權重值,即各中心點所在類別的個數 val weightMap = data.flatMap { p => Iterator.tabulate(runs) { r => ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0) } }.reduceByKey(_ + _).collectAsMap() //最終的初始化中心 val finalCenters = (0 until runs).par.map { r => val myCenters = centers(r).toArray val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30) }

上述代碼的關鍵點時通過本地k-means++演算法求最終的初始化點。它是通過LocalKMeans.kMeansPlusPlus來實現的。它使用k-means++來處理。

// 初始化一個中心點centers(0) = pickWeighted(rand, points, weights).toDense//for (i <- 1 until k) { // 根據概率比例選擇下一個中心點 val curCenters = centers.view.take(i) //每個點的權重與距離的乘積和 val sum = points.view.zip(weights).map { case (p, w) => w * KMeans.pointCost(curCenters, p) }.sum //取隨機值 val r = rand.nextDouble() * sum var cumulativeScore = 0.0 var j = 0 //尋找概率最大的點 while (j < points.length && cumulativeScore < r) { cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j)) j += 1 } if (j == 0) { centers(i) = points(0).toDense } else { centers(i) = points(j - 1).toDense }}

上述代碼中,points指的是候選的中心點,weights指這些點相應地權重。尋找概率最大的點的方式就是第二章提到的方式。初始化k個中心點後, 就可以通過一般的k-means流程來求最終的k個中心點了。具體的過程4.3會講到。

4.3 確定數據點所屬類別

找到中心點後,我們就需要根據距離確定數據點的聚類,即數據點和哪個中心點最近。具體代碼如下:

// 找到每個聚類中包含的點距離中心點的距離和以及這些點的個數 val totalContribs = data.mapPartitions { points => val thisActiveCenters = bcActiveCenters.value val runs = thisActiveCenters.length val k = thisActiveCenters(0).length val dims = thisActiveCenters(0)(0).vector.size val sums = Array.fill(runs, k)(Vectors.zeros(dims)) val counts = Array.fill(runs, k)(0L) points.foreach { point => (0 until runs).foreach { i => //找到離給定點最近的中心以及相應的歐幾里得距離 val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point) costAccums(i) += cost //距離和 val sum = sums(i)(bestCenter) //y += a * x axpy(1.0, point.vector, sum) //點數量 counts(i)(bestCenter) += 1 } } val contribs = for (i <- 0 until runs; j <- 0 until k) yield { ((i, j), (sums(i)(j), counts(i)(j))) } contribs.iterator }.reduceByKey(mergeContribs).collectAsMap()

4.4 重新確定中心點

找到類別中包含的數據點以及它們距離中心點的距離,我們可以重新計算中心點。代碼如下:

//更新中心點for ((run, i) <- activeRuns.zipWithIndex) { var changed = false var j = 0 while (j < k) { val (sum, count) = totalContribs((i, j)) if (count != 0) { //x = a * x,求平均距離即sum/count scal(1.0 / count, sum) val newCenter = new VectorWithNorm(sum) //如果新舊兩個中心點的歐式距離大於閾值 if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) { changed = true } centers(run)(j) = newCenter } j += 1 } if (!changed) { active(run) = false logInfo("Run " + run + " finished in " + (iteration + 1) + " iterations") } costs(run) = costAccums(i).value}

5 參考文獻

  • 【1】Bahman Bahmani,Benjamin Moseley,Andrea Vattani.Scalable K-Means++
  • 【2】David Arthur and Sergei Vassilvitskii.k-means++: The Advantages of Careful Seeding

推薦閱讀:

[貝葉斯三]之決策函數和決策面
崛起中的機器文明
Python預測分析核心演算法——Day4、5、6
機器學習基礎與實踐(二)----數據轉換

TAG:機器學習 |