目的・これは何か?
これは、100個とか10000個とかのジョブ集合からなるジョブ並列タスクをクラスタ型コンピュータ上に走らせて、その計算結果を集めるためのルーズなフレームワークです。
ルーズなので、たぶんカスタマー向けのアプリケーションには向きませんが、学生や研究者による試行錯誤を含む開発には逆にそのルーズさが便利です。
特徴はインタラクティブ性です。途中の状況を見るとか、エラーで止まったジョブを再実行するとか、バグを見つけて全部止めてやり直すとか、並列実行中のジョブを途中で止めるとか、プログラム開発の途中でよくあるイベントでのストレスが少なくなるようにデザインされています。
100万個とかの細かいタスクをひとつひとつクラスタ上のタスク管理システムに投げると、クラスタの管理ノードが苦しむじゃないですか?走らせている間にバグに気づいて直して再実行とか、面倒くさいじゃないですか?
数十個のノードを持つクラスタマシンで使うことを想定して MATLAB用に作りましたが、実際には3ノードぐらいで実行するのに便利に用いています。
考え方自体はどんなプラットフォームでも使えると思いますので、参考にしてもらえれば嬉しいです。
(追記)
予想してはいましたが、並列度が大きくなるとファイル一つを pool として寄ってたかって更新するっていう方式はやはり良くないようで、pool ファイルが壊れる確率が高いようです。案2としてMySQLとか建てる気になればデータベースに聞きに行く方式がもっともカッコイイのですが、案3としてディレクトリをpoolとして各ユニットジョブのパラメタ情報やリザルト情報をディレクトリ内の細かいファイルの形で持つようにするのが簡単でよいかも。
(この投稿は満足いくまで不定期に書き換えると思います。あしからずご了承ください)
おおまかな仕組み
ジョブと JobPool
各ジョブには識別番号 id=1:NumIDs
が振られており、対応するパラメタを並べたセル配列 param = params{ id }
で定義されています。
ここで各パラメタ param
の値はジョブの挙動を定義した構造体です。
このセル配列のことをJobPoolと呼び、これを保存した .mat ファイル名のことを JobPoolFileName と呼びます。
実行者
MATLAB上で
taskmanager( JobPoolFileName, 'run' )
を実行すると、実行者が起動します。
複数ノードで、同様にして実行者を起動することができます。
ひとつひとつの実行者は、JobPool から未実行のジョブをひとつずつ受け取って実行して結果を保存します。未実行のジョブが無くなったとき、すべての実行者は停止します。
ジョブには 0:waiting, 1:running, 2:finished の3種類のステータスがあります。ジョブが作られたとき 0, 実行者に渡されたときに 1, 実行者から結果が送られてきたときに 2 となります。
実行中にジョブを加えたり、削除したり、running や finished のジョブを waiting に戻したりすることができます。
注意点
タイミング制御はしていません。
- 複数の実行者が同じジョブを実行してしまう(小さな)可能性は排除していません。後から実行した結果で、結果報告が上書きされてしまいます。
- JobPoolに対する同時アクセスのせいで、書き込み中のファイルを読み込もうとしてエラーで止まることがあります。
パラメタ設定やバグのせいで、無限ループやエラーのせいで途中で止まってしまうジョブが含まれる可能性も排除していません。
出力されるテキストなどから見て、挙動のおかしい実行者は Ctrl-C で止めてデバッグします。致命的でなければ、ただ実行者を再起動すれば他の実行者に影響はありません。
途中で止まったせいで running 状態のままになってしまったジョブが生じたときは
taskmanager( JobPoolFileName, 'reset', -1 )
で waiting 状態にリセットできます。
使い方
準備
単位 m-script を用意する
パラメタを設定した構造体 param
を受け取り、計算結果を構造体 result
を出力する m-script を作ります。
result = dosomething( param )
result.image_file_name = sprintf('archive/image%d.png', param.id);
saveImage( result.image_file_name )
JobPoolのファイルを小さく保つために、計算結果が大きい場合には構造体 result
には含めずに、ファイルに保存しつつファイル名のみを result.image_file_name
に書き込んでおくなどすると良いでしょう。
パラメタ設定をまとめて JobPoolに書き込む m-script を用意する
100個とか10000個のパラメタ設定のもとで unit_job.m
を働かせたいものとします。
JobPool = './hoge/hemo/jobpool001.mat'; % JobPool の実体となる .mat fileのファイル名
for i=1:10000
param.K = rand(1);
param.script = 'unit_job'; % この param を実行する単位 m-script の名前
taskmanager( JobPool, 'set', param ); % Job Pool に param をセット
end
実行
実行者を起動
taskmanager( './hoge/hemo/jobpool001.mat', 'run' )
複数ノードで実行者が起動していれば、waiting のジョブがなくなるまで手分けしてジョブを処理してくれます。
実行者起動中のインタラクティブ操作
- 全 JobPool の簡単な統計情報が得られます
>> taskmanager( JobPoolFileName, 'status' )
Waiting: 10
Running: 3
Finished: 4
- 計算途中経過を詳しく知りたければ以下のようにして JobPool の中を見てみましょう
>> load( JobPoolFileName )
>> pool.status
>> pool.results
-
エラーや、JobPoolファイルへの書き込みタイミングのバッティングなどで実行者が止まったときには、ただたんに起動しなおせばOK
-
実行途中で止まったせいで running 状態のままになってしまったジョブが生じたときは
taskmanager( JobPoolFileName, 'reset', -1 )
で waiting 状態にリセットできます。すでに働いている実行者が、waiting ジョブを見つけて処理してくれます。
- unit_job.m のバグを直して、finished 状態になった特定IDのジョブだけ再実行したいとき、
taskmanager( JobPoolFileName, 'reset', [1, 3:5, 9] )
のようにして選択的にリセットかけることができます.
ソースコード
上に示していない機能がついていたりしますが、必要に応じて適当に使ってみてください。
function o = taskmanager( jobpool, action, varargin )
% taskmanager handles multiple jobs that are to run as job-parallel task
% in cluster machine
% JobPool is a mat-file made of a structure variable pool
% pool.params{id}
% pool.result{id}
% pool.status(id) in {0,1,2}, where values of 1, 2, and 3
% stands for corresponding job-status,
% Waiting, Running, and Finished, respectively.
%
% Usage:
%
% >> taskmanager( JobPool, 'set', parameter )
% adds a set of parameters (structure variable)
% that defines a job in a JobPool.
% An ID is given and status is set Waiting.
% >> param = taskmanager( JobPool, 'next' )
% gives a set of parameters of the next waiting job,
% with setting corresponding job status Running
% >> taskmanager( JobPool, 'report', result )
% stores a result of the job, at ID = resut.id.
% Status is set at Finished
switch action
case 'next'
load( jobpool )
id = find( pool.status == 0, 1, 'first'); % 0 stands for waiting
if length(id)==0
o = [];
else
pool.status( id ) = 1; % 1 stands for running
o = pool.params{id};
o.id = id;
end
save( jobpool, 'pool' )
case 'set'
if exist( jobpool, 'file' )
load( jobpool )
else
pool.status = [];
pool.params = cell(0);
end
nid = length( pool.status );
id = nid + 1
pool.status( id ) = 0;
pool.params{id} = varargin{1};
save( jobpool, 'pool')
fprintf('Added jobID%d\n',id)
case 'report'
load( jobpool )
result = varargin{1};
id = result.id;
pool.report{id} = result;
pool.status(id) = 2; % 2 stands for finished
save( jobpool, 'pool' )
case 'reset'
load( jobpool )
id0 = varargin{1};
for id = id0
pool.status( id ) = 0;
end
save( jobpool, 'pool' )
case 'status'
load( jobpool )
s = pool.status;
nid = length( s );
fprintf( 'Waiting %d\n', sum( s==0 ) )
fprintf( 'Running %d\n', sum( s==1 ) )
fprintf( 'Finished %d\n', sum( s==2 ) )
case 'run'
while true
param = taskmanager(jobpool,'next');
if length(param)==0
disp('No more job.')
break
else
fprintf('Calling jobID=%d',param.id)
eval( param.script )
taskmanager(jobpool, 'report', result )
end
end
end