第十章 Executor框架
來自專欄 Java並發編程
線程的工作單位是Runnable和Callable,執行機制由Executor框架提供。
10.1Executor框架簡介
10.1.1Executor框架的兩級調度
在上層,Java多線程程序通常把應用分解成為若干任務,然後使用用戶級的調度器(Executor框架)將這些任務映射成為固定數量的線程;在底層,操作系統內核將這些線程映射到硬體處理器上。
應用程序通過Executor框架控制上層的調度;而下層的調度由操作系統內核控制,下層的調度不受應用程序的控制。
10.1.2Executor框架的結構和成員
1.Executor框架的結構
? 主要分為3大部分:
? 1.任務:被執行任務需要實現的介面:Runnable介面和Callable介面
? 2.任務的執行:任務執行機制的核心介面Executor,以及繼承自Executor的Executor的ExecutorService介面。Executor框架有兩個關鍵類實現了ExecutorService介面(ThreadPoolExecutor和ScheduThreadPoolExecutor)。
? 3.非同步計算結果:包含介面Future和實現Future介面的FutureTask類。
主線程首先要創建實現Runnable或者Callable介面的任務對象。
1.Executors可以把一個Runnable對象封裝成為一個Callable對象
2.然後將Runnable對象或者Callable對象直接交給ExecutorService執行
3.ExecutorService將返回一個實現Future介面的對象。由於FutureTask實現Runnable,程序員也可以創建FutureTask,然後直接交給ExecutorService執行。
4.主線程可以執行FutureTask.get()方法等待任務執行完成。
2.Executor框架的成員
? Executor框架的主要成員:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future介面、Runnable介面、Callable介面和Executors
(1)ThreadPoolExecutor
? 3種類型:SingleThreadExecutor、FixedThreadPool、CachedThreadPool。
? FixedThreadPool:創建使用固定線程數,使用當前線程數量的應用場景,比較適應負載比較重的伺服器
? SingleThreadExecutor: 創建單個線程的SingleThreadExector,適用於保證順序第執行各個任務
? CachedThreadPool:創建一個根據需要創建新線程,大小無界的線程池,適用於執行很多的短期非同步任務的小程序。
(2)ScheduledThreadPoolExecutor
? 通常使用工廠類Executors創建。2種類型的ScheduThreadPoolExecutor:
? ScheduledThreadPoolExecutor:包含若干線程,適用於需要多個後台線程執行周期任務,同時為了滿足資源管理的需求而限制後台線程的數量的應用場景。
? SingleThreadScheduledExecutor:包含一個線程,適用於需要單個後台線程執行周期任務,同時需要保證順序的執行各個任務的應用場景。
3.Future介面
? Future介面和實現Future介面的FutureTask類用來表示非同步計算的結果。
4.Runnable介面和Callable介面
? 這個介面的實現類可以被ThreadPoolExecutor或SchedledThreadPoolExecutor執行。他們之間的區別是Runnable不返回結果,Callable可以返回結果。
10.2ThreadPoolExecutor詳解
Executor核心類是ThreadPoolExecutor,線程池的額主要實現類:
1.coolPool:核心線程池大小
2.maximumpool:最大線程池的大小
3.BlockingQueue:用來暫時保存任務的工作隊列
4.RejectedExecutionHandler:當ThreadPoolExcutor已經關閉或ThreadPoolExecutor已經飽和時,executor方法調用的Handler
可以創建3類型:
FixedThreadPool
SingleThreadExecutor
CachedThreadPool
10.2.1FixedThreadPool
? 可重用固定線程數的線程池。
? 核心線程數和最大線程數都是制定參數
? 當線程數大於corePoolSize時,KeepAliveTime為多餘的空閑線程等待任務的最長時間,超過這個時間後多餘的線程被終止。這裡的設置為0L
? !image-20180522100112003
? 流程如下:
1.當前運行的線程數少於corePoolSize,除創建線程執行任務
2.運行線程數等於corePoolSize,將任務加入LinkedBlockingQueue
3.反覆沖隊列獲取任務,使用無界對類,最大仁良偉整型最大值
10.2.2SIngleThreadExecutor詳解
? 使用單個worker線程的Executor
? 核心線程數和最大線程數設置為1,使用無界隊列作為組塞隊列
10.2.3CachedThreadPool詳解
? 根據需求創建新線程的線程池。
? 核心線程池數量為0,最大為整型最大值,KeepAliveTime為60秒,使用SynchronousQueue作為線程池的工作隊列
?
? !image-20180522101009737
10.3ScheduledThreadPoolExecutor詳解
? 繼承自ThreadPoolExecutor,主要用於給定的延遲之後運行,或者定期執行任務。
10.3.1ScheduledThreadPoolExecutor的運行機制
? 使用DelayQueue無界隊列,核心線程數和最大線程數沒有什麼意義
? 執行分為兩部分:
? 1.當調用SchedThreadPoolExecutor的scheduledAtFixedRate()或者scheduleWithFixedDelay()方法是,會想SchedThreadPoolExecutor()的DelayQueue添加一個實現了RunnableScheduledFuture介面的ScheduledFutureTask。
? 2.線程池中的線程從DelayQueue中獲取ScheduledFutureTask然後執行任務。
10.3.2ScheduledThreadPoolExecutor的實現
3個成員變數
? long型成員time,表示這個任務將要被執行的具體時間
? long型成員sequenceNumber,表示這個任務被添加ScheduThreadPoolExecutor中的序號
? long型成員變數preiod,表示任務執行的間隔周期
DelayQueue封裝優先隊列,根據time進行升序排序,time小的排在前面,如果time相同,就比較sequenceNumber,小的排在前面。
? 執行流程:
1.線程獲取到期任務,任務指的是time大於等於當前時間
2.線程執行這個ScheduFutrueTask
3.線程修改ScheduledFutreTask的time變數為下次要修改的執行時間
4.修改後放入隊列
DelayQueue.take()方法的源代碼
分為三個步驟
1.獲取lock
2.獲取周期任務
如果隊列為空,當前線程Condition中等待,否則執行y下一步
如果隊列頭元素的time時間比當前時間大,到Condition中等待到time時間,否則下一步
獲取隊列都元素,如果隊列不為空,喚醒Condition中的所有線程
3釋放鎖
下面是隊列的add執行示意圖
分為3個步驟
1.獲取Lock
2.添加任務
向隊列添加任務,喚醒Condition中等待的所有線程
3.釋放鎖
10.4FutureTask詳解
Future介面和實現Future介面的FutureTask類,代表非同步計算的結果。
10.4.1FutureTask簡介
FutureTask.run() 方法執行的時機;FutureTask處於下面3種狀態
1.未啟動。沒有執行run方法之前,這個FutureTask處於未啟動狀態
2.已啟動。run方法被執行的過程,處於已啟動狀態
3.已完成,執行完成
FutureTask的get方法和cancel方法執行示意圖
10.4.2FutureTask的使用
? 當一個線程等待另一個線程吧某個任務執行完成後他才能繼續執行,此時可以使用FutureTask。
? 當多個線程試圖同時執行同一個任務時,只允許一個線程執行任務,其他線程需要等待這個任務執行完成在能繼續執行。
10.4.3FutureTask的實現
? FutureTask的實現基於AQS。
下面是FutureTask的示意圖
Sync是FutureTask的內部私有類,繼承自AQS。創建FutureTask時會創建內部私有的成員對象Sync,FutureTask所有的公有方法都直接委託給內部私有的Sync。
線程級聯喚醒
推薦閱讀: