2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Dartで並列バッチ

Last updated at Posted at 2022-08-29

はじめに

Dart 2はブラウザあらためクライアント(全般)に最適化された言語との触れ込みであり、サーバアプリに関する発信はほとんど見かけませんが、Dartで書いたバッチ処理をマルチIsolateで並列化する機会がありましたので、その骨子を紹介します。

バッチの概要

  • 各行にコマンドが記載された入力ファイルを順次読み込む
  • 各コマンドを処理して出力ファイルに順次書き込む
  • 各コマンドの処理は互いに独立している
  • 各コマンドの処理はCPUインテンシブ
  • 出力は入力ファイルのコマンドの順を保つ

並列化の基本方針

  • 下記の複数のサーバIsolateを用いて並列化
    • コマンドをメッセージとして受信する
    • 処理した結果をメッセージとして送信する
    • バッチ終了まで常駐してこの繰り返し
  • メインIsolateの下記の非同期処理(ここではファイバと呼ぶ)が各サーバIsolateを担当
    • 全ファイバ共有のコマンドキューをフェッチ
    • コマンドを担当サーバに送信
    • 結果を担当サーバから受信
    • 結果を整流して出力
    • バッチ終了まで常駐してこの繰り返し

コード

クライアントのメイン処理

import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'dart:math';

import 'package:async/async.dart';

const loadParam = 10000000; // 処理すべきコマンドの平均的負荷CPUインテンシブ)
const dataSize = 100; // 処理すべきコマンドの数
final serverCount = Platform.numberOfProcessors; // CPUコア数ぐらいが適当

void main(List<String> arguments) async {
  final commands = StreamQueue(Stream.fromIterable(
      Iterable<String>.generate(dataSize, (i) => '[$i]'))); // ファイルストリームのつもり
  final servers = <Server>[];

  // サーバIsolate起動
  for (var id = 0; id < serverCount; id++) {
    var server = Server();
    await server.spawn(id);
    servers.add(server);
  }
  await Dispatcher(servers, commands).dispatch(); // サーバをコマンドにディスパッチ

  for (var id = 0; id < serverCount; id++) {
    servers[id].stop();
  }
}

コマンド一覧の入ったファイルストリームを模したストリームからStreamQueueを作成しています。
次に、並列度分だけサーバIsolateを起動しています。並列度はCPUコア数程度が適当なので、それをPlatformから取得しています。
上記のコマンドストリームとサーバ一覧をディスパッチャに渡し、ディスパッチを要求しています。
なお、処理終了を待ち合わせるためにdispatch()awaitしています(任意)。

訂正

なお、main Isolateにおけるserver IsolateからのReceivePortを閉じないと、プログラムが終了しません。

サーバの処理

class Server {
  final ReceivePort crp; // クライアント <= サーバの受信ポート
  late final StreamIterator cri; // 上記のストリームイテレーター
  late final SendPort csp; // クライアント => サーバの送信ポート
  late final Isolate isolate; // 使わないけど

  Server() : crp = ReceivePort() {
    cri = StreamIterator(crp); // Streamはfirstすると閉じちゃうのでイテレータを作成
  }

  Future<void> spawn(int id) async {
    isolate =
        await Isolate.spawn<List<dynamic>>(main, <dynamic>[id, crp.sendPort]);
    await cri.moveNext();
    csp = await cri.current as SendPort;
  }

  void stop() {
    csp.send(null);
    crp.close();
  }

  static Future<void> main(List<dynamic> message) async {
    final id = message[0] as int;
    final ssp = message[1] as SendPort; // サーバ => クライアントの送信ポート
    final srp = ReceivePort(); // サーバ <= クライアントの受信ポート
    ssp.send(srp.sendPort); // クライアント => サーバの送信ポートをクライアントに受渡し
    final rand = Random();
    await for (var command in srp /* コマンド受信 */) {
      if (command == null) {
        break; // サーバ終了コマンドの場合
      }
      var c = rand.nextInt(loadParam) + loadParam ~/ 2 + 1;
      var r = somethingHeavy(c); // 何やら重い処理 (0.5〜1.5 loadParam 回ループする)
      ssp.send('Server($id) recived "$command", Σ(1, $c) = $r'); //結果返信
    }
  }

