はじめに
x10の標準ライブラリの一つに、タスクをロードバランスして実行してくれるGLB(Global Load Balancer)というライブラリがあります。
MapReduce処理が簡単に記述できます。
多数のタスクがある時に、平行に実行することができ、実行時間が不均一で単純な分割ではうまくいかない場合に特に有用です。また途中でタスクを動的に追加することもできます。
ここではGLBの使い方の基本を紹介します。
参考資料
- 論文:
- http://arxiv.org/abs/1312.5691 (ただし、x10 ver2.5.0と仕様が異なる)
- API
- source code
- sample:
-
https://svn.code.sourceforge.net/p/x10/code/tags/SF_RELEASE_2_5_0/x10.dist/samples/GLB
- 使われていない冗長なコードがいくつかあったりして、あまり分かりやすくないです。
- ちなみにこの中のfibonacci数の計算のコードを元にrefactoringしたコードをこちらにおきました。
- https://gist.github.com/yohm/01a5d34cebc765208593
-
https://svn.code.sourceforge.net/p/x10/code/tags/SF_RELEASE_2_5_0/x10.dist/samples/GLB
最小限のサンプルコード
GLBの最小限の使い方を示したのがこちらのコードです。
https://gist.github.com/yohm/059644106ce1dddcef97
10種類のタスクを作り並列に実行します。10個のタスクそれぞれが1~10の数字を返し、その結果の和を計算します。
このコードを解説していきます。
大まかな流れは、以下のようになります。もちろん挙動をカスタマイズしたい場合には他のメソッドも実装する必要がありますが、以下の手順だけで最小限動きます。
- TaskQueueをimplementしたクラスを作る
- initメソッドの実装
- processメソッドの実装
- getResultメソッドを実装
- GLBResultを継承したクラスを作る
- getResult を実装
- getReduceOperator を実装
- GLBオブジェクトを作って実行する
-
packageのimport。いろいろとありますが、GLBを使う場合以下のものを丸ごと入れておけばよいと思います。
import x10.util.ArrayList; import x10.glb.ArrayListTaskBag; import x10.glb.TaskQueue; import x10.glb.TaskBag; import x10.glb.GLBParameters; import x10.glb.GLB; import x10.util.Team; import x10.glb.Context; import x10.glb.ContextI; import x10.glb.GLBResult;
-
TaskQueueをimplementしたクラスを作る
- 最初の型パラメータはimplementした型(今回はMyTaskQueue)、第二パラメータはresultの型(今回はLong)を指定します。
- このTaskQueueは各プレースで作成されます。
- メンバーとして ArrayListTaskBag を保持する必要があります。今回は各タスクに渡すパラメータはLongなので、型パラメータとしてLongを指定しています。
- さらに結果を保持するメンバーとして
results_of_current_worker
という変数を作っています。各プレースごとに一つの変数に結果を格納しています。
class MyTaskQueue implements TaskQueue[MyTaskQueue, Long] { val bag = new ArrayListTaskBag[Long](); var results_of_current_worker:Long = 0;
-
initを実装する。
- この中でTaskBagの中にTask(に渡すパラメータ)を詰めていきます。ここでは10個分詰めています。
- このメソッドはPlace(0)でのみ実行されます。
- Place(0)のTaskBagに詰めるだけで負荷分散されるのか不安に思うかもしれませんが、Place0のバッグから他のplaceに自動的にタスクが分割されるので問題ありません。
public def init(n: Long) { Console.OUT.println("adding " + n + " at " + here); for( i in 1..n ) { bag.bag().add(i); } }
-
processを実装する。
- このprocessが実際にタスクを処理する部分です。
- 自分のArrayListTaskBagからタスクを取り出して、最大n個のタスクを処理するように実装します。nはライブラリから与えられます。
-
tb.bag().removeLast()
でタスクを取り出しています。- ArrayListTaskBag に対して
.bag()
メソッドを呼ぶとArrayListが返ってきます。そのArrayListから最後の要素を取り出しています。(QueueというよりStackですね)
- ArrayListTaskBag に対して
- とった値に対して何か処理をして、結果を
results_of_current_worker
に入れています。今回の場合は単純に足しているだけです。 -
context.yield()
を各タスクの実行後に読んでいます。これが呼ばれたタイミングで動的に負荷分散されます。- あまりにも頻繁に呼ぶとオーバーヘッドになるかもしれませんが、ほとんどの場合は今回のようにforループの最後に入れておけばよいでしょう。
- 返り値は自分のTaskBagが空になったかどうかを返します。まだTaskが残っている場合はtrueを返します。
- 今回のように
return tb.bag().size() > 0
を機械的に書いておけば問題ないです。
- 今回のように
public def process(var n:Long, context: Context[MyTaskQueue,Long]):Boolean { for( var i:Long = 0; tb.size() > 0 && i < n; i++) { val x = tb.bag().removeLast(); Console.OUT.println("running at " + here + " processing " + x); results_of_current_worker += x; context.yield(); } return tb.bag().size() > 0; }
-
count, merge, split, printLog を実装する。
- 実装が必要ですが、サンプルコードをコピーするだけで良いと思います。
- count は処理したタスクの数を返すように実装すると、統計情報がログが出るようです。実装しなくても良いので、とりあえず
return 0
と書いておけばよいです。 - split , merge は負荷分散のときに呼ばれるメソッドです。
- split で自分のTaskBagを分割して、他のplaceにて分割したものをmergeします。
- 挙動をカスタマイズしたいというのでもなければ、このサンプルコードと全く同じように書けばよいです。
- printLogは全ての処理が終わった後に呼ばれます。各Placeでタスクを実行した時ログなんかを書けばよいです。
-
getResult を実装する。
- タスクの実行が全て終わって、集計するときに実行される。各Placeごとにこのメソッドが呼ばれます。
- 結果はGLBResultを継承したクラス(MyResult)をnewして返すように実装すればよいです。
- ライブラリ側で、MyResultに対してreduction演算を実行します。
-
GLBResultを継承したクラス MyResult を作る
- 今回のコードでは結果を保持するメンバー変数
result
を定義し、コンストラクタでその変数内に入れています。
class MyResult extends GLBResult[Long] { val result: Long; public def this(local_result:Long) { Console.OUT.println("constructor of MyResult"); result = local_result; }
- 今回のコードでは結果を保持するメンバー変数
-
MyResult#getResult を実装する
- getResult は結果の型(ここではLong)の長さ1のRailとして返します。(なぜRailにする仕様??)
public def getResult():x10.lang.Rail[Long] { val r = new Rail[Long](1); r(0) = result; return r; }
-
MyResult#getReduceOperator を実装する
- 使える候補はこちらを確認のこと。 x10doc のTeamのFieldを参照のこと
- ADD,AND,MAX,MIN,MUL,OR,XOR などがあるようです。
public def getReduceOperator():Int { return Team.ADD; }
- 使える候補はこちらを確認のこと。 x10doc のTeamのFieldを参照のこと
あと MyResult#display というメソッドもありますが、表示に使われるだけです。reductionされた結果の表示に使われます。
-
GLBオブジェクトを作って実行する
- initというclosureを作って、GLBの引数に渡します。各placeでinitが呼ばれます。
-
glb.run()
の引数として、タスク処理を初期化する処理を書きます。 - この辺りのコードはサンプルコードをコピーするだけでよいでしょう。
- runというメソッドを作っているのは、staticメソッドからinitでnewするclosureを作れないからです。(?)staticメソッドから実行しようとするとコンパイルエラーになります。
def run(n: Long) { val init = () => { return new MyTaskQueue(); }; val glb = new GLB[MyTaskQueue, Long](init, GLBParameters.Default, true); Console.OUT.println("Starting..."); val start = () => { glb.taskQueue().init(n); }; val r = glb.run(start); Console.OUT.println(r); }
実行方法
- コンパイル
x10c++ StaticTasks.x10
-
実行
-
./a.out [n]
引数nを渡すと、n個のタスクが実行されます。 - プレース数を指定するには環境変数 X10_NPLACES を指定する
X10_NPLACES=4 ./a.out 10
- スレッド数はどうやって指定する?
-
実行結果例
Starting...
adding 10 at Place(0)
running at Place(0) processing 10
MyTaskQueue#split at Place(0)
MyTaskQueue#split at Place(0)
MyTaskQueue#split at Place(0)
running at Place(0) processing 2
running at Place(0) processing 1
MyTaskQueue#merge at Place(3)
running at Place(3) processing 6
running at Place(3) processing 7
running at Place(3) processing 8
running at Place(3) processing 9
MyTaskQueue#merge at Place(1)
running at Place(1) processing 3
MyTaskQueue#merge at Place(2)
running at Place(2) processing 4
running at Place(2) processing 5
MyTaskQueue#getResult at Place(0)
constructor of MyResult
MyResult#getResult at Place(0) : [13]
MyTaskQueue#getResult at Place(1)
constructor of MyResult
MyResult#getResult at Place(1) : [3]
MyTaskQueue#getResult at Place(2)
constructor of MyResult
MyResult#getResult at Place(2) : [9]
MyTaskQueue#getResult at Place(3)
constructor of MyResult
MyResult#getResult at Place(3) : [30]
MyResult#display: 55
Setup time(s):0.001855
Process time(s):0.070904
Result reduce time(s):0.013154
MyTaskQueue#printLog at Place(0)
MyTaskQueue#printLog at Place(1)
MyTaskQueue#printLog at Place(2)
MyTaskQueue#printLog at Place(3)
0 -> 2.7E-4 : 0.0657 : 0.0659 : 0.4106% :: 8.1E-4 : 0.0668 :: 0 :: 7 : 0 : 0 :: 5 : 1 :: 0 : 3 :: 4 : 4 :: 1 : 1
1 -> 8.0E-5 : 0.0700 : 0.0701 : 0.1140% :: 0.0013 : 0.0714 :: 0 :: 0 : 0 : 1 :: 2 : 1 :: 0 : 0 :: 4 : 4 :: 1 : 0
2 -> 1.1E-4 : 0.0700 : 0.0701 : 0.1552% :: 9.7E-4 : 0.0711 :: 0 :: 0 : 0 : 2 :: 5 : 1 :: 0 : 0 :: 4 : 4 :: 1 : 0
3 -> 1.7E-4 : 0.0702 : 0.0703 : 0.2358% :: 8.1E-4 : 0.0712 :: 0 :: 0 : 0 : 4 :: 4 : 1 :: 0 : 0 :: 4 : 4 :: 1 : 0
7 Task items stolen = 0 (direct) + 7 (lifeline).
0 successful direct steals.
3 successful lifeline steals.
[55]
他のサンプル
- GLBで並列に外部プロセスを実行
- Fibonacci数を計算
- https://gist.github.com/yohm/01a5d34cebc765208593
- 途中でタスクを動的に追加している