yarn中的事件驅動模型
為了更好的應對並發yarn採用了基於事件的非同步模型。所謂基於事件的模型其實很好理解,就是通過事件隊列緩存住各種事件,然後通過事件分發器里的常駐線程不斷的從事件隊列里取事件並將該事件交給相應的事件處理handler(EventHandler)進行處理。over。
所以其實事件分發器才是最重要的。
事件分發器介面Dispatcher核心的方法是register方法,該方法是將各種事件對應的事件處理器保存下來,不然誰知道哪個handler對應哪個event呢。在yarn中,Dispatcher介面的實現有三種,不過在絕大多數場景下只使用它的一種實現——AsyncDispatcher。
public void register(Class<? extends Enum> eventType, EventHandler handler) { /* check to see if we have a listener registered */ EventHandler<Event> registeredHandler = (EventHandler<Event>) eventDispatchers.get(eventType); LOG.info("Registering " + eventType + " for " + handler.getClass()); if (registeredHandler == null) { eventDispatchers.put(eventType, handler); } else if (!(registeredHandler instanceof MultiListenerHandler)){ /* for multiple listeners of an event add the multiple listener handler */ MultiListenerHandler multiHandler = new MultiListenerHandler(); multiHandler.addHandler(registeredHandler); multiHandler.addHandler(handler); eventDispatchers.put(eventType, multiHandler); } else { /* already a multilistener, just add to it */ MultiListenerHandler multiHandler = (MultiListenerHandler) registeredHandler; multiHandler.addHandler(handler); }}
register操作就是將<Event,EventHandler>放入map,不過有時候一個事件可能會有多個handler對其進行處理,所以這時候是將<Event, List<EventHandler>>放入map。
在AsyncDispatcher中內置了一個EventHandler,所有的事件發送都是通過這個EventHandler。(隨便找個例子)
if (!containersToCleanup.isEmpty()) { dispatcher.getEventHandler().handle( new CMgrCompletedContainersEvent(containersToCleanup, CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));}
這個內置EventHandler的handle方法其實就是將Event放進事件隊列里,不過條件是隊列的大小要超過1000。
前面提到了有個常駐線程會不斷的從事件隊列里取事件進行處理。這個線程就是eventHandlingThread。
while (!stopped && !Thread.currentThread().isInterrupted()) { drained = eventQueue.isEmpty(); // blockNewEvents is only set when dispatcher is draining to stop, // adding this check is to avoid the overhead of acquiring the lock // and calling notify every time in the normal run of the loop. if (blockNewEvents) { synchronized (waitForDrained) { if (drained) { waitForDrained.notify(); } } } Event event; try { event = eventQueue.take(); } catch(InterruptedException ie) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", ie); } return; } if (event != null) { dispatch(event); }}
waitForDrained這個對象在Dispatcher執行停止操作時會wait住等事件隊列里的事件被處理,不過也不會等太久(有超時時間)。如果事件隊列為空了就會通知waitForDrained別等了接著往下走吧。
推薦閱讀:
※分散式計算框架 Hadoop 為什麼叫 "Hadoop" ?
※Hadoop可視化分析利器之Hue
※如何進入大數據領域,學習路線是什麼?
※如何評價小米團隊擁有4個hbase committer?
※Big Data: current trends & next big thing 'Apache Kudu' - Strata + Hadoop 2016