5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

async/await(Promise)の並列と直列を関数型プログラミングで書く

Last updated at Posted at 2019-10-01

前提とゴール

  • よくあるasync/awaitのループの書き方がわかるようになる
    • arrayのmapでasync使いたいが、直列で書きたい
      • 普通にmapでasync使うと、前回の結果を待たないので、実は並列になります
      • for文やmapのかわりに reduce 使います
    • 事前に回数のわからないasyncを、直列で書きたい
      • おもにrestでページングがあり、何回で終わるのかrestしないとわからない時とか
      • while文のかわりに 再帰 します
  • 並列と直列を組み合わせたい
    • 基本は直列になっていくとおもいますが、並列でやりたいところだけ並列にします
    • Promise.allで並列の待ちあわせをし、flatでならします
  • 関数型プログラミングについて
    • あまり厳密な雰囲気ではないのでご容赦ください
      • オブジェクトのシャローコピー、console、副作用じゃんみたいなのとか
    • 本当にやりたいことは、for文やwhile文で必要になってしまう以下をスマートにすること
      • mutableな変数を使わない
      • 再代入しない

パターンごとのサンプルコード

Promise.allでasync/awaitを並列実行

どういう時

  • 同時に並列で実行できる

コード

console.log("Promise.allでasync/awaitを並列実行");
const p1: Promise<number> = new Promise(resolve => {
  setTimeout(() => {
    resolve(1); // 本来はRESTする処理など想定
  }, 1000); // 1秒待つ
});
const p2: Promise<number> = new Promise(resolve => {
  setTimeout(() => {
    resolve(2); // 本来はRESTする処理など想定
  }, 100); // 0.1秒待つ
});

// 処理の終わりを待ってないので、直接は使えない
// console.log(p1 + p2); // 3にはならないし、TSではコンパイルエラー

(async () => {
  // p1とp2を並列で実行し、終わるタイミングを待ち合わせる
  const result: number[] = await Promise.all([p1, p2]); // await Promise.allで全て処理終わるのを待つ
  console.log(result); // [1, 2]
})();

reduceでasync/awaitを直列実行

どういう時

  • 実行回数が決まっている
  • 同時に並列で実行ができない

コード

for文で直列実行(reduce前説明)
(async () => {
  const keys: number[] = [1, 2]; // 前処理で、ループしたい値と回数が決まる

  const keyResults: number[] = []; // constなのに怪しい予感

  // いつものfor文
  for (const key of keys) {
    const p: Promise<number> = new Promise(resolve =>
      setTimeout(() => resolve(key * 2), 100), // 本来はREST
    );

    keyResults.push(await p); // keyResultsに副作用がある!
  }
  console.log(keyResults); // [2, 4]
  // 直列で実行できたが副作用が混ざっている...
})();
mapで並列実行(reduce前説明)
(async () => {
  const keys: number[] = [1, 2]; // 前処理で、ループしたい値と回数が決まる

  // map
  const keyResults: Promise<number>[] = keys.map(async key => { // ラムダ式にasyncをつけないとawaitできてない
    return await new Promise(resolve =>
      setTimeout(() => resolve(key * 2), 100),  // 本来はREST
    ) as unknown as Promise<number>;
  });

  console.log(keyResults); // Promiseのarrayになっている
  console.log(await Promise.all(keyResults)); // [2, 4]
  // できたように見えるが、mapでは順番に待つような処理をしないので、並列になってしまった
})();
reduceでasync/awaitを直列実行
(async () => {
  const keys: number[] = [1, 2]; // 前処理で、ループしたい値と回数が決まる

  // reduceはひとつひとつ実行する処理
  const keyResults: Promise<number[]> = keys.reduce(async (previousValue, currentValue) => {
    const pv: number[] = await previousValue; // 一つ前の値をawaitすることで処理を待てる
    const currentResult: number = await new Promise(resolve =>  // 現在の処理待ち
      setTimeout(() => resolve(currentValue * 2), 100), // 本来はREST
    );
    return pv.concat([currentResult]); // 前回の処理に、現在処理した値をつなげていく
  }, Promise.resolve([])); // 1件目を処理する時の初期値

  console.log(await keyResults); // [2, 4]
  // 直列で実行できたし、変数に副作用もない
})();

再帰でasync/awaitを直列実行

どういう時

  • 実行回数が決まっていない
  • 同時に並列で実行ができない

コード

普通の再帰
const result = recursive(2);
console.log(result);

