優秀開源項目kombu源碼分析之registry和entrypoint
我曾經在一些公眾場合說過心中的優秀Python開發者。Flask和Requests的作者就不說了,21世紀最缺的就是idea,他們不僅有而且還都用非常優美的方式做出來了。另外我還提到了Celery作者Ask Solem,並不是因為Celery很有名它的主要作者就優秀了,我對ask的欣賞,完全是看Celery及其相關依賴的源代碼的時候產生的。
有多年後台開發的工程師想必清楚,Celery本身涉及到的技術點其實在業界應用是很廣泛的。Celery能這麼流行,我們先排除沒有進行技術深入下的盲從,和它誕生的非常早以外,我認為這和項目的內部設計的非常好也是有關的。
接下來的幾篇文章我將分析Celery使用的Kombu庫中的一些設計實現讓大家對這個優秀項目更了解,並從中學習可擴展開發的實踐。
Kombu是什麼?
當一個項目變得越來越複雜,就要考慮只保留核心,並把其他部分分拆到不同的項目中以便減少未來的維護和開發的成本。Flask、IPython都是這樣做的。
Kombu是一個把消息傳遞封裝成統一介面的庫。 Celery一開始先支持的RabbitMQ,也就是使用AMQ協議。由於要支持越來越多的消息代理,但是這些消息代理是不支持AMQ協議的,需要一個東西把所有的消息代理的處理方式統一起來,甚至可以理解為把它們「偽裝成支持AMQ協議」。Kombu的最初的實現叫做carrot, 後來經過重構才成了Kombu。
registry
registry也就是「註冊」,有按需加入的意思,在Python標準庫和一些優秀開源項目中都有應用。我們先看個django的場景,為了減少篇幅我沒有列出CheckRegistry類中其他方法:
### source code startnfrom itertools import chainnnnclass CheckRegistry:nn def __init__(self):n self.registered_checks = []n self.deployment_checks = []nn def register(self, check=None, *tags, **kwargs):n kwargs.setdefault(deploy, False)nn def inner(check):n check.tags = tagsn if kwargs[deploy]:n if check not in self.deployment_checks:n self.deployment_checks.append(check)n elif check not in self.registered_checks:n self.registered_checks.append(check)n return checknn if callable(check):n return inner(check)n else:n if check:n tags += (check, )n return innernn def tag_exists(self, tag, include_deployment_checks=False):n return tag in self.tags_available(include_deployment_checks)nn def tags_available(self, deployment_checks=False):n return set(chain(*[check.tags for check in self.get_checks(deployment_checks) if hasattr(check, tags)]))nn def get_checks(self, include_deployment_checks=False):n checks = list(self.registered_checks)n if include_deployment_checks:n checks.extend(self.deployment_checks)n return checksnnnregistry = CheckRegistry()nregister = registry.registerntag_exists = registry.tag_existsnn### source code endn@register(mytag, another_tag)ndef my_check(apps, **kwargs):n passnnnprint tag_exists(another_tag)nprint tag_exists(not_exists_tag)n
可以看到每次用registry.register都能動態的添加新的tag,最後還用 register= registry.register這樣的方式列了個別名。執行結果如下:
? python django_example.pynTruenFalsen
kombu庫包含對消息的序列化和反序列化工作的實現,可以同時支持多種序列化方案,如pickle、json、yaml和msgpack。假如你從前沒有寫過這樣可擴展的項目,可能想的是每種的方案的loads和dumps都封裝一遍,然後用一個大的if/elif/else來控制最後的序列化如何執行。
那麼在kombu裡面是怎麼用的呢?我簡化下它的實現:
import codecsnfrom collections import namedtuplenncodec = namedtuple(codec, (content_type, content_encoding, encoder))nnclass SerializerNotInstalled(Exception):n passnnnclass SerializerRegistry(object):n def __init__(self):n self._encoders = {}n self._decoders = {}n self._default_encode = Nonen self._default_content_type = Nonen self._default_content_encoding = Nonenn def register(self, name, encoder, decoder, content_type,n content_encoding=utf-8):n if encoder:n self._encoders[name] = codec(n content_type, content_encoding, encoder,n )n if decoder:n self._decoders[content_type] = decodernn def _set_default_serializer(self, name):n try:n (self._default_content_type, self._default_content_encoding,n self._default_encode) = self._encoders[name]n except KeyError:n raise SerializerNotInstalled(n No encoder installed for {0}.format(name))nn def dumps(self, data, serializer=None):n if serializer and not self._encoders.get(serializer):n raise SerializerNotInstalled(n No encoder installed for {0}.format(serializer))nn if not serializer and isinstance(data, unicode):n payload = data.encode(utf-8)n return text/plain, utf-8, payloadnn if serializer:n content_type, content_encoding, encoder = n self._encoders[serializer]n else:n encoder = self._default_encoden content_type = self._default_content_typen content_encoding = self._default_content_encodingnn payload = encoder(data)n return content_type, content_encoding, payloadnn def loads(self, data, content_type, content_encoding):n content_type = (content_type if content_typen else application/data)n content_encoding = (content_encoding or utf-8).lower()nn if data:n decode = self._decoders.get(content_type)n if decode:n return decode(data)n return datannnregistry = SerializerRegistry()ndumps = registry.dumpsnloads = registry.loadsnregister = registry.registern
其實kombu還實現了unregister限於篇幅我就不展開了。現在我們想添加yaml的支持,只需要加這樣一個函數:
def register_yaml():n try:n import yamln registry.register(yaml, yaml.safe_dump, yaml.safe_load,n content_type=application/x-yaml,n content_encoding=utf-8)n except ImportError:nn def not_available(*args, **kwargs):n """Raise SerializerNotInstalled.n Used in case a client receives a yaml message, but yamln isnt installed.n """n raise SerializerNotInstalled(n No decoder installed for YAML. Install the PyYAML library)n registry.register(yaml, None, not_available, application/x-yaml)nnnregister_yaml()n
這樣就支持yaml了。如果希望默認使用yaml來序列化,可以執行:
registry._set_default_serializer(yaml)n
是不是非常好擴展,如果哪天我希望去掉對pickle(安全問題),就可以直接注釋對應的函數就好了。寫個小例子試驗下:
yaml_data = """nfloat: 3.1415926500000002nint: 10nlist: [george, jerry, elaine, cosmo]nstring: The quick brown fox jumps over the lazy dognunicode: "ThxE9 quick brown fox jumps over thxE9 lazy dog"n"""nncontent_type, content_encoding, payload = dumps(yaml_data, serializer=yaml)nprint content_type, content_encodingnnassert loads(payload, content_type=content_type, content_encoding=content_encoding) == yaml_datan
運行的結果就是:
? python kombu_example.pynapplication/x-yaml utf-8n
entrypoint
在我的書裡面介紹過如果使用標準庫自帶的pkg_resources.iter_entry_points實現一個簡單的插件系統。這在kombu上面也有應用,在序列化實現模塊的最後加了這麼幾句:
from pkg_resources import iter_entry_pointsnnfor ep in iter_entry_points(kombu.serializers):n args = ep.load()n register(ep.name, *args)n
這是什麼東西呢?pkg_resources是一個用於包發現和資源訪問的模塊,我們可以實現不同的kombu擴展,如果在這個擴展項目的setup.py裡面設置對應的entry_points,在安裝之後,運行上述代碼的時候就會自動找到這些擴展,並註冊進來。這就是一個擴展系統。Flake8就是最好的這個擴展玩法的範例。
kombu的擴展不多,我選擇kombu-fernet-serializers來進行介紹。首先看一下它的setup.py文件:
...n entry_points={n kombu.serializers: [n fernet_json = kombu_fernet.serializers.json:register_args,n fernet_yaml = kombu_fernet.serializers.yaml:register_args,n fernet_pickle = kombu_fernet.serializers.pickle:register_args,n fernet_msgpack = kombu_fernet.serializers.msgpack:register_args,n ]n }n...n
注意到了吧,這個entry點就是kombu.serializers,安裝之後就多了4個序列化方案,我們看一下fernet_json的實現:
import anyjson as _jsonnnfrom . import fernet_encode, fernet_decodennMIMETYPE = application/x-fernet-jsonnnregister_args = (n fernet_encode(_json.dumps),n fernet_decode(_json.loads),n MIMETYPE,n utf-8,n)n
而fernet_yaml也用了被放進了模塊的方式,其實和在函數內殊途同歸:
from kombu.exceptions import SerializerNotInstallednnfrom . import fernet_encode, fernet_decodenntry:n import yamlnexcept ImportError:n def not_available(*args, **kwargs):n """In case a client receives a yaml message, but yamln isnt installed."""n raise SerializerNotInstalled(n No decoder installed for YAML. Install the PyYAML library)nn yaml_encoder = not_availablen yaml_decoder = Nonenelse:n yaml_encoder = yaml.safe_dumpn yaml_decoder = yaml.safe_loadnnMIMETYPE = application/x-fernet-yamlnnregister_args = (n fernet_encode(yaml_encoder),n fernet_decode(yaml_decoder) if yaml_decoder else None,n MIMETYPE,n utf-8,n)n
事實上,我們並不需要了解fernet_encode和fernet_decode是如何對消息做對稱加密的,只是感受下這樣添加擴展的方式是不是很優雅呢?
PS:本文全部代碼可以在微信公眾號文章代碼庫項目中找到。
無恥的廣告:《Python Web開發實戰》上市了!
歡迎關注本人的微信公眾號獲取更多Python相關的內容(也可以直接搜索「Python之美」):
推薦閱讀:
※Python從零開始系列連載(14)——Python程序的基本控制流程(中)
※你真的了解Python中的日期時間處理嗎?
※Beautiful Soup實踐
※[11] Python條件判斷語句(二)
TAG:Python |