標籤:

Flink中的broadcast變數

在我們寫程序的時候經常會碰到閉包的概念,其實閉包很簡單,在python中可以說是處處可見。

def func(): var = 10 def wrapper(): print var return wrapperf = func()f()#output:10

這個就是一個簡單的閉包例子。那麼我們想一下,如果在分散式計算裡面需要用到閉包,那麼外部變數應該放在哪??

在Flink自帶的例子——KMeans中,聚類中心點作為外部變數需要參與到迭代過程中的每次map計算,這裡就是broadcast很典型的應用場景,把聚類中心點作為broadcast變數,所謂broadcast變數是指這個變數將存在於集群中的每個節點。

final class SelectNearestCenter extends RichMapFunction[Point, (Int, Point)] { private var centroids: Traversable[Centroid] = null /** Reads the centroid values from a broadcast variable into a collection. */ override def open(parameters: Configuration) { //讀取broadcast變數,centroids centroids = getRuntimeContext.getBroadcastVariable[Centroid]("centroids").asScala }val finalCentroids = centroids.iterate(params.getInt("iterations", 10)) { currentCentroids => val newCentroids = points //將currentCentroids作為broadcast變數centroids .map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids")

推薦閱讀:

Flink源碼解析-從API到JobGraph
Flink yarn-session啟動流程分析
Apache Storm 1.1.0 中文文檔 | ApacheCN
Flink中的join實現原理

TAG:Flink |