http上傳協議之文件流實現,輕鬆支持大文件上傳
來自專欄 Python中文社區20 人贊了文章
最近在公司進行業務開發時遇到了一些問題,當需要上傳一個較大的文件時,經常會遇到內存被大量佔用的情況。公司之前使用的web框架是一個老前輩實現的。在實現multipart/form-data類型的post請求解析時, 是將post請求體一次性讀到內存中再做解析的,從而導致內存佔用過大。而我之前為公司開發的框架
ShichaoMa/star_builder是基於apistar這個asgi框架的,而apistar在解析mutilpart時使用的時flask作者編寫的
https://github.com/pallets/werkzeugflask和django在對待multipart報文解析使用的方案基本是一致的,通過持續的解析請求體,將解析出來的文件內容放入一個工廠類創建的類文件對象中,工廠類在django中返回uploader的子類,在flask中叫作stream_factory。可以使用基於內存的,也可以使用基於臨時文件的。
但是apistar作者在借用werkzeug的FormDataParser解析時,卻直接將一個BytesIO傳入了!而BytesIO中存放的是全量請求體,這勢必會全部存在於內存中!那麼帶來的問題就是,當上傳大文件時,內存會被撐爆!代碼如下:
class MultiPartCodec(BaseCodec): media_type = multipart/form-data def decode(self, bytestring, headers, **options): try: content_length = max(0, int(headers[content-length])) except (KeyError, ValueError, TypeError): content_length = None try: mime_type, mime_options = parse_options_header(headers[content-type]) except KeyError: mime_type, mime_options = , {} body_file = BytesIO(bytestring) parser = FormDataParser() stream, form, files = parser.parse(body_file, mime_type, content_length, mime_options) return ImmutableMultiDict(chain(form.items(multi=True), files.items(multi=True)))
其實想必這也是不得已的事情,因為apistar支持ASGI協議,這就導致了每次請求IO都是非同步的,非同步read介面和同步介面調用方式肯定不一樣,所以作者想偷懶不自己實現一套非同步解析方案,那麼只能么做。
作者想偷懶我可以理解,但是公司對我的要求讓我感覺鴨梨山大,之前基於s3文件的上傳服務是由我開發的,使用的框架也是我依賴apistar開發的star_builder,現在公司要求廢棄掉公司之前的文件上傳服務(也就是基於老前輩web框架開發的那個),將所有介面全部轉移到我開發的服務上來。那麼勢必要求我一併解決掉大文件上傳的問題。所以沒有辦法,只能為apistar的作者造個輪子接上先用著了。
在我簡單了解了multipart/form-data協議之後,實現了一個FileStream類和File類,每個類都返回可非同步迭代對象,FileStream迭代File對象,File對象迭代數據,迭代過程實時解析請求體,實時發現文件對象,實時處理得到的文件數據。以這種方式處理上傳的文件,對內存不會產生任何壓力。
FIleStream的實現如下:
class FileStream(object): def __init__(self, receive, boundary): self.receive = receive self.boundary = boundary self.body = b"" self.closed = False def __aiter__(self): return self async def __anext__(self): return await File.from_boundary(self, self.receive, self.boundary)
FileStream支持非同步迭代,每次返回一個File對象。同時FIleStream存儲已讀但未返回到應用層的請求體數據。
File的實現如下:
class File(object): mime_type_regex = re.compile(b"Content-Type: (.*)") disposition_regex = re.compile( rb"Content-Disposition: form-data;" rb"(?: name="(?P<name>[^;]*?)")?" rb"(?:; filename*?="?" rb"(?:(?P<enc>.+?)" rb"(?P<lang>w*))?" rb"(?P<filename>[^"]*)"?)?") def __init__(self, stream, receive, boundary, name, filename, mimetype): self.mimetype = mimetype self.receive = receive self.filename = filename self.name = name self.stream = stream self.tmpboundary = b"
--" + boundary self.boundary_len = len(self.tmpboundary) self._last = b"" self._size = 0 self.body_iter = self._iter_content() def __aiter__(self): return self.body_iter def __str__(self): return f"<{self.__class__.__name__} " f"name={self.name} " f"filename={self.filename} >" __repr__ = __str__ def iter_content(self): return self.body_iter async def _iter_content(self): stream = self.stream while True: # 如果存在read過程中剩下的,則直接返回 if self._last: yield self._last continue index = self.stream.body.find(self.tmpboundary) if index != -1: # 找到分隔線,返回分隔線前的數據 # 並將分隔及分隔線後的數據返回給stream read, stream.body = stream.body[:index], stream.body[index:] self._size += len(read) yield read if self._last: yield self._last break else: if self.stream.closed: raise RuntimeError("Uncomplete content!") # 若沒有找到分隔線,為了防止分隔線被讀取了一半 # 選擇只返回少於分隔線長度的部分body read = stream.body[:-self.boundary_len] stream.body = stream.body[-self.boundary_len:] self._size += len(read) yield read await self.get_message(self.receive, stream) async def read(self, size=10240): read = b"" assert size > 0, (999, "Read size must > 0") while len(read) < size: try: buffer = await self.body_iter.asend(None) except StopAsyncIteration: return read read = read + buffer read, self._last = read[:size], read[size:] return read @staticmethod async def get_message(receive, stream): message = await receive() if not message[type] == http.request: raise RuntimeError( f"Unexpected ASGI message type: {message[type]}.") if not message.get(more_body, False): stream.closed = True stream.body += message.get("body", b"") def tell(self): return self._size @classmethod async def from_boundary(cls, stream, receive, boundary): tmp_boundary = b"--" + boundary while not stream.closed: await cls.get_message(receive, stream) if b"
" in stream.body and tmp_boundary in stream.body or stream.closed: break return cls(stream, receive, boundary, *cls.parse_headers(stream, tmp_boundary)) @classmethod def parse_headers(cls, stream, tmp_boundary): end_boundary = tmp_boundary + b"--" body = stream.body index = body.find(tmp_boundary) if index == body.find(end_boundary): raise StopAsyncIteration body = body[index + len(tmp_boundary):] header_str = body[:body.find(b"
")] body = body[body.find(b"
") + 4:] groups = cls.disposition_regex.search(header_str).groupdict() filename = groups["filename"] and unquote(groups["filename"].decode()) if groups["enc"]: filename = filename.encode().decode(groups["enc"].decode()) name = groups["name"].decode() mth = cls.mime_type_regex.search(header_str) mimetype = mth and mth.group(1).decode() stream.body = body assert name, "FileStream iterated without File consumed. " return name, filename, mimetype
File實例也是一個非同步可迭代對象,每次迭代從receive中實時獲取數據,receive的實現請參見
ASGI - Channels 2.1.2 documentation同時File還支持非同步read,但read本質上也是對File對象的迭代。
那麼正確的使用姿勢是怎樣的呢?
下面是star_builder構建的項目中關於FileStream在一次請求中action的demo實現。
@post("/test_upload") async def up(stream: FileStream): async for file in stream: if file.filename: with open(file.filename, "wb") as f: async for chuck in file: f.write(chuck) else: # 沒有filename的是其它類型的form參數 arg = await file.read() print(f"Form參數:{file.name}={arg.decode()}")
使用方法非常簡單,不會生成臨時文件,也不會佔用內存來存儲。實時非同步從socket中讀取數據,非要說有什麼缺點的話,就是不全部迭代完的話,是無法知道這一次請求中一共上傳了幾個文件的。如果需要提前知道的話,可以通過前端配合通過url傳入params參數來獲取文件相關屬性信息。
這種實時從socket讀取的實現方案,應該是基於http協議性能最好的文件上傳方案。歡迎評論區發表意見和建議。
推薦閱讀: