
失敗容忍是分散式系統中一個基礎議題,所謂失敗容忍是指在作業掛掉然後重新執行時能夠恢復到作業掛掉之前的狀態。為了進行狀態恢復,系統需要保存在當前系統狀態的快照,但是由於系統是分散式的,大家沒有一個統一的時間刻度,所以各個節點並不能真正的在同一時間進行保存快照的操作。所以牛逼的Lamport老爺子提出一套適用於分散式系統的快照保存演算法——Chandy Lamport演算法。Flink中的快照機制就是基於這個演算法。下面結合Flink中的代碼來看看這個快照機制到底是怎麼回事(在Flink中快照基本上和checkpoint是一回事,所以接下來統一說checkpoint)。


// interval of max long value indicates disable periodic checkpoint,// the CheckpointActivatorDeactivator should be created only if the interval is not max valueif (interval != Long.MAX_VALUE) { // the periodic checkpoint scheduler is activated and deactivated as a result of // job status changes (running -> on, all other states -> off) registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());}public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { if (newJobStatus == JobStatus.RUNNING) { // start the checkpoint scheduler coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } }



public void startCheckpointScheduler() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } // make sure all prior timers are cancelled stopCheckpointScheduler(); periodicScheduling = true; currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), baseInterval, baseInterval, TimeUnit.MILLISECONDS); }}


private final class ScheduledTrigger implements Runnable { @Override public void run() { try { triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint.", e); } }}


public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();}


final PendingCheckpoint checkpoint = new PendingCheckpoint( job, checkpointID, timestamp, ackTasks, props, targetDirectory, executor);


// send the messages to the tasks that trigger their checkpointfor (Execution execution: executions) { execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}


Execution[] executions = new Execution[tasksToTrigger.length];for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); if (ee != null && ee.getState() == ExecutionState.RUNNING) { executions[i] = ee; } else { LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex()); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); }}


// collect the vertices that receive "trigger checkpoint" messages.// currently, these are all the sourcesList<JobVertexID> triggerVertices = new ArrayList<>();// collect the vertices that need to acknowledge the checkpoint// currently, these are all verticesList<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());



private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = { actorMessage match { case message: TriggerCheckpoint => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp val checkpointOptions = message.getCheckpointOptions log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.") val task = runningTasks.get(taskExecutionId) if (task != null) { task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions) } else { log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.") }

在taskmanager actor接受到TriggerCheckpoint的消息後,會執行上面的handleCheckpointMessage方法,這個方法里的核心部分就是task.triggerCheckpointBarrier了。triggerCheckpointBarrier方法實際上是起一個新的線程非同步的執行StreamTask里的triggerCheckpoint方法,而triggerCheckpoint方法又調用了performCheckpoint方法,這個方法才是最關鍵的。

private boolean performCheckpoint( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { LOG.debug("Starting checkpoint ({}) {} on task {}", checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName()); synchronized (lock) { if (isRunning) { // we can do a checkpoint // Since both state checkpointing and downstream barrier emission occurs in this // lock scope, they are an atomic operation regardless of the order in which they occur. // Given this, we immediately emit the checkpoint barriers, so the downstream operators // can start their checkpoint work as soon as possible operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); return true; } else { // we cannot perform our checkpoint - let the downstream operators know that they // should not wait for any input from this operator // we cannot broadcast the cancellation markers on the operator chain, because it may not // yet be created final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); Exception exception = null; for (ResultPartitionWriter output : getEnvironment().getAllWriters()) { try { output.writeBufferToAllChannels(EventSerializer.toBuffer(message)); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed( new Exception("Could not send cancel checkpoint marker to downstream tasks.", e), exception); } } if (exception != null) { throw exception; } return false; } }}



Flink 分散式快照的核心概念之一就是數據柵欄(barrier)。這些 barrier 被插入到數據流中,作為數據流的一部分和數據一起向下流動。Barrier 不會干擾正常數據,數據流嚴格有序。一個 barrier 把數據流分割成兩部分:一部分進入到當前快照,另一部分進入下一個快照。每一個 barrier 都帶有快照 ID,並且 barrier 之前的數據都進入了此快照。Barrier 不會干擾數據流處理,所以非常輕量。多個不同快照的多個 barrier 會在流中同時出現,即多個快照可能同時創建。

Barrier 在數據源端插入,當 snapshot n 的 barrier 插入後,系統會記錄當前 snapshot 位置值 n (用Sn表示)。例如,在 Apache Kafka 中,這個變數表示某個分區中最後一條數據的偏移量。這個位置值 Sn 會被發送到一個稱為 checkpoint coordinator 的模塊。(即 Flink 的 JobManager).

然後 barrier 繼續往下流動,當一個 operator 從其輸入流接收到所有標識 snapshot n 的 barrier 時,它會向其所有輸出流插入一個標識 snapshot n 的 barrier。當 sink operator (DAG 流的終點)從其輸入流接收到所有 barrier n 時,它向 the checkpoint coordinator 確認 snapshot n 已完成。當所有 sink 都確認了這個快照,快照就被標識為完成。

接收超過一個輸入流的 operator 需要基於 barrier 對齊(align)輸入。參見上圖:

operator 只要一接收到某個輸入流的 barrier n,它就不能繼續處理此數據流後續的數據,直到 operator 接收到其餘流的 barrier n。否則會將屬於 snapshot n 的數據和 snapshot n+1的搞混

barrier n 所屬的數據流先不處理,從這些數據流中接收到的數據被放入接收緩存里(input buffer)

當從最後一個流中提取到 barrier n 時,operator 會發射出所有等待向後發送的數據,然後發射snapshot n 所屬的 barrier

經過以上步驟,operator 恢復所有輸入流數據的處理,優先處理輸入緩存中的數據


for (StreamOperator<?> op : allOperators) { checkpointStreamOperator(op);}接著調用的就是operator中的snapshotState方法了這個方法也是非同步執行的if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}


for (StreamOperator<?> op : allOperators) { checkpointStreamOperator(op);}if (LOG.isDebugEnabled()) { LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", checkpointMetaData.getCheckpointId(), owner.getName());}startAsyncPartNano = System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);// at this point we are transferring ownership over snapshotInProgressList for cleanup to the threadrunAsyncCheckpointingAndAcknowledge();


if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { TaskStateSnapshot acknowledgedState = hasState ? taskOperatorSubtaskStates : null; // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state // to stateless tasks on restore. This enables simple job modifications that only concern // stateless without the need to assign them uids to match their (always empty) states. owner.getEnvironment().acknowledgeCheckpoint( checkpointMetaData.getCheckpointId(), checkpointMetrics, acknowledgedState);


switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) { case SUCCESS: LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob()); if (checkpoint.isFullyAcknowledged()) { completePendingCheckpoint(checkpoint); } break;

