標籤:

python任務調度之schedule

在工作中多少都會涉及到一些定時任務,比如定時郵件提醒等. 本文通過開源項目schedule來學習定時任務調度是如何工作的,以及基於此實現一個web版本的提醒工具.

schedule簡介

既然schedule說是給人類使用的作業調度器, 先來看看作者給提供的例子:

import scheduleimport timedef job(): print("Im working...")schedule.every(10).minutes.do(job)schedule.every().hour.do(job)schedule.every().day.at("10:30").do(job)schedule.every().monday.do(job)schedule.every().wednesday.at("13:15").do(job)while True: schedule.run_pending() time.sleep(1)

上面的意思就是:

  • 每隔10分鐘執行一次任務
  • 每隔一小時執行一次任務
  • 每天10:30執行一次任務
  • 每周一的這個時候執行一次任務
  • 每周三13:15執行一次任務

代碼確實簡潔明了,一讀就懂. 那麼接下來就來學習一下源碼.

schedule源碼學習

首先看一下有哪些類,如圖(使用pycharm導出的):

可以看到只有三個類,源碼分析就圍繞這三個類:

CancelJob

class CancelJob(object): pass

可以看到就是一個空類, 這個類的作用就是當你的job執行函數返回一個CancelJob類型的對象,那麼執行完後就會被Scheduler移除. 簡單說就是只會執行一次.

Scheduler

為了使代碼緊湊,這裡刪除了注釋,剩下也就34行代碼.

class Scheduler(object): def __init__(self): self.jobs = [] def run_pending(self): runnable_jobs = (job for job in self.jobs if job.should_run) for job in sorted(runnable_jobs): self._run_job(job) def run_all(self, delay_seconds=0): for job in self.jobs: self._run_job(job) time.sleep(delay_seconds) def clear(self): del self.jobs[:] def cancel_job(self, job): try: self.jobs.remove(job) except ValueError: pass def every(self, interval=1): job = Job(interval) self.jobs.append(job) return job def _run_job(self, job): ret = job.run() if isinstance(ret, CancelJob) or ret is CancelJob: self.cancel_job(job) @property def next_run(self): if not self.jobs: return None return min(self.jobs).next_run @property def idle_seconds(self): return (self.next_run - datetime.datetime.now()).total_seconds()

Scheduler作用就是在job可以執行的時候執行它. 這裡的函數也都比較簡單:

  • run_pending:運行所有可以運行的任務
  • run_all:運行所有任務,不管是否應該運行
  • clear:刪除所有調度的任務
  • cancel_job:刪除一個任務
  • every: 創建一個調度任務, 返回的是一個job
  • _run_job:運行一個job
  • next_run:獲取下一個要運行任務的時間, 這裡使用的是min去得到最近將執行的job, 之所以這樣使用,是Job重載了__lt_方法,這樣寫起來確實很簡潔.
  • idle_seconds:還有多少秒即將開始運行任務.

Job

Job是整個定時任務的核心. 主要功能就是根據創建Job時的參數,得到下一次運行的時間. 代碼如下,稍微有點長(會省略部分代碼,可以看源碼):

class Job(object): def __init__(self, interval): self.interval = interval # pause interval * unit between runs self.job_func = None # the job job_func to run self.unit = None # time units, e.g. minutes, hours, ... self.at_time = None # optional time at which this job runs self.last_run = None # datetime of the last run self.next_run = None # datetime of the next run self.period = None # timedelta between runs, only valid for self.start_day = None # Specific day of the week to start on def __lt__(self, other): return self.next_run < other.next_run def minute(self): assert self.interval == 1, Use minutes instead of minute return self.minutes @property def minutes(self): self.unit = minutes return self @property def hour(self): assert self.interval == 1, Use hours instead of hour return self.hours @property def hours(self): self.unit = hours return self @property def day(self): assert self.interval == 1, Use days instead of day return self.days @property def days(self): self.unit = days return self @property def week(self): assert self.interval == 1, Use weeks instead of week return self.weeks @property def weeks(self): self.unit = weeks return self @property def monday(self): assert self.interval == 1, Use mondays instead of monday self.start_day = monday return self.weeks def at(self, time_str): assert self.unit in (days, hours) or self.start_day hour, minute = time_str.split(:) minute = int(minute) if self.unit == days or self.start_day: hour = int(hour) assert 0 <= hour <= 23 elif self.unit == hours: hour = 0 assert 0 <= minute <= 59 self.at_time = datetime.time(hour, minute) return self def do(self, job_func, *args, **kwargs): self.job_func = functools.partial(job_func, *args, **kwargs) try: functools.update_wrapper(self.job_func, job_func) except AttributeError: # job_funcs already wrapped by functools.partial wont have # __name__, __module__ or __doc__ and the update_wrapper() # call will fail. pass self._schedule_next_run() return self @property def should_run(self): return datetime.datetime.now() >= self.next_run def run(self): logger.info(Running job %s, self) ret = self.job_func() self.last_run = datetime.datetime.now() self._schedule_next_run() return ret def _schedule_next_run(self): assert self.unit in (seconds, minutes, hours, days, weeks) self.period = datetime.timedelta(**{self.unit: self.interval}) self.next_run = datetime.datetime.now() + self.period # 太長,後面講一下邏輯或者看源碼.