  static int somethingHeavy(int c) {
    int r = 0;
    for (var i = 1; i < c; i++) {
      r += i;
    }
    return r;
  }
}

mainより前はクライアントにおけるサーバ関連情報、いわばサーバハンドラです。その本質はクライアントにおけるサーバとのメッセージ送受信ポートです。サーバへの送信ポートを取り出すためにサーバからの受信ポート(シングルサブスクリプション型のStream)をfirstすると、これが閉じてしまうので、StreamIteratorであるcri作成しています。

なお、IsolateをspawnするとシステムからからそのハンドラであるIsolateオブジェクトを受け取りますが、ここでは用途がありませんでした。

Serverクラスの静的なmainがサーバIsolateで動く処理のエントリポイントです。
mainの中ではクラアントとの通信ポートを初期化した後に、クライアントからの受信ポートをawait forで監視します。先述のfirstと異なりawait forは受信ポートから複数のメッセージを受信できます。

メッセージ(コマンド)を受信したら何やら重い処理を行い、結果をクライアントに送信します。

サーバの処理は単純で、以上です。

クライアントにおけるディスパッチ処理

class Dispatcher {
  final List<Server> servers;
  final StreamQueue<String> commands;
  final results = <int, String>{};
  var maxResultsLength = 0;
  var ixS = 0; // 次に送信するコマンドのインデックス
  var ixO = 0; // 次に結果を出力するコマンドのインデックス

  Dispatcher(this.servers, this.commands);

  // コンストラクタではawaitできないので外出し
  Future<void> dispatch() async {
    var futures = <Future<void>>[];
    for (var id = 0; id < servers.length; id++) {
      futures.add(sendReceive(servers[id])); // サーバと送受信をする非同期処理を呼び出すだけ
    }
    await Future.wait<void>(futures); // 終了を待ち合わせる場合はここでawait
    print(maxResultsLength);
  }

  Future<void> sendReceive(Server server) async {
    // コマンドがなくなるまでサーバーと送受信
    while (await commands.hasNext) {
      var ix = ixS; // 結果受信のawait以降はixSは他非同期処理に更新済みかも
      ixS++; // 結果受信のawait前に更新しておく
      var command = await commands.next; // これ以降ixSは変わっているかも
      server.csp.send(command); // コマンド送信
      await server.cri.moveNext();
      var result = await server.cri.current as String; // 結果受信
      results[ix] = result; // 結果はコマンドの順にリストに格納
      maxResultsLength = results.length > maxResultsLength ? results.length : maxResultsLength;
      printResultsInOrder();
    }
    server.csp.send(null); // サーバ終了コマンド送信
  }

  // バラバラと受信する結果を並べ替え
  void printResultsInOrder() {
    for (; ixO < ixS; ixO++) /* await禁止 */ {
      var result = results[ixO];
      if (result == null) {
        return; // 歯抜けなら次受信まで出力中断
      }
      print('$ixO $result');
      results.remove(ixO); // 出力した結果はGC対象に
    }
  }
}

ここが並列化の肝です。
プロパティとしてディスパッチ対象であるサーバ群とコマンドキューを持ちます。

この他に、結果出力を整流するために結果リスト(実際にはマップ)があります。メモリ節約のために出力した結果はその場で破棄します。当初はマップより添字参照が高速と思いリストとしましたが、単純化のためにマップとしました。平均107 ms、最大658 ms、標準偏差63 ms、約4万件のデータで最大長は40程度までしか伸びませんでしたし、速度も遜色なかったのでマップで十分です。これに関連するものとして、ixSは次にサーバに送信するコマンドのインデックス、ixOは次に結果出力するコマンドのインデックスです。これらは本質的にサーバIsolate対応の全ファイバから共有されるので、参照・更新には注意が必要です(後述)。

dispatch()は基本的に各サーバIsolate対応のファイバを起動するだけです。ファイバの処理を並列化する必要が有るので、ファイバの終了を待ち合わる場合も、awaitforループの外で行う必要があります。

