《Node.js設計模式(第2版)》試讀 & 送書活動
發點小福利,外刊君給大家帶來《Node.js設計模式(第2版)》部分章節的試讀。如果大家覺得不錯,歡迎參加文末的活動,獲得本書的紙質版!
第5章 流編程
流是Node.js最重要的組成和設計模式之一。社區流行這樣一句格言「stream all the things!」,這就足以描述流在Node.js中扮演的重要角色。Dominic Tarr是一位著名的Node.js社區貢獻者,他形容流是Node.js中最棒的想法,同時也是最容易讓人產生誤解的部分。有許多不同的原因使得Node.js中的流如此有吸引力,不僅因為在技術上表現出的良好性能和高效率,更多的是在於它的優雅,以及能完美融入Node.js的編程思想。
在本章中,你將學習以下這些內容:
- 為什麼流在Node.js中如此重要
- 使用和創建流
- 流編程範式:展現流除了I/O操作以外在很多不同編程領域的優勢
- 管道模式以及在不同使用場景中進行流的拼接
流的重要性
在例如Node.js這樣以事件為基礎的平台,處理I/O操作最高效的方法就是實時處理,儘快地接收處理輸入內容,並經過程序的處理儘快地輸出結果。
在這部分,我們將對Node.js的流以及流的功能進行一個最初始的介紹。請記住這只是一個概述,更多關於如何使用和組合流的分析將在本章後面部分被講解到。
緩衝和流
到目前為止,你在本書中看到的幾乎所有非同步API都使用了緩衝模式。比如要完成一個輸入操作,使用buffer讓所有的源數據被存放到緩存當中,當整個數據源讀取完畢後,會將緩存中的數據立即傳遞給回調函數處理。下圖生動地展示了這個處理過程:
在上圖中,我們可以看到在t1時刻,有些數據被讀取到緩存中。在t2時刻,另一個數據塊也就是最後一個數據塊被接收到,完成了本次讀取數據的過程並將整個緩存區的數據發送給處理程序。
不同的是,流允許你儘可能快地處理接收到的數據。下圖很好地展示了這一過程:
這一次,圖表展示了如何從數據源讀取每一個數據塊,然後被立即提供給後續的處理流程,這時就可以立即處理讀取到的數據而不需要等待所有的數據被先存放在緩存中。
但是這兩種處理數據的方式到底有什麼不一樣?我們可以從兩個主要的方面來總結:
- 空間效率
- 時間效率
除此之外,Node.js流有另外一個重要的優勢:可組合性。現在讓我們來看下這些屬性是如何影響我們設計和編寫程序的。
空間效率
首先,流可以幫助我們實現一些無法通過緩存數據並一次性處理來實現的功能。例如,考慮這樣一種情況,我們需要讀取一個很大的文件,比方說有幾百M甚至幾百G的大小。很明顯,讀取整個文件內容,然後從緩存中一次性返回的方式並不好。設想一下如果我們的程序同時讀取很多這樣的大文件,很容易導致內存溢出。除此之外,V8中的緩存區最大不能超過0x3FFFFFFF位元組(略小於1G)。所以我們根本無法去完全耗盡物理內存。
通過緩存實現Gzip
來舉個具體的例子,讓我們考慮實現一個簡單的命令行介面(CLI)應用程序,它使用Gzip格式來壓縮一個文件。在Node.js中使用緩存API,程序代碼會是這樣的(為了代碼簡潔,省略了錯誤的處理):
const fs = require(fs);const zlib = require(zlib);const file = process.argv[2];fs.readFile(file, (err, buffer) => { zlib.gzip(buffer, (err, buffer) => { fs.writeFile(file + .gz, buffer, err => { console.log(File successfully compressed); }); });});
現在,我們可以將上述代碼保存到gzip.js文件中並使用以下命令來執行:
node gzip <path to file>
如果我們選擇一個足夠大的文件,比如大於1GB,我們會得到預想的錯誤,提示我們嘗試讀取的文件大小超過了緩存允許的最大值,比如下面的輸出:
RangeError: File size is greater than possible Buffer:0x3FFFFFFF bytes
這正是我們能預想到的錯誤,說明我們使用了錯誤的方法。
通過流實現Gzip
修改我們的Gzip程序使其能夠處理大文件的方法就是使用流。讓我們來看下具體怎麼實現,修改下我們剛剛創建的文件內容:
const fs = require(fs);const zlib = require(zlib);const file = process.argv[2];fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(fs.createWriteStream(file + .gz)) .on(finish, () => console.log(File successfully compressed));
你也許會問,就這麼簡單?是的,正如我們之前說的,流的神奇也在於它提供的介面和可組合性,能使代碼更加整潔和優雅。接下來我們會了解更多的細節,但現在你需要知道的是,我們的程序可以順利的處理任何大小的文件,同時內存的使用率能夠保持恆定。你可以自己嘗試一下(但同時你需要知道壓縮一個大文件會耗費很長的時間)。
時間效率
現在讓我們來考慮這樣的情況,一個應用程序壓縮一個文件並將其上傳到遠程的HTTP伺服器,接著伺服器會解壓縮這個文件並將文件保存到文件系統中。如果你在客戶端使用緩存的方式去實現,只有在整個文件被讀取並壓縮之後才會開始執行上傳操作。也就是說,伺服器端只有接受到所有的數據之後才能開始解壓縮文件。使用流來實現這個功能應該是一個更好的方案。在客戶端,一旦從文件系統讀取到數據塊,流允許你立即進行壓縮和發送這些數據塊,而同時,伺服器上你也可以立即解壓縮從遠程收到的每個數據塊。讓我們創建一個這樣的應用程序來具體說明,先從服務端開始吧。
讓我們創建一個gzipReceive.js文件,代碼如下:
const http = require(http);const fs = require(fs);const zlib = require(zlib);const server = http.createServer((req, res) => { const filename = req.headers.filename; console.log(File request received: + filename); req .pipe(zlib.createGunzip()) .pipe(fs.createWriteStream(filename)) .on(finish, () => { res.writeHead(201, {Content-Type: text/plain}); res.end(Thats itn); console.log(`File saved: ${filename}`);});server.listen(3000, () => console.log(Listening));
使用Node.js的流,伺服器能夠迅速處理從網路上接受到的數據塊,解壓縮並且保存到文件。
創建一個gzipSend.js的文件作為我們應用程序的客戶端模塊,代碼如下:
const fs = require(fs);const zlib = require(zlib);const http = require(http);const path = require(path);const file = process.argv[2];const server = process.argv[3];const options = { hostname: server, port: 3000, path: /, method: PUT, headers: { filename: path.basename(file), Content-Type: application/octet-stream, Content-Encoding: gzip }};const req = http.request(options, res => { console.log(Server response: + res.statusCode);});fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(req) .on(finish, () => { console.log(File successfully sent);});
在上面的代碼中,我們再一次使用流來從文件系統讀取文件內容,並立即壓縮發送每一個數據塊。
現在,可以試運行一下我們的應用,先使用以下命令啟動服務端:
node gzipReceive
然後,啟動客戶端程序並指定要發送的文件和伺服器的地址(比如localhost):
node gzipSend <path to file> localhost
如果選擇的文件足夠大,我們就能更容易明白數據是怎樣從客戶端傳遞到服務端,但是到底為什麼使用流會比使用緩存來處理髮送數據更加高效呢?下圖會給我們一些啟示:
文件處理會經過以下一系列的步驟:
- [客戶端]從文件系統讀取數據
- [客戶端]對數據進行壓縮
- [客戶端]發送到服務端
- [服務端]接受客戶端發送的數據
- [服務端]解壓縮接收到的數據
- [服務端]將數據寫入磁碟
為了完成整個處理過程,我們必須像流水線一樣按順序完成以上所有的步驟。如上圖所示,使用緩存,整個過程是完全順序執行的。首先必須等待整個文件被讀取之後才能進行數據壓縮,然後必須等待文件讀取完畢以及數據壓縮完成之後才可以向服務端發送數據。相反,如果我們使用流,當讀取到第一個數據塊的時候,整條流水線就開始運行起來,而不需要等到整個文件內容被讀取到。但是更加驚奇的是,當下一個數據塊到達的時候,不需要等待之前的任務完成,相反,另一條流水線並行啟動。之所以能這樣是由於每個任務都是非同步執行的,在Node.js中可以並行來處理。唯一的限制就是數據塊到達每個階段的順序必須被保存(這一點Node.js的流模塊已經幫我們實現了)。
從上圖中我們可以看到,使用流的方式,整個處理流程花費了更少的時間,因為我們不需要等待所有的數據被讀取之後再一次性地進行處理。
組合性
我們前面看到的代碼已經大概展示了如何將流組合起來使用,這要歸功於pipe()這個方法,允許我們將不同的處理單元連接起來,而每一個處理單元只實現單一的功能,這一點很符合Node.js的編程風格。這之所以可行是因為流提供了統一的處理介面,從API層面來看流都是互通的。唯一的前提就是管道中的下一個流必須支持上一個流輸出的數據類型,有可能是二進位流,文本甚至對象,這些在後面的章節中都會講到。
通過另一個例子來看下這一屬性的應用,我們嘗試在致歉構建的gzipReceive/gzipSend應用中添加一個加密層。
為了說明這一點,我們只需要簡單更新下客戶端程序,在管道中增加一個流;具體來說,添加crypto.createChipher()對現有的流進行處理。最終代碼是這樣的:
const crypto = require(crypto);// ...fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(crypto.createCipher(aes192, a_shared_secret)) .pipe(req) .on(finish, () => console.log(File succesfully sent));
同樣的方式,我們修改一下服務端程序使數據在解壓縮之前先進行解密:
const crypto = require(crypto);// ...const server = http.createServer((req, res) => {// ... req .pipe(crypto.createDecipher(aes192, a_shared_secret)) .pipe(zlib.createGunzip()) .pipe(fs.createWriteStream(filename)) .on(finish, () => { /* ... */ });});
只要很少的修改(事實上只是幾行代碼),就在我們的應用程序中增加了一個加密層;我們簡單地將一個已有的轉換流到嵌入到已經搭建的流管道中。用類似的方式,我們可以像玩樂高積木一樣隨意地添加和組合其他流。
顯然,這種方法的主要優點是可重用性,但同時,從這個例子可以看出,流能使得代碼更加清晰和模塊化。正因為如此,流不僅僅可以用來處理純I/O問題,也可以用來對代碼進行簡化和模塊化處理。
......
送書,送書!
掃碼尾圖二維碼關注前端外刊評論公眾號,在公眾號本文的留言區留言,獲得點贊最多的前三位童鞋每人一本,第一位童鞋將會獲得本書的外刊君簽名版!(點贊數據以本文發出後24小時整為準)
推薦閱讀: