2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

TypeScript での createWithFunc, withMany, withManyParallel による非同期並列リソース管理

Last updated at Posted at 2019-09-21

createWithFunc

リソースを開く関数 function openSomeResource(openOpt: OpenOption): Promise<SomeResource>
リソースを閉じる関数 function closeSomeResource(closeOpt: CloseOption): Promise<void> から
スコープ付きでリソースを管理する関数 function withSomeResource<T>(openOpt: OpenOption, withBlock: (r: SomeResource)=> Promise<T>): Promise<T> を合成する関数。

使い方

function openSomeResource(openOpt: OpenOption): Promise<SomeResource>;
function closeSomeResource(closeOpt: CloseOption): Promise<void>;

export const withSomeResource = createWithFunc(
  openSomething,
  closeSomething,
  (r: SomeResource, openOpt: OpenOption): CloseOption => ({...})
);

のようにして使う。
似たようなものとして haskell には bracket がある
(継続モナドによるリソース管理#bracket関数 に説明がある)。
しかし、ファイルを開いてしまわずに with 関数を合成する点、
close のために必要なオプションを生成する (r: SomeResource, openOpt: OpenOption)=> CloseOption が必要な点
eが違う。

実装

export function createWithFunc<
  T,
  U,
  V,
  W,
>(
  create: (o: T) => Promise<U>,
  delete_: (o: V) => Promise<W>,
  compose_delete_arg: (
    o: T,
    p: U
  ) => V
): <RET>(
  o: T,
  callback: (o: U) => Promise<RET>
) => Promise<RET> {
  // callback と delete_ の両方のエラーを返しうる
  return async (o, callback) => {
    const resource = (await create(o));
    const delopt = compose_delete_arg(o, resource);
    try {
      const ret = await callback(resource);
      return ret;
    } catch (err) {
      throw err;
    } finally {
      await delete_(delopt);
    }
  };
}

withMany

haskell の素朴な withMany

使い方

await withMany(withSomeResource, [o1,o2,o3], async ([r1,r2,r3])=>{
  ...
});

await withSomeResource(o1, (r1)=>{
  return withSomeResource(o2, (r2)=>{
    return withSomeResource(o3, (r3)=>{
      return (([r1,r2,r3])=>{
        ...
      }))([r1,r2,r3);
    });
  });
});

のように展開されるため、非同期並列でリソースを取得することができず、時間がかかる。

実装

export function withMany<
  A,
  B,
  RES,
  WITH_FUNC extends (x: A, g: (y: B) => RES) => RES
>(withFoo: WITH_FUNC, xs: Array<A>, f: (ys: Array<B>) => RES): RES {
  if (xs.length === 0) {
    return f([]);
  }
  const [head, ...rest] = xs;
  return withFoo(head, (y) =>
    withMany<A, B, RES, WITH_FUNC>(withFoo, rest, (ys) => f([y].concat(ys)))
  );
}

withManyParallel

withMany の非同期並列版。
Promise を使った疑似的な継続 call/cc を使うことで実現する。

実装

export async function withManyParallel<
  A,
  B,
  RES,
  WITH_FUNC extends (x: A, f: (y: B) => Promise<RES>) => Promise<RES>
>(
  xs: Array<A>,
  withFoo: WITH_FUNC,
  f: (
    xs: Array<Parameters<WITH_FUNC>[0]>,
    ys: Array<Parameters<Parameters<WITH_FUNC>[1]>[0]>
  ) => Promise<RES>
): Promise<RES> {
  const ys = new Map<number, Parameters<Parameters<WITH_FUNC>[1]>[0]>();
  let result: RES;
  let ok: Function;
  const allok = new Promise((resolve) => {
    ok = resolve;
  });
  const prm = withParallel<A, B, RES, WITH_FUNC>(
    xs,
    withFoo,
    async (i, x, y) => {
      ys.set(i, y);
      if (ys.size === xs.length) {
        const _ys = xs.map((x, i) => ys.get(i) as B);
        result = await f(xs, _ys);
        ok(); // ok は results を得てから
      }
      // ok を呼ぶと他のタスクが↓の await を通過する
      await new Promise((resolve, reject) => {
        prm.catch(reject);
        allok.then(resolve);
      });
      return result;
    }
  );
  await prm;
  return result!;
}
import lodash = require("lodash");
// withManyParallel の補助関数
export async function withParallel<
  A,
  B,
  RES,
  WITH_FUNC extends (x: A, f: (y: B) => Promise<RES>) => Promise<RES>
>(
  xs: Array<A>,
  withFoo: WITH_FUNC,
  f: (index: number, x: A, y: B) => Promise<RES>
): Promise<Array<RES>> {
  const parallelism = 5;
  const chunks = lodash.chunk(
    xs.map<[number, A]>((x, index) => [index, x]),
    parallelism
  );
  return lodash.flatten(
    await Promise.all(
      chunks.map((xs: [number, A][]) =>
        Promise.all(
          xs.map(async ([index, x]) => withFoo(x, (y) => f(index, x, y)))
        )
      )
    )
  );
}
2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?