node中的精髓Stream(流)
在前端工程化中產生了很多工具,例如grunt,gulp,webpack,babel…等等,這些工具都是通過node中的stream實現。
在node中stream也是非常非常非常重要的模塊,比如我們常用的console就是基於stream的實例,還有net,http等核心模塊都是基於stream來實現的,可見stream是多麼的重要。1.什麼是stream?
是一種數據傳輸手段,從一個地方傳輸到另一個地方。
在寫node的時候會存在讀取文件,比如現在我們有一個非常大的文件,50G吧
const fs = require(fs); // test文件50個G fs.readFileSync(./test.text);
這個時候需要消耗大量的時候去讀取這個文件,然而我們可能關心的並不是文件所有內容,還會存在直接讀取失敗。stream就是為了解決這些問題而產生,我們讀一些數據處理一些數據,當讀到所關心數據的時候,則可以不再繼續讀取。
stream翻譯成中文『流』,就像水一樣,從水龍頭流向水杯。
2. Stream模塊
stream繼承於EventEmitter,擁有事件觸發和事件監聽功能。主要分為4種基本流類型:
- Readable (可讀流)
- Writable (可寫流)
- Duplex (讀寫流)
- Transform (轉換流) 在流中默認可操作的類型string和Buffer,如果需要處理其他類型的js值需要傳入參數objectMode: true(默認為false)
在流中存在一個重要的概念,緩存區,就像拿水杯去接水,水杯就是緩存區,當水杯滿,則會關閉水龍頭,等把水杯裡面的水消耗完畢,再打開水龍頭去接水。
stream默認緩存區大小為16384(16kb),可以通過highWaterMark參數設置緩存區大小,但設置encoding後,以設置的字元編碼為單位衡量。3. Readable
首先創建一個可讀流,可接收5個參數:
- highWaterMark 緩存區位元組大小,默認16384
- encoding 字元編碼,默認為null,就是buffer
- objectMode 是否操作js其他類型 默認false
- read 對內部的_read()方式實現 子類實現,父類調用
- destroy 對內部的_ destroy()方法實現 子類實現,父類調用
可讀流中分為2種模式流動模式和暫停模式。
監聽data事件,觸發流動模式,會源源不斷生產數據觸發data事件:
const { Readable } = require(stream); let i = 0; const rs = Readable({ encoding: utf8, // 這裡傳入的read方法,會被寫入_read() read: (size) => { // size 為highWaterMark大小 // 在這個方法裡面實現獲取數據,讀取到數據調用rs.push([data]),如果沒有數據了,push(null)結束流 if (i < 10) { rs.push(`當前讀取數據: ${i++}`); } else { rs.push(null); } }, // 源代碼,可覆蓋 destroy(err, cb) { rs.push(null); cb(err); } }); rs.on(data, (data) => { console.log(data); // 每次push數據則觸發data事件 // 當前讀取數據: 0 // 當前讀取數據: 1 // 當前讀取數據: 2 // 當前讀取數據: 3 // 當前讀取數據: 4 // 當前讀取數據: 5 // 當前讀取數據: 6 // 當前讀取數據: 7 // 當前讀取數據: 8 // 當前讀取數據: 9 })
監聽readable事件,觸發暫停模式,當流有了新數據或到了流結束之前觸發readable事件,需要顯示調用read([size])讀取數據:
const { Readable } = require(stream); let i = 0; const rs = Readable({ encoding: utf8, highWaterMark: 9, // 這裡傳入的read方法,會被寫入_read() read: (size) => { // size 為highWaterMark大小 // 在這個方法裡面實現獲取數據,讀取到數據調用rs.push([data]),如果沒有數據了,push(null)結束流 if (i < 10) { // push其實是把數據放入緩存區 rs.push(`當前讀取數據: ${i++}`); } else { rs.push(null); } } }); rs.on(readable, () => { const data = rs.read(9); console.log(data); // })
read([size]) size參數:
- 不傳代表讀取緩存區所有數據。
- 傳入0 填充緩存區, 但返回null
- size < 當前緩存區數據 返回所需數據
- size > 當前緩存區數據 返回null 並改變highWaterMark值
這裡的緩存區數據不是指highWaterMark,獲取緩存區數據大小rs._readableState.length。
流的模式可以自由切換: 通過rs._readableState.flowing的值獲取當前狀態
- null 初始狀態
- false 暫停模式
- true 流動模式
rs.pause()切換到暫停模式 rs.resume()切換到流動模式
在可讀流裡面還可以監聽其他事件:
rs.on(close, () => { // 流關閉時或文件關閉時觸發 }) rs.on(end, () => { // 在流中沒有數據可供消費時觸發 }) rs.on(error, (err) => { // 發生錯誤時候 })
4. Writable
可寫流可接受參數:
- highWaterMark 緩存區位元組大小,默認16384
- decodeStrings 是否將字元編碼傳入緩衝區
- objectMode 是否操作js其他類型 默認false
- write 子類實現,供父類調用 實現寫入底層數據
- writev 子類實現,供父類調用 一次處理多個chunk寫入底層數據
- destroy 可以覆蓋父類方法,不能直接調用,銷毀流時,父類調用
- final 完成寫入所有數據時父類觸發
在實現流除了用上面直接傳入參數的方式,還可以用繼承類
class WS extends stream.Writable { constructor() { super({ highWaterMark: 1 }); } _write(chunk, encoding, cb) { console.log(this._writableState.length); // chunk 為需要寫入的數據 // encoding 字元編碼 // cb 回調函數, 如果寫入成功需要調用cb去執行下一次寫入,如果發生錯誤,可以cb(new Error([錯誤信息])) if (chunk.length < 4) { fs.writeFileSync(./2.text, chunk, { flag: a }); cb(); } else{ cb(new Error(超出4個位元組)); } }}const ws = new WS();let i = 0;function next() { let flag = true; // write() 會返回boolean false -> 緩存區沒滿 true —> 已滿,需要暫停寫入數據 while(i < 10 && flag) { flag = ws.write(`${i++}`); console.log(flag, flag); }}next();// 當所有緩存區數據已經成功寫入底層數據,緩存區沒有數據了,觸發drain事件ws.on(drain, () => { console.log(drain); // 繼續寫入緩存區數據 next();})
可寫流的end事件,一旦觸發end事件,後續不能再寫入數據.
ws.write(start); ws.end(end); ws.wrtie(test); // 報錯 write after end
finish事件:
ws.write(start); ws.end(end); ws.on(finish, () => { console.log(調用end方法後,並且所有數據已經寫入底層) })
cork()與uncork(),強制所有數據先寫入緩存區,直到調用uncork()或end(),這時一併寫入底層:
const ws = stream.Writable({ writev(chunks, encoding, cb) { // 這時chunks為一個數組,包含所有的chunk // 現在length為10 console.log(chunk.length); } }); // 寫入數據之前,強制寫入數據放入緩存區 ws.cork(); // 寫入數據 for (let i = 0; i < 10; i++) { ws.write(i.toString()); } // 寫入完畢,可以觸發寫入底層 ws.uncork();
5. Duplex
讀寫流,該方法繼承了可寫流和可讀流,但相互之間沒有關係,各自獨立緩存區,擁有Writable和Readable所有方法和事件,同時實現_read()和_write()方法。
const fs = require(fs); const stream = require(stream); const duplex = stream.Duplex({ write(chunk, encoding, cb) { console.log(chunk.toString(utf8)); // 寫入 }, read() { this.push(讀取); this.push(null); } }); console.log(duplex.read(6).toString(utf8)); // 讀取 duplex.write(寫入);
6. Transform
轉換流,這個流在前端工程化中用到最多,從一個地方讀取數據,轉換數據後輸出到一個地方,該流繼承於Duplex。
const fs = require(fs); const stream = require(stream); const transform = stream.Transform({ transform(chunk, encoding, cb){ // 把數據轉換成大寫字母,然後push到緩存區 this.push(chunk.toString().toUpperCase()); cb(); } }); transform.write(a); console.log(transform.read(1).toString()); // A
7. fs快速創建可讀/可寫流
可讀流和可寫流都需要我們去實現父類的方法,那麼fs這個模塊幫我們做了這件事情,fs裡面實現了高效並且可靠的可讀/可寫流,提供快速創建流,不再去實現父類_write()或_read()。下面我們來看看如何使用:
const fs = require(fs); /** * 創建可讀流 * * 第一個參數文件路徑 * * 第二個參數為options * flags?: string; encoding?: string; 字元編碼 fd?: number; 文件打開後的標識符 mode?: number; 文件的許可權 autoClose?: boolean; 讀取完畢後,是否自動關閉文件 start?: number; 從哪個位置開始讀取 end?: number; 讀到什麼時候結束 highWaterMark?: number; 最高水位線 */ const rs = fs.createReadStream(1.text); rs.on(data, data => { console.log(data); }) /** * 創建可寫流 * * 第一個參數文件路徑 * * 第二個參數為options * flags?: string; encoding?: string; 字元編碼 fd?: number; 文件打開後的標識符 mode?: number; 文件的許可權 autoClose?: boolean; 寫入完畢後,是否自動關閉文件 start?: number; 從什麼位置開始寫入 */ const ws = fs.createWriteStream(2.text); ws.write(123);
8. pipe
在流中搭建一條管道,從可讀流中到可寫流。
可讀流中有pipe()方法,在可寫流中可以監聽pipe事件,下面實現了從可讀流中通過管道到可寫流:
const fs = require(fs); const stream = require(stream); const rs = stream.Readable({ read() { this.push(fs.readFileSync(./1.text)); // 文件內容 test this.push(null); } }); const ws = stream.Writable({ write(chunk, encoding, cb) { // chunk為test buffer fs.writeFileSync(./2.text, chunk.toString()); cb(); } }); ws.on(pipe, data => { // 觸發pipe事件 console.log(data); }); rs.pipe(ws);
9. 總結
流分為四種基本類型,兩種模式。流中的數據不是直接寫入或讀取,有緩存區的概念。
推薦閱讀:
※技術分享——ES2017繼發與並發!
※大齡電力汪前端學習路(頁面渲染篇)
※愛搞事情的webpack
※HTML5入門教程之HTML5新特性