具体的に言うと、例外を発行した後、そのストリームが機能しないように見える(ので、その後続のストリームにデータがわたらない)。
streamをまともに使ってこなかったので、例外が起きたところでハマりました。
【追記あります】
readableStream.pipe(transformStream).pipe(writableStream) な状態で、エラーが発生した場合、on('error', callback) して例外をキャッチしないと uncaught エラーを吐いて死にます。
ただ、on('error')しても、そのままだとパイプは機能しなくなる場合があります(下記 error.test.jsの最初のテスト参照)。なので何らかの方法でリカバリーさせる必要があるみたい。
error.test.js
var test = require('tape').test
var stream = require('stream')
function setup (src) {
var queue = src.slice(0)
var rs = new stream.Readable({objectMode: true})
rs._read = function () {
this.push(queue.shift() || null)
}
var ws = new stream.Writable({objectMode: true})
ws.spy = []
ws._write = function (chnk, enc, done) {
this.spy.push(chnk)
done()
}
var ts = new stream.Transform({objectMode: true})
ts.spy = []
ts._transform = function (chnk, enc, done) {
if (chnk.sError)
return done(chnk.sError)
this.spy.push(chnk)
this.push(chnk)
done()
}
return {
rs: rs
, ws: ws
, ts: ts
}
}
test('transformStreamがエラーを発生させた場合 # リカバリーしない', function (t) {
var error = new Error('foo is not bar')
var src = [
{sError: false, value: 'abc'}
, {sError: error, value: 'def'}
, {sError: false, value: 'ghi'}
]
var ss = setup(src)
var rs = ss.rs
var ts = ss.ts
var ws = ss.ws
var spy = []
Object.keys(ss).forEach(function (name) {
ss[name].on('error', function (err) {
spy.push(name + ' error ' + err.message)
})
})
rs.on('end', function () { spy.push('rs end') })
ts.on('finish', function () { spy.push('ts finish') })
ts.on('end', function () { spy.push('ts end') })
ws.on('finish', function () { spy.push('ws finish') })
rs
.pipe(ts)
.pipe(ws)
setTimeout(function () {
t.deepEqual(rs._readableState.buffer, [{sError: false, value: 'ghi'}])
t.deepEqual(spy, ['ts error foo is not bar'])
t.deepEqual(ts.spy, [ {sError: false, value: 'abc'} ])
t.deepEqual(ws.spy, [ {sError: false, value: 'abc'} ])
t.end()
}, 1000)
})
test('transformStreamがエラーを発生させた場合 # on("error")の中でpipeを繋ぎ直す', function (t) {
var error = new Error('foo is not bar')
var src = [
{sError: false, value: 'abc'}
, {sError: error, value: 'def'}
, {sError: false, value: 'ghi'}
]
var ss = setup(src)
var rs = ss.rs
var ts = ss.ts
var ws = ss.ws
var spy = []
Object.keys(ss).forEach(function (name) {
ss[name].on('error', function (err) {
spy.push(name + ' error ' + err.message)
})
})
ts.on('error', function () {
rs.unpipe(ts)
ts.unpipe(ws)
rs.pipe(ts).pipe(ws)
})
rs.on('end', function () { spy.push('rs end') })
ts.on('finish', function () { spy.push('ts finish') })
ts.on('end', function () { spy.push('ts end') })
ws.on('finish', function () { spy.push('ws finish') })
rs
.pipe(ts)
.pipe(ws)
setTimeout(function () {
t.deepEqual(rs._readableState.buffer, [])
t.deepEqual(spy, ['ts error foo is not bar', 'rs end', 'ts finish', 'ts end', 'ws finish'])
t.deepEqual(ts.spy, [{sError: false, value: 'abc'}, {sError: false, value: 'ghi'}])
t.deepEqual(ws.spy, [{sError: false, value: 'abc'}, {sError: false, value: 'ghi'}])
t.end()
}, 1000)
})
とりあえずのリカバリーだけど、on('error') に登録したコールバック関数の中でピプを繋ぎ直してリカバリーできました。
ただ、これは正しいやり方のなのかな?
【追記】
ts.on('error', ...)
しないで
ts.on('unpipe', functon (src) {
src.pipe(this)
})
の方が正しいのかも