問題
ある日,高頻度(10ms間隔)で更新されるCSVファイルを読んで更新内容を取得したいという事がありました.
これはreadStreamとreadlineで行けるのではと思ってやると,ファイルの末尾まで読んだ瞬間にストリームがクローズしてそれ以降追記されるデータが読み込めなくなってしまいました.
そこで,fs.watchして変更検知した瞬間に再度ストリームを開いて更新内容を取得すればいいのではと思ったのですが,これはストリームをオープンするコストが高く,全く使い物になりませんでした(streamをオープンしている間にデータが追記されてfs.watchのイベントが次々と発火され処理が間に合わない).
やること
- ファイルの末尾まで読んでもファイルをオープンしたままにする.
- ファイルが更新された時に低コストで更新内容を読む.
設計?
- node.jsのfsでファイルをオープンしてEOFまで読み取る.
- 読み取ったEOFの位置を記録しておく. - fs.watchでファイル変更を検知した時に前回のEOFの位置からまた次のEOFまで内容を読む.
- 再度読み取ったEOFの位置を記録しておく. - 2.へ
実装
コード↓
main.js
const csvFile = 'hogehoge.csv';
const CsvObserver = require('./csvObserver.js').CsvObserver;
const csvObserver = new CsvObserver(csvFile);
//windowsで動作確認をしています.
//コンストラクタでファイルをオープンした時に最初からあるコンテンツ
//行ごとに分割された内容 -> lines(Array)
csvObserver.events.on('initial', lines => {
console.log('file opened, initial content is');
console.log(lines);
});
//ファイルがアップデートされたときの更新内容
//行ごとに分割された内容 -> lines(Array)
//linesは大体長さ1の配列ですが,タイミングによって2行になったりします.
csvObserver.events.on('update', lines => {
console.log('file updated, lines are');
console.log(lines);
});
csvObserver.js
const fs = require('fs');
const EventEmitter = require('events').EventEmitter;
const Buffer = require('node:buffer').Buffer;
const decEOF = 0; //decimal of EOF code. represented by node:buffer
/**
* @class
* コンストラクタの引数に指定したCSVファイルを監視し,
* 追記された場合に内容を行に分割してイベントで通知します.
*/
class CsvObserver {
/**
* @constructor
* @param {String} filename - file path of csv file to observe
*/
constructor(filename) {
this.events = new EventEmitter();
this.targetFile = filename;
this.currentReading = 0;
this.openFile(this.targetFile)
.then(fd => {
this.fd = fd;
this.readUntilEOF(this.fd, this.currentReading)
.then(streamInfo => {
this.currentReading = streamInfo.posEnd;
this.events.emit('initial', this.getLines(streamInfo.content));
})
.catch(err => {
console.error(err);
});
})
.catch(err => {
console.error('file could not open, csv observer aborted');
throw err;
});
//observe file and emit event with lines added
fs.watch(this.targetFile, () => {
this.readUntilEOF(this.fd, this.currentReading)
.then(streamInfo => {
this.currentReading = streamInfo.posEnd;
this.events.emit('update', this.getLines(streamInfo.content));
})
.catch(err => {
console.error(err);
});
});
}
/**
* open file and get file descriptor
* @param {String} path - path of file to open
* @returns {Promise} - resolve: fd, reject: err
*/
openFile = function (path) {
return new Promise((resolve, reject) => {
fs.open(path, (err, fd) => {
if (err) {
reject(err);
} else {
resolve(fd);
}
});
});
}
/**
* close file that is opening on this.fd
*/
closeFile = function(){
fs.close(this.fd, err => {
if(err){
console.error('error occurred while closing file');
console.error(err);
}
});
}
/**
* read file until EOF appear
* @param {Number} fd - file descriptor
* @param {Number} startPosition - position of start to read
* @returns {Object} - object.content = string, object.posEnd = Number
*/
readUntilEOF = async function(fd, startPosition){
let string = '';
for(let i = startPosition;; i++){
try{
const buf = await this.getCharFromFile(fd, i);
//if char is EOF, return string.
//else, add char to string
if(this.isEOF(buf)){
return {
content: string,
posEnd: i
}
}else{
string += this.getChar(buf);
}
}catch(err){
return err;
}
}
}
/**
* get 1 byte of char from file in specified position
* @param {Number} fd - file descriptor
* @param {Number} readPosition - position of file to read
* @returns {Promise} - resolve: buffer, reject: err
*/
getCharFromFile = function (fd, readPosition) {
return new Promise((resolve, reject) => {
const buf = Buffer.alloc(1);
fs.read(fd, buf, 0, 1, readPosition, (err, bytesRead, buffer) => {
if (err) {
reject(err);
} else {
//console.log(buffer[0]);
resolve(buffer);
}
});
});
}
/**
* read buf and judge the content is EOF
* @param {Buffer} buf - buffer from fs.read
* @param {Boolean} - is the buffer has EOF or not
*/
isEOF = function (buf) {
if (buf.length !== 1) {
throw 'buffer length should be 1';
}
if (buf[0] === decEOF) {
return true;
} else {
return false;
}
}
/**
* return ascii content of buffer
* @param {Buffer} buf - buffer of fs.read
* @returns {String} stringified content of buffer
*/
getChar = function (buf) {
return buf.toString()[0];
}
/**
* split string by \n and return as array
* @param {String} string
* @returns {Array} - array of string that is splited by \n
*/
getLines = function(string){
const lines = string.split('\n');
//delete line which has no content
for(let i = 0; i < lines.length; i++){
if(!lines[i]){
lines.splice(i, 1);
}
}
return lines;
}
/**
* parse stringified csv line to array
* @param {String} line
* @returns {Array} - array of csv content
*/
parseCsvLine = function(line){
return line.split(',');
}
}
exports.CsvObserver = CsvObserver;
おわりに
- 既知のバグ?
- windowsでの動作確認はしていますが,wsl(Ubuntu)でのfs.watchの挙動が違うのかwsl(Ubuntu)では更新内容があったりなかったり,または重複したりします.