分散式做系統 Elastic-Job-Lite 源碼分析 —— 作業初始化

點擊上方「

芋道源碼

」,選擇「置頂公眾號」

技術文章第一時間送達!

源碼精品專欄

 

  • 精盡 Dubbo 原理與源碼專欄( 已經完成 69+ 篇,預計總共 75+ 篇 )

  • 中文詳細注釋的開源項目

  • Java 並發源碼合集

  • RocketMQ 源碼合集

  • Sharding-JDBC 源碼解析合集

  • Spring MVC 和 Security 源碼合集

  • MyCAT 源碼解析合集


摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-init/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. 作業註冊表

  • 3. 作業調度器

    • 3.1 創建

    • 3.2 初始化

  • 666. 彩蛋


1. 概述

本文主要分享 

Elastic-Job-Lite 作業初始化

涉及到主要類的類圖如下( 打開大圖 ):

你行好事會因為得到讚賞而愉悅 同理,開源項目貢獻者會因為 Star 而更加有動力 為 Elastic-Job 點贊!傳送門

2. 作業註冊表

作業註冊表( JobRegistry ),維護了單個 Elastic-Job-Lite 

進程內

作業相關信息,可以理解成其專屬的 Spring IOC 容器。因此,其本身是一個

單例

public

 

final

 

class

 

JobRegistry

 

{    

/**     * 單例     */

    

private

 

static

 

volatile

 JobRegistry instance;    

/**     * 作業調度控制器集合     * key:作業名稱     */

    

private

 Map schedulerMap = 

new

 ConcurrentHashMap<>();    

/**     * 註冊中心集合     * key:作業名稱     */

    

private

 Map regCenterMap = 

new

 ConcurrentHashMap<>();    

/**     * 作業運行實例集合     * key:作業名稱     */

    

private

 Map jobInstanceMap = 

new

 ConcurrentHashMap<>();    

/**     * 運行中作業集合     * key:作業名字     */

    

private

 Map jobRunningMap = 

new

 ConcurrentHashMap<>();    

/**     * 作業總分片數量集合     * key:作業名字     */

    

private

 Map currentShardingTotalCountMap = 

new

 ConcurrentHashMap<>();    

/**     * 獲取作業註冊表實例.     *      * 

@return

 作業註冊表實例     */

    

public

 

static

 JobRegistry 

getInstance

()

 

{        

if

 (

null

 == instance) {            

synchronized

 (JobRegistry.class) {                

if

 (

null

 == instance) {                    instance = 

new

 JobRegistry();                }            }        }        

return

 instance;    }    

// .... 省略方法

}
  • instance 是一個單例,通過 #getInstance() 方法獲取該單例。該單例的創建方式為

    雙重檢驗鎖模式

  • Map集合屬性

    全部

    作業名稱

    作為 KEY,通過作業名稱,可以獲得作業相關信息。

  • 省略的方法,下文在實際調用時,進行解析。

3. 作業調度器

作業調度器( JobScheduler ),創建並初始化後,進行作業調度。

Elastic-Job-Lite 使用 Quartz 作為調度內核。

3.1 創建

public

 

class

 

JobScheduler

 

{    

/**     * Lite作業配置     */

    

private

 

final

 LiteJobConfiguration liteJobConfig;    

/**     * 註冊中心     */

    

private

 

final

 CoordinatorRegistryCenter regCenter;    

/**     * 調度器門面對象     */

    

@Getter

    

private

 

final

 SchedulerFacade schedulerFacade;    

/**     * 作業門面對象     */

    

private

 

final

 JobFacade jobFacade;    

public

 

JobScheduler

(

final

 CoordinatorRegistryCenter regCenter, 

final

 LiteJobConfiguration liteJobConfig, 

final

 ElasticJobListener... elasticJobListeners)

 

{        

this

(regCenter, liteJobConfig, 

new

 JobEventBus(), elasticJobListeners);    }    

public

 

JobScheduler

(

final

 CoordinatorRegistryCenter regCenter, 

final

 LiteJobConfiguration liteJobConfig, 

final

 JobEventConfiguration jobEventConfig,                         

final

 ElasticJobListener... elasticJobListeners)

 

{        

this

(regCenter, liteJobConfig, 

new

 JobEventBus(jobEventConfig), elasticJobListeners);    }    

private

 

JobScheduler

(

final

 CoordinatorRegistryCenter regCenter, 

final

 LiteJobConfiguration liteJobConfig, 

final

 JobEventBus jobEventBus, 

final

 ElasticJobListener... elasticJobListeners)

 

{        

// 添加 作業運行實例

        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), 

new

 JobInstance());        

