はじめに
Dart 2はブラウザあらためクライアント(全般)に最適化された言語との触れ込みであり、サーバアプリに関する発信はほとんど見かけませんが、Dartで書いたバッチ処理をマルチIsolateで並列化する機会がありましたので、その骨子を紹介します。
バッチの概要
- 各行にコマンドが記載された入力ファイルを順次読み込む
- 各コマンドを処理して出力ファイルに順次書き込む
- 各コマンドの処理は互いに独立している
- 各コマンドの処理はCPUインテンシブ
- 出力は入力ファイルのコマンドの順を保つ
並列化の基本方針
- 下記の複数のサーバIsolateを用いて並列化
- コマンドをメッセージとして受信する
- 処理した結果をメッセージとして送信する
- バッチ終了まで常駐してこの繰り返し
- メインIsolateの下記の非同期処理(ここではファイバと呼ぶ)が各サーバIsolateを担当
- 全ファイバ共有のコマンドキューをフェッチ
- コマンドを担当サーバに送信
- 結果を担当サーバから受信
- 結果を整流して出力
- バッチ終了まで常駐してこの繰り返し
コード
クライアントのメイン処理
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'dart:math';
import 'package:async/async.dart';
const loadParam = 10000000; // 処理すべきコマンドの平均的負荷CPUインテンシブ)
const dataSize = 100; // 処理すべきコマンドの数
final serverCount = Platform.numberOfProcessors; // CPUコア数ぐらいが適当
void main(List<String> arguments) async {
final commands = StreamQueue(Stream.fromIterable(
Iterable<String>.generate(dataSize, (i) => '[$i]'))); // ファイルストリームのつもり
final servers = <Server>[];
// サーバIsolate起動
for (var id = 0; id < serverCount; id++) {
var server = Server();
await server.spawn(id);
servers.add(server);
}
await Dispatcher(servers, commands).dispatch(); // サーバをコマンドにディスパッチ
for (var id = 0; id < serverCount; id++) {
servers[id].stop();
}
}
コマンド一覧の入ったファイルストリームを模したストリームからStreamQueue
を作成しています。
次に、並列度分だけサーバIsolateを起動しています。並列度はCPUコア数程度が適当なので、それをPlatform
から取得しています。
上記のコマンドストリームとサーバ一覧をディスパッチャに渡し、ディスパッチを要求しています。
なお、処理終了を待ち合わせるためにdispatch()
をawait
しています(任意)。
訂正
なお、main Isolateにおけるserver IsolateからのReceivePortを閉じないと、プログラムが終了しません。
サーバの処理
class Server {
final ReceivePort crp; // クライアント <= サーバの受信ポート
late final StreamIterator cri; // 上記のストリームイテレーター
late final SendPort csp; // クライアント => サーバの送信ポート
late final Isolate isolate; // 使わないけど
Server() : crp = ReceivePort() {
cri = StreamIterator(crp); // Streamはfirstすると閉じちゃうのでイテレータを作成
}
Future<void> spawn(int id) async {
isolate =
await Isolate.spawn<List<dynamic>>(main, <dynamic>[id, crp.sendPort]);
await cri.moveNext();
csp = await cri.current as SendPort;
}
void stop() {
csp.send(null);
crp.close();
}
static Future<void> main(List<dynamic> message) async {
final id = message[0] as int;
final ssp = message[1] as SendPort; // サーバ => クライアントの送信ポート
final srp = ReceivePort(); // サーバ <= クライアントの受信ポート
ssp.send(srp.sendPort); // クライアント => サーバの送信ポートをクライアントに受渡し
final rand = Random();
await for (var command in srp /* コマンド受信 */) {
if (command == null) {
break; // サーバ終了コマンドの場合
}
var c = rand.nextInt(loadParam) + loadParam ~/ 2 + 1;
var r = somethingHeavy(c); // 何やら重い処理 (0.5〜1.5 loadParam 回ループする)
ssp.send('Server($id) recived "$command", Σ(1, $c) = $r'); //結果返信
}
}
static int somethingHeavy(int c) {
int r = 0;
for (var i = 1; i < c; i++) {
r += i;
}
return r;
}
}
main
より前はクライアントにおけるサーバ関連情報、いわばサーバハンドラです。その本質はクライアントにおけるサーバとのメッセージ送受信ポートです。サーバへの送信ポートを取り出すためにサーバからの受信ポート(シングルサブスクリプション型のStream
)をfirst
すると、これが閉じてしまうので、StreamIterator
であるcri
作成しています。
なお、Isolateをspawn
するとシステムからからそのハンドラであるIsolate
オブジェクトを受け取りますが、ここでは用途がありませんでした。
Server
クラスの静的なmain
がサーバIsolateで動く処理のエントリポイントです。
main
の中ではクラアントとの通信ポートを初期化した後に、クライアントからの受信ポートをawait for
で監視します。先述のfirst
と異なりawait for
は受信ポートから複数のメッセージを受信できます。
メッセージ(コマンド)を受信したら何やら重い処理を行い、結果をクライアントに送信します。
サーバの処理は単純で、以上です。
クライアントにおけるディスパッチ処理
class Dispatcher {
final List<Server> servers;
final StreamQueue<String> commands;
final results = <int, String>{};
var maxResultsLength = 0;
var ixS = 0; // 次に送信するコマンドのインデックス
var ixO = 0; // 次に結果を出力するコマンドのインデックス
Dispatcher(this.servers, this.commands);
// コンストラクタではawaitできないので外出し
Future<void> dispatch() async {
var futures = <Future<void>>[];
for (var id = 0; id < servers.length; id++) {
futures.add(sendReceive(servers[id])); // サーバと送受信をする非同期処理を呼び出すだけ
}
await Future.wait<void>(futures); // 終了を待ち合わせる場合はここでawait
print(maxResultsLength);
}
Future<void> sendReceive(Server server) async {
// コマンドがなくなるまでサーバーと送受信
while (await commands.hasNext) {
var ix = ixS; // 結果受信のawait以降はixSは他非同期処理に更新済みかも
ixS++; // 結果受信のawait前に更新しておく
var command = await commands.next; // これ以降ixSは変わっているかも
server.csp.send(command); // コマンド送信
await server.cri.moveNext();
var result = await server.cri.current as String; // 結果受信
results[ix] = result; // 結果はコマンドの順にリストに格納
maxResultsLength = results.length > maxResultsLength ? results.length : maxResultsLength;
printResultsInOrder();
}
server.csp.send(null); // サーバ終了コマンド送信
}
// バラバラと受信する結果を並べ替え
void printResultsInOrder() {
for (; ixO < ixS; ixO++) /* await禁止 */ {
var result = results[ixO];
if (result == null) {
return; // 歯抜けなら次受信まで出力中断
}
print('$ixO $result');
results.remove(ixO); // 出力した結果はGC対象に
}
}
}
ここが並列化の肝です。
プロパティとしてディスパッチ対象であるサーバ群とコマンドキューを持ちます。
この他に、結果出力を整流するために結果リスト(実際にはマップ)があります。メモリ節約のために出力した結果はその場で破棄します。当初はマップより添字参照が高速と思いリストとしましたが、単純化のためにマップとしました。平均107 ms、最大658 ms、標準偏差63 ms、約4万件のデータで最大長は40程度までしか伸びませんでしたし、速度も遜色なかったのでマップで十分です。これに関連するものとして、ixS
は次にサーバに送信するコマンドのインデックス、ixO
は次に結果出力するコマンドのインデックスです。これらは本質的にサーバIsolate対応の全ファイバから共有されるので、参照・更新には注意が必要です(後述)。
dispatch()
は基本的に各サーバIsolate対応のファイバを起動するだけです。ファイバの処理を並列化する必要が有るので、ファイバの終了を待ち合わる場合も、await
はfor
ループの外で行う必要があります。
各ファイバの行う処理がsendReceive()
の中のループです。全ファイバ共有のコマンドキューが無くなるまでコマンドをフェッチしてはサーバに送信し、結果を受信・出力します。
commands.hasNext
がtrue
の場合は次のcommands.next
の成功が保証されますが、その間に他ファイバに横取りされては元も子もないのでcommands.hasNext
からcommands.next
の間はawait
禁止です。また、await
以降はixS
の値として他ファイバにより更新済みのものを参照してしまうかもしれないので、出力整流のためにローカル変数ix
に保存しておきます。
サーバから帰ってきた結果はとりあえず先述の結果リストの該当インデックスresults[ix]
に格納しておきます。
結果を整流して出力するのがprintResultsInOrder()
です。
今結果を受け取ったコマンドのインデクスとは関係なく、次回結果出力するコマンドのインデクスixO
から、次回サーバに送信するコマンドのインデックスixO
の一つ前までループして結果を出力します。ただし、途中で結果未受信のものがあったら出力処理を中断し、以降の出力は次の結果受信時に委ねます。その際どのファイバが担当するかは不明です。
また、結果を出力したらその場でその結果リストエントリを削除し、格納されていた結果オブジェクトを結果リストから切り離しGC対象とします。
この結果出力処理も全ファイバ共通のixO
、ixS
を読み書きするのでawait
時は要注意です。
訂正
クライアントにおけるサーバからの受信ポートからブロードバンド型のストリームを作成し複数回first
していましたが、APIを確認したところ2回目以降のfirst
ではサーバからのメッセージを聞き逃す可能性があることがわかりました。対策として、StreamIterator
を作成してこれ経由でメッセージを受信するよう変更しました。上記はそれに合わせて修正済みです。なお、コマンドキューのようにStreamQueue
でも良かったのですがオーバースペックなので採用していません。
補足
Isolateとか使うのでDartPadでは動きません。
Dart SDK version: 2.18.0-271.4.beta (beta) (Unknown timestamp) on "linux_x64"で正規表現を利用するサーバ処理を並列化したところ、JIT版でVMがセグメンテーションフォールトを起こしました(AOT版は動きます)。FFIとか関係する?5月ぐらいから有る長寿命バグのようだけど、2.18がなかなかリリースされない原因だったりする?
Isolateによる並列化では、ライブラリトップレベル変数はいかにも邪道ですね。Javaよろしくすべてクラスに閉じ込めました(しかも当然インスタンスメンバ)。おかげさまで良いリファクタリングができました。
全体ソースコード
以上ですが、最後に全ソースを載せておきます。
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'dart:math';
import 'package:async/async.dart';
const loadParam = 10000000; // 処理すべきコマンドの平均的負荷CPUインテンシブ)
const dataSize = 100; // 処理すべきコマンドの数
final serverCount = Platform.numberOfProcessors; // CPUコア数ぐらいが適当
void main(List<String> arguments) async {
final commands = StreamQueue(Stream.fromIterable(
Iterable<String>.generate(dataSize, (i) => '[$i]'))); // ファイルストリームのつもり
final servers = <Server>[];
// サーバIsolate起動
for (var id = 0; id < serverCount; id++) {
var server = Server();
await server.spawn(id);
servers.add(server);
}
await Dispatcher(servers, commands).dispatch(); // サーバをコマンドにディスパッチ
for (var id = 0; id < serverCount; id++) {
servers[id].stop();
}
}
class Server {
final ReceivePort crp; // クライアント <= サーバの受信ポート
late final StreamIterator cri; // 上記のストリームイテレーター
late final SendPort csp; // クライアント => サーバの送信ポート
late final Isolate isolate; // 使わないけど
Server() : crp = ReceivePort() {
cri = StreamIterator(crp); // Streamはfirstすると閉じちゃうのでイテレータを作成
}
Future<void> spawn(int id) async {
isolate =
await Isolate.spawn<List<dynamic>>(main, <dynamic>[id, crp.sendPort]);
await cri.moveNext();
csp = await cri.current as SendPort;
}
void stop() {
csp.send(null);
crp.close();
}
static Future<void> main(List<dynamic> message) async {
final id = message[0] as int;
final ssp = message[1] as SendPort; // サーバ => クライアントの送信ポート
final srp = ReceivePort(); // サーバ <= クライアントの受信ポート
ssp.send(srp.sendPort); // クライアント => サーバの送信ポートをクライアントに受渡し
final rand = Random();
await for (var command in srp /* コマンド受信 */) {
if (command == null) {
break; // サーバ終了コマンドの場合
}
var c = rand.nextInt(loadParam) + loadParam ~/ 2 + 1;
var r = somethingHeavy(c); // 何やら重い処理 (0.5〜1.5 loadParam 回ループする)
ssp.send('Server($id) recived "$command", Σ(1, $c) = $r'); //結果返信
}
}
static int somethingHeavy(int c) {
int r = 0;
for (var i = 1; i < c; i++) {
r += i;
}
return r;
}
}
class Dispatcher {
final List<Server> servers;
final StreamQueue<String> commands;
final results = <int, String>{};
var maxResultsLength = 0;
var ixS = 0; // 次に送信するコマンドのインデックス
var ixO = 0; // 次に結果を出力するコマンドのインデックス
Dispatcher(this.servers, this.commands);
// コンストラクタではawaitできないので外出し
Future<void> dispatch() async {
var futures = <Future<void>>[];
for (var id = 0; id < servers.length; id++) {
futures.add(sendReceive(servers[id])); // サーバと送受信をする非同期処理を呼び出すだけ
}
await Future.wait<void>(futures); // 終了を待ち合わせる場合はここでawait
print(maxResultsLength);
}
Future<void> sendReceive(Server server) async {
// コマンドがなくなるまでサーバーと送受信
while (await commands.hasNext) {
var ix = ixS; // 結果受信のawait以降はixSは他非同期処理に更新済みかも
ixS++; // 結果受信のawait前に更新しておく
var command = await commands.next; // これ以降ixSは変わっているかも
server.csp.send(command); // コマンド送信
await server.cri.moveNext();
var result = await server.cri.current as String; // 結果受信
results[ix] = result; // 結果はコマンドの順にリストに格納
maxResultsLength = results.length > maxResultsLength ? results.length : maxResultsLength;
printResultsInOrder();
}
server.csp.send(null); // サーバ終了コマンド送信
}
// バラバラと受信する結果を並べ替え
void printResultsInOrder() {
for (; ixO < ixS; ixO++) /* await禁止 */ {
var result = results[ixO];
if (result == null) {
return; // 歯抜けなら次受信まで出力中断
}
print('$ixO $result');
results.remove(ixO); // 出力した結果はGC対象に
}
}
}
/*
class _ResultBuffer {
final buffer = <String?>[];
int bufferLength = 0;
int maxBufferLength = 0;
int listLength = 0;
int firstIx = 0;
static const String deleted = "";
void expandTo(int ix) {
assert(ix == listLength);
buffer.add(null);
bufferLength++;
maxBufferLength =
bufferLength > maxBufferLength ? bufferLength : maxBufferLength;
listLength++;
return;
}
void deleteAt(int ix) {
assert(ix >= firstIx && ix < listLength);
buffer[ix - firstIx] = deleted;
if (ix == firstIx) {
int i;
for (i = 0; i < bufferLength; i++) {
if (buffer[i] != deleted) {
break;
}
}
buffer.removeRange(0, i);
bufferLength -= i;
firstIx += i;
return;
}
}
String? operator [](int ix) {
assert(ix >= firstIx && ix < listLength);
return buffer[ix - firstIx];
}
operator []=(int ix, String v) {
assert(ix >= firstIx && ix < listLength);
buffer[ix - firstIx] = v;
}
}