pagerank的概念和spark實現

概念層面:

pagerank把從A頁面到B頁面的鏈接解釋為A頁面給B頁面投票

一個有較多鏈入的頁面(即被引用次數高)會有較高的等級和PR值,相反如果一個頁面沒有任何鏈入頁面,那麼它沒有等級

同時,投票是帶有權重的,A頁面到B頁面的鏈接產生的「票數」不是1,而是A頁面當前的PR值/A頁面所有引用鏈數量之和,換句話說,鏈出總數平分一個頁面的PR

那麼每一個頁面的PageRank可由其他頁面的PageRank計算得到。通過迭代更新每個頁面的PageRank。如果給每個頁面一個隨機PageRank值(非0),迭代約10次後,這些頁面的PR值會趨向於穩定

在這裡,為了防止迭代中一部分頁面的PR值由於沒有被投票而變為0,無法再繼續為其他頁面產生貢獻值了。故pagerank為一個頁面的PR值計算添加了一個最小值

這裡的d是一個手動選擇的參數,在原版論文中,最小值是1-d,而不是(1-d)/N,下例中將d選擇為了0.85,即最小值設為0.15

spark實現層面:

首先,演算法需要維護兩個RDD:一個由(pageID, linkList) 的元素組成,包含每個頁面所「引用」頁面的列表;另一個由(pageID, rank) 元素組成,包含每個頁面的當前PR值。

演算法實現的三個步驟:

(1) 將每個頁面的PR值初始化為1.0。

(2) 在每次迭代中,對頁面p,向其「引用」的頁面發送一個值為rank(p)/numNeighbors(p) 的貢獻值。

(3) 將每個頁面的排序值設為0.15 + 0.85 * contributionsReceived。

後兩步經過大約10次迭代計算會趨於收斂

Scala實現

// 假設相鄰頁面列表以Spark objectFile的形式存儲,將讀取的linksRDD進行哈希分區,分區有利於後續的join等操作,然後persist進行持久化val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()// 將每個頁面的排序值初始化為1.0;由於使用mapValues,生成的RDD的分區方式會和"links"的一樣var ranks = links.mapValues(v => 1.0)// 運行10輪PageRank迭代,for(i <- 0 until 10) {val contributions = links.join(ranks).flatMap {case (pageId, (links, rank)) =>links.map(dest => (dest, rank / links.size))}ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)}// 寫出最終排名ranks.saveAsTextFile("ranks")

Python實現

from pyspark import SparkConf,SparkContextdef f(x): list1=[] s=len(x[1][0]) for y in x[1][0]: list1.append(tuple((y,x[1][1]/s))) return list1def p(x): print xif __name__=="__main__": list=[(A,(D,)),(B,(A,)),(C,(A,B)),(D,(A,C))] conf=SparkConf().setMaster("local").setAppName("pagerank") sc=SparkContext(conf=conf) pages=sc.parallelize(list).map(lambda x:(x[0],tuple(x[1]))).partitionBy(2).cache() links=sc.parallelize([A,B,C,D]).map(lambda x:(x,1.0))

必須轉換成key-values

持久化提高效率,這裡cache() 與使用默認存儲級別調用persist() 是一樣的

partitionBy將相同key的元素哈希到相同的機器上,省去了後續join操作shuffle開銷

其實這裡應該mapvalues然後links和pages有同KEY分區

for i in range(1,10): rank=pages.join(links).flatMap(f)

join後的pairRDD形如 (key,((linklist),PR))

s=len(x[1][0])得到 linklist長度

對於上面得到的每個pairRDD的元組,f(x) 將linklist中的每一個link都計算了當前key的投票值即PR/s,獲得的新的RDD--rank,若是使用map函數,則是以形如(link,keyPR/key_s)的元組集合成的不等長列表構成的RDD,flatmap負責得到一個由各列表中的元素組成的RDD,而不是一個由列表組成的RDD。這裡rank可以理解為flatmap過後得到一個平坦的「唱票集合」,RDD單元的形式是(誰,獲得了多少投票量)

links=rank.reduceByKey(lambda x, y:x+y)

然後經過reduceByKey將所有頁面按照key累加其獲得的得票

links=links.mapValues(lambda x:0.15+0.85*x) links.foreach(p) links.saveAsSequenceFile("/pagerank")

再將得票量與常量按比例集成為PR值

foreach(func) 對RDD 中的每個元素使用給定的函數.foreach無返回值(準確說返回void)單純遍歷,map返回集合對象,映射轉換


推薦閱讀:

TAG:Spark | PageRank | Python |