// 設置 Lite作業配置

        

this

.liteJobConfig = liteJobConfig;        

this

.regCenter = regCenter;        

// 設置 作業監聽器

        List elasticJobListenerList = Arrays.asList(elasticJobListeners);        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);        

// 設置 調度器門面對象

        schedulerFacade = 

new

 SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);        

// 設置 作業門面對象

        jobFacade = 

new

 LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);    }}
  • 調用 #JobRegistry#addJobInstance() 方法添

    加作業運行實例( JobInstance )

    // JobRegistry.java

    /*** 作業運行實例集合* key:作業名稱*/

    private

     Map jobInstanceMap = 

    new

     ConcurrentHashMap<>();

    /*** 添加作業實例.** 

    @param

     jobName 作業名稱* 

    @param

     jobInstance 作業實例*/

    public

     

    void

     

    addJobInstance

    (

    final

     String jobName, 

    final

     JobInstance jobInstance)

     

    {   jobInstanceMap.put(jobName, jobInstance);}

    // JobInstance.java

    public

     

    final

     

    class

     

    JobInstance

     

    {

    private

     

    static

     

    final

     String DELIMITER = 

    "@-@"

    ;

    /** * 作業實例主鍵. */

    private

     

    final

     String jobInstanceId;

    public

     

    JobInstance

    ()

     

    {    jobInstanceId = IpUtils.getIp()            + DELIMITER            + ManagementFactory.getRuntimeMXBean().getName().split(

    "@"

    )[

    0

    ]; 

    // PID

    }

    }

    • jobInstanceId 格式:${IP}@-@${PID}。其中 PID 為進程編號。同一個 Elastic-Job-Lite 實例,

      不同

      的作業使用

      相同

      的作業實例主鍵。

  • 設置作業監聽器,在《Elastic-Job-Lite 源碼解析 —— 作業監聽器》詳細分享。

  • SchedulerFacade,為

    調度器

    提供內部服務的門面類。

    public

     

    final

     

    class

     

    SchedulerFacade

     

    {

    /** * 作業名稱 */

    private

     

    final

     String jobName;

    /** * 作業配置服務 */

    private

     

    final

     ConfigurationService configService;

    /** * 作業分片服務 */

    private

     

    final

     ShardingService shardingService;

    /** * 主節點服務 */

    private

     

    final

     LeaderService leaderService;

    /** * 作業伺服器服務 */

    private

     

    final

     ServerService serverService;

    /** * 作業運行實例服務 */

    private

     

    final

     InstanceService instanceService;

    /** * 執行作業服務 */

    private

     

    final

     ExecutionService executionService;

    /** * 作業監控服務 */

    private

     

    final

     MonitorService monitorService;

    /** * 調解作業不一致狀態服務 */

    private

     

    final

     ReconcileService reconcileService;

    /** * 作業註冊中心的監聽器管理者 */

    private

     ListenerManager listenerManager;

    public

     

    SchedulerFacade

    (

    final

     CoordinatorRegistryCenter regCenter, 

    final

     String jobName, 

    final

     List elasticJobListeners)

     

    {    

    this

    .jobName = jobName;    

    // .... 省略 new XXXXX() 對象

    }

  • LiteJobFacade,為

    作業

    提供內部服務的門面類。

    public

     

    final

     

    class

     

    LiteJobFacade

     

    implements

     

    JobFacade

     

    {    

    /**     * 作業配置服務     */

        

    private

     

    final

     ConfigurationService configService;    

    /**     * 作業分片服務     */

        

    private

     

    final

     ShardingService shardingService;    

    /**     * 執行作業服務     */

        

    private

     

    final

     ExecutionService executionService;    

    /**     * 作業運行時上下文服務     */

        

    private

     

    final

     ExecutionContextService executionContextService;    

    /**     * 作業失效轉移服務     */

        

    private

     

    final

     FailoverService failoverService;    

    /**     * 作業監聽器數組     */

        

    private

     

    final

     List elasticJobListeners;    

    /**     * 作業事件匯流排     */

        

    private

     

    final

     JobEventBus jobEventBus;

    public

     

    LiteJobFacade

    (

    final

     CoordinatorRegistryCenter regCenter, 

    final

     String jobName, 

    final

     List elasticJobListeners, 

    final

     JobEventBus jobEventBus)

     

    {    

    // .... 省略 new XXXXX() 對象

        failoverService = 

    new

     FailoverService(regCenter, jobName);    

    this

    .elasticJobListeners = elasticJobListeners;    

    this

    .jobEventBus = jobEventBus;}

    }

SchedulerFacade 和 LiteJobFacade,看起來很相近,實際差別很大。它們分別為調度器、作業提供需要的方法。下文也會體現這一特點。

3.2 初始化

作業調度器創建後,調用 #init() 方法初始化,作業方

開始

調度。

/*** 初始化作業.*/

public

 

void

 

init

()

 

{   

// 更新 作業配置

   LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);   

// 設置 當前作業分片總數

   JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());   

