LoginSignup
0
0

More than 1 year has passed since last update.

readStreamでストリームをクローズしないようにしたい(Node.js)

Posted at

問題

ある日,高頻度(10ms間隔)で更新されるCSVファイルを読んで更新内容を取得したいという事がありました.
これはreadStreamとreadlineで行けるのではと思ってやると,ファイルの末尾まで読んだ瞬間にストリームがクローズしてそれ以降追記されるデータが読み込めなくなってしまいました.
そこで,fs.watchして変更検知した瞬間に再度ストリームを開いて更新内容を取得すればいいのではと思ったのですが,これはストリームをオープンするコストが高く,全く使い物になりませんでした(streamをオープンしている間にデータが追記されてfs.watchのイベントが次々と発火され処理が間に合わない).

やること

  • ファイルの末尾まで読んでもファイルをオープンしたままにする.
  • ファイルが更新された時に低コストで更新内容を読む.

設計?

  1. node.jsのfsでファイルをオープンしてEOFまで読み取る.
     - 読み取ったEOFの位置を記録しておく.
  2. fs.watchでファイル変更を検知した時に前回のEOFの位置からまた次のEOFまで内容を読む.
     - 再度読み取ったEOFの位置を記録しておく.
  3. 2.へ

実装

コード↓

main.js
const csvFile = 'hogehoge.csv';
const CsvObserver = require('./csvObserver.js').CsvObserver;
const csvObserver = new CsvObserver(csvFile);

//windowsで動作確認をしています.
//コンストラクタでファイルをオープンした時に最初からあるコンテンツ
//行ごとに分割された内容 -> lines(Array)
csvObserver.events.on('initial', lines => {
  console.log('file opened, initial content is');
  console.log(lines);
});

//ファイルがアップデートされたときの更新内容
//行ごとに分割された内容 -> lines(Array)
//linesは大体長さ1の配列ですが,タイミングによって2行になったりします.
csvObserver.events.on('update', lines => {
  console.log('file updated, lines are');
  console.log(lines);
});

csvObserver.js
const fs = require('fs');
const EventEmitter = require('events').EventEmitter;
const Buffer = require('node:buffer').Buffer;
const decEOF = 0;   //decimal of EOF code. represented by node:buffer

/**
 * @class
 * コンストラクタの引数に指定したCSVファイルを監視し,
 * 追記された場合に内容を行に分割してイベントで通知します.
 */
class CsvObserver {

  /**
   * @constructor
   * @param {String} filename - file path of csv file to observe
   */
  constructor(filename) {
    this.events = new EventEmitter();
    this.targetFile = filename;
    this.currentReading = 0;

    this.openFile(this.targetFile)
      .then(fd => {
        this.fd = fd;
        this.readUntilEOF(this.fd, this.currentReading)
        .then(streamInfo => {
          this.currentReading = streamInfo.posEnd;
          this.events.emit('initial', this.getLines(streamInfo.content));
        })
        .catch(err => {
          console.error(err);
        });
      })
      .catch(err => {
        console.error('file could not open, csv observer aborted');
        throw err;
      });

    //observe file and emit event with lines added
    fs.watch(this.targetFile, () => {
      this.readUntilEOF(this.fd, this.currentReading)
      .then(streamInfo => {
        this.currentReading = streamInfo.posEnd;
        this.events.emit('update', this.getLines(streamInfo.content));
      })
      .catch(err => {
        console.error(err);
      });

    });
  }


  /**
   * open file and get file descriptor
   * @param {String} path - path of file to open
   * @returns {Promise} - resolve: fd, reject: err
   */
  openFile = function (path) {
    return new Promise((resolve, reject) => {
      fs.open(path, (err, fd) => {
        if (err) {
          reject(err);
        } else {
          resolve(fd);
        }
      });
    });
  }

  /**
   * close file that is opening on this.fd
   */
  closeFile = function(){
    fs.close(this.fd, err => {
      if(err){
        console.error('error occurred while closing file');
        console.error(err);
      }
    });
  }

  /**
   * read file until EOF appear
   * @param {Number} fd - file descriptor
   * @param {Number} startPosition - position of start to read
   * @returns {Object} - object.content = string, object.posEnd = Number
   */
  readUntilEOF = async function(fd, startPosition){
    let string = '';

    for(let i = startPosition;; i++){
      try{
        const buf = await this.getCharFromFile(fd, i);
        
        //if char is EOF, return string.
        //else, add char to string
        if(this.isEOF(buf)){
          return {
            content: string,
            posEnd: i
          }

        }else{
          string += this.getChar(buf);
        }
      }catch(err){
        return err;
      }
    }

  }
  
  /**
   * get 1 byte of char from file in specified position
   * @param {Number} fd - file descriptor
   * @param {Number} readPosition - position of file to read 
   * @returns {Promise} - resolve: buffer, reject: err
   */
  getCharFromFile = function (fd, readPosition) {
    return new Promise((resolve, reject) => {
      const buf = Buffer.alloc(1);
  
      fs.read(fd, buf, 0, 1, readPosition, (err, bytesRead, buffer) => {
        if (err) {
          reject(err);
        } else {
          //console.log(buffer[0]);
          resolve(buffer);
        }
      });
    });
  }

  /**
   * read buf and judge the content is EOF
   * @param {Buffer} buf - buffer from fs.read
   * @param {Boolean} - is the buffer has EOF or not
   */
  isEOF = function (buf) {
    if (buf.length !== 1) {
      throw 'buffer length should be 1';
    }
  
    if (buf[0] === decEOF) {
      return true;
    } else {
      return false;
    }
  }

  /**
   * return ascii content of buffer
   * @param {Buffer} buf - buffer of fs.read
   * @returns {String} stringified content of buffer 
   */
  getChar = function (buf) {
    return buf.toString()[0];
  }

  /**
   * split string by \n and return as array
   * @param {String} string
   * @returns {Array} - array of string that is splited by \n
   */
  getLines = function(string){
    const lines = string.split('\n');
    
    //delete line which has no content
    for(let i = 0; i < lines.length; i++){
      if(!lines[i]){
        lines.splice(i, 1);
      }
    }

    return lines;
  }

  /**
   * parse stringified csv line to array
   * @param {String} line
   * @returns {Array} - array of csv content
   */
  parseCsvLine = function(line){
    return line.split(',');
  }
}

exports.CsvObserver = CsvObserver;

おわりに

  • 既知のバグ?
    • windowsでの動作確認はしていますが,wsl(Ubuntu)でのfs.watchの挙動が違うのかwsl(Ubuntu)では更新内容があったりなかったり,または重複したりします.
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0