這個方法也不是很多,有很多邏輯是一樣的. 簡單介紹一下:

首先看一下幾個參數的含義:

  • interval:間隔多久,每interval秒或分等.
  • job_func:job執行函數
  • unit : 間隔單元,比如minutes, hours
  • at_time :job具體執行時間點,比如10:30等
  • last_run:job上一次執行時間
  • next_run :job下一次即將運行時間
  • period: 距離下次運行間隔時間
  • start_day: 周的特殊天,也就是monday等的含義

再來看一下各個方法:

  • __lt__: 比較哪個job最先即將執行, Scheduler中next_run方法里使用min會用到, 有時合適的使用python這些特殊方法可以簡化代碼,看起來更pythonic.

  • second、seconds的區別就是second時默認interval ==1,即schedule.every().second和schedule.every(1).seconds是等價的,作用就是設置unit為seconds. minute和minutes、hour和hours、day和days、week和weeks也類似.

  • monday: 設置start_day 為monday, unit 為weeks,interval為1. 含義就是每周一執行job. 類似 tuesday、wednesday、thursday、friday、saturday、sunday一樣.

  • at: 表示某天的某個時間點,所以不適合minutes、weeks且start_day 為空(即單純的周)這些unit. 對於unit為hours時,time_str中小時部分為0.

  • do: 設置job對應的函數以及參數, 這裡使用functools.update_wrapper去更新函數名等信息.主要是functools.partial返回的函數和原函數名稱不一樣.具體可以看看官網文檔. 然後調用_schedule_next_run去計算job下一次執行時間.

  • should_run: 判斷job是否可以運行了.依據是當前時間點大於等於job的next_run

  • _schedule_next_run: 這是整個job的定時的邏輯部分是計算job下次運行的時間點的.描述一下流程:

    • 計算下一次執行時間:

      這裡根據unit和interval計算出下一次運行時間. 舉個例子,比如schedule.every().hour.do(job, message=things)下一次運行時間就是當前時間加上一小時的間隔.
    • 但是當start_day不為空時,即表示某個星期. 這時period就不能直接加在當前時間了. 看代碼:

      其中days_ahead表示job表示的星期幾與當表示的星期幾差幾天. 比如今天是星期三,job表示的是星期五,那麼days_ahead就為2,最終self.next_run效果就是在now基礎上加了2天.
    • 當at_time不為空時, 需要更新執行的時間點,具體就是計算時、分、秒然後調用replace進行更新. 這裡對unit為days或hours進行特殊處理:

      當已經過了執行時間的話的話,unit為days的話減去一天, unit為hours的話減去一小時. 這樣可以保證任務今天運行.
    • 後面還有一句代碼:

      這句的含義時對於像monday這些定時任務特殊情況的處理. 舉個例子, 今天是星期四12:00,創建的job是星期四13:00, days_ahead <=7 這個條件滿足,最終next_run實際加了7,這樣的話這個任務就不會運行了. 所以這一步實際就是把7減掉. 看上去有點繞, 實際只要把days_ahead <= 0改為days_ahead < 0這句代碼就不用了.

學習總結

通過學習schedule,可以看到實現一個基礎的任務定時調度就是根據job的配置計算執行時間執行job. 代碼里我認為比較好的地方有:

  • __lt__的使用,這樣min函數直接應用在job上.
  • @property是代碼更簡潔
  • 返回self支持連綴操作,像schedule.every(10).minutes.do(job)看起來很直接.
  • 時間部分完全是根據datetime實現的,有很多很好用的函數.

web版提醒工具

待續...


推薦閱讀:

在jupyter notebook中美觀顯示矩陣
Python語法速覽與機器學習開發環境搭建
python-IO
Python語言有多流行
使用python爬蟲結合mysql進行大數據存儲

TAG:Python |