使用codecs自定義編/解碼方案
相信很多使用Python的同學熟悉編解碼,比如:
In : print uU0001F3F4.encode(utf-8)n??nIn : 哈哈.decode(utf8)nnOut: u哈哈n
這樣可以在字元串和unicode之前轉換,不過細心的同學可能發現了,我使用了「utf-8」和「utf8」,這2個詞長得很像。事實上都能正常使用是由於他們都是「utf_8」的別名,這些別名的對應關係可以用如下方法找到:
In : import encodingsnnIn : encodings.aliases.aliases[utf8]nOut: utf_8nnIn : 哈哈.decode(u8)nOut: u哈哈nnIn : 哈哈.decode(utf)nOut: u哈哈nnIn : 哈哈.decode(utf8_ucs2)nOut: u哈哈nnIn : 哈哈.decode(utf8_ucs4)nOut: u哈哈n
encodings是標準庫中自帶的編碼庫,其中包含了上百個標準的編碼轉換方案。但是通常並不需要使用encodings,而是使用codecs。
codecs包含了編碼解碼器的註冊和其他基本的類,開發者還可以通過codecs提供的介面自定義編/解碼方案,也就是可以創造一個新的編解碼轉換方案,使用encode(XX)和decode(XX)的方式使用。今天我給大家演示直接進行Fernet對稱加密的例子。
互聯網安全的重要性不必在複述了,大家都應該接觸過一些加密技術,可能聽過M2Crypto、PyCrypto、Cryptography之類的庫。在這裡歪個樓,現在的主流是使用Cryptography,它的出現就是為了替代之前的那些庫,具體的可以看官方文檔。我們使用Cryptography提供的Fernet類來實現,首先實現一個Codec類:
import codecsnnfrom cryptography.fernet import Fernetnfrom cryptography.fernet import InvalidTokennnKEY = Fernet.generate_key()nf = Fernet(KEY)nnnclass FernetCodec(codecs.Codec):n def encode(self, input, errors=fernet.strict):n return f.encrypt(input), len(input)nn def decode(self, input, errors=fernet.strict):n try:n return f.decrypt(input), len(input)n except InvalidToken:n error = codecs.lookup_error(errors)n return error(UnicodeDecodeError(fernet, input, 0, len(input) + 1,n Invalid Token))n
當然也不必在類中實現encode和decode方法,單獨的2個函數也可以。我這裡是為了給之後的演示到的類復用。
如果你看過內置的字元串encode的方法,它的文檔說還接收第二個參數,讓你告訴它當出現出錯的時候如何去處理,默認是strict,直接就會拋出來錯誤。
其餘可選的還有ignore、replace、xmlcharrefreplace:
In [35]: x80abc.decode(utf-8, strict)n---------------------------------------------------------------------------nUnicodeDecodeError Traceback (most recent call last)n<ipython-input-35-974ebc908d50> in <module>()n----> 1 x80abc.decode(utf-8, strict)nn/Users/dongweiming/dae/venv/lib/python2.7/encodings/utf_8.pyc in decode(input, errors)n 14n 15 def decode(input, errors=strict):n---> 16 return codecs.utf_8_decode(input, errors, True)n 17n 18 class IncrementalEncoder(codecs.IncrementalEncoder):nnUnicodeDecodeError: utf8 codec cant decode byte 0x80 in position 0: invalid start bytennIn [36]: x80abc.decode(utf-8, replace)nOut[36]: u?cnnIn [37]: x80abc.decode(utf-8, ignore)nOut[37]: uabcn
事實上Python還內置了其他的選項,如backslashreplace、namereplace、surrogatepass、surrogateescape等,有興趣的可以看源碼實現
我也會定義2種錯誤函數,因為在解密(執行decrypt)的時候可能會報InvalidToken錯誤,但是InvalidToken不包含任何參數,而對錯誤處理的時候需要知道起始和結束的位置,所以我就直接拋一個UnicodeDecodeError錯誤了。
接著我們定義遞增式和流式的編碼類:
class IncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):n passnnnclass IncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):n passnnnclass StreamReader(FernetCodec, codecs.StreamReader):n passnnnclass StreamWriter(FernetCodec, codecs.StreamWriter):n pass n
由於這個要加密的字元串不會很長,沒有必要實現這些類,我就簡單的繼承然後pass了。接著開放入口:
def getregentry():n return codecs.CodecInfo(n name=fernet,n encode=FernetCodec().encode,n decode=FernetCodec().decode,n incrementalencoder=IncrementalEncoder,n incrementaldecoder=IncrementalDecoder,n streamwriter=StreamWriter,n streamreader=StreamReader,n )n
其實incrementalencoder、streamwriter這些參數不傳遞也是可以的,默認是None,今天只是為了讓大家知道是有這部分介面的。然後是註冊這個入口,為了提供更好的性能,我創建一個函數加上緩存功能:
_cache = {}n_unknown = --unknown--nndef search_function(encoding):n import encodingsn encoding = encodings.normalize_encoding(encoding)n entry = _cache.get(encoding, _unknown)n if entry is not _unknown:n return entrynn if encoding == fernet:n entry = getregentry()n _cache[encoding] = entryn return entryn n ncodecs.register(search_function) n
這裡簡單介紹下normalize_encoding的意義,之前我說到了別名,我們再感受下:
In : u哈哈.encode(utf-8)nOut: xe5x93x88xe5x93x88nnIn : u哈哈.encode(utf_8)nOut: xe5x93x88xe5x93x88n
這種中劃線和下劃線最後無差別對待,就是通過這個函數標準化的。
codecs模塊底層維護了一個搜索函數的列表,通過調用codecs.register方法就把上述函數append進去了。最後我們註冊2個錯誤處理的函數:
def strict_errors(exc):n if isinstance(exc, UnicodeDecodeError):n raise TypeError(Invalid Token)nnndef fallback_errors(exc):n s = []n for c in exc.object[exc.start:exc.end]: # 只是為了演示提供的介面,實施上就是返回原來的inputn s.append(c)n return .join(s), exc.endnncodecs.register_error(fernet.strict, strict_errors)ncodecs.register_error(fernet.fallback, fallback_errors)n
默認使用的是fernet.strict這種處理方案,也可以使用fallback模式。ok,我們現在感受一下:
In [1]: import fernetnnIn [2]: input = hahnnIn [3]: output = input.encode(fernet)nnIn [4]: output # 已經是加密後的結果了nOut[4]: gAAAAABZCFa6Znp2a_e9O0VqP6qToO6T3xRbF7O-adtpFC4QYO7jvVc6Yrcwbo6YGQfL8g5HCXcsaan_THWNhjZAorPTwlQQTA==nnIn [5]: output.decode(fernet) # 解密後還原成原來的字元串nOut[5]: hahnnIn [6]: input.decode(fernet)nfernet codec cant decode bytes in position 0-3: Invalid Tokenn---------------------------------------------------------------------------nTypeError Traceback (most recent call last)n<ipython-input-6-1c20dc246c52> in <module>()n----> 1 input.decode(fernet)nn/Users/dongweiming/test/tmp/fernet.pyc in decode(self, input, errors)n 20 error = codecs.lookup_error(errors)n 21 return error(UnicodeDecodeError(fernet, input, 0, len(input) + 1,n---> 22 Invalid Token))n 23n 24nn/Users/dongweiming/test/tmp/fernet.pyc in strict_errors(exc)n 68 print excn 69 if isinstance(exc, UnicodeDecodeError):n---> 70 raise TypeError(Invalid Token)n 71n 72nnTypeError: Invalid Token # 拋了個自定義錯誤nnIn [7]: input.decode(fernet, fernet.fallback)nOut[7]: hah # 解密不成功,返回原來的字元串n
好了,自定義的加解密方案完成了,整理全部的代碼如下:
import codecsnnfrom cryptography.fernet import Fernetnfrom cryptography.fernet import InvalidTokennnKEY = Fernet.generate_key()nf = Fernet(KEY)n_cache = {}n_unknown = --unknown--nnnclass FernetCodec(codecs.Codec):n def encode(self, input, errors=fernet.strict):n return f.encrypt(input), len(input)nn def decode(self, input, errors=fernet.strict):n try:n return f.decrypt(input), len(input)n except InvalidToken:n error = codecs.lookup_error(errors)n return error(UnicodeDecodeError(fernet, input, 0, len(input) + 1,n Invalid Token))nnnclass IncrementalEncoder(codecs.IncrementalEncoder, FernetCodec):n passnnnclass IncrementalDecoder(codecs.IncrementalDecoder, FernetCodec):n passnnnclass StreamReader(FernetCodec, codecs.StreamReader):n passnnnclass StreamWriter(FernetCodec, codecs.StreamWriter):n passnnnndef getregentry():n return codecs.CodecInfo(n name=fernet,n encode=FernetCodec().encode,n decode=FernetCodec().decode,n incrementalencoder=IncrementalEncoder,n incrementaldecoder=IncrementalDecoder,n streamwriter=StreamWriter,n streamreader=StreamReader,n )nnndef search_function(encoding):n import encodingsn encoding = encodings.normalize_encoding(encoding)n entry = _cache.get(encoding, _unknown)n if entry is not _unknown:n return entrynn if encoding == fernet:n entry = getregentry()n _cache[encoding] = entryn return entrynnndef strict_errors(exc):n if isinstance(exc, UnicodeDecodeError):n raise TypeError(Invalid Token)nnndef fallback_errors(exc):n s = []n for c in exc.object[exc.start:exc.end]:n s.append(c)n return .join(s), exc.endnnncodecs.register(search_function)ncodecs.register_error(fernet.strict, strict_errors)ncodecs.register_error(fernet.fallback, fallback_errors)n
歡迎關注本人的微信公眾號獲取更多Python相關的內容(也可以直接搜索「Python之美」):
推薦閱讀:
※拖尾的簡單實現
※Python爬蟲實戰入門二:從一個簡單的HTTP請求開始
※Fab Academy 第十六周:界面和應用編程
※Haskell 這段代碼該如何理解?
※設計師學編程?從五個思維訓練開始