function recursive(num: number, limit: number = 10000): number {
  // 2乗
  const square = num * num;
  // 上限を超えているか
  return square > limit
    // 超えたら終わり
    ? square
    // 超えてなければもう一度、自分自身を実行
    : recursive(square, limit);
}
再帰でasync/awaitを直列実行
(async () => {
  const result: number = await recursivePromise(2);

  console.log(result);
})();

async function recursivePromise(num: number, limit: number = 10000): Promise<number> {
  // 都度awaitしている
  const square: number = await new Promise(resolve =>
    setTimeout(() => resolve(num * num), 100),  // 本来はREST
  );
  return square > limit
    ? square
    // awaitしてからもう一度実行なので直列
    : await recursivePromise(square, limit);
}

async/awaitで並列と直列を組み合わせる

どういう時

  • 同時にできない直列処理と、同時にできる並列処理が混ざっている

コード

async/awaitで並列と直列を組み合わせる
(async () => {
  // [1, 2]を直列で実行したいが、[1, 2]の終了を待たずに、[3, 4]と[5]を実行したい
  const keys: number[][] = [[1, 2], [3, 4], [5]];

  const keyPromises: Promise<number[]>[] = keys.map(key => { // mapで並列に
    return key.reduce(async (previousValue, currentValue) => { // reduceで直列に
      const pv: number[] = await previousValue;
      const currentResult: number = await new Promise(resolve =>
        setTimeout(() => resolve(currentValue * 2), 100), // 本来はREST
      );
      return pv.concat([currentResult]);
    }, Promise.resolve([])) as Promise<number[]>;
  });

  console.log(keyPromises); // [Promise.resolve([2, 4]), Promise.resolve([6, 8]), Promise.resolve([10])] の状態

  // await Promise.allで並列の待ち合わせ
  console.log(await Promise.all(keyPromises)); // [[2, 4], [6, 8], [10]]
})();

具体的にやってみる

コードを書く上で、いい感じの要件なにかないかなーと思いましたが、
GitHubの単一のOrganization内にあるPRの一覧を出すことにします
自分が関係ないPR含めて何が進行しているか見たい、的な感じです

  • GitHubのOrganizationのリポジトリ一覧を取得(再帰)
  • リポジトリの開いているPRを取得(直列のみ)
  • リポジトリの開いているPRを取得(並列+直列)
  • ちょっと見やすくして表示

GitHubのOrganizationのリポジトリ一覧を取得(再帰)

id:passwordはbase64にして環境変数から取得
repository.getOrgRepos でOrganizationのリポジトリ一覧をとっていますが、
APIの仕様として、デフォルトは30件までしか取れません!
GitHubRepository#getRecursive にて全件取りきる 再帰処理 を書いています

(async () => {
  // encode
  // new Buffer('a').toString('base64') // => YQ==
  // decode
  // new Buffer("YQ==",'base64').toString() // => a
  const auth: string = process.env["GITHUB_AUTH"] || ""; // "userId:password" のbase64
  const owner: string = process.env["GITHUB_OWNER"] || "";
  const repository = new GitHubRepository(auth);
  console.log({auth: auth ? "あり" : "", owner});

  // 再帰で全件とる
  const rRepos: GitHubRepo[] = await repository.getOrgRepos(owner).catch(() => []);
  const repoNames: string[] = rRepos.map(r => r.name);
  console.log({repoNames});

  // ...
})();
GitHubRepo
interface GitHubRepo {
  name: string,
}
GitHubRepository
import fetch, {BodyInit, RequestInit} from "node-fetch";
import * as querystring from "querystring";

class GitHubRepository {
  constructor(
    readonly auth: string,
  ) {
  }

  createHeaders(usePostParam: boolean): object {
    return {
      "Authorization": `Basic ${this.auth}`,
      "Content-Type": usePostParam ? "application/json" : undefined,
    };
  }

  async getRecursive<E>(
    baseUrl: string,
    baseParam: object = {},
    page: number = 1, // 現在のページ
    per_page: number = 100, // 取得件数(デフォルト30, 最大100)
    beforeResult: E[] = [],
  ): Promise<E[]> {
    const url: string = baseUrl + "?" + querystring.stringify({
      ...baseParam,
      page,
      per_page,
    });

    const currentResult: E[] = await fetch(url, {
      method: "GET",
      headers: this.createHeaders(false),
    } as RequestInit).then(async r => {
      if (r.status !== 200) {
        console.error({url: r.url, status: r.status, text: await r.text()});
        return [];
      }
      console.debug({url: r.url, status: r.status});
      return r.json();
    });
    const totalResult = beforeResult.concat(currentResult);
    return currentResult.length === per_page
      // 最大件数まで取得したので、もう一度
      ? this.getRecursive(baseUrl, baseParam, page + 1, per_page, totalResult)
      // 最大件数と取得件数が合わなければ終わり
      : totalResult;
  }

