2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ジェネレータを使った非同期処理

Posted at

はじめに

ES6以降、JavaScriptの非同期処理は、GeneratorやPromiseを使って書きやすくはなりました。しかし、直列と並列を変更したり、混在させたり、何層にも入れ子にしたい場合があるので、より書きやすく読みやすい記述を検討します。

サンプル用非同期処理関数(sleep)の準備

関数内の実行状況がわかるサンプルを用意してテストします。実際のコードで使用する実用的なものとしては、画像の読み込み、AjaxでのAPI呼び出し、タイマーで起動させるものなどがあります。

テスト用サンプル関数の準備
const sleep = (time, callback) => {
  console.log('sleep time : ' + time + ' start');
  const completion = () => {
    console.log('sleep time : ' + time + ' callback');
    callback();
  };
  if (time === 0) { // 関数終了前にコールバックが呼ばれる場合のテスト用
    completion();
  } else {
    setTimeout(completion, time);
  }
};

原始的な非同期処理の例

サンプル
// 直列非同期処理
sleep(300, () => {
  sleep(200, () => {
    sleep(100, () => {
      console.log('completed.');
    });
  });
});

// 並列非同期処理
let n = 3;
const check = () => { if (--n === 0) console.log('completed.'); };
sleep(300, check);
sleep(200, check);
sleep(100, check);
テスト結果
// 直列非同期処理
sleep time : 300 start
sleep time : 300 callback
sleep time : 200 start
sleep time : 200 callback
sleep time : 100 start
sleep time : 100 callback
completed.

// 並列非同期処理
sleep time : 300 start
sleep time : 200 start
sleep time : 100 start
sleep time : 100 callback
sleep time : 200 callback
sleep time : 300 callback
completed.

ジェネレータを使った非同期処理

前述の非同期処理は直列と並列で構造が異なるため、ジェネレータを使って、直列と並列の非同期処理を同じ構造になるように書きます。直列か並列かは、serial/parallelで切り替えます。

(準備)現在実行している処理が完了してから、関数を実行する仕組み(postpone)の用意

基本的な動作としては、setTimeout(callback, 0);等と同じものです。
ここではジェネレータを使用する時に、再帰的呼び出し状態を回避するために使います。

postpone
const postpone = (callback) => {
  // setTimeout(callback, 0); は4msの制限があるため
  // gif 1x1
  const img = new Image();
  img.onload = callback;
  img.onerror = callback;
  img.src = 'data:image/gif;base64,R0lGODlhAQABAIAAAAUEBAAAACwAAAAAAQABAAACAkQBADs=';
};

非同期処理ジェネレータ serial/parallel

serial/parallel
// 直列用
const serial = function (generator, completion) {
  function proceeder() {
    postpone(() => { g.next().done && completion && completion(); });
  }
  var g = generator(proceeder);
  proceeder();
};

// 並列用
const parallel = function (generator, completion) {
  let n = 0;
  function proceeder() {
    postpone(() => {
      if (n-- !== 0) return;
      completion && completion();
    });
  }
  var g = generator(proceeder);
  postpone(() => {
    while (!g.next().done) n++;
    proceeder();
  });
};

このserial/parallelを使って、前述の原始的非同期処理を書いたサンプルです。

サンプル
// 直列
serial(function* (cb) {
  yield sleep(300, cb);
  yield sleep(200, cb);
  yield sleep(100, cb);
}, () => {
  console.log('serial : end');
});

// 並列
parallel(function* (cb) {
  yield sleep(300, cb);
  yield sleep(200, cb);
  yield sleep(100, cb);
}, () => {
  console.log('parallel : end');
});
テスト結果
// 直列
sleep time : 300 start
sleep time : 300 callback
sleep time : 200 start
sleep time : 200 callback
sleep time : 100 start
sleep time : 100 callback
serial : end

// 並列
sleep time : 300 start
sleep time : 200 start
sleep time : 100 start
sleep time : 100 callback
sleep time : 200 callback
sleep time : 300 callback
parallel : end

serial/parallel関数の引数completionは必要がなければ記述を省略できます。

サンプル
serial(function* (cb) {
  yield sleep(200, cb);
  yield sleep(100, cb);
});