// 創建 作業調度控制器

   JobScheduleController jobScheduleController = 

new

 JobScheduleController(           createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());   

// 添加 作業調度控制器

   JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);   

// 註冊 作業啟動信息

   schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());   

// 調度作業

   jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());}

3.2.1 更新作業配置

// SchedulerFacade.java

/*** 更新作業配置.** 

@param

 liteJobConfig 作業配置* 

@return

 更新後的作業配置*/

public

 LiteJobConfiguration 

updateJobConfiguration

(

final

 LiteJobConfiguration liteJobConfig)

 

{   

// 更新 作業配置

   configService.persist(liteJobConfig);   

// 讀取 作業配置

   

return

 configService.load(

false

);}
  • 從《Elastic-Job 源碼分析 —— 作業配置》的「3.2 持久化作業配置」,調用 ConfigService#persist(…) 方法也不一定會更新作業配置,因此調用 ConfigService#load(…) 方法返回的可能是本地的作業配置,也可能是

    註冊中心

    存儲的作業配置。

3.2.2 設置當前作業分片總數

// JobRegistry.java

private

 Map currentShardingTotalCountMap = 

new

 ConcurrentHashMap<>();

/*** 設置當前分片總數.** 

@param

 jobName 作業名稱* 

@param

 currentShardingTotalCount 當前分片總數*/

public

 

void

 

setCurrentShardingTotalCount

(

final

 String jobName, 

final

 

int

 currentShardingTotalCount)

 

{   currentShardingTotalCountMap.put(jobName, currentShardingTotalCount);}

3.2.3 創建作業調度控制器

public

 

void

 

init

()

 