各ファイバの行う処理がsendReceive()の中のループです。全ファイバ共有のコマンドキューが無くなるまでコマンドをフェッチしてはサーバに送信し、結果を受信・出力します。
commands.hasNexttrueの場合は次のcommands.nextの成功が保証されますが、その間に他ファイバに横取りされては元も子もないのでcommands.hasNextからcommands.nextの間はawait禁止です。また、await以降はixSの値として他ファイバにより更新済みのものを参照してしまうかもしれないので、出力整流のためにローカル変数ixに保存しておきます。
サーバから帰ってきた結果はとりあえず先述の結果リストの該当インデックスresults[ix]に格納しておきます。

結果を整流して出力するのがprintResultsInOrder()です。
今結果を受け取ったコマンドのインデクスとは関係なく、次回結果出力するコマンドのインデクスixOから、次回サーバに送信するコマンドのインデックスixOの一つ前までループして結果を出力します。ただし、途中で結果未受信のものがあったら出力処理を中断し、以降の出力は次の結果受信時に委ねます。その際どのファイバが担当するかは不明です。
また、結果を出力したらその場でその結果リストエントリを削除し、格納されていた結果オブジェクトを結果リストから切り離しGC対象とします。
この結果出力処理も全ファイバ共通のixOixSを読み書きするのでawait時は要注意です。

訂正

クライアントにおけるサーバからの受信ポートからブロードバンド型のストリームを作成し複数回firstしていましたが、APIを確認したところ2回目以降のfirstではサーバからのメッセージを聞き逃す可能性があることがわかりました。対策として、StreamIteratorを作成してこれ経由でメッセージを受信するよう変更しました。上記はそれに合わせて修正済みです。なお、コマンドキューのようにStreamQueueでも良かったのですがオーバースペックなので採用していません。

補足

Isolateとか使うのでDartPadでは動きません。
Dart SDK version: 2.18.0-271.4.beta (beta) (Unknown timestamp) on "linux_x64"で正規表現を利用するサーバ処理を並列化したところ、JIT版でVMがセグメンテーションフォールトを起こしました(AOT版は動きます)。FFIとか関係する?5月ぐらいから有る長寿命バグのようだけど、2.18がなかなかリリースされない原因だったりする?
Isolateによる並列化では、ライブラリトップレベル変数はいかにも邪道ですね。Javaよろしくすべてクラスに閉じ込めました(しかも当然インスタンスメンバ)。おかげさまで良いリファクタリングができました。

全体ソースコード

以上ですが、最後に全ソースを載せておきます。

import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'dart:math';

import 'package:async/async.dart';

const loadParam = 10000000; // 処理すべきコマンドの平均的負荷CPUインテンシブ)
const dataSize = 100; // 処理すべきコマンドの数
final serverCount = Platform.numberOfProcessors; // CPUコア数ぐらいが適当

void main(List<String> arguments) async {
  final commands = StreamQueue(Stream.fromIterable(
      Iterable<String>.generate(dataSize, (i) => '[$i]'))); // ファイルストリームのつもり
  final servers = <Server>[];

  // サーバIsolate起動
  for (var id = 0; id < serverCount; id++) {
    var server = Server();
    await server.spawn(id);
    servers.add(server);
  }
  await Dispatcher(servers, commands).dispatch(); // サーバをコマンドにディスパッチ

  for (var id = 0; id < serverCount; id++) {
    servers[id].stop();
  }
}

class Server {
  final ReceivePort crp; // クライアント <= サーバの受信ポート
  late final StreamIterator cri; // 上記のストリームイテレーター
  late final SendPort csp; // クライアント => サーバの送信ポート
  late final Isolate isolate; // 使わないけど

  Server() : crp = ReceivePort() {
    cri = StreamIterator(crp); // Streamはfirstすると閉じちゃうのでイテレータを作成
  }

  Future<void> spawn(int id) async {
    isolate =
        await Isolate.spawn<List<dynamic>>(main, <dynamic>[id, crp.sendPort]);
    await cri.moveNext();
    csp = await cri.current as SendPort;
  }

