LoginSignup
6
9

More than 5 years have passed since last update.

pipeで繋いだStreamが例外を発行した場合の処理

Last updated at Posted at 2014-10-28

具体的に言うと、例外を発行した後、そのストリームが機能しないように見える(ので、その後続のストリームにデータがわたらない)。
streamをまともに使ってこなかったので、例外が起きたところでハマりました。

【追記あります】

readableStream.pipe(transformStream).pipe(writableStream) な状態で、エラーが発生した場合、on('error', callback) して例外をキャッチしないと uncaught エラーを吐いて死にます。
ただ、on('error')しても、そのままだとパイプは機能しなくなる場合があります(下記 error.test.jsの最初のテスト参照)。なので何らかの方法でリカバリーさせる必要があるみたい。

error.test.js
var test   = require('tape').test
var stream = require('stream')

function setup (src) {
    var queue = src.slice(0)
    var rs    = new stream.Readable({objectMode: true})
    rs._read  = function () {
        this.push(queue.shift() || null)
    }

    var ws = new stream.Writable({objectMode: true})
    ws.spy = []
    ws._write = function (chnk, enc, done) {
        this.spy.push(chnk)
        done()
    }

    var ts = new stream.Transform({objectMode: true})
    ts.spy = []
    ts._transform = function (chnk, enc, done) {
        if (chnk.sError)
            return done(chnk.sError)

        this.spy.push(chnk)
        this.push(chnk)
        done()
    }

    return {
        rs: rs
      , ws: ws
      , ts: ts
    }
}

test('transformStreamがエラーを発生させた場合 # リカバリーしない', function (t) {
    var error = new Error('foo is not bar')
    var src = [
        {sError: false, value: 'abc'}
      , {sError: error, value: 'def'}
      , {sError: false, value: 'ghi'}
    ]
    var ss = setup(src)
    var rs = ss.rs
    var ts = ss.ts
    var ws = ss.ws
    var spy = []

    Object.keys(ss).forEach(function (name) {
        ss[name].on('error', function (err) {
            spy.push(name + ' error ' + err.message)
        })
    })

    rs.on('end', function () { spy.push('rs end') })
    ts.on('finish', function () { spy.push('ts finish') })
    ts.on('end', function () { spy.push('ts end') })
    ws.on('finish', function () { spy.push('ws finish') })

    rs
      .pipe(ts)
      .pipe(ws)

    setTimeout(function () {
        t.deepEqual(rs._readableState.buffer, [{sError: false, value: 'ghi'}])
        t.deepEqual(spy, ['ts error foo is not bar'])
        t.deepEqual(ts.spy, [ {sError: false, value: 'abc'} ])
        t.deepEqual(ws.spy, [ {sError: false, value: 'abc'} ])
        t.end()
    }, 1000)
})

test('transformStreamがエラーを発生させた場合 # on("error")の中でpipeを繋ぎ直す', function (t) {
    var error = new Error('foo is not bar')
    var src = [
        {sError: false, value: 'abc'}
      , {sError: error, value: 'def'}
      , {sError: false, value: 'ghi'}
    ]
    var ss = setup(src)
    var rs = ss.rs
    var ts = ss.ts
    var ws = ss.ws
    var spy = []

    Object.keys(ss).forEach(function (name) {
        ss[name].on('error', function (err) {
            spy.push(name + ' error ' + err.message)
        })
    })

    ts.on('error', function () {
        rs.unpipe(ts)
        ts.unpipe(ws)
        rs.pipe(ts).pipe(ws)
    })

    rs.on('end', function () { spy.push('rs end') })
    ts.on('finish', function () { spy.push('ts finish') })
    ts.on('end', function () { spy.push('ts end') })
    ws.on('finish', function () { spy.push('ws finish') })

    rs
      .pipe(ts)
      .pipe(ws)

    setTimeout(function () {
        t.deepEqual(rs._readableState.buffer, [])
        t.deepEqual(spy, ['ts error foo is not bar', 'rs end', 'ts finish', 'ts end', 'ws finish'])
        t.deepEqual(ts.spy, [{sError: false, value: 'abc'}, {sError: false, value: 'ghi'}])
        t.deepEqual(ws.spy, [{sError: false, value: 'abc'}, {sError: false, value: 'ghi'}])
        t.end()
    }, 1000)
})

とりあえずのリカバリーだけど、on('error') に登録したコールバック関数の中でピプを繋ぎ直してリカバリーできました。

ただ、これは正しいやり方のなのかな?

【追記】

ts.on('error', ...)

しないで

ts.on('unpipe', functon (src) {
    src.pipe(this)
})

の方が正しいのかも

6
9
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
9