LoginSignup
0

More than 5 years have passed since last update.

Node.jsのStream APIとRxJSでテキストファイルを1行ずつ読み込みながら10行毎にファイル書き込みする

Last updated at Posted at 2018-01-31

ちきさんです。

RxJSを使っていい感じのコードが書けないものかと思っていたら丁度良いテーマがありました。

Node.jsのStream APIです。

Streamでファイルから読み込みしつつRxでデータを一定量まとめてStreamでファイルに書き込むというのをやってみます。

用意したフェイクデータ

度々お世話になっている https://github.com/arangodb/example-datasets のRandomUsersのデータを使います。今回はPCスペックがしょぼいので1000行のデータを使いましたが、もっと負荷をかけたければ30万行のデータも用意されています。
ファイルの中身はこんな感じで人物のフェイクデータが1行1JSONの形式でどっさり収録されています。

names_1000.json
{"name":{"first":"Shawna","last":"Matheney"},"gender":"female","birthday":"1950-08-06","contact":{"address":{"street":"10 17th St","zip":"62218","city":"Bartelso","state":"IL"},"email":["shawna.matheney@nosql-matters.org","shawna@nosql-matters.org"],"region":"618","phone":["618-9063984","618-8108742"]},"likes":["driving","skiing"],"memberSince":"2010-07-07"}
{"name":{"first":"Burton","last":"Siaperas"},"gender":"male","birthday":"1941-05-16","contact":{"address":{"street":"3 Spring garden Aly","zip":"14608","city":"Rochester","state":"NY"},"email":["burton.siaperas@nosql-matters.org","siaperas@nosql-matters.org"],"region":"716","phone":["716-5428351"]},"likes":["shopping"],"memberSince":"2009-02-02"}
{"name":{"first":"Ernie","last":"Levinson"},"gender":"male","birthday":"1955-07-25","contact":{"address":{"street":"9 Neumann Pl","zip":"44482","city":"Warren","state":"OH"},"email":["ernie.levinson@nosql-matters.org"],"region":"330","phone":["330-2486637"]},"likes":[],"memberSince":"2008-08-05"}
...(省略)

テキストをStreamで読み込んでRxでまとめてStreamで書き込む

  • ReadStreamをReadLineでラップしてテキストファイルから1行ずつ読み込む。
  • 1行ずつSubjectに流し込んでいくが、10行毎にWriteStreamを使って別ファイルに書き込む。
  • ReadStreamが完了したらSubjectに残っているデータを全てWriteStreamで書き込んでWriteStreamを完了させる。

10行毎にファイルに書き込むのをRxなしで書こうとすると地味にめんどくさそうです。

back-pressure-read-write.ts
import * as path from 'path';
import * as fs from 'fs';
import { WriteStream } from 'fs';
import * as readline from 'readline';
import { Subject } from 'rxjs/Rx';

const filepath = path.join(path.resolve(), 'files', 'names_1000.json');

const reader$ = new Subject<string>();
readline.createInterface(fs.createReadStream(filepath))
  .on('line', (input: string) => reader$.next(input))
  .on('error', (err) => reader$.error(err))
  .on('close', () => { reader$.complete(); console.log('ReadStream is closed.'); });

const ws: WriteStream =
  fs.createWriteStream(path.join(path.resolve(), 'output', 'result.json'))
    .on('close', () => console.log('WriteStream is closed.'));

const buffer$ = new Subject<string>();
const flusher$ = new Subject<void>();
let counter = 0;

// ここからRx
reader$
  .finally(() => { // reader$.complete()が呼ばれるとここに到達する。
    flusher$.next(); // まだ残っているバッファーを流す。
    flusher$.complete(); // flusher$とbuffer$をcompleteさせる。
  })
  .subscribe(line => {
    buffer$.next(line + '\n'); // 読み込んだ1行をバッファーに追加する。
    counter++;
    if (counter % 10 === 0) {
      flusher$.next(); // バッファーを流す。
    }
  });

buffer$
  .buffer(flusher$) // flusher$.next()されるまでバッファーを蓄積する。
  .filter(data => data.length > 0)
  .finally(() => { // flusher$.complete()が呼ばれるとここに到達する。
    ws.end();
  })
  .subscribe(data => {
    console.log('chunk:', data.length, counter);
    ws.write(data.join('')); // 10行単位でファイルに書き込みする。
  });

1行読み込まれる度にreader$のストリームが流れて、その途中でbuffer$のストリームに流れてbuffer()で止まるが、10行毎にflusher$のストリームが流されてbuffer()に貯まっていたものが流されてws.write()を通じてファイル書き込みされて...を延々繰り返しています。

そして出力はこんな感じ。

OUTPUT
chunk: 10 10
chunk: 10 20
chunk: 10 30
chunk: 10 40
chunk: 10 50
chunk: 10 60
...(省略)
chunk: 10 970
chunk: 10 980
chunk: 10 990
chunk: 10 1000
ReadStream is closed.
WriteStream is closed.

まとめ

Node.jsのStream APIとRxJSを使って1000行のテキストファイルを1行ずつ読み込んで10行毎にファイル書き込みできました。

これを応用すればCluster APIを使って10行毎にWorkerに割り振ってマルチCPUコアでMapReduceとかもできそうです。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0