yieldとstreamを行き来する
coやaaなどyield + generator + Promiseによってcallback地獄から開放された、やった!
となったはいいけどPromisifyで囲めるものはいいけど既存のStreamのコードを書き直して入れるの面倒でない?
と思ったので、yieldの中でstreamを使いやすいようにパッケージにしました。
ちなみにasync-awaitのライブラリはaaを使います
aa.Channel()が愛しい。
詳細はLightSpeedC氏の投稿へ。
aa-util
aa-utilではArrayとGeneratorをStreamに変換できます。
例文内では const au = require('aa-util')
で読み込まれているとしてください。
- stream生成 :
au.stream.a2s(Array)
で配列をStreamでくるんだものが返ってくるので今までどおりpipeできます - stream結果受取 :
au.stream.reduce
でstreamの結果が配列になって返ってきます
Array/Generator -> Stream
const au = require('aa-util')
function *main(){
const target = [0,1,2,3,4,5]
// Arrayの場合
const stream = yield au.stream.a2s(target); // array-> stream
// Generatorの場合
const g = function *(){
yield* target
}
const stream = yield au.stream.g2s(g());
// pipe your custom stream
stream.pipe(stream_A)
// get stream result
const result = yield au.stream.reduce(stream_A);
// result = [0,1,2,3,4,5]
}
Stream -> Result
ちょっとまって! 別の形で受け取りたいんだけど。
という時には第二引数に自分でReducer(Transform)を指定できます。
自分で好きにreduceしてflush(cb)
が呼ばれたときに一度だけpushしてください。
const stream = require('stream');
function *main(){
const target = [0,1,2,3,4,5]
const g = function *(){
yield* target
}
const stream = yield au.stream.g2s(g());
// set custom reducer to second argument
const result = yield au.stream.reduce(stream, sumStream());
// result = 15
}
// すべての値を加算して返す
function sumStream(){
return stream.Transform({
objectMode:true,
transform:function(chunk, env, cb){
// 自前のreduce処理を書く
if(this._stack === undefined){
this._stack = 0
}
this._stack += chunk
cb()
},
flush: function(cb){
this.push(this._stack) // 結果を返す
cb()
}
})
}
おまけ
yieldでも呼べるしcallbackでも呼べると便利なのでは?
という思いつきから au.promise.wrap
も作りました。 でもあんまり使ってない
// yield でうごかす
function *main(){
const time = 100
const result = yield proc(time)
}
// call backで動かす
proc(time, (err, result)=>{
console.log(err, result)
done();
})
// cbにFunctionが入っていたらaaでラップしてcbに値を返す
// cbに何もなければmain(opt)をそのまま返す
function proc(opt, cb){
return au.promise.wrap(main(opt), cb)
function *main(arg){
return yield wait(arg)
}
}
function wait(arg){
return new Promise((res, rej)=>{
if(arg > 1500) return rej(new Error("over >1500"))
if(arg > 500) return res(new Error("over >500"))
setTimeout(()=>{
return res(`wait:${arg}`)
}, arg)
})
}
それでは、よりよいnodejsライフを