標籤:

Flink yarn-session啟動流程分析

通過flink bin目錄下的yarn-session.sh腳本我們可以啟動一個跑yarn上的Flink application,這裡Flink是作為一個yarn應用進行啟動。先說一個結論,1.4版本的Flink on yarn並不能提供通過yarn為Flink作業動態分配資源的能力,在這個版本中的yarn僅僅是能為TaskManager分配資源。

先來看下通過yarn-session.sh都啟動了哪些進程,在master節點上啟動了如下進程:

在一個slave節點上則啟動了如下進程:

大家都知道作為一個yarn應用都需要自己實現一個ApplicationMaster,比如Hadoop2.x里mapreduce框架就有一個MRAppMaster作為自己的ApplicationMaster。從名字應該就能猜出來Flink作為ApplicationMaster的進程就是YarnApplicationMasterRunner,不過需要注意,這裡的YarnApplicationMasterRunner嚴格意義上並不能算是ApplicationMaster,Flink的ApplicationMaster其實是由兩個組件構成,一個是為TaskManager向yarn申請資源的YarnFlinkResourceManager,另一個則是JobManager。YarnApplicationMasterRunner可以看做是對這兩個組件包裹了一下。

現在應該有了一個大體上的概念了,那就進源碼里瞅瞅具體的步驟。

首先看yarn-session.sh里的入口類是什麼?

$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

ok,這不就是master節點上啟動的FlinkYarnSessionCli這個進程嘛。這個類中run方法是我們關注的重點。

AbstractYarnClusterDescriptor yarnDescriptor;try { yarnDescriptor = createDescriptor(configuration, configurationDirectory, null, cmd);} catch (Exception e) { System.err.println("Error while starting the YARN Client: " + e.getMessage()); e.printStackTrace(System.err); return 1;}final ClusterSpecification clusterSpecification = createClusterSpecification(yarnDescriptor.getFlinkConfiguration(), cmd);try { yarnCluster = yarnDescriptor.deploySessionCluster(clusterSpecification);} catch (Exception e) { System.err.println("Error while deploying YARN cluster: " + e.getMessage()); e.printStackTrace(System.err); return 1;}

這裡的信息量很大,一個是createDescriptor的調用,還是一個是deploySessionCluster的調用。createDescriptor是將Flink所依賴的配置文件跟jar包封裝起來,deploySessionCluster則是真正的啟動Flink應用。

protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory, boolean flip6) { if (flip6) { return new YarnClusterDescriptorV2(configuration, configurationDirectory); } else { return new YarnClusterDescriptor(configuration, configurationDirectory); }}

在createDescriptor中構建clusterDescriptor的時候會調用這個getClusterDescriptor方法,因為現在我們並沒有用flip6,所以此時的yarnDescriptor所屬的類是YarnClusterDescriptor。那麼下面我們就來看YarnClusterDescriptor中的deploySessionCluster都幹了什麼。

public YarnClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) { try { return deployInternal( clusterSpecification, getYarnSessionClusterEntrypoint(), null); } catch (Exception e) { throw new RuntimeException("Couldnt deploy Yarn session cluster", e); }}

注意這裡的getYarnSessionClusterEntrypoint方法的調用,這個方法的返回值就是Flink的ApplicationaMaster類,它會被封裝進context提交給yarn。

protected String getYarnSessionClusterEntrypoint() { return YarnApplicationMasterRunner.class.getName();}

可以看到這裡的返回值就是YarnApplicationMasterRunner類。

deployInternal方法會一直阻塞直到YarnApplicationMasterRunner啟動起來,這個方法很長,我們就只撿一些重要的貼出來。

ApplicationReport report = startAppMaster( yarnClusterEntrypoint, jobGraph, yarnClient, yarnApplication, clusterSpecification);

這裡是把Flink框架本身作為一個yarn application提交給yarn使得yarn分配container給Flink好讓YarnApplicationMasterRunner能夠跑起來,yarn客戶端提交application的時候是直接返回的,它的返回值就是ApplicationReport,Flink需要拿著這個report不斷地向yarn輪詢——怎麼樣,我的container分配好了嗎?我的YarnApplicationMasterRunner是不是已經跑起來了?

return createYarnClusterClient( this, clusterSpecification.getNumberTaskManagers(), clusterSpecification.getSlotsPerTaskManager(), yarnClient, report, flinkConfiguration, true);

在createYarnClusterClient的時候我們可以看到report是被作為參數穿進去,實際上在在YarnClusterClient進行初始化的時候拿著report進行輪詢的線程也開始啟動了。

public YarnClusterClient(-------------------------------------------- this.pollingRunner = new PollingThread(yarnClient, appId); this.pollingRunner.setDaemon(true); this.pollingRunner.start(); Runtime.getRuntime().addShutdownHook(clientShutdownHook);}

ok,現在我們假設資源足夠,YarnApplicationMasterRunner能夠成功跑起來,那麼接下來會發生什麼?這部分的代碼主要集中在runApplicationMaster這個方法中。

try { taskManagerContainerMemory = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_MEMORY));} catch (NumberFormatException e) { throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_TM_MEMORY + " : " + e.getMessage());}try { numInitialTaskManagers = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_COUNT));} catch (NumberFormatException e) { throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_TM_COUNT + " : " + e.getMessage());}try { slotsPerTaskManager = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_SLOTS));} catch (NumberFormatException e) { throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_SLOTS + " : " + e.getMessage());}

這裡會先檢查每個taskmanager所需的內存大小,以及總共需要啟動幾個taskmanager,這個數量是在最開始執行yarn-session.sh時會用-n參數指定的,每個節點上啟動的taskmanager是由該參數除以總的節點數。最後還要檢查每個taskmanager將持有幾個slot,關於slot我們後面再說。

接下來就是啟動JobManager跟taskmanager了。

ActorRef jobManager = JobManager.startJobManagerActors( config, actorSystem, futureExecutor, ioExecutor, highAvailabilityServices, metricRegistry, webMonitor == null ? Option.empty() : Option.apply(webMonitor.getRestAddress()), new Some<>(JobMaster.JOB_MANAGER_NAME), Option.<String>empty(), getJobManagerClass(), getArchivistClass())._1();-----------------------------------------------------------------------------------Props resourceMasterProps = YarnFlinkResourceManager.createActorProps( getResourceManagerClass(), config, yarnConfig, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), appMasterHostname, webMonitorURL, taskManagerParameters, taskManagerContext, numInitialTaskManagers, LOG);

jobmanager的啟動沒啥說的,重點來看下taskmanager的啟動。在啟動taskmanager之前需要先啟動YarnFlinkResourceManager,然後通過這個YarnFlinkResourceManager向yarn resourcemanager申請啟動taskmanager所需的container,注意這裡的taskmanager不是TaskManager類而是YarnTaskManager類。

在createActorProps的時候會先調用YarnFlinkResourceManager中的initialize方法。

resourceManagerCallbackHandler.initialize(self());resourceManagerClient.init(yarnConfig);resourceManagerClient.start();// create the client to communicate with the node managersnodeManagerClient.init(yarnConfig);nodeManagerClient.start();nodeManagerClient.cleanupRunningContainersOnStop(true);

這裡的resourceManagerCallbackHandler,resourceManagerClient跟nodeManagerClient三個大哥很關鍵:第一個是處理yarn ResourceManager返回給Flink am(注意!這裡具體指的是YarnFlinkResourceManager並不是它外面的那層殼YarnApplicationMasterRunner)的container分配消息;第二個是Flink am跟yarn ResourceManager通信客戶端,Flink就是通過它來向yarn申請container的;第三個是Flink am跟yarn nodemanager的通信客戶端,通過它Flink am可以啟動分配給自己的container。

在container分配完成後,taskmanager就可以啟動了,那麼整個Flink cluster算是完整的啟動起來了。

private void containersAllocated(List<Container> containers) { final int numRequired = getDesignatedWorkerPoolSize(); final int numRegistered = getNumberOfStartedTaskManagers(); for (Container container : containers) { numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); LOG.info("Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); // decide whether to return the container, or whether to start a TaskManager if (numRegistered + containersInLaunch.size() < numRequired) { // start a TaskManager final YarnContainerInLaunch containerInLaunch = new YarnContainerInLaunch(container); final ResourceID resourceID = containerInLaunch.getResourceID(); containersInLaunch.put(resourceID, containerInLaunch); String message = "Launching TaskManager in container " + containerInLaunch + " on host " + container.getNodeId().getHost(); LOG.info(message); sendInfoMessage(message); try { // set a special environment variable to uniquely identify this container taskManagerLaunchContext.getEnvironment() .put(ENV_FLINK_CONTAINER_ID, resourceID.getResourceIdString()); nodeManagerClient.startContainer(container, taskManagerLaunchContext); } catch (Throwable t) { // failed to launch the container containersInLaunch.remove(resourceID); // return container, a new one will be requested eventually LOG.error("Could not start TaskManager in container " + containerInLaunch, t); containersBeingReturned.put(container.getId(), container); resourceManagerClient.releaseAssignedContainer(container.getId()); } } else { // return excessive container LOG.info("Returning excess container {}", container.getId()); containersBeingReturned.put(container.getId(), container); resourceManagerClient.releaseAssignedContainer(container.getId()); } } updateProgress(); // if we are waiting for no further containers, we can go to the // regular heartbeat interval if (numPendingContainerRequests <= 0) { resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis); } // make sure we re-check the status of workers / containers one more time at least, // in case some containers did not come up properly triggerCheckWorkers();}

推薦閱讀:

Flink源碼解析-從API到JobGraph
Apache Flink和Apache Spark有什麼異同?它們的發展前景分別怎樣?

TAG:Flink | YARN |