分散式做系統 Elastic-Job-Lite 源碼分析 —— 作業初始化
點擊上方「
芋道源碼
」,選擇「置頂公眾號」
技術文章第一時間送達!
源碼精品專欄
精盡 Dubbo 原理與源碼專欄( 已經完成 69+ 篇,預計總共 75+ 篇 )
中文詳細注釋的開源項目
Java 並發源碼合集
RocketMQ 源碼合集
Sharding-JDBC 源碼解析合集
Spring MVC 和 Security 源碼合集
MyCAT 源碼解析合集
摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-init/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!
本文基於 Elastic-Job V2.1.5 版本分享
1. 概述
2. 作業註冊表
3. 作業調度器
3.1 創建
3.2 初始化
666. 彩蛋
1. 概述
本文主要分享
Elastic-Job-Lite 作業初始化
。涉及到主要類的類圖如下( 打開大圖 ):
你行好事會因為得到讚賞而愉悅 同理,開源項目貢獻者會因為 Star 而更加有動力 為 Elastic-Job 點贊!傳送門
2. 作業註冊表
作業註冊表( JobRegistry ),維護了單個 Elastic-Job-Lite
進程內
作業相關信息,可以理解成其專屬的 Spring IOC 容器。因此,其本身是一個單例
。public final class JobRegistry
{
/** * 單例 */
private
static
volatile
JobRegistry instance;/** * 作業調度控制器集合 * key:作業名稱 */
private
Mapnew
ConcurrentHashMap<>();/** * 註冊中心集合 * key:作業名稱 */
private
Mapnew
ConcurrentHashMap<>();
/** * 作業運行實例集合 * key:作業名稱 */
private
Mapnew
ConcurrentHashMap<>();/** * 運行中作業集合 * key:作業名字 */
private
Mapnew
ConcurrentHashMap<>();/** * 作業總分片數量集合 * key:作業名字 */
private
Mapnew
ConcurrentHashMap<>();/** * 獲取作業註冊表實例. * *
@return
作業註冊表實例 */
public
static
JobRegistrygetInstance
()
{if
(null
== instance) {synchronized
(JobRegistry.class) {if
(
null
== instance) { instance =new
JobRegistry(); } } }return
instance; }// .... 省略方法
}instance
是一個單例,通過#getInstance()
方法獲取該單例。該單例的創建方式為雙重檢驗鎖模式
。Map集合屬性
全部
以作業名稱
作為 KEY,通過作業名稱,可以獲得作業相關信息。省略的方法,下文在實際調用時,進行解析。
3. 作業調度器
作業調度器( JobScheduler ),創建並初始化後,進行作業調度。
Elastic-Job-Lite 使用 Quartz 作為調度內核。
3.1 創建
public class JobScheduler
/** * Lite作業配置 */
private
final
LiteJobConfiguration liteJobConfig;/** * 註冊中心 */
private
final
CoordinatorRegistryCenter regCenter;/** * 調度器門面對象 */
@Getter
private
final
SchedulerFacade schedulerFacade;/** * 作業門面對象 */
private
final
JobFacade jobFacade;public
JobScheduler
(
final
CoordinatorRegistryCenter regCenter,final
LiteJobConfiguration liteJobConfig,final
ElasticJobListener... elasticJobListeners) {this
(regCenter, liteJobConfig,new
JobEventBus(), elasticJobListeners); }public
JobScheduler
(
final
CoordinatorRegistryCenter regCenter,final
LiteJobConfiguration liteJobConfig,final
JobEventConfiguration jobEventConfig,final
ElasticJobListener... elasticJobListeners) {this
(regCenter, liteJobConfig,new
JobEventBus(jobEventConfig), elasticJobListeners); }private
JobScheduler
(
final
CoordinatorRegistryCenter regCenter,final
LiteJobConfiguration liteJobConfig,final
JobEventBus jobEventBus,final
ElasticJobListener... elasticJobListeners) {// 添加 作業運行實例
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(),new
JobInstance());// 設置 Lite作業配置
this
.liteJobConfig = liteJobConfig;this
.regCenter = regCenter;// 設置 作業監聽器
List// 設置 調度器門面對象
schedulerFacade =new
SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);// 設置 作業門面對象
jobFacade =new
LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus); }}調用
#JobRegistry#addJobInstance()
方法添加作業運行實例( JobInstance )
。// JobRegistry.java
/*** 作業運行實例集合* key:作業名稱*/
private
MapjobInstanceMap = new
ConcurrentHashMap<>();/*** 添加作業實例.**
@param
jobName 作業名稱*@param
jobInstance 作業實例*/public
void
addJobInstance
(
final
String jobName,final
JobInstance jobInstance) { jobInstanceMap.put(jobName, jobInstance);}// JobInstance.java
public
final
class
JobInstance
{
{ jobInstanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split(private
static
final
String DELIMITER ="@-@"
;/** * 作業實例主鍵. */
private
final
String jobInstanceId;public
JobInstance
()
"@"
)[0
];// PID
}}
jobInstanceId
格式:${IP}@-@${PID}
。其中PID
為進程編號。同一個 Elastic-Job-Lite 實例,不同
的作業使用相同
的作業實例主鍵。設置作業監聽器,在《Elastic-Job-Lite 源碼解析 —— 作業監聽器》詳細分享。
SchedulerFacade,為
調度器
提供內部服務的門面類。
{public
final
class
SchedulerFacade
{/** * 作業名稱 */
private
final
String jobName;/** * 作業配置服務 */
private
final
ConfigurationService configService;/** * 作業分片服務 */
private
final
ShardingService shardingService;/** * 主節點服務 */
private
final
LeaderService leaderService;/** * 作業伺服器服務 */
private
final
ServerService serverService;/** * 作業運行實例服務 */
private
final
InstanceService instanceService;/** * 執行作業服務 */
private
final
ExecutionService executionService;/** * 作業監控服務 */
private
final
MonitorService monitorService;/** * 調解作業不一致狀態服務 */
private
final
ReconcileService reconcileService;/** * 作業註冊中心的監聽器管理者 */
private
ListenerManager listenerManager;public
SchedulerFacade
(
final
CoordinatorRegistryCenter regCenter,final
String jobName,final
ListelasticJobListeners) this
.jobName = jobName;// .... 省略 new XXXXX() 對象
}LiteJobFacade,為
作業
提供內部服務的門面類。
{public
final
class
LiteJobFacade
implements
JobFacade
/** * 作業配置服務 */
private
final
ConfigurationService configService;/** * 作業分片服務 */
private
final
ShardingService shardingService;/** * 執行作業服務 */
private
final
ExecutionService executionService;/** * 作業運行時上下文服務 */
private
final
ExecutionContextService executionContextService;/** * 作業失效轉移服務 */
private
final
FailoverService failoverService;/** * 作業監聽器數組 */
private
final
ListelasticJobListeners; /** * 作業事件匯流排 */
private
final
JobEventBus jobEventBus;
{public
LiteJobFacade
(
final
CoordinatorRegistryCenter regCenter,final
String jobName,final
ListelasticJobListeners, final
JobEventBus jobEventBus)// .... 省略 new XXXXX() 對象
failoverService =new
FailoverService(regCenter, jobName);this
.elasticJobListeners = elasticJobListeners;this
.jobEventBus = jobEventBus;}}
SchedulerFacade 和 LiteJobFacade,看起來很相近,實際差別很大。它們分別為調度器、作業提供需要的方法。下文也會體現這一特點。
3.2 初始化
作業調度器創建後,調用 #init()
方法初始化,作業方
開始
調度。/*** 初始化作業.*/ public void init ()
// 更新 作業配置
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);// 設置 當前作業分片總數
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());// 創建 作業調度控制器
JobScheduleController jobScheduleController =new
JobScheduleController( createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());// 添加 作業調度控制器
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);// 註冊 作業啟動信息
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());// 調度作業
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());}3.2.1 更新作業配置
// SchedulerFacade.java /*** 更新作業配置.** @param @return
public
LiteJobConfigurationupdateJobConfiguration
(
final
LiteJobConfiguration liteJobConfig) {// 更新 作業配置
configService.persist(liteJobConfig);// 讀取 作業配置
return
configService.load(false
);}從《Elastic-Job 源碼分析 —— 作業配置》的「3.2 持久化作業配置」,調用
ConfigService#persist(…)
方法也不一定會更新作業配置,因此調用ConfigService#load(…)
方法返回的可能是本地的作業配置,也可能是註冊中心
存儲的作業配置。
3.2.2 設置當前作業分片總數
// JobRegistry.java private new /*** 設置當前分片總數.** @param @param
public
void
setCurrentShardingTotalCount
(
final
String jobName,final
int
currentShardingTotalCount) { currentShardingTotalCountMap.put(jobName, currentShardingTotalCount);}3.2.3 創建作業調度控制器
public void init ()
// .... 省略
// 創建 作業調度控制器
JobScheduleController jobScheduleController =new
JobScheduleController( createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());// .... 省略
}JobScheduleController,作業調度控制器,提供對 Quartz 方法的封裝:
{public
final
class
JobScheduleController
{}/** * Quartz 調度器 */
private
final
Scheduler scheduler;/** * 作業信息 */
private
final
JobDetail jobDetail;/** * 觸發器編號 * 目前使用工作名字( jobName ) */
private
final
String triggerIdentity;public
void
scheduleJob
(
final
String cron)// 調度作業
public
synchronized
void
rescheduleJob
(
final
String cron) {}// 重新調度作業
private
CronTriggercreateTrigger
(
final
String cron) {}// 創建觸發器
public
synchronized
boolean
isPaused
()
{}// 判斷作業是否暫停
public
synchronized
void
pauseJob
()
{}// 暫停作業
public
synchronized
void
resumeJob
()
{}// 恢復作業
public
synchronized
void
triggerJob
()
{}// 立刻啟動作業
public
synchronized
void
shutdown
()
{}// 關閉調度器
}
調用
#createScheduler()
方法創建 Quartz 調度器:
{ Scheduler result;// JobScheduler.java
private
SchedulercreateScheduler
()
try
{ StdSchedulerFactory factory =new
StdSchedulerFactory(); factory.initialize(getBaseQuartzProperties()); result = factory.getScheduler(); result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener()); }catch
(final
SchedulerException ex) {throw
new
JobSystemException(ex); }return
result;}private
PropertiesgetBaseQuartzProperties
()
{ Properties result =new
Properties(); result.put("org.quartz.threadPool.class"
, org.quartz.simpl.SimpleThreadPool.class.getName()); result.put("org.quartz.threadPool.threadCount"
,"1"
);// Quartz 線程數:1
result.put("org.quartz.scheduler.instanceName"
, liteJobConfig.getJobName()); result.put("org.quartz.jobStore.misfireThreshold"
,"1"
); result.put("org.quartz.plugin.shutdownhook.class"
, JobShutdownHookPlugin.class.getName());// 作業關閉鉤子
result.put("org.quartz.plugin.shutdownhook.cleanShutdown"
, Boolean.TRUE.toString());// 關閉時,清理所有資源
return
result;}org.quartz.threadPool.threadCount = 1
,即 Quartz 執行作業線程數量為 1。原因:一個作業( ElasticJob )
的調度,需要配置獨有
的一個作業調度器( JobScheduler )
,兩者是1 : 1
的關係。org.quartz.plugin.shutdownhook.class
設置作業優雅關閉
鉤子:JobShutdownHookPlugin。觸發器監聽器( TriggerListener ),在《Elastic-Job-Lite 源碼解析 —— 作業執行》詳細分享。
調用
#createJobDetail()
方法創建 Quartz 作業:
{// JobScheduler.java
private
JobDetailcreateJobDetail
(
final
String jobClass)// 創建 Quartz 作業
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();//
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);// 創建 Elastic-Job 對象
OptionalelasticJobInstance = createElasticJobInstance(); if
(elasticJobInstance.isPresent()) { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get()); }else
if
(!jobClass.equals(ScriptJob.class.getCanonicalName())) {try
{ result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance()); }catch
(final
ReflectiveOperationException ex) {throw
new
JobConfigurationException("Elastic-Job: Job class "%s" can not initialize."
, jobClass); } }return
result;}protected
OptionalcreateElasticJobInstance
()
return
Optional.absent();}// SpringJobScheduler.java
@Override
protected
OptionalcreateElasticJobInstance
()
return
Optional.fromNullable(elasticJob);}創建 Quartz 作業設置了 LiteJob 類,這樣 Quartz 觸發作業執行時,LiteJob 會去調用 Elastic-Job 作業對象。在《Elastic-Job-Lite 源碼解析 —— 作業執行》詳細分享。
在 Spring 里,Elastic-Job 如果已經創建好
注入
到 SpringJobScheduler,無需進行創建。Jodetail.jobDataMap
屬性里添加了作業門面對象( LiteJobFacade )、Elastic-Job 對象,Quartz 觸發作業時,會設置到 LiteJob 對象里。
3.2.4 註冊作業啟動信息
/*** 註冊作業啟動信息.* * @param
public
void
registerStartUpInfo
(
final
boolean
enabled) {// 開啟 所有監聽器
listenerManager.startAllListeners();// 選舉 主節點
leaderService.electLeader();// 持久化 作業伺服器上線信息
serverService.persistOnline(enabled);// 持久化 作業運行實例上線相關信息
instanceService.persistOnline();// 設置 需要重新分片的標記
shardingService.setReshardingFlag();// 初始化 作業監聽服務
monitorService.listen();// 初始化 調解作業不一致狀態服務
if
(!reconcileService.isRunning()) { reconcileService.startAsync(); }}開啟所有監聽器。每個功能模塊都有其相應的監聽器,在模塊對應「文章」詳細分享。
選舉主節點,在《Elastic-Job-Lite 源碼解析 —— 主節點選舉》詳細分享。
調用
ServerService#persistOnline()
方法,持久化作業伺服器上線信息。
{public
final
class
ServerService
/** * 持久化作業伺服器上線信息. * *
@param
enabled 作業是否啟用 */public
void
persistOnline
(
final
boolean
enabled) {if
(!JobRegistry.getInstance().isShutdown(jobName)) { jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ?""
: ServerStatus.DISABLED.name()); } }}當作業配置設置作業
禁用
時(LiteJobConfiguration.disabled = true
),作業調度但調度作業分片為空
。不太好理解?《Elastic-Job-Lite 源碼解析 —— 作業分片》詳細分享。調用
InstanceService#persistOnline()
方法,持久化作業運行實例上線相關信息:
{public
final
class
InstanceService
/** * 持久化作業運行實例上線相關信息. */
public
void
persistOnline
()
{ jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(),""
); }}設置需要重新分片的標記,在《Elastic-Job-Lite 源碼解析 —— 作業分片》詳細分享。
初始化作業監聽服務,在《Elastic-Job-Lite 源碼解析 —— 作業監控服務》詳細分享。
初始化調解作業不一致狀態服務,在《Elastic-Job-Lite 源碼解析 —— 自診斷修復》詳細分享。
3.2.5 調度作業
// JobScheduler.java public void init ()
// .... 省略部分代碼
// 調度作業
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());}// JobScheduleController.java
/*** 調度作業.**
@param
cron CRON表達式*/public
void
scheduleJob
(
final
String cron) {try
{if
(!scheduler.checkExists(jobDetail.getKey())) { scheduler.scheduleJob(jobDetail, createTrigger(cron)); } scheduler.start(); }catch
(final
SchedulerException ex) {throw
new
JobSystemException(ex); }}調用
#scheduleJob()
方法後,該 Elastic-Job 作業開始
被調度。
666. 彩蛋
作業初始化,如果你對 Quartz 不是特別了解,可以再看 Quartz 再重新理解。
下一篇,《Elastic-Job-Lite 源碼解析 —— 作業執行》 起航!
道友,分享一波
微信朋友圈
支持支持支持,可好?如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。
目前在知識星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源碼解析如下:01. 調試環境搭建02. 項目結構一覽03. 配置 Configuration04. 核心流程一覽
05. 拓展機制 SPI
06. 線程池
07. 服務暴露 Export
08. 服務引用 Refer
09. 註冊中心 Registry
10. 動態編譯 Compile
11. 動態代理 Proxy
12. 服務調用 Invoke
13. 調用特性
14. 過濾器 Filter
15. NIO 伺服器
16. P2P 伺服器
17. HTTP 伺服器
18. 序列化 Serialization
19. 集群容錯 Cluster
20. 優雅停機
21. 日誌適配
22. 狀態檢查
23. 監控中心 Monitor
24. 管理中心 Admin
25. 運維命令 QOS
26. 鏈路追蹤 Tracing
...一共 60 篇++
源碼不易
↓
↓
↓↓↓點贊
支持老艿艿
↓↓
推薦閱讀:
※使用Apache Zookeeper分散式部署PHP應用程序
※【轉】基於內存資料庫的分散式資料庫架構
※揭秘:去中心化分散式系統的好與壞