標籤:

Apache Hadoop mapreduce 的Shuffle過程詳解

註:本篇所涉及的代碼都是基於hadoop1.0.4版本。

眾所周知,hadoop的map-reduce任務一共有如下五個階段,見 TaskStatus:

  1. STARTING
  2. MAP
  3. SHUFFLE
  4. SORT
  5. REDUCE
  6. CLEANUP

整個過程可描述如下:

其中SHUFFLE, SORT, REDUCE三個階段發生在reduce端的jvm中,那麼reduce端的Shuffle階段到期發生了什麼?

首先由TaskTracker 啟動一個Child jvm進程,Child類的主函數會讀取命令行參數(由TaskTracker提供)實例化對應的ReduceTask,ReduceTask執行完其run()方法,整個生命周期結束,其所在jvm也會被TaskTracker銷毀。

參看ReduceTask的代碼,我們知道SHUFFLE過程的主旋律就是拷貝map端的輸出結果。

當ReduceTask初始化時,它的階段(Phase)為SHUFFLE,代碼如下:

{ n getProgress().setStatus("reduce"); n setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with n }n

當ReduceTask完成從map端的數據拷貝時,其階段(Phase)被設置為SORT,整個SHUFFLE過程完成。ReduceTask#run()代碼如下:

boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));n if (!isLocal) {n reduceCopier = new ReduceCopier(umbilical, job, reporter);n if (!reduceCopier.fetchOutputs()) {n if(reduceCopier.mergeThrowable instanceof FSError) {n throw (FSError)reduceCopier.mergeThrowable;n }n throw new IOException("Task: " + getTaskID() + n " - The reduce copier failed", reduceCopier.mergeThrowable);n }n }n copyPhase.complete(); // copy is already complete completen setPhase(TaskStatus.Phase.SORT);n

ReduceCopier完成fetchOutputs()後,未出異常,則reduce端的Shuffle階段結束。

在fetchOutputs()的過程中,數據會被暫存在reduce端的JVM的內存或者本地文件系統中,這個值取決於下面兩個參數:

  • mapred.job.reduce.total.mem.bytes

  • mapred.job.shuffle.input.buffer.percent

mapred.job.reduce.total.mem.bytes 是分配給Reduce的最大內存(單位:byte)

mapred.job.shuffle.input.buffer.percent 是shuffle階段的數據可用reduce jvm的內存佔比。

也就是說,shuffle過程所能用的最大數據緩存大小為:mapred.job.reduce.total.mem.bytes * mapred.job.shuffle.input.buffer.percent ,可事實上,這個值還要乘以0.25f ,這個值如下:

private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;n

假如map端的輸出結果文件,假如小於最大緩存大小,則可以直接放在內存中,否則放在磁碟上。

具體是如何拷貝map端的輸出的呢?下面從代碼中詳窺一下整個過程的細節。

這還得從MapTask的輸出說起,當MapReduce任務中的reduce個數為0,那麼MapTask會將map的輸出直接寫入HDFS;否則,會將map的輸出結果寫到本地文件系統,參見MapTask#runNewMapper以下代碼:

if (job.getNumReduceTasks() == 0) {n output =n new NewDirectOutputCollector(taskContext, job, umbilical, reporter);n } else {n output = new NewOutputCollector(taskContext, job, umbilical, reporter);n }nn mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),n input, output, committer,n reporter, split);nn input.initialize(split, mapperContext);n mapper.run(mapperContext);n input.close();n output.close(mapperContext);n

即:

  • 當 mapred.reduce.tasks=0時,map的輸出被NewDirectOutputCollector 直接寫入HDFS;
  • 反之,map的輸出被NewOutputCollector寫入本地文件系統。

當MapTask完成時,會用TaskReporter類中封裝的TaskUmbilicalProtocol協議通知TaskTracker,map階段已經完成。

在reduce端, ReduceTask 通過一個後台線程 GetMapEventsThread每隔一秒去輪詢TaskTracker,查看Map完成事件(map-completion event),GetMapEventsThread#run()代碼如下:

public void run() {nnn LOG.info(reduceTask.getTaskID() + " Thread started: " + getName()); do {n try {n int numNewMaps = getMapCompletionEvents(); if (LOG.isDebugEnabled()) {n if (numNewMaps > 0) {n LOG.debug(reduceTask.getTaskID() + ": " + n "Got " + numNewMaps + " new map-outputs"); }n }n Thread.sleep(SLEEP_TIME); } n catch (InterruptedException e) {n LOG.warn(reduceTask.getTaskID() +n " GetMapEventsThread returning after an " +n " interrupted exception"); return; }n catch (Throwable t) {n String msg = reduceTask.getTaskID()n + " GetMapEventsThread Ignoring exception : " + StringUtils.stringifyException(t); reportFatalError(getTaskID(), t, msg); }n } while (!exitGetMapEvents); LOG.info("GetMapEventsThread exiting");}n

GetMapEventsThread#getMapCompletionEvents()會獲取已經完成的MapTask的map輸出,代碼如下:

private int getMapCompletionEvents() throws IOException {n n int numNewMaps = 0;n n MapTaskCompletionEventsUpdate update = n umbilical.getMapCompletionEvents(reduceTask.getJobID(), n fromEventId.get(), n MAX_EVENTS_TO_FETCH,n reduceTask.getTaskID(), jvmContext);n TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();n n // Check if the reset is required.n // Since there is no ordering of the task completion events at the n // reducer, the only option to sync with the new jobtracker is to reset n // the events indexn if (update.shouldReset()) {n fromEventId.set(0);n obsoleteMapIds.clear(); // clear the obsolete mapn mapLocations.clear(); // clear the map locations mappingn }n n // Update the last seen event IDn fromEventId.set(fromEventId.get() + events.length);n n // Process the TaskCompletionEvents:n // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.n // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop n // fetching from those maps.n // 3. Remove TIPFAILED maps from neededOutputs since we dont need theirn // outputs at all.n for (TaskCompletionEvent event : events) {n switch (event.getTaskStatus()) {n case SUCCEEDED:n {n URI u = URI.create(event.getTaskTrackerHttp());n String host = u.getHost();n TaskAttemptID taskId = event.getTaskAttemptId();n URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + n "/mapOutput?job=" + taskId.getJobID() +n "&map=" + taskId + n "&reduce=" + getPartition());n List<MapOutputLocation> loc = mapLocations.get(host);n if (loc == null) {n loc = Collections.synchronizedListn (new LinkedList<MapOutputLocation>());n mapLocations.put(host, loc);n }n loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));n numNewMaps ++;n }n break;n case FAILED:n case KILLED:n case OBSOLETE:n {n obsoleteMapIds.add(event.getTaskAttemptId());n LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + n " map-task: " + event.getTaskAttemptId() + "");n }n break;n case TIPFAILED:n {n copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());n LOG.info("Ignoring output of failed map TIP: " + n event.getTaskAttemptId() + "");n }n break;n }n }n return numNewMaps;n }n }n }n

將已完成的map的輸出保存在下面的Map結構中:

Map<String, List<MapOutputLocation>> mapLocations = nn new ConcurrentHashMap<String, List<MapOutputLocation>>();n

然後 ReduceTask 在獲取所有的,與自己對應的map 結果輸出路徑之前,不斷循環將mapLocations的結果放到被調度的拷貝路徑中:

private List<MapOutputLocation> scheduledCopies;n

同時,ReduceTask 還有若干個後台線程 MapOutputCopier,會檢查 scheduledCopies,一旦有值,就去拷貝存放在 MapOutputLocation 的map輸出數據。MapOutputCopier#run()代碼如下:

public void run() {n while (true) { n try {n MapOutputLocation loc = null; long size = -1; synchronized (scheduledCopies) {n while (scheduledCopies.isEmpty()) {n scheduledCopies.wait(); }n loc = scheduledCopies.remove(0); }n CopyOutputErrorType error = CopyOutputErrorType.OTHER_ERROR; readError = false; try {n shuffleClientMetrics.threadBusy(); start(loc); size = copyOutput(loc); shuffleClientMetrics.successFetch(); error = CopyOutputErrorType.NO_ERROR; } catch (IOException e) {n LOG.warn(reduceTask.getTaskID() + " copy failed: " +n loc.getTaskAttemptId() + " from " + loc.getHost()); LOG.warn(StringUtils.stringifyException(e)); shuffleClientMetrics.failedFetch(); if (readError) {n error = CopyOutputErrorType.READ_ERROR; }n // Reset size = -1; } finally {n shuffleClientMetrics.threadFree(); finish(size, error); }n } catch (InterruptedException e) { n break; // ALL DONE } catch (FSError e) {n LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " + n StringUtils.stringifyException(e)); try {n umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), jvmContext); } catch (IOException io) {n LOG.error("Could not notify TT of FSError: " + n StringUtils.stringifyException(io)); }n } catch (Throwable th) {n String msg = getTaskID() + " : Map output copy failure : " + StringUtils.stringifyException(th); reportFatalError(getTaskID(), th, msg); }n }n n if (decompressor != null) {n CodecPool.returnDecompressor(decompressor); }n n}n

MapOutputCopier默認為5個,為了提高shuffle性能,用戶可以自行調整,修改以下參數即可:

mapred.reduce.parallel.copies=5n

當文件個數拷貝至預期個數,該過程就會終止,同時ReduceTask會發送中斷信號給MapOutputCopier後台線程,MapOutputCopier線程收到中斷信號會結束生命周期,被gc回收。

在hadoop1.x中,reduce端獲取map端的輸出結果時,採用的是HTTP協議;hadoop2.x提供插件化的方式,用戶可採用效率更好的RDMA,具體可參見:hadoop.apache.org/docs/

當然,實際過程遠比上面所提到的要複雜,包括遠程copy map 結果的錯誤處理,copy 成功後,對數據的merge操作,這其中還涉及到兩個後台的合併線程:

  • InMemFSMergeThread
  • LocalFSMerger

讀者有興趣可自行閱讀該部分內容。n

推薦閱讀:

《Machine Learning:Clustering & Retrieval》課程第3章KMeans之並行化
HBase可以替代redis嗎?
Spark可以完全替代hadoop嗎?
請問有哪些關於大數據以及hadoop好的學習課程?

TAG:Hadoop |