Flink失敗容忍之快照(checkpoint)機制
失敗容忍是分散式系統中一個基礎議題,所謂失敗容忍是指在作業掛掉然後重新執行時能夠恢復到作業掛掉之前的狀態。為了進行狀態恢復,系統需要保存在當前系統狀態的快照,但是由於系統是分散式的,大家沒有一個統一的時間刻度,所以各個節點並不能真正的在同一時間進行保存快照的操作。所以牛逼的Lamport老爺子提出一套適用於分散式系統的快照保存演算法——Chandy Lamport演算法。Flink中的快照機制就是基於這個演算法。下面結合Flink中的代碼來看看這個快照機制到底是怎麼回事(在Flink中快照基本上和checkpoint是一回事,所以接下來統一說checkpoint)。
在Flink中構建ExecutionGraph時候會註冊一個job狀態切換監聽器。
// interval of max long value indicates disable periodic checkpoint,// the CheckpointActivatorDeactivator should be created only if the interval is not max valueif (interval != Long.MAX_VALUE) { // the periodic checkpoint scheduler is activated and deactivated as a result of // job status changes (running -> on, all other states -> off) registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());}public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { if (newJobStatus == JobStatus.RUNNING) { // start the checkpoint scheduler coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } }
jobStatusChanges方法是狀態監聽器的核心方法,狀態發生變化的時候執行具體操作的也就是這個方法。可以看到在job從切換到running狀態的時候,checkpoint的線程會被觸發。
checkpoint執行線程會被周期性的調度,執行周期是由用戶自行配置的。
public void startCheckpointScheduler() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } // make sure all prior timers are cancelled stopCheckpointScheduler(); periodicScheduling = true; currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), baseInterval, baseInterval, TimeUnit.MILLISECONDS); }}
這裡的SchedulerTrigger線程就是真正執行checkpoint的線程了,跟進去看一下。
private final class ScheduledTrigger implements Runnable { @Override public void run() { try { triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint.", e); } }}
這裡沒啥,就是調用triggerCheckpoint方法而已。
public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();}
嗯,還套了一層,接著往下看。在真正觸發checkpoint保存的方法中,前面很多事進行條件檢查的步驟,這裡我們先不關心這個條件檢查,抓住主要脈絡往下看。
final PendingCheckpoint checkpoint = new PendingCheckpoint( job, checkpointID, timestamp, ackTasks, props, targetDirectory, executor);
在進行條件檢查只有,首先會構造出一個PendingCheckpoint實例,然後再賽到隊列里,只有當jobmanager收到sinktask發來的checkpoint保存成功的消息,這個PendingCheckpoint才會變成CompletedCheckpoint,這才代表一次checkpoint保存操作真正的完成了。
// send the messages to the tasks that trigger their checkpointfor (Execution execution: executions) { execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}
這裡才是最最最關鍵的,就是向sourcetask發送保存checkpoint的消息,通知taskmanager進行本地的checkpoint保存。為什麼說是向sourcetask發送消息呢?那肯定是executions里保存的都是sourcetask啊。那為啥executions里都是sourcetask呢?這個就稍微複雜些了。
Execution[] executions = new Execution[tasksToTrigger.length];for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); if (ee != null && ee.getState() == ExecutionState.RUNNING) { executions[i] = ee; } else { LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex()); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); }}
這段代碼是在構建PendingCheckpoint之前進行的,可以看到executions里的元素也都是從taskToTrigger里取來的。那taskToTrigger里的元素是哪來的呢?這裡調用有些複雜,我就不詳細說了,只說個結論。
// collect the vertices that receive "trigger checkpoint" messages.// currently, these are all the sourcesList<JobVertexID> triggerVertices = new ArrayList<>();// collect the vertices that need to acknowledge the checkpoint// currently, these are all verticesList<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
taskToTrigger里的元素都來自於triggerVertices,它是在構建JobGraph的時候生成的。
到這裡,jobmanager乾的活算是部分完成了,接下來要看的是taskmanager上的操作了。
private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = { actorMessage match { case message: TriggerCheckpoint => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp val checkpointOptions = message.getCheckpointOptions log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.") val task = runningTasks.get(taskExecutionId) if (task != null) { task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions) } else { log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.") }
在taskmanager actor接受到TriggerCheckpoint的消息後,會執行上面的handleCheckpointMessage方法,這個方法里的核心部分就是task.triggerCheckpointBarrier了。triggerCheckpointBarrier方法實際上是起一個新的線程非同步的執行StreamTask里的triggerCheckpoint方法,而triggerCheckpoint方法又調用了performCheckpoint方法,這個方法才是最關鍵的。
private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); synchronized (lock) { if (isRunning) { // we can do a checkpoint // Since both state checkpointing and downstream barrier emission occurs in this // lock scope, they are an atomic operation regardless of the order in which they occur. // Given this, we immediately emit the checkpoint barriers, so the downstream operators // can start their checkpoint work as soon as possible operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); return true; } else { // we cannot perform our checkpoint - let the downstream operators know that they // should not wait for any input from this operator // we cannot broadcast the cancellation markers on the operator chain, because it may not // yet be created final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); Exception exception = null; for (ResultPartitionWriter output : getEnvironment().getAllWriters()) { try { output.writeBufferToAllChannels(EventSerializer.toBuffer(message)); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed( new Exception("Could not send cancel checkpoint marker to downstream tasks.", e), exception); } } if (exception != null) { throw exception; } return false; } }}
它首先會判斷當前task是不是還在running狀態,如果不是,那麼它需要通知它下游節點的taskmanager別等barrier了,如果還在running狀態,那麼它會立即向與當前節點連接的所有出節點廣播發送barrier。這裡需要說一下何為barrier。
Barrier就是在數據流中插入的一個個樁,每個checkpoint只負責兩個樁之間的數據。我們來看下官方文檔中對barrier的解釋。
Flink 分散式快照的核心概念之一就是數據柵欄(barrier)。這些 barrier 被插入到數據流中,作為數據流的一部分和數據一起向下流動。Barrier 不會干擾正常數據,數據流嚴格有序。一個 barrier 把數據流分割成兩部分:一部分進入到當前快照,另一部分進入下一個快照。每一個 barrier 都帶有快照 ID,並且 barrier 之前的數據都進入了此快照。Barrier 不會干擾數據流處理,所以非常輕量。多個不同快照的多個 barrier 會在流中同時出現,即多個快照可能同時創建。
Barrier 在數據源端插入,當 snapshot n 的 barrier 插入後,系統會記錄當前 snapshot 位置值 n (用Sn表示)。例如,在 Apache Kafka 中,這個變數表示某個分區中最後一條數據的偏移量。這個位置值 Sn 會被發送到一個稱為 checkpoint coordinator 的模塊。(即 Flink 的 JobManager).
然後 barrier 繼續往下流動,當一個 operator 從其輸入流接收到所有標識 snapshot n 的 barrier 時,它會向其所有輸出流插入一個標識 snapshot n 的 barrier。當 sink operator (DAG 流的終點)從其輸入流接收到所有 barrier n 時,它向 the checkpoint coordinator 確認 snapshot n 已完成。當所有 sink 都確認了這個快照,快照就被標識為完成。
接收超過一個輸入流的 operator 需要基於 barrier 對齊(align)輸入。參見上圖:
operator 只要一接收到某個輸入流的 barrier n,它就不能繼續處理此數據流後續的數據,直到 operator 接收到其餘流的 barrier n。否則會將屬於 snapshot n 的數據和 snapshot n+1的搞混barrier n 所屬的數據流先不處理,從這些數據流中接收到的數據被放入接收緩存里(input buffer)當從最後一個流中提取到 barrier n 時,operator 會發射出所有等待向後發送的數據,然後發射snapshot n 所屬的 barrier
經過以上步驟,operator 恢復所有輸入流數據的處理,優先處理輸入緩存中的數據
在廣播發送barrier之後才是對本地節點的狀態進行保存,也就是對本地節點的所有operator的狀態進行保存。
for (StreamOperator<?> op : allOperators) { checkpointStreamOperator(op);}接著調用的就是operator中的snapshotState方法了,這個方法也是非同步執行的。if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}
operatorStateBackend是保存狀態的介質,Flink中提供了三種不同的存儲介質,一種是存在java堆里,一種是存在文件中也就是hdfs上,還有一種的rockdb中。它們都需要實現snapshot方法,這個方法就是對operator狀態的序列化和存儲細節了,這裡不展開了。
for (StreamOperator<?> op : allOperators) { checkpointStreamOperator(op);}if (LOG.isDebugEnabled()) { LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), owner.getName());}startAsyncPartNano = System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);// at this point we are transferring ownership over snapshotInProgressList for cleanup to the threadrunAsyncCheckpointingAndAcknowledge();
注意這裡,runAsynCheckpointingAndAcknowledge方法會被調用,這個方法才是真正去執行狀態保存操作的。上面的checkpointStreamOpeatator方法只是去構建讓runAsynCheckpointingAndAcknowledge方法執行的作業線程的。在這個方法中可以看到在狀態保存完成之後,它會向jobmanager發送ack。
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { TaskStateSnapshot acknowledgedState = hasState ? taskOperatorSubtaskStates : null; // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state // to stateless tasks on restore. This enables simple job modifications that only concern // stateless without the need to assign them uids to match their (always empty) states. owner.getEnvironment().acknowledgeCheckpoint( checkpointMetaData.getCheckpointId(), checkpointMetrics, acknowledgedState);
在jobmanager接受到ack後,PendingCheckpoint就會變成CompletedCheckpoint。
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) { case SUCCESS: LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob()); if (checkpoint.isFullyAcknowledged()) { completePendingCheckpoint(checkpoint); } break;
參考資料:人類身份驗證 - SegmentFault
推薦閱讀:
※Apache Flink和Apache Spark有什麼異同?它們的發展前景分別怎樣?
※Flink yarn-session啟動流程分析
※Flink源碼解析-從API到JobGraph
※Flink中的broadcast變數
※Apache Storm 1.1.0 中文文檔 | ApacheCN