parallel(function* (cb) {
  yield sleep(200, cb);
  yield sleep(100, cb);
});

serial/parallelは混在させることができます。入れ子にする時は、completionで上位階層のコールバックを呼びます。

サンプル
serial(function* (cb) {
  yield sleep(700, cb);
  yield parallel(function* (cb) {
    yield sleep(600, cb);
    yield serial(function* (cb) {
      yield sleep(500, cb);
      yield sleep(400, cb);
    }, cb);
    yield parallel(function* (cb) {
      yield sleep(300, cb);
      yield sleep(200, cb);
    }, cb);
  }, cb);
  yield sleep(100, cb);
}, () => {
  console.log('serial + parallel: end');
});
テスト結果
sleep time : 700 start
sleep time : 700 callback
sleep time : 600 start
sleep time : 500 start
sleep time : 300 start
sleep time : 200 start
sleep time : 200 callback
sleep time : 300 callback
sleep time : 500 callback
sleep time : 400 start
sleep time : 600 callback
sleep time : 400 callback
sleep time : 100 start
sleep time : 100 callback
serial + parallel: end

ジェネレータを使った非同期処理の改良その1

前述のserial/parallelを入れ子にする場合、completionで上位階層のコールバックを呼びますが、

  • どの階層のジェネレータ関数の引数で与えられているコールバックが呼ばれているのかわかりにくい。
  • completion部分の記述を忘れた場合に正しく動かない。
  • そもそもcompletion部分でコールバックを指定するのが面倒である。

という問題があります。
そのため、completion部分でコールバックを呼ばなくて済むように改良します。具体的には、serial/parallelが外からコールバックを指定できる関数を返すことで、それを経由してコールバック関数を受け取り、自動で呼ばれるようにします。

非同期処理ジェネレータ serial/parallel 改良版その1

serial/parallel
const serial = (generator, completion) => {
  let callback = () => { callback = void 0; };
  const set_callback = cb => {
    callback || cb();
    callback = cb;
  };
  const proceed = () => {
    postpone(() => {
      const r = g.next();
      if (typeof r.value === 'function' && r.value.name === 'set_callback') r.value(proceed);
      if (r.done) {
        completion && completion();
        callback();
      }
    });
  };
  const g = generator(proceed);
  proceed();
  return set_callback;
};

const parallel = (generator, completion) => {
  let n = 0;
  let callback = () => { callback = void 0; };
  const set_callback = cb => {
    callback || cb();
    callback = cb;
  };
  const proceed = () => {
    postpone(() => {
      if (n-- !== 0) return;
      completion && completion();
      callback();
    });
  };
  const g = generator(proceed);
  postpone(() => {
    for (let r = g.next(); !r.done; r = g.next(), n++) {
      if (typeof r.value === 'function' && r.value.name === 'set_callback') r.value(proceed);
    }
    proceed();
  });
  return set_callback;
};
サンプル
serial(function* (cb) {
  yield sleep(700, cb);
  yield parallel(function* (cb) {
    yield sleep(600, cb);
    yield serial(function* (cb) {
      yield sleep(500, cb);
      yield sleep(400, cb);
    });
    yield parallel(function* (cb) {
      yield sleep(300, cb);
      yield sleep(200, cb);
    });
  });
  yield sleep(100, cb);
}, () => {
  console.log('serial + parallel: end');
});
テスト結果
sleep time : 700 start
sleep time : 700 callback
sleep time : 600 start
sleep time : 500 start
sleep time : 300 start
sleep time : 200 start
sleep time : 200 callback
sleep time : 300 callback
sleep time : 500 callback
sleep time : 400 start
sleep time : 600 callback
sleep time : 400 callback
sleep time : 100 start
sleep time : 100 callback
serial + parallel: end

ジェネレータを使った非同期処理の改良その2

さらに改良を加えて、ジェネレータ関数の引数及び、非同期処理関数のコールバック関数を省略できるようにします。コールバック関数を遅延設定、実行するために自前クラスTransmitterを用意しました。自前関数transmitはそれを使いやすくするヘルパ関数になります。

非同期処理ジェネレータ serial/parallel 最終形

