ちきさんです。
RxJSを使っていい感じのコードが書けないものかと思っていたら丁度良いテーマがありました。
Node.jsのStream APIです。
Streamでファイルから読み込みしつつRxでデータを一定量まとめてStreamでファイルに書き込むというのをやってみます。
用意したフェイクデータ
度々お世話になっている https://github.com/arangodb/example-datasets のRandomUsersのデータを使います。今回はPCスペックがしょぼいので1000行のデータを使いましたが、もっと負荷をかけたければ30万行のデータも用意されています。
ファイルの中身はこんな感じで人物のフェイクデータが1行1JSONの形式でどっさり収録されています。
{"name":{"first":"Shawna","last":"Matheney"},"gender":"female","birthday":"1950-08-06","contact":{"address":{"street":"10 17th St","zip":"62218","city":"Bartelso","state":"IL"},"email":["shawna.matheney@nosql-matters.org","shawna@nosql-matters.org"],"region":"618","phone":["618-9063984","618-8108742"]},"likes":["driving","skiing"],"memberSince":"2010-07-07"}
{"name":{"first":"Burton","last":"Siaperas"},"gender":"male","birthday":"1941-05-16","contact":{"address":{"street":"3 Spring garden Aly","zip":"14608","city":"Rochester","state":"NY"},"email":["burton.siaperas@nosql-matters.org","siaperas@nosql-matters.org"],"region":"716","phone":["716-5428351"]},"likes":["shopping"],"memberSince":"2009-02-02"}
{"name":{"first":"Ernie","last":"Levinson"},"gender":"male","birthday":"1955-07-25","contact":{"address":{"street":"9 Neumann Pl","zip":"44482","city":"Warren","state":"OH"},"email":["ernie.levinson@nosql-matters.org"],"region":"330","phone":["330-2486637"]},"likes":[],"memberSince":"2008-08-05"}
...(省略)
テキストをStreamで読み込んでRxでまとめてStreamで書き込む
- ReadStreamをReadLineでラップしてテキストファイルから1行ずつ読み込む。
- 1行ずつSubjectに流し込んでいくが、10行毎にWriteStreamを使って別ファイルに書き込む。
- ReadStreamが完了したらSubjectに残っているデータを全てWriteStreamで書き込んでWriteStreamを完了させる。
10行毎にファイルに書き込むのをRxなしで書こうとすると地味にめんどくさそうです。
import * as path from 'path';
import * as fs from 'fs';
import { WriteStream } from 'fs';
import * as readline from 'readline';
import { Subject } from 'rxjs/Rx';
const filepath = path.join(path.resolve(), 'files', 'names_1000.json');
const reader$ = new Subject<string>();
readline.createInterface(fs.createReadStream(filepath))
.on('line', (input: string) => reader$.next(input))
.on('error', (err) => reader$.error(err))
.on('close', () => { reader$.complete(); console.log('ReadStream is closed.'); });
const ws: WriteStream =
fs.createWriteStream(path.join(path.resolve(), 'output', 'result.json'))
.on('close', () => console.log('WriteStream is closed.'));
const buffer$ = new Subject<string>();
const flusher$ = new Subject<void>();
let counter = 0;
// ここからRx
reader$
.finally(() => { // reader$.complete()が呼ばれるとここに到達する。
flusher$.next(); // まだ残っているバッファーを流す。
flusher$.complete(); // flusher$とbuffer$をcompleteさせる。
})
.subscribe(line => {
buffer$.next(line + '\n'); // 読み込んだ1行をバッファーに追加する。
counter++;
if (counter % 10 === 0) {
flusher$.next(); // バッファーを流す。
}
});
buffer$
.buffer(flusher$) // flusher$.next()されるまでバッファーを蓄積する。
.filter(data => data.length > 0)
.finally(() => { // flusher$.complete()が呼ばれるとここに到達する。
ws.end();
})
.subscribe(data => {
console.log('chunk:', data.length, counter);
ws.write(data.join('')); // 10行単位でファイルに書き込みする。
});
1行読み込まれる度にreader$
のストリームが流れて、その途中でbuffer$
のストリームに流れてbuffer()
で止まるが、10行毎にflusher$
のストリームが流されてbuffer()
に貯まっていたものが流されてws.write()
を通じてファイル書き込みされて...を延々繰り返しています。
そして出力はこんな感じ。
chunk: 10 10
chunk: 10 20
chunk: 10 30
chunk: 10 40
chunk: 10 50
chunk: 10 60
...(省略)
chunk: 10 970
chunk: 10 980
chunk: 10 990
chunk: 10 1000
ReadStream is closed.
WriteStream is closed.
まとめ
Node.jsのStream APIとRxJSを使って1000行のテキストファイルを1行ずつ読み込んで10行毎にファイル書き込みできました。
これを応用すればCluster APIを使って10行毎にWorkerに割り振ってマルチCPUコアでMapReduceとかもできそうです。