  async getOrgRepos(org: string): Promise<GitHubRepo[]> {
    return this.getRecursive(`https://api.github.com/orgs/${org}/repos`);
  }

  async getOwnerRepoPulls(owner: string, repo: string): Promise<GitHubPull[]> {
    return this.getRecursive(`https://api.github.com/repos/${owner}/${repo}/pulls`);
  }
}

(余談)再帰はスタックオーバーフローが起きうることを認識する

TS(JS)で再帰は、回数がかなり多い処理に使ってはいけません
普段スタックオーバーフローする件数を扱うことはほとんど無いはずなので、気にすることはないと思いますが、スタックオーバーフローが起きうることは覚えてはおきましょう

末尾再帰最適化という挙動に言語やエンジンが対応していれば、問題はないのですが、
ES6の仕様に末尾再帰最適化がありつつも、ほとんど対応されていないようです
全体的な状況はきちんと調べなおしてはいませんが、少なくともTS3.6.3ではコンパイル後のjsでも再帰のままでした...

詳細はほかの記事を読んだほうがいいです
末尾再帰による最適化
末尾再帰最適化について

この部分
    return currentResult.length === per_page
      // 最大件数まで取得したので、もう一度
      ? this.getRecursive(baseUrl, baseParam, page + 1, per_page, totalResult)
      // 最大件数と取得件数が合わなければ終わり
      : totalResult;

リポジトリの開いているPRを取得(直列のみ)

件数の決まっているループについては、for文やmapではなく reduce を使います(再帰でもいいですが)

GitHubPull
interface GitHubPull {
  url: string,
  title: string,
  user: {login: string},
  assignee: {login: string},
  head: {label: string},
  base: {label: string},
}
(async () => {
  // ...
  const repoNames: string[] = rRepos.map(r => r.name);
  console.log({repoNames});

  // ふつうの直列
  const rPulls: GitHubPull[] = await mapSync(repoNames, cv =>
    repository.getOwnerRepoPulls(owner, cv).catch(() => []),
  );

  // ...
})();
mapSync
function mapSync<E, T>(base: T[], f: (currentValue: T) => Promise<E[]>): Promise<E[]> {
  return base.reduce(async (previousValue: Promise<E[]>, currentValue: T) =>
      // 前のasyncをawaitしてから、次のasyncをawaitすることで直列になる
      (await previousValue).concat(await f(currentValue))
    , Promise.resolve([]));
}

リポジトリの開いているPRを取得(並列+直列)

直列だけでリクエストしているとやっぱり遅いので、ちょっと並列も混ぜたいと思います
Promise.allで並列の待ちあわせをし、flatでならします
TSではflatだけlib指定しないと、デフォルトでは使えないので、設定に注意してください

tsconfig.json
{
  "compilerOptions": {
    "module": "commonjs",
    "target": "es5",
    "lib": [
      "esnext"
    ],
    ...
  }
}
(async () => {
  // ...
  const repoNames: string[] = rRepos.map(r => r.name);
  console.log({repoNames});

  // 並列+直列にしたい
  const chunkedRepoNames: string[][] = chunk(repoNames, 3);
  console.log({chunkedRepoNames});

  // Promise[]つくって、Promise.allでまちあわせ
  const readyPromise: Promise<GitHubPull[]>[] = chunkedRepoNames.map(async repoNames => mapSync(repoNames, cv =>
    repository.getOwnerRepoPulls(owner, cv).catch(() => []),
  ));
  const rPulls: GitHubPull[] = (await Promise.all(readyPromise)).flat();

  // ...
})();
chunk
function chunk<E>(v: E[], parallel: number): E[][] {
  const chunked = v.reduce((pv, cv, ci) => {
    const chunkKey = ci % parallel;
    return {
      ...pv,
      [chunkKey]: (pv[chunkKey] || []).concat([cv]),
    };
  }, {});
  return Object.keys(chunked).map(key => chunked[key]);
}

ちょっと見やすくして表示

特別なことは何もないですが、見やすいように整形して表示して終わりです

うごくコードはこちらに
https://github.com/yakisuzu/sandbox-typescript-fp

ShowPull
interface ShowPull {
  url: string,
  title: string,
  user: string,
  branch: string,
}
(async () => {
  // ...
  const rPulls: GitHubPull[] = (await Promise.all(readyPromise)).flat();

  const pulls: ShowPull[] = rPulls.map(p => ({
    url: p.url,
    title: p.title,
    user: p.user.login,
    branch: `${p.head.label} to ${p.base.label}`,
  }));

  console.log({pulls});
})();
5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?