標籤:

使用codecs自定義編/解碼方案

相信很多使用Python的同學熟悉編解碼,比如:

In : print uU0001F3F4.encode(utf-8)n??nIn : 哈哈.decode(utf8)nnOut: u哈哈n

這樣可以在字元串和unicode之前轉換,不過細心的同學可能發現了,我使用了「utf-8」和「utf8」,這2個詞長得很像。事實上都能正常使用是由於他們都是「utf_8」的別名,這些別名的對應關係可以用如下方法找到:

In : import encodingsnnIn : encodings.aliases.aliases[utf8]nOut: utf_8nnIn : 哈哈.decode(u8)nOut: u哈哈nnIn : 哈哈.decode(utf)nOut: u哈哈nnIn : 哈哈.decode(utf8_ucs2)nOut: u哈哈nnIn : 哈哈.decode(utf8_ucs4)nOut: u哈哈n

encodings是標準庫中自帶的編碼庫,其中包含了上百個標準的編碼轉換方案。但是通常並不需要使用encodings,而是使用codecs。

codecs包含了編碼解碼器的註冊和其他基本的類,開發者還可以通過codecs提供的介面自定義編/解碼方案,也就是可以創造一個新的編解碼轉換方案,使用encode(XX)和decode(XX)的方式使用。今天我給大家演示直接進行Fernet對稱加密的例子。

互聯網安全的重要性不必在複述了,大家都應該接觸過一些加密技術,可能聽過M2Crypto、PyCrypto、Cryptography之類的庫。在這裡歪個樓,現在的主流是使用Cryptography,它的出現就是為了替代之前的那些庫,具體的可以看官方文檔。我們使用Cryptography提供的Fernet類來實現,首先實現一個Codec類:

import codecsnnfrom cryptography.fernet import Fernetnfrom cryptography.fernet import InvalidTokennnKEY = Fernet.generate_key()nf = Fernet(KEY)nnnclass FernetCodec(codecs.Codec):n def encode(self, input, errors=fernet.strict):n return f.encrypt(input), len(input)nn def decode(self, input, errors=fernet.strict):n try:n return f.decrypt(input), len(input)n except InvalidToken:n error = codecs.lookup_error(errors)n return error(UnicodeDecodeError(fernet, input, 0, len(input) + 1,n Invalid Token))n

當然也不必在類中實現encode和decode方法,單獨的2個函數也可以。我這裡是為了給之後的演示到的類復用。

如果你看過內置的字元串encode的方法,它的文檔說還接收第二個參數,讓你告訴它當出現出錯的時候如何去處理,默認是strict,直接就會拋出來錯誤。

其餘可選的還有ignore、replace、xmlcharrefreplace:

In [35]: x80abc.decode(utf-8, strict)n---------------------------------------------------------------------------nUnicodeDecodeError Traceback (most recent call last)n<ipython-input-35-974ebc908d50> in <module>()n----> 1 x80abc.decode(utf-8, strict)nn/Users/dongweiming/dae/venv/lib/python2.7/encodings/utf_8.pyc in decode(input, errors)n 14n 15 def decode(input, errors=strict):n---> 16 return codecs.utf_8_decode(input, errors, True)n 17n 18 class IncrementalEncoder(codecs.IncrementalEncoder):nnUnicodeDecodeError: utf8 codec cant decode byte 0x80 in position 0: invalid start bytennIn [36]: x80abc.decode(utf-8, replace)nOut[36]: u?cnnIn [37]: x80abc.decode(utf-8, ignore)nOut[37]: uabcn

事實上Python還內置了其他的選項,如backslashreplace、namereplace、surrogatepass、surrogateescape等,有興趣的可以看源碼實現

我也會定義2種錯誤函數,因為在解密(執行decrypt)的時候可能會報InvalidToken錯誤,但是InvalidToken不包含任何參數,而對錯誤處理的時候需要知道起始和結束的位置,所以我就直接拋一個UnicodeDecodeError錯誤了。

接著我們定義遞增式和流式的編碼類:

class IncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):n passnnnclass IncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):n passnnnclass StreamReader(FernetCodec, codecs.StreamReader):n passnnnclass StreamWriter(FernetCodec, codecs.StreamWriter):n pass n

由於這個要加密的字元串不會很長,沒有必要實現這些類,我就簡單的繼承然後pass了。接著開放入口:

def getregentry():n return codecs.CodecInfo(n name=fernet,n encode=FernetCodec().encode,n decode=FernetCodec().decode,n incrementalencoder=IncrementalEncoder,n incrementaldecoder=IncrementalDecoder,n streamwriter=StreamWriter,n streamreader=StreamReader,n )n

其實incrementalencoder、streamwriter這些參數不傳遞也是可以的,默認是None,今天只是為了讓大家知道是有這部分介面的。然後是註冊這個入口,為了提供更好的性能,我創建一個函數加上緩存功能:

_cache = {}n_unknown = --unknown--nndef search_function(encoding):n import encodingsn encoding = encodings.normalize_encoding(encoding)n entry = _cache.get(encoding, _unknown)n if entry is not _unknown:n return entrynn if encoding == fernet:n entry = getregentry()n _cache[encoding] = entryn return entryn n ncodecs.register(search_function) n