{   

// .... 省略

   

// 創建 作業調度控制器

   JobScheduleController jobScheduleController = 

new

 JobScheduleController(           createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());   

// .... 省略

}
  • JobScheduleController,作業調度控制器,提供對 Quartz 方法的封裝:

    public

     

    final

     

    class

     

    JobScheduleController

     

    {

    /** * Quartz 調度器 */

    private

     

    final

     Scheduler scheduler;

    /** * 作業信息 */

    private

     

    final

     JobDetail jobDetail;

    /** * 觸發器編號 * 目前使用工作名字( jobName ) */

    private

     

    final

     String triggerIdentity;

    public

     

    void

     

    scheduleJob

    (

    final

     String cron)

     

    {} 

    // 調度作業

    public

     

    synchronized

     

    void

     

    rescheduleJob

    (

    final

     String cron)

     

    {} 

    // 重新調度作業

    private

     CronTrigger 

    createTrigger

    (

    final

     String cron)

     

    {} 

    // 創建觸發器

    public

     

    synchronized

     

    boolean

     

    isPaused

    ()

     

    {} 

    // 判斷作業是否暫停

    public

     

    synchronized

     

    void

     

    pauseJob

    ()

     

    {} 

    // 暫停作業

    public

     

    synchronized

     

    void

     

    resumeJob

    ()

     

    {} 

    // 恢復作業

    public

     

    synchronized

     

    void

     

    triggerJob

    ()

     

    {} 

    // 立刻啟動作業

    public

     

    synchronized

     

    void

     

    shutdown

    ()

     

    {} 

    // 關閉調度器

    }

  • 調用 #createScheduler() 方法創建 Quartz 調度器:

    // JobScheduler.java

    private

     Scheduler 

    createScheduler

    ()

     

    {   Scheduler result;   

    try

     {       StdSchedulerFactory factory = 

    new

     StdSchedulerFactory();       factory.initialize(getBaseQuartzProperties());       result = factory.getScheduler();       result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());   } 

    catch

     (

    final

     SchedulerException ex) {       

    throw

     

    new

     JobSystemException(ex);   }   

    return

     result;}

    private

     Properties 

    getBaseQuartzProperties

    ()

     

    {   Properties result = 

    new

     Properties();   result.put(

    "org.quartz.threadPool.class"

    , org.quartz.simpl.SimpleThreadPool.class.getName());   result.put(

    "org.quartz.threadPool.threadCount"

    "1"

    ); 

    // Quartz 線程數:1

       result.put(

    "org.quartz.scheduler.instanceName"

    , liteJobConfig.getJobName());   result.put(

    "org.quartz.jobStore.misfireThreshold"

    "1"

    );   result.put(

    "org.quartz.plugin.shutdownhook.class"

    , JobShutdownHookPlugin.class.getName()); 

    // 作業關閉鉤子

       result.put(

    "org.quartz.plugin.shutdownhook.cleanShutdown"

    , Boolean.TRUE.toString()); 

    // 關閉時,清理所有資源

       

    return

     result;}
    • org.quartz.threadPool.threadCount = 1,即 Quartz 執行作業線程數量為 1。原因:一個

      作業( ElasticJob )

      的調度,需要配置

      獨有

      的一個

      作業調度器( JobScheduler )

      ,兩者是 1 : 1 的關係。

    • org.quartz.plugin.shutdownhook.class 設置作業

      優雅關閉

      鉤子:JobShutdownHookPlugin。

    • 觸發器監聽器( TriggerListener ),在《Elastic-Job-Lite 源碼解析 —— 作業執行》詳細分享。

  • 調用 #createJobDetail() 方法創建 Quartz 作業:

    // JobScheduler.java

    private

     JobDetail 

    createJobDetail

    (

    final

     String jobClass)

     

    {   

    // 創建 Quartz 作業

       JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();   

    //

       result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);   

    // 創建 Elastic-Job 對象

       Optional elasticJobInstance = createElasticJobInstance();   

    if

     (elasticJobInstance.isPresent()) {       result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());   } 

    else

     

    if

     (!jobClass.equals(ScriptJob.class.getCanonicalName())) {       

    try

     {           result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());       } 

    catch

     (

    final

     ReflectiveOperationException ex) {           

    throw

     

    new

     JobConfigurationException(

    "Elastic-Job: Job class "%s" can not initialize."

    , jobClass);       }   }   

    return

     result;}

    protected

     Optional 

    createElasticJobInstance

    ()

     

    {   

    return

     Optional.absent();}

    // SpringJobScheduler.java

    @Override

    protected

     Optional 

    createElasticJobInstance

    ()

     

    {   

    return

     Optional.fromNullable(elasticJob);}
    • 創建 Quartz 作業設置了 LiteJob 類,這樣 Quartz 觸發作業執行時,LiteJob 會去調用 Elastic-Job 作業對象。在《Elastic-Job-Lite 源碼解析 —— 作業執行》詳細分享。

    • 在 Spring 里,Elastic-Job 如果已經創建好

      注入

      到 SpringJobScheduler,無需進行創建。

    • Jodetail.jobDataMap 屬性里添加了作業門面對象( LiteJobFacade )、Elastic-Job 對象,Quartz 觸發作業時,會設置到 LiteJob 對象里。

3.2.4 註冊作業啟動信息

/*** 註冊作業啟動信息.* * 

@param

 enabled 作業是否啟用*/

public

 

void

 

registerStartUpInfo

(

final

 

boolean

 enabled)

 

