標籤:

如何在Node.js或TypeScript中實現for..of形式的按行讀文件?

每次用Node.js按行讀文件都感覺有點怪怪的,今日突發奇想,想問一下如何實現for..of形式地按行讀文件(就是python那種按行讀文件的方式)


之前關注了這個問題,寫了個草稿回來發現這個問題被舉報關閉了。不知道這個問題哪裡不合適了???

前面的「js全棧工程師」答主明顯是在強答,是沒用過 Python Java 等其他語言嗎?按行讀取是很多語言標準庫裡面的功能,Node 也提供了一個 readline 模塊實現按行讀取文件流,怎麼就奇葩了?


按行讀有兩個思路實現,一種是簡單粗暴的方式是先讀取整個文件,再 split 成數組,這樣就能迭代了。缺點是讀大文件比較吃內存。另一種更合理的方式是每次從文件讀一點,放進緩衝區,然後讀行。當不足一行的時候再從文件讀一點。

Node.js 讀文件也分同步和非同步兩種方式。如果使用同步方式,IO 操作會阻塞線程。但是要實現 for...of 讀文件,就只能用同步的方式。

Iteration

for...of 是應用於可迭代對象的,所以首先你得遵循協議構造一個可迭代對象,才能對它 for...of。在可迭代對象生成的迭代器中用一個緩衝區存儲讀到的數據,在 next 方法裡面返回一行文本(如果不足一行,就繼續讀文件,直到大於一行,再返回第一行)。

另外也可以使用 Generator 生成可迭代對象,在實現上就變「被動」為「主動」,在一個函數裡面重複執行「讀一點文件 + yield 一行」。

參考 Generator 實現如下,比較粗糙:

const fs = require(fs)
const LF = 10
const CR = 13

function* readLines(filePath) {
const fd = fs.openSync(filePath, r)
const chunkSize = 64 // for test, should be larger
let buffer = new Buffer(0)

while (true) {
const chunk = new Buffer(chunkSize)
const bytesRead = fs.readSync(fd, chunk, 0, chunkSize)
buffer = Buffer.concat([buffer, chunk])

let startIndex = 0

for (let i = 0; i &< buffer.length; i++) { if ( buffer[i] === LF || ( buffer[i] === CR buffer[i + 1] === LF ) ) { yield buffer.slice(startIndex, i).toString() if (buffer[i] === LF) { startIndex = i + 1 } else { startIndex = i + 2 i = i + 1 } } } if (startIndex &> 0) {
buffer = buffer.slice(startIndex)
}

if (bytesRead &< chunkSize) { const restSize = buffer.length + bytesRead - chunkSize if (restSize) { yield buffer.slice(0, restSize).toString() } return } } } for (let line of readLines(text.txt)) { console.log(line.length) }

在這個例子里,打開文件(fs.open)和讀取文件(fs.read)使用的都是同步方法,會阻塞線程。然而未來會有非同步的 for...of,對應需要實現非同步的可迭代對象+迭代器.

Async iteration

TC39 有一個 proposal-async-iteration 提案,目前處於 Stage 3,這個提案給出了非同步的 for...of 的使用語法和對應的迭代器實現,其使用語法如下:

// await 需要在 async 函數內
async function readLineByLine() {
for await (const line of readLines(filePath)) {
console.log(line);
}
}

非同步迭代器的 next 方法會返回一個 Promise&<{ value: any, done: boolean }&>,而不是同步迭代器的 { value: any, done: boolean }

同樣地,我們還是使用 Generator,但是它變成了 Async Generator. 同時,fs 的 sync 方法變成 async 方法:

async function* readLines(filePath) {
const fd = await fs.open(filePath, r)
...
while (true) {
const chunk = new Buffer(chunkSize)
// fs-extra 返回了一個對象
const { bytesRead } = await fs.read(fd, chunk, 0, chunkSize)
...
}
}

在這段代碼里我使用 fs-extra 替換內置的 fs,因為 fs-extra 的非同步方法能夠返回 Promise. 你也可以用其他的 Promisify 庫像 mz。也可以用 Node 8 的 util.promisify 包一下 fs 的非同步方法。

這段代碼通過 babel(stage-3) 轉一下,然後 polyfill 一下 Symbol.asyncIterator 就能跑了。


其實我覺得暫時用不了新特性的情況下,用非同步高階函數也蠻好的,而且實際上高階函數比for of要好用很多:

const fs = require(fs)
const split = require(split)//https://www.npmjs.com/package/split
const stream = require(stream)

function eachLineOf(fileName, iterator) {
return new Promise(function(resolve, reject) {
let lineNum = 1

let lineStream = fs.createReadStream(fileName).pipe(split())

let lastProcessingLine

lineStream.on(drain, async () =&> {
await lastProcessingLine
resolve()
})

lineStream.pipe(new stream.Writable({
objectMode: true,
highWaterMark: 1,
write: async function(line, encoding, done) {
var shouldStop = await (lastProcessingLine = iterator(line, lineNum++))
done()
}
}))
})
}

function delay(duration) {
return new Promise(function(resolve) {
setTimeout(resolve, duration)
})
}

(async function(){
console.log(------start)
await eachLineOf(./each-line.js, async (line, lineNum) =&> {
await delay(Math.random() * 1000)
console.log(line)
})
console.log(------end)
})()


const readline = require(readline);
const fs = require(fs)

const arr = []

const rl = readline.createInterface({
input: fs.createReadStream(xxx.js),
});

rl.on(line, (line) =&> {
arr.push(line)
})

rl.on(close, () =&> {
for (let line of arr) {
console.log(接收到的是:, line);
}
})


推薦閱讀:

js中Async/Await 怎麼做錯誤處理更好?
GitHub上有哪些值得關注學習的NodeJS開源項目?
node.js 入門請推薦本好的入門書籍?
為什麼nodejs不需要IO功能?

TAG:編程 | Nodejs |