這裡簡單介紹下normalize_encoding的意義,之前我說到了別名,我們再感受下:

In : u哈哈.encode(utf-8)nOut: xe5x93x88xe5x93x88nnIn : u哈哈.encode(utf_8)nOut: xe5x93x88xe5x93x88n

這種中劃線和下劃線最後無差別對待,就是通過這個函數標準化的。

codecs模塊底層維護了一個搜索函數的列表,通過調用codecs.register方法就把上述函數append進去了。最後我們註冊2個錯誤處理的函數:

def strict_errors(exc):n if isinstance(exc, UnicodeDecodeError):n raise TypeError(Invalid Token)nnndef fallback_errors(exc):n s = []n for c in exc.object[exc.start:exc.end]: # 只是為了演示提供的介面,實施上就是返回原來的inputn s.append(c)n return .join(s), exc.endnncodecs.register_error(fernet.strict, strict_errors)ncodecs.register_error(fernet.fallback, fallback_errors)n

默認使用的是fernet.strict這種處理方案,也可以使用fallback模式。ok,我們現在感受一下:

In [1]: import fernetnnIn [2]: input = hahnnIn [3]: output = input.encode(fernet)nnIn [4]: output # 已經是加密後的結果了nOut[4]: gAAAAABZCFa6Znp2a_e9O0VqP6qToO6T3xRbF7O-adtpFC4QYO7jvVc6Yrcwbo6YGQfL8g5HCXcsaan_THWNhjZAorPTwlQQTA==nnIn [5]: output.decode(fernet) # 解密後還原成原來的字元串nOut[5]: hahnnIn [6]: input.decode(fernet)nfernet codec cant decode bytes in position 0-3: Invalid Tokenn---------------------------------------------------------------------------nTypeError Traceback (most recent call last)n<ipython-input-6-1c20dc246c52> in <module>()n----> 1 input.decode(fernet)nn/Users/dongweiming/test/tmp/fernet.pyc in decode(self, input, errors)n 20 error = codecs.lookup_error(errors)n 21 return error(UnicodeDecodeError(fernet, input, 0, len(input) + 1,n---> 22 Invalid Token))n 23n 24nn/Users/dongweiming/test/tmp/fernet.pyc in strict_errors(exc)n 68 print excn 69 if isinstance(exc, UnicodeDecodeError):n---> 70 raise TypeError(Invalid Token)n 71n 72nnTypeError: Invalid Token # 拋了個自定義錯誤nnIn [7]: input.decode(fernet, fernet.fallback)nOut[7]: hah # 解密不成功,返回原來的字元串n

好了,自定義的加解密方案完成了,整理全部的代碼如下:

import codecsnnfrom cryptography.fernet import Fernetnfrom cryptography.fernet import InvalidTokennnKEY = Fernet.generate_key()nf = Fernet(KEY)n_cache = {}n_unknown = --unknown--nnnclass FernetCodec(codecs.Codec):n def encode(self, input, errors=fernet.strict):n return f.encrypt(input), len(input)nn def decode(self, input, errors=fernet.strict):n try:n return f.decrypt(input), len(input)n except InvalidToken:n error = codecs.lookup_error(errors)n return error(UnicodeDecodeError(fernet, input, 0, len(input) + 1,n Invalid Token))nnnclass IncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):n passnnnclass IncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):n passnnnclass StreamReader(FernetCodec, codecs.StreamReader):n passnnnclass StreamWriter(FernetCodec, codecs.StreamWriter):n passnnnndef getregentry():n return codecs.CodecInfo(n name=fernet,n encode=FernetCodec().encode,n decode=FernetCodec().decode,n incrementalencoder=IncrementalEncoder,n incrementaldecoder=IncrementalDecoder,n streamwriter=StreamWriter,n streamreader=StreamReader,n )nnndef search_function(encoding):n import encodingsn encoding = encodings.normalize_encoding(encoding)n entry = _cache.get(encoding, _unknown)n if entry is not _unknown:n return entrynn if encoding == fernet:n entry = getregentry()n _cache[encoding] = entryn return entrynnndef strict_errors(exc):n if isinstance(exc, UnicodeDecodeError):n raise TypeError(Invalid Token)nnndef fallback_errors(exc):n s = []n for c in exc.object[exc.start:exc.end]:n s.append(c)n return .join(s), exc.endnnncodecs.register(search_function)ncodecs.register_error(fernet.strict, strict_errors)ncodecs.register_error(fernet.fallback, fallback_errors)n

歡迎關注本人的微信公眾號獲取更多Python相關的內容(也可以直接搜索「Python之美」):

推薦閱讀:

拖尾的簡單實現
Python爬蟲實戰入門二:從一個簡單的HTTP請求開始
Fab Academy 第十六周:界面和應用編程
Haskell 這段代碼該如何理解?
設計師學編程?從五個思維訓練開始

TAG:Python | 编程 |