{   

// 開啟 所有監聽器

   listenerManager.startAllListeners();   

// 選舉 主節點

   leaderService.electLeader();   

// 持久化 作業伺服器上線信息

   serverService.persistOnline(enabled);   

// 持久化 作業運行實例上線相關信息

   instanceService.persistOnline();   

// 設置 需要重新分片的標記

   shardingService.setReshardingFlag();   

// 初始化 作業監聽服務

   monitorService.listen();   

// 初始化 調解作業不一致狀態服務

   

if

 (!reconcileService.isRunning()) {       reconcileService.startAsync();   }}
  • 開啟所有監聽器。每個功能模塊都有其相應的監聽器,在模塊對應「文章」詳細分享。

  • 選舉主節點,在《Elastic-Job-Lite 源碼解析 —— 主節點選舉》詳細分享。

  • 調用 ServerService#persistOnline() 方法,持久化作業伺服器上線信息。

    public

     

    final

     

    class

     

    ServerService

     

    {    

    /**     * 持久化作業伺服器上線信息.     *      * 

    @param

     enabled 作業是否啟用     */

        

    public

     

    void

     

    persistOnline

    (

    final

     

    boolean

     enabled)

     

    {        

    if

     (!JobRegistry.getInstance().isShutdown(jobName)) {            jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? 

    ""

     : ServerStatus.DISABLED.name());        }    }}
    • 當作業配置設置作業

      禁用

      時( LiteJobConfiguration.disabled = true ),作業調度但

      調度作業分片為空

      。不太好理解?《Elastic-Job-Lite 源碼解析 —— 作業分片》詳細分享。

  • 調用 InstanceService#persistOnline() 方法,持久化作業運行實例上線相關信息:

    public

     

    final

     

    class

     

    InstanceService

     

    {    

    /**     * 持久化作業運行實例上線相關信息.     */

        

    public

     

    void

     

    persistOnline

    ()

     

    {        jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), 

    ""

    );    }}
  • 設置需要重新分片的標記,在《Elastic-Job-Lite 源碼解析 —— 作業分片》詳細分享。

  • 初始化作業監聽服務,在《Elastic-Job-Lite 源碼解析 —— 作業監控服務》詳細分享。

  • 初始化調解作業不一致狀態服務,在《Elastic-Job-Lite 源碼解析 —— 自診斷修復》詳細分享。

3.2.5 調度作業

// JobScheduler.java

public

 

void

 

init

()

 

{   

// .... 省略部分代碼

   

// 調度作業

   jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());}

// JobScheduleController.java

/*** 調度作業.** 

@param

 cron CRON表達式*/

public

 

void

 

scheduleJob

(

final

 String cron)

 

{   

try

 {       

if

 (!scheduler.checkExists(jobDetail.getKey())) {           scheduler.scheduleJob(jobDetail, createTrigger(cron));       }       scheduler.start();   } 

catch

 (

final

 SchedulerException ex) {       

throw

 

new

 JobSystemException(ex);   }}
  • 調用 #scheduleJob() 方法後,該 Elastic-Job 作業

    開始

    被調度。

666. 彩蛋

作業初始化,如果你對 Quartz 不是特別了解,可以再看 Quartz 再重新理解。

下一篇,《Elastic-Job-Lite 源碼解析 —— 作業執行》 起航!

道友,分享一波

微信朋友圈

支持支持支持,可好?



如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。

知識星球

目前在知識星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源碼解析如下:01. 調試環境搭建02. 項目結構一覽03. 配置 Configuration04. 核心流程一覽

05. 拓展機制 SPI

06. 線程池

07. 服務暴露 Export

08. 服務引用 Refer

09. 註冊中心 Registry

10. 動態編譯 Compile

11. 動態代理 Proxy

12. 服務調用 Invoke

13. 調用特性 

14. 過濾器 Filter

15. NIO 伺服器

16. P2P 伺服器

17. HTTP 伺服器

18. 序列化 Serialization

19. 集群容錯 Cluster

20. 優雅停機

21. 日誌適配

22. 狀態檢查

23. 監控中心 Monitor

24. 管理中心 Admin

25. 運維命令 QOS

26. 鏈路追蹤 Tracing

...一共 60 篇++

源碼不易

↓↓↓

點贊

支持老艿艿

↓↓


推薦閱讀:

使用Apache Zookeeper分散式部署PHP應用程序
【轉】基於內存資料庫的分散式資料庫架構
揭秘:去中心化分散式系統的好與壞

TAG:分散式 | 系統 | 作業 | 源碼 | 分析 |