node.js中Stream的理解

一、流的概念

  • 流是一組有序的,有起點和終點的位元組數據傳輸手段
  • 它不關心文件的整體內容,只關注是否從文件中讀到了數據,以及讀到數據之後的處理
  • 流是一個抽象介面,被 Node 中的很多對象所實現。比如HTTP 伺服器request和response對象都是流。

二、Stream 有四種流類型

  • Readable 可讀操作 可讀流
  • Writable 可寫操作 可寫流
  • Duplex 可讀可寫操作 雙工流
  • Transform 轉換流

所有的 Stream 對象都是 EventEmitter 的實例。常用的事件有:

  • data 當有數據可讀時候觸發
  • end 沒有數據可讀時觸發
  • error 在接收和寫入過程中發生錯誤時觸發
  • finish 所有數據已被寫入到底層系統時觸發

三、可讀流的操作 Readable

可讀的流有兩種模式:流動模式和暫停模式。流動模式下,數據會自動從來源流出,直到來源的數據耗盡。暫停模式下,你得通過stream.read()主動去要數據,你要了它才從來源讀,你不要它就在那兒等你。可讀流在創建時都是暫停模式。暫停模式和流動模式可以互相轉換

Readable流的一些常見實例如下:

  • 客戶端的HTTP響應
  • 服務端的HTTP請求
  • fs讀取流
  • zlib流
  • crypto(加密)流
  • TCP套接字
  • 子進程的stdout和stderr
  • process.stdin

Readable提供了一些函數,我們可以用它們讀取或操作流:

  • read([size]):如果你給read方法傳遞了一個大小作為參數,那它會返回指定數量的數據,如果數據不足,就會返回null。如果你不給read方法傳參,它會返回內部緩衝區里的所有數據,如果沒有數據,會返回null,此時有可能說明遇到了文件末尾。read返回的數據可能是Buffer對象,也可能是String對象。
  • setEncoding(encoding):給流設置一個編碼格式,用於解碼讀到的數據。調用此方法後,read([size])方法返回String對象。
  • pause():暫停可讀流,不再發出data事件
  • resume():恢復可讀流,繼續發出data事件
  • pipe(destination,[options]):把這個可讀流的輸出傳遞給destination指定的Writable流,兩個流組成一個管道。options是一個JS對象,這個對象有一個布爾類型的end屬性,默認值為true,當end為true時,Readable結束時自動結束Writable。注意,我們可以把一個Readable與若干Writable連在一起,組成多個管道,每一個Writable都能得到同樣的數據。這個方法返回destination,如果destination本身又是Readable流,就可以級聯調用pipe(比如我們在使用gzip壓縮、解壓縮時就會這樣,馬上會講到)。
  • unpipe([destination]):埠與指定destination的管道。不傳遞destination時,斷開與這個可讀流連在一起的所有管道

fs.createReadStream(path[, options])用來打開一個可讀的文件流,它返回一個fs.ReadStream對象。path參數指定文件的路徑,可選的options是一個JS對象,可以指定一些選項,類似下面這樣:

{ flags: r, //用什麼模式打開文件,』w』代表寫,』r』代表讀encoding: utf8, //指定打開文件時使用編碼格式fd: null, //屬性默認為null,當你指定了這個屬性時,createReadableStream會根據傳入的fd創建一個流,忽略pathmode: 0666,start: , // 指定起始的位元組偏移end: , // 指定結束(包含在內)的位元組偏移autoClose: true // 屬性為true(默認行為)時,當發生錯誤或文件讀取結束時會自動關閉文件描述符}

下面看一個栗子,先創建一個文件1.txt文件,文件內容如下

1234567890

let fs = require(fs)let rs = fs.createReadStream(./1.txt,{ flags: r, // 以什麼方式 mode: 0o666, start: 3, // 開始讀取位置 end: 8, // 結束位置 encoding: utf8, //字元編碼 highWaterMark: 3 //緩存區位元組大小});rs.setEncoding(utf8)rs.on(open, function(){ console.log(文件打開)})rs.on(data, function(data){ console.log(data) rs.pause() setTimeout(function(){ rs.resume() }, 2000)})rs.on(error, function(){ console.log(error)})rs.on(end, function(){ console.log(讀完了)})rs.on(close, function(){ console.log(文件關閉)})// 輸出結果文件打開456789讀完了文件關閉

四、可寫流的操作 Writable

Writable流提供了一個介面,用來把數據寫入到目的設備(或內存)中。Writable流的一些常見實例:

  • 客戶端的HTTP請求
  • 伺服器的HTTP響應
  • fs寫入流
  • zlib流
  • crypto(加密)流
  • 子進程的stdin
  • process.stdout和process.stderr

Writable流的write(chunk[,encoding] [,callback])方法可以把數據寫入流中。其中,chunk是待寫入的數據,是Buffer或String對象。這個參數是必須的,其它參數都是可選的。如果chunk是String對象,encoding可以用來指定字元串的編碼格式,write會根據編碼格式將chunk解碼成位元組流再來寫入。callback是數據完全刷新到流中時會執行的回調函數。write方法返回布爾值,當數據被完全處理後返回true

Writable流的end([chunk] [,encoding] [,callback])方法可以用來結束一個可寫流。它的三個參數都是可選的。chunk和encoding的含義與write方法類似。callback是一個可選的回調,當你提供它時,它會被關聯到Writable的finish事件上,這樣當finish事件發射時它就會被調用。

現在我們來看看Writable的事件:

  • finish: 在end()被調用、所有數據都已被寫入底層設備後發射。對應的處理器函數沒有參數。
  • pipe: 當你在Readable流上調用pipe()方法時,Writable流會發射這個事件,對應的處理器函數有一個參數,類型是Readable,指向與它連接的那個Readable流。
  • unpipe: 當你在Readable流上調用unpipe()方法時,Writable流會發射這個事件,對應的處理器函數有一個參數,類型是Readable,指向與剛與它斷開連接的那個Readable流。
  • error: 出錯時發射,對應的處理器函數的參數是Error對象。

fs.createWriteStream(path[,options])用來創建一個可寫的文件流,它返回fs.WriteStream對象。第一個參數path是路徑,第二個參數options是JS對象,是可選的,指定創建文件時的選項,類似:

{ flags: w, defaultEncoding: utf8, fd: null, mode: 0666 }

var fs = require(fs);var writable = fs.createWriteStream(example.txt,{ flags: w, defaultEncoding: utf8, mode: 0666,});writable.on(finish, function(){ console.log(write finished); process.exit(0);});writable.on(error, function(err){ console.log(write error, err.message);});writable.write(My name is xxxx, utf8);writable.end();

五、Duplex

讀寫流,該方法繼承了可寫流和可讀流,但相互之間沒有關係,各自獨立緩存區,擁有Writable和Readable所有方法和事件,同時實現_read()和_write()方法。

const fs = require(fs); const stream = require(stream); const duplex = stream.Duplex({ write(chunk, encoding, cb) { console.log(chunk.toString(utf8)); // 寫入 }, read() { this.push(讀取); this.push(null); } }); console.log(duplex.read(6).toString(utf8)); // 讀取 duplex.write(寫入);

推薦閱讀:

全面了解TCP/IP到HTTP
汪汪汪,抓緊啦,年前最後一期周刊來啦
低仿vue-async-computed
React填坑記(三):國際化方案

TAG:Nodejs | StreamProcessing | 前端開發 |