標籤:

Executor

java.util.concurrent

眾所周知, java.util.concurrent已經給開發者提供了非常有用的並發編程工具包; Netty的EventExecutor 正是基於java.util.concurrent中的一些Executor實現的, 在深入研究Netty的EventExecutor前, 先了解下這些Executor是很有必要的。

Executor

Executor介面提供了一種將任務提交與執行解耦合的Abstraction機制。當開發者使用Executor介面的實現類時, 開發者不需要關心這個實現類是怎麼實現的, 是多線程執行還是單線程執行呢;開發者也不需要關心任務在實現類內部是怎麼調度的。開發者只需要把需要執行的任務提交給Executor介面的實現類。Executor的定義很簡單, 僅提供了一個方法execute來執行Runnable任務,但是這種解耦合的機制帶來的好處需要認真體會。僅有的execute方法並不能滿足任務執行的要求, 因為不能查詢任務執行的狀態, 因此需要一個更強的Executor。

ExecutorService

ExecutorService介面是對Executor介面的擴展。 ExecutorService介面提供了管理executor中止的方法, 比如shutdown, 同時也提供了返回Future的方法submit執行任務, Future可以追蹤任務的進展情況。不難想到, submit方法的實現內部必然需要是調用execute方法的, 可以把submit看作一個加強型的executor方法。

到目前為止, 基於ExecutorService基本可以實現一個強大的Executor類, 可以通過submit提交任務, 同時也可以通過返回的future查詢任務進展的狀態。 java.util.concurrent已經提供了一個這樣的抽象類。

AbstractExecutorService

AbstractExecutorService是實現了ExecutorService介面的抽象類。 AbstractExecutorService抽象類通過由newTaskFor方法返回的RunnableFuture實現了submit, invokeAny, invokeAll方法,RunnableFuture的默認實現類是FutureTask。 AbstractExecutorService的子類可以通過重寫newTaskFor方法返回自己實現的RunnableFuture。

現在實現一個ExecutorService已經變的十分簡單, 繼承AbstractExecutorService並實現execute方法, 比如ThreadPoolExecutor。

當需要執行延時任務怎麼辦呢, 執行延時任務的情況也是必須滿足的。一個可以執行延遲任務的ExecutorService或是一個好的選擇。

ScheduledExecutorService

ScheduledExecutorService介面是對ExecutorService介面的擴展, 可以執行延遲, 周期性的任務。

Netty EventExecutor

EventExecutorGroup

EventExecutorGroup是Netty EventExecutor最為基礎的Executor介面, 通過next方法提供可使用的EventExecutor, 除此之外, 還提供了shutdownGracefully等方法來管理EventExecutor的生命周期。同時, 重寫了submit, schedule方法, 可以返回一個功能更強大的Future, 該Future可以通過addlistener方法註冊listener, 當Future isDone時, 可以通知listener。

EventExecutor

EventExecutor是一個特殊的EventExecutorGroup, next方法返回它自己, 它提供了inEventLoop方法來判斷一個線程是否是event loop的執行線程, 它還提供了newPromise, newSucceededFuture等輔助方法, 可以創建新的Promise, Future等。

OrderedEventExecutor

OrderedEventExecutor其實就是EventExecutor, 沒做任何擴展, 只是強調有序。

AbstractEventExecutor

AbstractEventExecutor是繼承java.util.concurrent中的AbstractExecutorService並實現了Netty EventExecutor介面的抽象類, 它重寫newTaskFor方法返回PromiseTask, 提供safeExecute方法保證task的安全執行。

SimpleThreadEventExecutor

SimpleThreadEventExecutor是以單線程形式執行提交的任務的抽象基礎類, Netty EventExecutor中比較核心的一個實現類。

先看一下inEventLoop方法:

@Overriden public boolean inEventLoop() {n return inEventLoop(Thread.currentThread());n } nn @Overriden public boolean inEventLoop(Thread thread) {n return thread == this.thread;n }n

executor在啟動時, 會綁定一個執行的線程, 也就是this.thread, 不難看出, inEventLoop是判斷當前執行線程是不是executor綁定的線程。

讓我們看個簡單例子:

EventExecutor executor = next.executor();n if (executor.inEventLoop()) {n next.invokeChannelRegistered();n } else {n executor.execute(new Runnable() {n @Overriden public void run() {n next.invokeChannelRegistered();n }n });n }n

如果inEventLoop()是True, 則表明是在線程內執行, 直接調用, 否則, 通過execute方法把需要執行的程序封裝成task, 通過execute提交給executor。

現在我們需要看一下execute的實現了, 將任務提交和任務執行解耦合, 這是Executor提供的核心功能。

@Overriden public void execute(Runnable task) {n if (task == null) {n throw new NullPointerException("task");n }nn boolean inEventLoop = inEventLoop();n if (inEventLoop) {n addTask(task);n } else {n startThread();n addTask(task);n if (isShutdown() && removeTask(task)) {n reject();n }n }nn if (!addTaskWakesUp && wakesUpForTask(task)) {n wakeup(inEventLoop);n }n }n

通過execute方法向executor提交任務, 先檢查是否是executor自己的線程, 如果是, 直接把任務放進隊列, 如果不是, 則startThread, 然後再把任務放進隊列。

private void startThread() {n if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {n if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {n doStartThread();n }n }n }n

這裡用到了cas原子操作, 是非阻塞同步的一種重要方式。

如果executor是ST_NOT_STARTED狀態, 設置為ST_STARTED狀態, 然後調用doStartThread, 啟動線程。

doStartThread把啟動任務提交給executor完成SingleThreadEventExecutor的啟動。這裡可能大多數人有些迷茫了, 這個executor又是啥, 這裡可以這麼理解, SingleThreadEventExecutor只是個皮包公司, 真正幹活的是這個executor。

SingleThreadEventExecutor.this.run();n

這裡是開始執行任務的地方。SingleThreadEventExecutor的子類實現這個方法。

看一下SingleThreadEventExecutor的子類DefaultEventExecutor是怎麼實現的:

@Overriden protected void run() {n for (;;) {n Runnable task = takeTask();n if (task != null) {n task.run();n updateLastExecutionTime();n }nn if (confirmShutdown()) {n break;n }n }n }n

run內部不斷從LinkedBlockingQueue任務隊列中取出task並執行, 如果shutdownGracefully調用過, 中斷該loop, 開始shutdown的後續處理。

protected void wakeup(boolean inEventLoop) {n if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {n // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as theren // is already something in the queue.n taskQueue.offer(WAKEUP_TASK);n }n }n

wakeup方法用來喚醒正在阻塞的executor, 因為takeTask方法調用時, 如果任務隊列里沒有task, 就會阻塞, 直到有task為止才會返回。


推薦閱讀:

netty的編/解碼器有什麼分類背景(netty-in-action中沒有對此部分詳細陳述)?
關於應用層解決拆包粘包問題?
為什麼nio效率會比bio高?

TAG:Netty | 并发 | NIO |