如何在Node.js或TypeScript中實現for..of形式的按行讀文件?
每次用Node.js按行讀文件都感覺有點怪怪的,今日突發奇想,想問一下如何實現for..of形式地按行讀文件(就是python那種按行讀文件的方式)
之前關注了這個問題,寫了個草稿回來發現這個問題被舉報關閉了。不知道這個問題哪裡不合適了???
前面的「js全棧工程師」答主明顯是在強答,是沒用過 Python Java 等其他語言嗎?按行讀取是很多語言標準庫裡面的功能,Node 也提供了一個 readline 模塊實現按行讀取文件流,怎麼就奇葩了?
按行讀有兩個思路實現,一種是簡單粗暴的方式是先讀取整個文件,再 split 成數組,這樣就能迭代了。缺點是讀大文件比較吃內存。另一種更合理的方式是每次從文件讀一點,放進緩衝區,然後讀行。當不足一行的時候再從文件讀一點。
Node.js 讀文件也分同步和非同步兩種方式。如果使用同步方式,IO 操作會阻塞線程。但是要實現 for...of 讀文件,就只能用同步的方式。
Iteration
for...of 是應用於可迭代對象的,所以首先你得遵循協議構造一個可迭代對象,才能對它 for...of。在可迭代對象生成的迭代器中用一個緩衝區存儲讀到的數據,在 next 方法裡面返回一行文本(如果不足一行,就繼續讀文件,直到大於一行,再返回第一行)。
另外也可以使用 Generator 生成可迭代對象,在實現上就變「被動」為「主動」,在一個函數裡面重複執行「讀一點文件 + yield 一行」。
參考 Generator 實現如下,比較粗糙:
const fs = require(fs)
const LF = 10
const CR = 13
function* readLines(filePath) {
const fd = fs.openSync(filePath, r)
const chunkSize = 64 // for test, should be larger
let buffer = new Buffer(0)
while (true) {
const chunk = new Buffer(chunkSize)
const bytesRead = fs.readSync(fd, chunk, 0, chunkSize)
buffer = Buffer.concat([buffer, chunk])
let startIndex = 0
for (let i = 0; i &< buffer.length; i++) {
if (
buffer[i] === LF || (
buffer[i] === CR
buffer[i + 1] === LF
)
) {
yield buffer.slice(startIndex, i).toString()
if (buffer[i] === LF) {
startIndex = i + 1
} else {
startIndex = i + 2
i = i + 1
}
}
}
if (startIndex &> 0) {
buffer = buffer.slice(startIndex)
}
if (bytesRead &< chunkSize) { const restSize = buffer.length + bytesRead - chunkSize if (restSize) { yield buffer.slice(0, restSize).toString() } return } } } for (let line of readLines(text.txt)) { console.log(line.length) }
在這個例子里,打開文件(fs.open)和讀取文件(fs.read)使用的都是同步方法,會阻塞線程。然而未來會有非同步的 for...of,對應需要實現非同步的可迭代對象+迭代器.
Async iteration
TC39 有一個 proposal-async-iteration 提案,目前處於 Stage 3,這個提案給出了非同步的 for...of 的使用語法和對應的迭代器實現,其使用語法如下:
// await 需要在 async 函數內
async function readLineByLine() {
for await (const line of readLines(filePath)) {
console.log(line);
}
}
非同步迭代器的 next 方法會返回一個 Promise&<{ value: any, done: boolean }&>
,而不是同步迭代器的 { value: any, done: boolean }
同樣地,我們還是使用 Generator,但是它變成了 Async Generator. 同時,fs 的 sync 方法變成 async 方法:
async function* readLines(filePath) {
const fd = await fs.open(filePath, r)
...
while (true) {
const chunk = new Buffer(chunkSize)
// fs-extra 返回了一個對象
const { bytesRead } = await fs.read(fd, chunk, 0, chunkSize)
...
}
}
在這段代碼里我使用 fs-extra 替換內置的 fs,因為 fs-extra 的非同步方法能夠返回 Promise. 你也可以用其他的 Promisify 庫像 mz。也可以用 Node 8 的 util.promisify 包一下 fs 的非同步方法。
這段代碼通過 babel(stage-3) 轉一下,然後 polyfill 一下 Symbol.asyncIterator 就能跑了。
其實我覺得暫時用不了新特性的情況下,用非同步高階函數也蠻好的,而且實際上高階函數比for of要好用很多:
const fs = require(fs)
const split = require(split)//https://www.npmjs.com/package/split
const stream = require(stream)
function eachLineOf(fileName, iterator) {
return new Promise(function(resolve, reject) {
let lineNum = 1
let lineStream = fs.createReadStream(fileName).pipe(split())
let lastProcessingLine
lineStream.on(drain, async () =&> {
await lastProcessingLine
resolve()
})
lineStream.pipe(new stream.Writable({
objectMode: true,
highWaterMark: 1,
write: async function(line, encoding, done) {
var shouldStop = await (lastProcessingLine = iterator(line, lineNum++))
done()
}
}))
})
}
function delay(duration) {
return new Promise(function(resolve) {
setTimeout(resolve, duration)
})
}
(async function(){
console.log(------start)
await eachLineOf(./each-line.js, async (line, lineNum) =&> {
await delay(Math.random() * 1000)
console.log(line)
})
console.log(------end)
})()
const readline = require(readline);
const fs = require(fs)
const arr = []
const rl = readline.createInterface({
input: fs.createReadStream(xxx.js),
});
rl.on(line, (line) =&> {
arr.push(line)
})
rl.on(close, () =&> {
for (let line of arr) {
console.log(接收到的是:, line);
}
})
推薦閱讀:
※js中Async/Await 怎麼做錯誤處理更好?
※GitHub上有哪些值得關注學習的NodeJS開源項目?
※node.js 入門請推薦本好的入門書籍?
※為什麼nodejs不需要IO功能?