serial/parallel
const Transmitter = class {
  constructor(executor) { // executor = callback => {...};
    this.value = void 0;  // 同期部分のreturnで返す値
    this.result = void 0; // 非同期部分のcallbackで返す値
    this._callback = result => {
      this.result = result;
      this._callback = void 0;
    };
    this.value = executor(result => { this._callback(result); });
  }
  connect(callback) {
    this._callback || callback(this.result);
    this._callback = callback;
    return this.value;
  }
};

// 最後の引数がコールバック関数となる関数のみ使用可能
// 仮のコールバック関数を強制設定する
const transmit = (func, ...args) => {
  return new Transmitter(callback => {
    if (func.length === 0) postpone(callback);
    if (func.length > 0) {
      const org = args[func.length - 1];
      if (typeof org === 'function') {
        args[func.length - 1] = () => { org(); callback(); };
      } else if (org === void 0 || org === null) {
        args[func.length - 1] = callback;
      } else {
        postpone(callback);
      }
    }
    return func(...args);
  });
};

const serial = (generator, completion) => {
  return new Transmitter(callback => {
    let y = void 0;
    const proceed = () => {
      postpone(() => {
        const r = g.next(y);
        if (!r.done) {
          y = (r.value instanceof Transmitter) ? r.value.connect(proceed) : r.value;
        } else {
          completion && completion();
          callback();
        }
      });
    };
    const g = generator(proceed);
    proceed();
  });
};

const parallel = (generator, completion) => {
  return new Transmitter(callback => {
    let n = 0;
    const proceed = () => {
      postpone(() => {
        if (n-- !== 0) return;
        completion && completion();
        callback();
      });
    };
    const g = generator(proceed);
    postpone(() => {
      let y = void 0;
      for (let r = g.next(y); !r.done; r = g.next(y), n++) {
        y = (r.value instanceof Transmitter) ? r.value.connect(proceed) : r.value;
      }
      proceed();
    });
  });
};
サンプル
serial(function* () {
  yield transmit(sleep, 700);
  yield parallel(function* () {
    yield transmit(sleep, 600);
    yield serial(function* () {
      yield transmit(sleep, 500);
      yield transmit(sleep, 400);
    });
    yield parallel(function* () {
      yield transmit(sleep, 300);
      yield transmit(sleep, 200);
    });
  });
  yield transmit(sleep, 100);
}, () => {
  console.log('serial + parallel: end');
});
テスト結果
sleep time : 700 start
sleep time : 700 callback
sleep time : 600 start
sleep time : 500 start
sleep time : 300 start
sleep time : 200 start
sleep time : 200 callback
sleep time : 300 callback
sleep time : 500 callback
sleep time : 400 start
sleep time : 600 callback
sleep time : 400 callback
sleep time : 100 start
sleep time : 100 callback
serial + parallel: end

対象となる非同期処理関数で、Transmitterを使用したり、transmitでラップしておけば、transmitの記述をする必要も無くなります。

サンプル
const t1Sleep = (time) => { return new Transmitter(callback => { sleep(time, callback); }); };
const t2Sleep = (...args) => { return transmit(sleep, ...args); };

serial(function* () {
  yield t1Sleep(400);
  yield t2Sleep(300);
  yield parallel(function* () {
    yield t1Sleep(200);
    yield t2Sleep(100);
  });
});
テスト結果
sleep time : 400 start
sleep time : 400 callback
sleep time : 300 start
sleep time : 300 callback
sleep time : 200 start
sleep time : 100 start
sleep time : 100 callback
sleep time : 200 callback

transmitを使用せずに明示的にコールバック呼ぶ形式(改良版その1の書き方)も混在できます。

サンプル
serial(function* (cb) {
  yield t1Sleep(600);
  yield t2Sleep(500);
  yield sleep(400, cb);
  yield parallel(function* (cb) {
    yield t1Sleep(300);
    yield t2Sleep(200);
    yield sleep(100, cb);
  });
});
テスト結果
sleep time : 600 start
sleep time : 600 callback
sleep time : 500 start
sleep time : 500 callback
sleep time : 400 start
sleep time : 400 callback
sleep time : 300 start
sleep time : 200 start
sleep time : 100 start
sleep time : 100 callback
sleep time : 200 callback
sleep time : 300 callback

以上です。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?