Node.js
RxJS

Node.jsのStream APIとRxJSでテキストファイルを1行ずつ読み込みながら10行毎にファイル書き込みする

ちきさんです。

RxJSを使っていい感じのコードが書けないものかと思っていたら丁度良いテーマがありました。

Node.jsのStream APIです。

Streamでファイルから読み込みしつつRxでデータを一定量まとめてStreamでファイルに書き込むというのをやってみます。

用意したフェイクデータ

度々お世話になっている https://github.com/arangodb/example-datasets のRandomUsersのデータを使います。今回はPCスペックがしょぼいので1000行のデータを使いましたが、もっと負荷をかけたければ30万行のデータも用意されています。
ファイルの中身はこんな感じで人物のフェイクデータが1行1JSONの形式でどっさり収録されています。

names_1000.json
{"name":{"first":"Shawna","last":"Matheney"},"gender":"female","birthday":"1950-08-06","contact":{"address":{"street":"10 17th St","zip":"62218","city":"Bartelso","state":"IL"},"email":["shawna.matheney@nosql-matters.org","shawna@nosql-matters.org"],"region":"618","phone":["618-9063984","618-8108742"]},"likes":["driving","skiing"],"memberSince":"2010-07-07"}
{"name":{"first":"Burton","last":"Siaperas"},"gender":"male","birthday":"1941-05-16","contact":{"address":{"street":"3 Spring garden Aly","zip":"14608","city":"Rochester","state":"NY"},"email":["burton.siaperas@nosql-matters.org","siaperas@nosql-matters.org"],"region":"716","phone":["716-5428351"]},"likes":["shopping"],"memberSince":"2009-02-02"}
{"name":{"first":"Ernie","last":"Levinson"},"gender":"male","birthday":"1955-07-25","contact":{"address":{"street":"9 Neumann Pl","zip":"44482","city":"Warren","state":"OH"},"email":["ernie.levinson@nosql-matters.org"],"region":"330","phone":["330-2486637"]},"likes":[],"memberSince":"2008-08-05"}
...(省略)

テキストをStreamで読み込んでRxでまとめてStreamで書き込む

  • ReadStreamをReadLineでラップしてテキストファイルから1行ずつ読み込む。
  • 1行ずつSubjectに流し込んでいくが、10行毎にWriteStreamを使って別ファイルに書き込む。
  • ReadStreamが完了したらSubjectに残っているデータを全てWriteStreamで書き込んでWriteStreamを完了させる。

10行毎にファイルに書き込むのをRxなしで書こうとすると地味にめんどくさそうです。

back-pressure-read-write.ts
import * as path from 'path';
import * as fs from 'fs';
import { WriteStream } from 'fs';
import * as readline from 'readline';
import { Subject } from 'rxjs/Rx';

const filepath = path.join(path.resolve(), 'files', 'names_1000.json');

const reader$ = new Subject<string>();
readline.createInterface(fs.createReadStream(filepath))
  .on('line', (input: string) => reader$.next(input))
  .on('error', (err) => reader$.error(err))
  .on('close', () => { reader$.complete(); console.log('ReadStream is closed.'); });

const ws: WriteStream =
  fs.createWriteStream(path.join(path.resolve(), 'output', 'result.json'))
    .on('close', () => console.log('WriteStream is closed.'));

const buffer$ = new Subject<string>();
const flusher$ = new Subject<void>();
let counter = 0;

// ここからRx
reader$
  .finally(() => { // reader$.complete()が呼ばれるとここに到達する。
    flusher$.next(); // まだ残っているバッファーを流す。
    flusher$.complete(); // flusher$とbuffer$をcompleteさせる。
  })
  .subscribe(line => {
    buffer$.next(line + '\n'); // 読み込んだ1行をバッファーに追加する。
    counter++;
    if (counter % 10 === 0) {
      flusher$.next(); // バッファーを流す。
    }
  });

buffer$
  .buffer(flusher$) // flusher$.next()されるまでバッファーを蓄積する。
  .filter(data => data.length > 0)
  .finally(() => { // flusher$.complete()が呼ばれるとここに到達する。
    ws.end();
  })
  .subscribe(data => {
    console.log('chunk:', data.length, counter);
    ws.write(data.join('')); // 10行単位でファイルに書き込みする。
  });

1行読み込まれる度にreader$のストリームが流れて、その途中でbuffer$のストリームに流れてbuffer()で止まるが、10行毎にflusher$のストリームが流されてbuffer()に貯まっていたものが流されてws.write()を通じてファイル書き込みされて...を延々繰り返しています。

そして出力はこんな感じ。

OUTPUT
chunk: 10 10
chunk: 10 20
chunk: 10 30
chunk: 10 40
chunk: 10 50
chunk: 10 60
...(省略)
chunk: 10 970
chunk: 10 980
chunk: 10 990
chunk: 10 1000
ReadStream is closed.
WriteStream is closed.

まとめ

Node.jsのStream APIとRxJSを使って1000行のテキストファイルを1行ずつ読み込んで10行毎にファイル書き込みできました。

これを応用すればCluster APIを使って10行毎にWorkerに割り振ってマルチCPUコアでMapReduceとかもできそうです。