當spark讀取一個非常大的本地文件時,讀入內存後分區會自動分布到多個節點上嗎?
本地文件指的是在某一個節點的本地文件系統上,不是HDFS上面。如若不然是在執行action的時候再拷貝相應分區到多個worker節點進行並行計算嗎?希望能說一下對應哪塊源碼。我目前還沒找到這一塊。
一·是在執行action的時候再拷貝相應分區到多個worker節點進行並行計算嗎?
不是,這種讀取local file system而不是hdfs的情況,需要同一個文件存在所有的worker node上面,在讀取的時候每個worker node的task會去讀取本文件的一部分。打個比方,比如你有一個file,有一個spark集群(node1是master,node2,node3兩個是slaves),那麼這個file需要在node2,node3上面都存在,這兩個節點的task會各讀一半,不然會出錯。(這裡其實還有一個點注意,你的spark app所運行的節點也需要有這個file,因為需要用到file進行Partition劃分)。二·具體對應哪一段源碼。
1.由讀取文件的方法SparkContext.textFile(path)跟蹤源碼知道它利用了TextInputFormat生成了一個HadoopRDD.def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair =&> pair._2.toString)
}
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ &<: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) =&> FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
2.再來分析HadoopRDD,對於你的疑問來說最重要的是getPartitions方法,也就是如何劃分你輸入的文件成為Partitions:
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i &<- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
override def getPreferredLocations(split: Partition): Seq[String] = {
val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
case Some(c) =&>
try {
val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
Some(HadoopRDD.convertSplitLocationInfo(infos))
} catch {
case e: Exception =&>
logDebug("Failed to use InputSplitWithLocations.", e)
None
}
case None =&> None
}
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}
從這段代碼可以看出來,對於localhost的host,是沒有PreferredLocation的,這個會把對應於該partition的task追加到no_prefs的任務隊列中,進行相應data locality的任務調度。
3.任務調度val taskIdToLocations = try {
stage match {
case s: ShuffleMapStage =&>
partitionsToCompute.map { id =&> (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =&>
val job = s.resultOfJob.get
partitionsToCompute.map { id =&>
val p = job.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
}
由於Spark每個partition的運算都是由一個task進行的,那麼partition的preferlocation會成為task的preferLocation,這是data locality的任務調度,遵循著移動計算比移動數據更加高效的原則。
那麼這樣每個task都有了自己的應該允許的Location,然而對於本地文件系統,這是一個坑爹的存在,因為getPreferredLocs這個方法返回的是Nil,是空的。如果task沒有PreferLocation,那麼它如何被調度呢?答案在TaskSetManager裡面:if (tasks(index).preferredLocations == Nil) {
addTo(pendingTasksWithNoPrefs)
}
如何沒有preferLocation的話,那麼是會把這個任務追加到pendingTasksWithNoPrefs數組裡面。
該數組裡面的任務是以Round-Robin的方式分發到各個Executor裡面的,到這裡已經能說明問題了,你有一個file,根據FileInputFormat生成了兩個Split,HadoopRDD據此生成了兩個Partition,兩個Partition需要兩個Task,這兩個Task會 Round-Robin 得spread到你的node2,node3上面的executor上面,這些Task要讀取的Split的文件的host都是localhost,大小就是file的一半,到此,你應該可以理解為什麼需要這個file在每個worker node都存在了,因為每個worker node的executor執行的task要讀取的Split的Location信息是localhost,他不會到master上面讀,只會在運行這個task的worker node本地讀。相對應的源碼就是上面的,細節留待你自己去再梳理一遍。PS:
1.這種使用textFile方法讀取本地文件系統的文件的方法,只能用於debug,不用於其他任何用途,因為他會導致file的replication數與node的個數同步增長。
2.上述描述中的分成2份這種是默認值,為了方面說明,你可以自己設置partition個數。錯spark每個worker會讀取本地路徑下的文件的某一段
不存在複製文件內容的過程
你要保證這些文件內容一樣spark programming-guide.html有這樣一句話。
Some notes on reading files with Spark:
- If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
那麼針對這個問題是說在觸發action的時候,是整個文件被copy到work節點,而不是複製文件的某個分區是這樣的嗎?
會自動分,和這個文件大小沒關係
推薦閱讀:
※伺服器集群負載均衡原理?
※為什麼 Erlang 流行不起來?
※Hadoop和Spark解決了哪些並行資料庫沒解決的問題?
※現在主流開源分散式系統架構都有哪些?
※Paxos(Multi-Paxos)在工程實現中需要注意哪些問題?