Executor
java.util.concurrent
眾所周知, java.util.concurrent已經給開發者提供了非常有用的並發編程工具包; Netty的EventExecutor 正是基於java.util.concurrent中的一些Executor實現的, 在深入研究Netty的EventExecutor前, 先了解下這些Executor是很有必要的。ExecutorExecutor介面提供了一種將任務提交與執行解耦合的Abstraction機制。當開發者使用Executor介面的實現類時, 開發者不需要關心這個實現類是怎麼實現的, 是多線程執行還是單線程執行呢;開發者也不需要關心任務在實現類內部是怎麼調度的。開發者只需要把需要執行的任務提交給Executor介面的實現類。Executor的定義很簡單, 僅提供了一個方法execute來執行Runnable任務,但是這種解耦合的機制帶來的好處需要認真體會。僅有的execute方法並不能滿足任務執行的要求, 因為不能查詢任務執行的狀態, 因此需要一個更強的Executor。
ExecutorServiceExecutorService介面是對Executor介面的擴展。 ExecutorService介面提供了管理executor中止的方法, 比如shutdown, 同時也提供了返回Future的方法submit執行任務, Future可以追蹤任務的進展情況。不難想到, submit方法的實現內部必然需要是調用execute方法的, 可以把submit看作一個加強型的executor方法。
到目前為止, 基於ExecutorService基本可以實現一個強大的Executor類, 可以通過submit提交任務, 同時也可以通過返回的future查詢任務進展的狀態。 java.util.concurrent已經提供了一個這樣的抽象類。
AbstractExecutorService
AbstractExecutorService是實現了ExecutorService介面的抽象類。 AbstractExecutorService抽象類通過由newTaskFor方法返回的RunnableFuture實現了submit, invokeAny, invokeAll方法,RunnableFuture的默認實現類是FutureTask。 AbstractExecutorService的子類可以通過重寫newTaskFor方法返回自己實現的RunnableFuture。
現在實現一個ExecutorService已經變的十分簡單, 繼承AbstractExecutorService並實現execute方法, 比如ThreadPoolExecutor。
當需要執行延時任務怎麼辦呢, 執行延時任務的情況也是必須滿足的。一個可以執行延遲任務的ExecutorService或是一個好的選擇。
ScheduledExecutorServiceScheduledExecutorService介面是對ExecutorService介面的擴展, 可以執行延遲, 周期性的任務。
Netty EventExecutor
EventExecutorGroup
EventExecutorGroup是Netty EventExecutor最為基礎的Executor介面, 通過next方法提供可使用的EventExecutor, 除此之外, 還提供了shutdownGracefully等方法來管理EventExecutor的生命周期。同時, 重寫了submit, schedule方法, 可以返回一個功能更強大的Future, 該Future可以通過addlistener方法註冊listener, 當Future isDone時, 可以通知listener。
EventExecutor
EventExecutor是一個特殊的EventExecutorGroup, next方法返回它自己, 它提供了inEventLoop方法來判斷一個線程是否是event loop的執行線程, 它還提供了newPromise, newSucceededFuture等輔助方法, 可以創建新的Promise, Future等。
OrderedEventExecutor
OrderedEventExecutor其實就是EventExecutor, 沒做任何擴展, 只是強調有序。
AbstractEventExecutor
AbstractEventExecutor是繼承java.util.concurrent中的AbstractExecutorService並實現了Netty EventExecutor介面的抽象類, 它重寫newTaskFor方法返回PromiseTask, 提供safeExecute方法保證task的安全執行。
SimpleThreadEventExecutor
SimpleThreadEventExecutor是以單線程形式執行提交的任務的抽象基礎類, Netty EventExecutor中比較核心的一個實現類。
先看一下inEventLoop方法:
@Overriden public boolean inEventLoop() {n return inEventLoop(Thread.currentThread());n } nn @Overriden public boolean inEventLoop(Thread thread) {n return thread == this.thread;n }n
executor在啟動時, 會綁定一個執行的線程, 也就是this.thread, 不難看出, inEventLoop是判斷當前執行線程是不是executor綁定的線程。
讓我們看個簡單例子:
EventExecutor executor = next.executor();n if (executor.inEventLoop()) {n next.invokeChannelRegistered();n } else {n executor.execute(new Runnable() {n @Overriden public void run() {n next.invokeChannelRegistered();n }n });n }n
如果inEventLoop()是True, 則表明是在線程內執行, 直接調用, 否則, 通過execute方法把需要執行的程序封裝成task, 通過execute提交給executor。
現在我們需要看一下execute的實現了, 將任務提交和任務執行解耦合, 這是Executor提供的核心功能。
@Overriden public void execute(Runnable task) {n if (task == null) {n throw new NullPointerException("task");n }nn boolean inEventLoop = inEventLoop();n if (inEventLoop) {n addTask(task);n } else {n startThread();n addTask(task);n if (isShutdown() && removeTask(task)) {n reject();n }n }nn if (!addTaskWakesUp && wakesUpForTask(task)) {n wakeup(inEventLoop);n }n }n
通過execute方法向executor提交任務, 先檢查是否是executor自己的線程, 如果是, 直接把任務放進隊列, 如果不是, 則startThread, 然後再把任務放進隊列。
private void startThread() {n if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {n if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {n doStartThread();n }n }n }n
這裡用到了cas原子操作, 是非阻塞同步的一種重要方式。
如果executor是ST_NOT_STARTED狀態, 設置為ST_STARTED狀態, 然後調用doStartThread, 啟動線程。
doStartThread把啟動任務提交給executor完成SingleThreadEventExecutor的啟動。這裡可能大多數人有些迷茫了, 這個executor又是啥, 這裡可以這麼理解, SingleThreadEventExecutor只是個皮包公司, 真正幹活的是這個executor。
SingleThreadEventExecutor.this.run();n
這裡是開始執行任務的地方。SingleThreadEventExecutor的子類實現這個方法。
看一下SingleThreadEventExecutor的子類DefaultEventExecutor是怎麼實現的:
@Overriden protected void run() {n for (;;) {n Runnable task = takeTask();n if (task != null) {n task.run();n updateLastExecutionTime();n }nn if (confirmShutdown()) {n break;n }n }n }n
run內部不斷從LinkedBlockingQueue任務隊列中取出task並執行, 如果shutdownGracefully調用過, 中斷該loop, 開始shutdown的後續處理。
protected void wakeup(boolean inEventLoop) {n if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {n // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as theren // is already something in the queue.n taskQueue.offer(WAKEUP_TASK);n }n }n
wakeup方法用來喚醒正在阻塞的executor, 因為takeTask方法調用時, 如果任務隊列里沒有task, 就會阻塞, 直到有task為止才會返回。
推薦閱讀:
※netty的編/解碼器有什麼分類背景(netty-in-action中沒有對此部分詳細陳述)?
※關於應用層解決拆包粘包問題?
※為什麼nio效率會比bio高?