  void stop() {
    csp.send(null);
    crp.close();
  }

  static Future<void> main(List<dynamic> message) async {
    final id = message[0] as int;
    final ssp = message[1] as SendPort; // サーバ => クライアントの送信ポート
    final srp = ReceivePort(); // サーバ <= クライアントの受信ポート
    ssp.send(srp.sendPort); // クライアント => サーバの送信ポートをクライアントに受渡し
    final rand = Random();
    await for (var command in srp /* コマンド受信 */) {
      if (command == null) {
        break; // サーバ終了コマンドの場合
      }
      var c = rand.nextInt(loadParam) + loadParam ~/ 2 + 1;
      var r = somethingHeavy(c); // 何やら重い処理 (0.5〜1.5 loadParam 回ループする)
      ssp.send('Server($id) recived "$command", Σ(1, $c) = $r'); //結果返信
    }
  }

  static int somethingHeavy(int c) {
    int r = 0;
    for (var i = 1; i < c; i++) {
      r += i;
    }
    return r;
  }
}

class Dispatcher {
  final List<Server> servers;
  final StreamQueue<String> commands;
  final results = <int, String>{};
  var maxResultsLength = 0;
  var ixS = 0; // 次に送信するコマンドのインデックス
  var ixO = 0; // 次に結果を出力するコマンドのインデックス

  Dispatcher(this.servers, this.commands);

  // コンストラクタではawaitできないので外出し
  Future<void> dispatch() async {
    var futures = <Future<void>>[];
    for (var id = 0; id < servers.length; id++) {
      futures.add(sendReceive(servers[id])); // サーバと送受信をする非同期処理を呼び出すだけ
    }
    await Future.wait<void>(futures); // 終了を待ち合わせる場合はここでawait
    print(maxResultsLength);
  }

  Future<void> sendReceive(Server server) async {
    // コマンドがなくなるまでサーバーと送受信
    while (await commands.hasNext) {
      var ix = ixS; // 結果受信のawait以降はixSは他非同期処理に更新済みかも
      ixS++; // 結果受信のawait前に更新しておく
      var command = await commands.next; // これ以降ixSは変わっているかも
      server.csp.send(command); // コマンド送信
      await server.cri.moveNext();
      var result = await server.cri.current as String; // 結果受信
      results[ix] = result; // 結果はコマンドの順にリストに格納
      maxResultsLength = results.length > maxResultsLength ? results.length : maxResultsLength;
      printResultsInOrder();
    }
    server.csp.send(null); // サーバ終了コマンド送信
  }

  // バラバラと受信する結果を並べ替え
  void printResultsInOrder() {
    for (; ixO < ixS; ixO++) /* await禁止 */ {
      var result = results[ixO];
      if (result == null) {
        return; // 歯抜けなら次受信まで出力中断
      }
      print('$ixO $result');
      results.remove(ixO); // 出力した結果はGC対象に
    }
  }
}

/*
class _ResultBuffer {
  final buffer = <String?>[];
  int bufferLength = 0;
  int maxBufferLength = 0;
  int listLength = 0;
  int firstIx = 0;
  static const String deleted = "";

  void expandTo(int ix) {
    assert(ix == listLength);
    buffer.add(null);
    bufferLength++;
    maxBufferLength =
        bufferLength > maxBufferLength ? bufferLength : maxBufferLength;
    listLength++;
    return;
  }

  void deleteAt(int ix) {
    assert(ix >= firstIx && ix < listLength);
    buffer[ix - firstIx] = deleted;
    if (ix == firstIx) {
      int i;
      for (i = 0; i < bufferLength; i++) {
        if (buffer[i] != deleted) {
          break;
        }
      }
      buffer.removeRange(0, i);
      bufferLength -= i;
      firstIx += i;
      return;
    }
  }

  String? operator [](int ix) {
    assert(ix >= firstIx && ix < listLength);
    return buffer[ix - firstIx];
  }

  operator []=(int ix, String v) {
    assert(ix >= firstIx && ix < listLength);
    buffer[ix - firstIx] = v;
  }
}
2
0
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?