使用Celery
註:本專欄文章未經允許請勿轉載。
Celery是一個專註於實時處理和任務調度的分散式任務隊列。所謂任務就是消息,消息中的有效載荷中包含要執行任務需要的全部數據。
使用Celery的常見場景如下:
1. Web應用。當用戶觸發的一個操作需要較長時間才能執行完成時,可以把它作為任務交給Celery去非同步執行,執行完再返回給用戶。這段時間用戶不需要等待,提高了網站的整體吞吐量和響應時間。
2. 定時任務。生產環境經常會跑一些定時任務。假如你有上千台的伺服器、上千種任務,定時任務的管理很困難,Celery可以幫助我們快速在不同的機器設定不同種任務。
3. 同步完成的附加工作都可以非同步完成。比如發送簡訊/郵件、推送消息、清理/設置緩存等。
Celery還提供了如下的特性:
1. 方便地查看定時任務的執行情況,比如執行是否成功、當前狀態、執行任務花費的時間等。
2. 可以使用功能齊備的管理後台或者命令行添加、更新、刪除任務。
3. 方便把任務和配置管理相關聯。
4. 可選多進程、Eventlet和Gevent三種模式並發執行。
5. 提供錯誤處理機制。
- 提供多種任務原語,方便實現任務分組、拆分和調用鏈。
- 支持多種消息代理和存儲後端。
Celery的架構
Celery包含如下組件:
1. Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,周期性地將配置中到期需要執行的任務發送給任務隊列。
2. Celery Worker:執行任務的消費者,通常會在多台伺服器運行多個消費者來提高執行效率。
3. Broker:消息代理,或者叫作消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者資料庫)。
4. Producer:調用了Celery提供的API、函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者。
5. Result Backend:任務處理完後保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
Celery的架構圖如圖所示:
選擇消息代理
Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作為消息代理,但適用於生產環境的只有RabbitMQ和Redis,至於其他的方式,一是支持有限,二是可能得不到更好的技術支持。
Celery官方推薦的是RabbitMQ,Celery的作者Ask Solem Hoel最初在VMware就是為RabbitMQ工作的,Celery最初的設計就是基於RabbitMQ,所以使用RabbitMQ會非常穩定,成功案例很多。如果使用Redis,則需要能接受發生突然斷電之類的問題造成Redis突然終止後的數據丟失等後果。
Celery序列化
在客戶端和消費者之間傳輸數據需要序列化和反序列化,Celery支持如表9.2所示的序列化方案:
為了提供更高的性能,我們選擇如下方案:1. 選擇RabbitMQ作為消息代理。
2. RabbitMQ的Python客戶端選擇librabbitmq這個C庫。
3. 選擇Msgpack做序列化。
4. 選擇Redis做結果存儲。
下面先安裝它們。Celery提供bundles的方式,也就是安裝Celery的同時可以一起安裝多種依賴:
? pip install "celery[librabbitmq,redis,msgpack]"n
註:bundles的原理是在setup.py的setup函數中添加extras_require。
從一個簡單的例子開始
先演示一個簡單的項目讓Celery運行起來。項目的目錄結構如下:
? tree chapter9/section3/projnn├── celeryconfig.pynn├── celery.pynn├── __init__.pynn└── tasks.pyn
先看一下主程序celery.py:
from __future__ import absolute_importnnfrom celery import Celerynnapp = Celery(proj, include=[proj.tasks])nnapp.config_from_object(proj.celeryconfig)nnnif __name__ == __main__:nn app.start()n
解析一下這個程序:
1. "from __future__ import absolute_import"是拒絕隱式引入,因為celery.py的名字和celery的包名衝突,需要使用這條語句讓程序正確地運行。
2. app是Celery類的實例,創建的時候添加了proj.tasks這個模塊,也就是包含了proj/tasks.py這個文件。
3. 把Celery配置存放進proj/celeryconfig.py文件,使用app.config_from_object載入配置。
看一下存放任務函數的文件tasks.py:
from __future__ import absolute_importnnfrom proj.celery import appnnn@app.tasknndef add(x, y):nn return x + yn
tasks.py只有一個任務函數add,讓它生效的最直接的方法就是添加app.task這個裝飾器。
看一下我們的配置文件celeryconfig.py:
BROKER_URL = amqp://dongwm:123456@localhost:5672/web_develop # 使用RabbitMQ作為消息代理nnCELERY_RESULT_BACKEND = redis://localhost:6379/0 # 把任務結果存在了RedisnnCELERY_TASK_SERIALIZER = msgpack # 任務序列化和反序列化使用msgpack方案nnCELERY_RESULT_SERIALIZER = json # 讀取任務結果一般性能要求不高,所以使用了可讀性更好的JSONnnCELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間,不建議直接寫86400,應該讓這樣的magic數字表述更明顯nnCELERY_ACCEPT_CONTENT = [json, msgpack] # 指定接受的內容類型n
這個例子中沒有任務調度相關的內容, 所以只需要啟動消費者:
? cd ~/web_develop/chapter9/section3nn? celery -A proj worker -l infon
-A參數默認會尋找proj.celery這個模塊,其實使用celery作為模塊文件名字不怎麼合理。可以使用其他名字。舉個例子,假如是proj/app.py,可以使用如下命令啟動:
? celery -A proj.app worker -l infon
上述信息提供了一些有幫助的內容,如消息代理和存儲結果的地址、並發數量、任務列表、交換類型等。在對Celery不熟悉的時候可以通過如上信息判斷設置和修改是否已生效。
現在開啟另外一個終端,用IPython調用add函數:
In : from proj.tasks import addnnIn : r = add.delay(1, 3)nnIn : rnnOut: <AsyncResult: 93288a00-94ee-4727-b815-53dc3474cf3f>nnIn : r.resultnnOut: 4nnIn : r.statusnnOut: uSUCCESSnIn : r.successful()nnOut: TruennIn : r.backendnnOut: <celery.backends.redis.RedisBackend at 0x7fb2529500d0> # 保存在Redis中n
可以看到worker的終端上顯示執行了任務:
[2016-06-03 13:34:40,749: INFO/MainProcess] Received task: proj.tasks.add[93288a00-94ee-4727-b815-53dc3474cf3f]nn[2016-06-03 13:34:40,755: INFO/MainProcess] Task proj.tasks.add[93288a00-94ee-4727-b815-53dc3474cf3f] succeeded in 0.00511166098295s: 4n
通過IPython觸發的任務就完成了。任務的結果都需要根據上面提到的task_id獲得,我們還可以用如下兩種方式隨時找到這個結果:
task_id = 93288a00-94ee-4727-b815-53dc3474cf3fnnIn : add.AsyncResult(task_id).get()nnOut: 4n
或者:
In : from celery.result import AsyncResultnnIn : AsyncResult(task_id).get()nnOut: 4n
指定隊列
Celery非常容易設置和運行,通常它會使用默認的名為celery的隊列(可以通過CELERY_DEFAULT_QUEUE修改)用來存放任務。我們可以使用優先順序不同的隊列來確保高優先順序的任務不需要等待就得到響應。
基於proj目錄下的源碼,我們創建一個projq目錄,並對projq/celeryconfig.py添加如下配置:
from kombu import QueuennCELERY_QUEUES = ( # 定義任務隊列nnQueue(default, routing_key=task.#), # 路由鍵以「task.」開頭的消息都進default隊列nnQueue(web_tasks, routing_key=web.#), # 路由鍵以「web.」開頭的消息都進web_tasks隊列nn)nnCELERY_DEFAULT_EXCHANGE = tasks # 默認的交換機名字為tasksnnCELERY_DEFAULT_EXCHANGE_TYPE = topic # 默認的交換類型是topicnnCELERY_DEFAULT_ROUTING_KEY = task.default # 默認的路由鍵是task.default,這個路由鍵符合上面的default隊列nnCELERY_ROUTES = {nn projq.tasks.add: { # tasks.add的消息會進入web_tasks隊列nn queue: web_tasks,nn routing_key: web.add,nn }nn}n
現在用指定隊列的方式啟動消費者進程:
? celery -A projq worker -Q web_tasks -l infon
上述worker只會執行web_tasks中的任務,我們可以合理安排消費者數量,讓web_tasks中任務的優先順序更高。
使用任務調度
之前的例子都是由發布者觸發的,本節展示一下使用Celery的Beat進程自動生成任務。基於proj目錄下的源碼,創建一個projb目錄,對projb/celeryconfig.py添加如下配置:
CELERYBEAT_SCHEDULE = {nn add: {nn task: projb.tasks.add,nn schedule: timedelta(seconds=10),nn args: (16, 16)nn }n}n
CELERYBEAT_SCHEDULE中指定了tasks.add這個任務每10秒跑一次,執行的時候的參數是16和16。
啟動Beat程序:
? celery beat -A projbn
然後啟動Worker進程:
? celery -A projb worker -l infon
之後可以看到每10秒都會自動執行一次tasks.add。
註:Beat和Worker進程可以一併啟動:
? celery -B -A projb worker -l infon
使用Django可以通過django-celery實現在管理後台創建、刪除、更新任務,是因為它使用了自定義的調度類djcelery.schedulers.DatabaseScheduler,我們可以參考它實現Flask或者其他Web框架的管理後台來完成同樣的功能。使用自定義調度類還可以實現動態添加任務。
任務綁定、記錄日誌和重試
任務綁定、記錄日誌和重試是Celery常用的3個高級屬性。現在修改proj/tasks.py文件,添加div函數用於演示:
from celery.utils.log import get_task_logger n nlogger = get_task_logger(__name__) n n n@app.task(bind=True) ndef div(self, x, y): n logger.info((Executing task id {0.id}, args: {0.args!r} n kwargs: {0.kwargs!r}).format(self.request)) n try: n result = x / y n except ZeroDivisionError as e: n raise self.retry(exc=e, countdown=5, max_retries=3) n return result n
當使用bind = True後,函數的參數發生變化,多出了參數self(第一個參數),相當於把div變成了一個已綁定的方法,通過self可以獲得任務的上下文。
在IPython中調用div:
In : from proj.tasks import divnnIn : r = div.delay(2, 1)n
可以看到如下執行信息:
[2016-06-03 15:50:31,853: INFO/Worker-1] proj.tasks.div[1da82fb8-20de-4d5a-9b48-045da6db0cda]: Executing task id 1da82fb8-20de-4d5a-9b48-045da6db0cda, args: [2, 1] kwargs: {}n
換成能造成異常的參數:
In : r = div.delay(2, 0)n
可以發現每5秒就會重試一次,一共重試3次(默認重複3次),然後拋出異常。
本節是《Python Web開發實戰》書中的第9章第三節的內容。但由於專欄不支持Markdown,格式和書中的不完全一樣。
無恥的廣告:《Python Web開發實戰》上市了!
歡迎關注本人的微信公眾號獲取更多Python相關的內容(也可以直接搜索「Python之美」):
推薦閱讀:
※Python小工具: 發個周報郵件給老闆
※如何真正零基礎入門Python?(第一節)
※VisPy 中文文檔:簡介與安裝
※[2] Python變數
※如何用100行Python代碼做出魔性聲控遊戲「八分音符醬」