分散並列処理の基本かつ強力なアプローチとして,Single Program Multiple Data (SPMD) があります.これは,複数のプロセッサが同一のプログラムを実行し,その際に各プロセッサは別々のデータを処理する,というモデルです.大規模なデータを独立で処理できる単位に分割し,小分けのデータを複数のプロセッサで並列に処理することで,データ全体の処理時間を大幅に短縮できます.
例えば,a01.txt
, a02.txt
, a03.txt
, a04.txt
を入力ファイルとしてjob
というコマンドを実行し,実行結果(出力)をそれぞれ,b01.txt
, b02.txt
, b03txt
, b04.txt
に格納することを考えます.次のコードは,この処理をbashシェルスクリプトで実現するものです.
#!/bin/bash
for src in $@
do
job < $src > ${src/a/b}
done
このforループ内の処理内容は互いに独立ですので,容易に並列化できますし,全体に要する処理時間を短縮できます.
自然言語処理や機械学習では,形態素解析や素性抽出など,データ分割による並列化が容易な処理が沢山あります.このような(分散)並列処理のことを(日本では)バカパラと呼ぶようです.この記事では,コマンド実行によるバカパラをIPython clusterで実現する方法,バカパラに特化したライブラリBakaparaを紹介します.
はじめに
公式ドキュメントのArchitecture overviewによると,IPython clusterは以下の4つの要素から構成されています.
- エンジン: 並列に実行するPythonコードを動かすIPythonインタプリタ.実行したいホストの上で並列に実行したい数だけ起動されます.ユーザのプログラムを実行している間,各エンジンは他の操作をブロックします.
- コントローラ: エンジン群を操作するインタフェース.ユーザはコントローラを操作することで,エンジンの動作を管理します.内部的に,1つのハブと複数のスケジューラから構成されます.
- ハブ: クラスタ実行環境の心臓部.エンジンへの接続,スケジューラ,クライアント,実行結果などを集中管理します.
- スケジューラ: エンジンのジョブを管理.
IPythonで並列実行環境を構築するには,1つのコントローラと複数のエンジンを起動する必要があります.コントローラとエンジンの起動方法は2つあります.
-
ipcluster
コマンドでコントローラとエンジンを自動的に立ち上げる -
ipcontroller
コマンドでコントローラ,ipengine
コマンドでエンジンを手動で立ち上げる
今回は,ipcluster
コマンドを使ってお手軽にクラスタ環境を構築したいと思います.
本記事ではサーバ群の環境として以下の事項を仮定します.
- 各サーバには
ssh
でパスワードなしで(ssh-agent
などを用いて)ログインできる - 各サーバはNFS等で共有されたhomeディレクトリにアクセスできる
- 各サーバにIPythonがインストールされている
具体的な説明を心がけるため,この記事では以下のサーバ構成を例に説明します.
- エンジン用のサーバとして,mezcal01.cl.ecei.tohoku.ac.jpからmezcal12.cl.ecei.tohoku.ac.jpの12台を利用する.
- 各ノードで4つのエンジンを起動する(4並列/ノード)
IPython clusterの設定
公式ドキュメントのUsing ipcluster in SSH modeを参考に設定ファイルを作成・編集します.IPython clusterはクラスタ実行環境をプロファイルという単位で管理すると便利です.プロファイルの名前は任意ですが,ここではエンジンのサーバ群の名前に基いて"mezcal"という名前のプロファイルを作成します.
$ ipython profile create --parallel --profile=mezcal
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_nbconvert_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcontroller_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipengine_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcluster_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/iplogger_config.py'
$HOME/.ipython/profile_{プロファイル名}
というディレクトリが作成され,IPython clusterの設定ファイルipcluster_config.py
が作成されます.このファイルをエディタで開き,各ノードの実行エンジンをSSH経由で起動するように設定します.設定箇所をコメントを残したまま示します.c.IPClusterStart.engine_launcher_class
を'SSH'
とします.
# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
# Local : start engines locally as subprocesses [default]
# MPI : use mpiexec to launch engines in an MPI environment
# PBS : use PBS (qsub) to submit engines to a batch queue
# SGE : use SGE (qsub) to submit engines to a batch queue
# LSF : use LSF (bsub) to submit engines to a batch queue
# SSH : use SSH to start the controller
# Note that SSH does *not* move the connection files
# around, so you will likely have to do this manually
# unless the machines are on a shared file system.
# HTCondor : use HTCondor to submit engines to a batch queue
# WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
# c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
# ipcluster start --engines=MPI
c.IPClusterStart.engine_launcher_class = 'SSH'
先ほど設定したc.IPClusterStart.engine_launcher_class
との差異が不明ですが,c.IPClusterEngines.engine_launcher_class
も'SSH'
とします.さらに,c.SSHEngineSetLauncher.engines
という辞書オブジェクトに分散並列処理を行うホスト名とエンジンの数(並列実行数)を指定します.engines
辞書オブジェクトのキーにホスト名,値にエンジン数をセットします.mezcal[[01-12]].cl.ecei.tohoku.ac.jp
で各4個の実行エンジンを起動し,最大で48並列処理を行うための設定例を示します.
# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
# Local : start engines locally as subprocesses [default]
# MPI : use mpiexec to launch engines in an MPI environment
# PBS : use PBS (qsub) to submit engines to a batch queue
# SGE : use SGE (qsub) to submit engines to a batch queue
# LSF : use LSF (bsub) to submit engines to a batch queue
# SSH : use SSH to start the controller
# Note that SSH does *not* move the connection files
# around, so you will likely have to do this manually
# unless the machines are on a shared file system.
# HTCondor : use HTCondor to submit engines to a batch queue
# WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
# c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
# ipcluster start --engines=MPI
c.IPClusterEngines.engine_launcher_class = 'SSH'
c.SSHEngineSetLauncher.engines = {
'mezcal01.cl.ecei.tohoku.ac.jp': 4,
'mezcal02.cl.ecei.tohoku.ac.jp': 4,
'mezcal03.cl.ecei.tohoku.ac.jp': 4,
'mezcal04.cl.ecei.tohoku.ac.jp': 4,
'mezcal05.cl.ecei.tohoku.ac.jp': 4,
'mezcal06.cl.ecei.tohoku.ac.jp': 4,
'mezcal07.cl.ecei.tohoku.ac.jp': 4,
'mezcal08.cl.ecei.tohoku.ac.jp': 4,
'mezcal09.cl.ecei.tohoku.ac.jp': 4,
'mezcal10.cl.ecei.tohoku.ac.jp': 4,
'mezcal11.cl.ecei.tohoku.ac.jp': 4,
'mezcal12.cl.ecei.tohoku.ac.jp': 4,
}
ターミナルやIPython notebookを実行するサーバと,コントローラのサーバが異なる場合は,他のサーバからコントローラへの接続を許可する必要があります.サーバ群が信頼できるLANに配置されている場合は,すべてのホストからコントローラに接続できるようにしておくと便利です.ipcontroller
の起動オプションに"--ip='*'"
を追加します(デフォルトはローカルホストのみ接続可).
#------------------------------------------------------------------------------
# LocalControllerLauncher configuration
#------------------------------------------------------------------------------
# Launch a controller as a regular external process.
# command-line args to pass to ipcontroller
c.LocalControllerLauncher.controller_args = ["--ip='*'", '--log-to-file', '--log-level=20']
今回は,コントローラとエンジンのホストでhomeディレクトリが共有されているので,以下の設定を追加します.
#------------------------------------------------------------------------------
# SSHLauncher configuration
#------------------------------------------------------------------------------
# A minimal launcher for ssh.
#
# To be useful this will probably have to be extended to use the ``sshx`` idea
# for environment variables. There could be other things this needs as well.
# hostname on which to launch the program
# c.SSHLauncher.hostname = ''
# command for starting ssh
# c.SSHLauncher.ssh_cmd = ['ssh']
# user@hostname location for ssh in one setting
# c.SSHLauncher.location = ''
# List of (local, remote) files to send before starting
c.SSHLauncher.to_send = []
# command for sending files
# c.SSHLauncher.scp_cmd = ['scp']
# List of (remote, local) files to fetch after starting
c.SSHLauncher.to_fetch = []
# args to pass to ssh
# c.SSHLauncher.ssh_args = ['-tt']
# username for ssh
# c.SSHLauncher.user = ''
この設定の意味を理解するには,IPython clusterが開始されるまでの流れを理解する必要があります.IPython clusterが起動するまでの流れを示します.
- コントローラを起動する
- コントローラが作成した
ipcontroller-engine.json
をエンジンのホストにscpで転送する - 各エンジンを起動する
ここで,ホームディレクトリが共有されている場合は,手順2が不要になるわけです.
クラスタの開始
コントローラを動かしたいホスト上でipcluster
コマンドを実行し,コントローラとエンジンをまとめて起動します.このとき,プロファイル名を--profile
オプションで指定します.
$ ipcluster start --profile=mezcal
2014-12-11 14:15:49.891 [IPClusterStart] Using existing profile dir: u'/home/okazaki/.ipython/profile_mezcal'
2014-12-11 14:15:50.023 [IPClusterStart] Starting ipcluster with [daemon=False]
2014-12-11 14:15:50.025 [IPClusterStart] Creating pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid
2014-12-11 14:15:50.025 [IPClusterStart] Starting Controller with LocalControllerLauncher
2014-12-11 14:15:51.024 [IPClusterStart] Starting 48 Engines with SSH
2014-12-11 14:16:25.117 [IPClusterStart] Engines appear to have started successfully
"Engines appear to have started successfully"と表示されれば成功です."Starting 48 Engines with SSH"という表示から,$12 \times 4 = 48$個のエンジンを始動したことを確認できます.
クラスタ上でプログラムを実行する
IPython.parallel
モジュールをインポートします.
In [1]: from IPython.parallel import Client
コントローラとエンジンを操作するためのClient
オブジェクトを作成します.profile
引数にプロファイル名を指定します.
In [2]: rc = Client(profile='mezcal')
Client
オブジェクトが接続しているエンジンのIDを確認すると,48台のエンジンに接続していることが確認できます.
In [3]: rc.ids
Out[3]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47]
以下の手順はバカパラとは関係がありませんが,IPython clusterの一般的な利用方法として解説します.タスクスケジューラを介さず,各エンジンで直接的にコードを実行するにはDirectView
インスタンスを用います.
In [4]: dview = rc[:]
以下のコードは$x^2$を$x \in \{0, 1, ..., 49\}$に対して(並列化なしで)計算します.
In [5]: serial_result = map(lambda x: x**2, range(50))
この計算を各要素$x \in \{0, 1, ..., 49\}$ごとに並列化してみます.
In [6]: parallel_result = dview.map_sync(lambda x: x**2, range(50))
parallel_result
には,各エンジンで実行した結果が集約・格納されます.
In [7]: parallel_result
Out[7]:
[0,
1,
4,
9,
...
2401]
当然のことながら,並列化の有無に関係なく,計算結果は同じです.
In [8]: serial_result == parallel_result
Out[8]: True
remote
デコレータを使えば,各エンジンで実行される関数(remote function)を定義できます.以下のgethostname
関数は,ホスト名をsocket.getfqdn()
で取得して返す関数ですが,関数内でsocket
モジュールをインポートしていることに注意してください.クライアント側でモジュールをインポートしても,エンジン上のIPythonプロセスではモジュールをインポートしたことにならないため,関数内でモジュールをインポートする必要が生じます.
In [9]: @dview.remote(block=True)
...: def gethostname():
...: import socket
...: return socket.getfqdn()
...:
クライアントでgethostname
関数を呼び出すと,各エンジンのホスト名が取得できます.順番はバラバラですが,mezcal01からmezcal12までのホストで,各4つのエンジンが起動していることが確認できます.
In [10]: gethostname()
Out[10]:
['mezcal06.cl.ecei.tohoku.ac.jp',
'mezcal06.cl.ecei.tohoku.ac.jp',
'mezcal06.cl.ecei.tohoku.ac.jp',
'mezcal06.cl.ecei.tohoku.ac.jp',
'mezcal09.cl.ecei.tohoku.ac.jp',
'mezcal09.cl.ecei.tohoku.ac.jp',
'mezcal09.cl.ecei.tohoku.ac.jp',
'mezcal09.cl.ecei.tohoku.ac.jp',
'mezcal04.cl.ecei.tohoku.ac.jp',
'mezcal04.cl.ecei.tohoku.ac.jp',
'mezcal04.cl.ecei.tohoku.ac.jp',
'mezcal04.cl.ecei.tohoku.ac.jp',
...
'mezcal01.cl.ecei.tohoku.ac.jp',
'mezcal03.cl.ecei.tohoku.ac.jp',
'mezcal03.cl.ecei.tohoku.ac.jp',
'mezcal03.cl.ecei.tohoku.ac.jp',
'mezcal03.cl.ecei.tohoku.ac.jp']
他にも,並列に実行する関数を定義するparallel
デコレータもあります.詳しくは,IPython's Direct interfaceを参照してください.
スケジューラを介した実行
LoadBalancedView
インスタンスは,動的な負荷分散を用いてジョブを実行します.個別のエンジンに直接アクセスすることは出来なくなりますが,multiprocessing.Pool
のジョブ・キューのように,クラスタのエンジンでジョブ・キューを実現できます.
単純な例として,sleep 10
というコマンドを各エンジンで実行してみましょう.実行したいジョブを格納したリストを作成します.
In [11]: jobs = [dict(cmd='sleep 10') for i in range(100)]
このリストの各要素は辞書型で,cmd
キーの値に実行したいコマンドを格納します.今回はすべてのジョブでsleep 10
を実行していますが,実際にバカパラを行うときは,入力データに応じてコマンドの内容が変わるはずです.先頭の5個のジョブはこんな感じです.
In [12]: jobs[:5]
Out[12]:
[{'cmd': 'sleep 10'},
{'cmd': 'sleep 10'},
{'cmd': 'sleep 10'},
{'cmd': 'sleep 10'},
{'cmd': 'sleep 10'}]
辞書オブジェクトで表現されたジョブ(コマンド)を実行する関数runjob
を実装します.受け取った辞書オブジェクトのcmd
キーの値をシェル上で実行し,その戻り値を辞書オブジェクトに格納して返します.
In [13]: def runjob(job):
....: import subprocess
....: try:
....: returncode = subprocess.call(job['cmd'], shell=True)
....: return dict(code=returncode)
....: except OSError, e:
....: return dict(error=str(e))
....:
このrunjob
関数をジョブキューで順々に実行するため,クライアントからLoadBalancedView
インスタンスを取得します.
In [14]: lview = rc.load_balanced_view()
そして,jobs
リストの各要素に対してrunjob
関数を非同期で実行します.
In [15]: ar = lview.map_async(runjob, jobs)
このコードの実行はブロックされず,直ちにAsyncResult
オブジェクトが返されます.ジョブの実行結果や進捗状況は,このAsyncResult
オブジェクトを介して確認できます.例えば,ジョブの実行状況をインタラクティブシェル上に表示してみます(IPython notebook上でも利用できます).
In [16]: ar.wait_interactive()
48/100 tasks finished after 15 s
このwait_interactive
関数を呼び出すと,1秒毎にジョブの実行状況がシェル上に表示されます.上の表示は,ジョブの実行開始から15秒経過した時点のもので,100個中48個のジョブが完了したことを表しています.1つのジョブの所要時間が10秒で,48個のエンジンを同時に利用しているため,実行開始から10秒で48個のジョブが完了し,10秒から20秒の間でさらに48個のジョブを実行することになります.すべてのジョブが完了すると,以下の様に表示されます.
In [16]: ar.wait_interactive()
100/100 tasks finished after 30 s
done
各ジョブの実行は非同期で行われていますが,wait_interactive
関数はすべてのジョブが完了するまで終了しません.ジョブの進捗状況の表示を止めたいときは,[Ctrl]+[c](IPython notebookならば"Kernel"-"Interrupt")で中断すればOKです.表示を中断してもジョブの実行は継続されます.
ジョブの実行結果は,AsyncResult
オブジェクトから確認できます.先頭から5個のジョブの実行結果を確認してみます.
In [17]: ar[:5]
Out[17]: [{'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}]
code
が0
とは,sleep
コマンドを実行したシェル(/bin/sh
)の返り値が0
であったことを示しています.
Bakaparaモジュール
シェル上のコマンドによるジョブキューをIPython cluster上でバカパラ実行する処理をモジュール化したものが,Bakaparaです.PythonのインタラクティブシェルやIPython notebookから利用できるように設計されています.また,コマンドラインから単体で動かすことも可能です.
インストール
モジュールとして利用する場合は,bakapara.pyをダウンロードして,PYTHONPATH
の通ったディレクトリに配置してください.コマンドラインから実行する場合は,PATH
の通ったディレクトリに配置しておくと便利です.
ジョブの仕様
Bakaparaオブジェクトは,ジョブのリストを受け取り,クラスタのエンジンで実行します.各ジョブは以下の仕様を満たす辞書オブジェクトであれば,どのような形式でも構いません.
キー | 値 | 例 |
---|---|---|
cmd |
エンジン上で実行したいコマンド.実際にはシェルを介してコマンドを実行するので,パイプやリダイレクトも利用可能. | wc -l /work/001.txt |
cd |
(省略可)エンジンがコマンドを実行するときの作業ディレクトリ.このキーが存在しない場合は,エンジンの作業ディレクトリを変更しません.なお,各エンジンの作業ディレクトリは,Bakaparaオブジェクト構築時の作業ディレクトリで初期化されます. | /home/okazaki/projects/bakapara |
out |
(省略可)この値にファイル名を指定すると,コマンド実行時の標準出力がファイルに保存されます.このキーが存在しない場合は,標準出力の内容を保存しません. | /work/001.txt.out |
err |
(省略可)この値にファイル名を指定すると,コマンド実行時の標準エラー出力がファイルに保存されます.このキーが存在しない場合は,標準エラー出力の内容を保存しません. | /work/001.txt.err |
host |
(省略可)コマンドを実行できるホスト(エンジン)のリスト.このキーが存在しない場合は,このジョブはすべてのエンジンで実行可能であるとみなします. | ['mezcal03.cl.ecei.tohoku.ac.jp',] |
host
は,特定のジョブを特定のホストで実行したいときに使います.例えば,処理したいデータを各サーバのローカルディスクに分散配置した場合,ジョブの実行に必要なデータが配置されているホストをhost
で指定できます.また,分散ファイルシステムGFarmでは,分散ファイルシステム上の各ファイルがどのホストに配置されているか調べることが出来ますので,処理データが配置されているホストを指定することで,HDFS+Hadoopのようなデータ局所性を考慮した分散並列処理を実現できます.
各ジョブの実行結果は,辞書オブジェクトのresult
キーの値として格納(上書き)されます.result
の値は以下の仕様の辞書オブジェクトです.
キー | 値 | 例 |
---|---|---|
code |
終了コード | 0 |
submitted |
ジョブがクライアントから発行された日時 | '2014-12-13T01:17:05.593718' |
started |
エンジン上でジョブの実行が開始された日時 | '2014-12-13T01:17:04.559970' |
completed |
エンジン上でジョブの実行が完了した日時 | '2014-12-13T01:17:14.566251' |
received |
クライアントがジョブの実行結果を受け取った日時 | '2014-12-13T01:17:15.614301' |
elapsed |
ジョブの実行に要した時間(completed -started ) |
'0:00:10.006281' |
engine_id |
ジョブを実行したエンジンのID(インデックス) | 3 |
host |
ジョブを実行したエンジンのホスト名 | 'mezcal06.cl.ecei.tohoku.ac.jp' |
pyerr |
Pythonの例外(あれば).ジョブを実行するホスト名を指定した時にUnmetDependency という例外が表示されることがありますが,これは正常です. |
None |
pyout |
Pythonインタプリタの出力(あれば) | None |
status |
'ok' もしくは'error'
|
'ok' |
msg_id |
クライアントとエンジンがやりとりしたメッセージのUUID | u'48fbc58b-ef73-4815-9f32-1932c01421ad' |
error |
(致命的なエラー発生時のみ存在)エラーメッセージ | '' |
status
が'ok'
でもコマンドの実行に失敗しているケースがありますので,注意が必要です.例えば,cmd
のコマンドを書き間違えて実行出来なかった場合でも,status
は'ok'
になりますが,標準エラー出力には'/bin/bash: 1: hoge: not found\n'
のようなメッセージが残ります.なお,ジョブのコマンドは/bin/bash -o pipefail
経由で実行されます.したがって,パイプで結ばれたコマンドのいずれかが0
以外のステータスコードを返した場合は,その戻り値がcode
に格納されることになります.したがって,戻り値code
を確認することは重要です.
Pythonインタラクティブシェル上での利用
まず,bakaparaモジュールをインポートして,Bakapara
インスタンスを作成します.Bakapara
クラスのコンストラクタの仕様はIPython.parallel.Clientと同一です.通常は,以下のようにプロファイル名を指定することになるかと思います.
In [1]: from bakapara import Bakapara
In [2]: bp = Bakapara(profile='mezcal')
実行したいジョブのリストを作成します.以下のコードは,sleep 10
というコマンドを100回実行するジョブを作成しています.
In [3]: jobs = [dict(cmd='sleep 10') for i in range(100)]
説明のため,先頭のジョブを確認してみます.
In [4]: jobs[0]
Out[4]: {'cmd': 'sleep 10'}
ジョブを開始するにはrun
メソッドを用います.返り値True
はジョブの登録に成功したことを表します.
In [5]: bp.run(jobs)
Out[5]: True
ジョブの進捗状況はstatus
メソッドで確認できます.ジョブの進捗状況が1秒おきに表示されます.
In [6]: bp.status()
48/100 tasks finished after 0:00:12.554905
このstatus
メソッドはすべてのジョブが完了するまで実行が終了しません.他の作業をしたくなったら,[Ctrl]+[c]キー(IPython notebookでは”Kernel"-"Interrupt")を押して中断してください.
ジョブの実行が完了すると,以下のように表示されます.
In [7]: bp.status()
100/100 tasks finished after 0:00:30.137117
100 tasks completed in 0:00:30.137117
ジョブが完了すると,ジョブリスト(Bakapara
クラスのrun
メソッドに与えたジョブリスト)に結果が書き込まれます.正確には,ジョブの完了後にstatus
メソッドもしくはwait
メソッドを呼び出すことで実行結果がjobs
リストに書き込まれます.全体のジョブが完了していなくても,status
メソッドもしくはwait
メソッドを呼び出した時点までの結果がジョブリストに書き込まれます.ジョブの実行結果を取得するためにも,status
メソッドを呼び出し,"xx/xxx tasks finished"というメッセージが表示されることを確認しましょう.
実行結果を確認してみましょう.ジョブを実行したホスト名,所要時間,終了コードなどの情報にアクセスできます.
In [8]: jobs[0]
Out[8]:
{'cmd': 'sleep 10',
'result': {'code': 0,
'completed': '2014-12-13T12:26:12.665525',
'elapsed': '0:00:10.005647',
'engine_id': 11,
'host': 'mezcal11.cl.ecei.tohoku.ac.jp',
'msg_id': u'72666439-9364-4a37-9dfb-8a04921d9a0c',
'pyerr': None,
'pyout': None,
'received': '2014-12-13T12:24:38.638917',
'started': '2014-12-13T12:26:02.659878',
'status': u'ok',
'submitted': '2014-12-13T12:23:58.320781'}}
コマンドラインからの利用
今まで見てきたように,Bakapara
モジュールを使うのは簡単です.ただ,中にはPythonは全く分からないけど,バカパラの分散並列処理を行いたい人もいるかと思います.そんな方のために,コマンドライン・インタフェースも準備しました.bakapara.py
を単体で実行すると,ジョブのJSONを標準入力から読み込み,ジョブの実行結果のJSONを標準出力に書き出します.入力と出力のJSONは,1行1ジョブで,各行にジョブの辞書オブジェクトをJSONで記述した形式を採用しています.全体のジョブをリスト形式で格納しないのは,ジョブが完了すると直ちにその実行結果を書き出せるようにするためです.
例えば,sleep 10
を100件実行するジョブのJSONファイルは,次のようになります.
$ head -n5 sleep.task.json
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
このジョブを実行し,その実行結果をsleep.result.json
に書き出すには,次のようなコマンドを実行します.IPython clusterのプロファイル名は-p
オプションで指定します.
$ python bakapara.py -p mezcal < sleep.task.json > sleep.result.json
2014-12-21 10:39:49,231 total 100 jobs on 48 engines
2014-12-21 10:39:59,288 [1/100] returned 0 in 0:00:10.007444 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [2/100] returned 0 in 0:00:10.005645 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [3/100] returned 0 in 0:00:10.005994 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [4/100] returned 0 in 0:00:10.006593 sec on mezcal01.cl.ecei.tohoku.ac.jp: sleep 10
...
2014-12-21 10:40:19,282 [97/100] returned 0 in 0:00:10.005299 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [98/100] returned 0 in 0:00:10.005097 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [99/100] returned 0 in 0:00:10.005758 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [100/100] returned 0 in 0:00:10.004995 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 completed
標準出力から,実行したジョブが(JSONでエンコードされた)辞書オブジェクトとして返ってきます.result
キーに実行結果が格納されていることが分かると思います.
$ head -n5 sleep.result.json
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262566", "code": 0, "engine_id": 9, "started": "2014-12-21T10:40:46.649199", "completed": "2014-12-21T10:40:56.656643", "msg_id": "22d664c5-793a-44f1-b29d-c74f2aa434c1", "submitted": "2014-12-21T10:39:49.235879", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.007444", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262205", "code": 0, "engine_id": 11, "started": "2014-12-21T10:40:46.650998", "completed": "2014-12-21T10:40:56.656643", "msg_id": "e8eb5db2-ac9b-481b-b0a4-fdb2ef15be62", "submitted": "2014-12-21T10:39:49.236327", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005645", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.287773", "code": 0, "engine_id": 8, "started": "2014-12-21T10:40:46.679033", "completed": "2014-12-21T10:40:56.685027", "msg_id": "8a7e6fe0-482a-4ae0-a2ff-8321849aa8b0", "submitted": "2014-12-21T10:39:49.244347", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005994", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.284039", "code": 0, "engine_id": 46, "started": "2014-12-21T10:40:46.698136", "completed": "2014-12-21T10:40:56.704729", "msg_id": "f03f9b93-4a60-494b-9a21-625cdcac252e", "submitted": "2014-12-21T10:39:49.242042", "pyerr": null, "host": "mezcal01.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006593", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.259553", "code": 0, "engine_id": 28, "started": "2014-12-21T10:40:46.889807", "completed": "2014-12-21T10:40:56.895995", "msg_id": "bc9e7b74-64ba-45f4-ac0e-31b27db5d862", "submitted": "2014-12-21T10:39:49.234939", "pyerr": null, "host": "mezcal07.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006188", "pyout": null}}
ジョブの中断について
クラスタ上で実行されているジョブの実行結果を眺めているうちに,ジョブの内容が間違っていたこと,もしくはジョブが暴走していることに気づくことがあります.この場合,ジョブの実行をキャンセルすることになります.Bakaparaでジョブの実行をキャンセルする方法は2つあります.
-
abort()
メソッドを呼び出す. -
ipcluster
プロセスを停止し,すべてのエンジンとコントローラを強制終了する.
方法1の場合,実行中のジョブはキャンセルされず,ジョブキューに入っているジョブのみがキャンセルされることになります.したがって,非常に時間のかかるジョブを実行している場合や,ジョブが暴走した場合には使えません.
方法2は,クラスタ実行環境のエンジンとコントローラを終了することで,実行中のジョブも強制終了させる方法です.具体的には,ipcluster
を実行したコンソール上で[Ctrl]+[c]キーを押すことになります.
^C2014-12-14 17:16:40.729 [IPClusterStart] ERROR | IPython cluster: stopping
2014-12-14 17:16:40.730 [IPClusterStart] Stopping Engines...
2014-12-14 17:16:43.734 [IPClusterStart] Removing pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid
残念ながら,現バーションのLoadBalancedView
インタフェースには,実行中のジョブを停止する方法が用意されていません(参考文献: [IPython-dev] Fwd: interrupt/abort parallel jobs).緊急の場合は方法2のようにipcluster
そのものを再起動する必要があります.
おわりに
私がクラスタ環境での分散並列処理を始めた頃(2005年頃)はGXP Grid & Cluster Shellを利用していました.GXPにはmakeのようなワークフローを実行する機能がありますが,私はもっぱらバカパラの用途で使っていました.便利なツールですが,現在は開発が停止しているようです.この程度の用途であれば,GNU Parallalでも十分だったかもしれません.
2011年頃,Hadoopの利用を真剣に検討したことがありますが,大学の研究室レベルでHadoopを管理・運用するのは大変でした.また,Pythonで実装した小規模なプログラムをバカパラで並列化するには,Hadoop向けの追加作業が要るため,学生さんにはハードルが高いように感じました.ただ,分散ファイルシステム(HDFS)は非常に便利に感じました.そこで,分散ファイルシステムとしてGFarmを使うようになりました.分散ファイルシステム上に置いたデータを局所性を考慮して処理するため,独自のジョブキューシステムをParamikoを使って実装しました.そのジョブキューシステムの残骸がgfqsubです.
ところが最近,実験やデータ分析作業をIPython notebook上で行うようになりました.IPython notebookは大変便利なのですが,コマンドラインツールとの親和性はイマイチで,バカパラを実現するPythonライブラリを探しました.調べてみると,IPython clusterで分散並列処理がサポートされているものの,公式ドキュメント以外の情報が少なかったため,自分でドキュメントを読みながらBakaparaを作りました.
今回はSSHを使ってバカパラを実現しましたが,IPython clusterはipcluster_config.py
を設定することで,MPI, PBS (qsub) , Windows HPC Server, Amazon EC2を統一的に利用できるようです.おそらく,Bakaparaモジュールはこれらの環境でもそのまま動作すると思います(私自身はテストをしておりません).また,IPython clusterはコントローラとエンジン間のコード・データのやり取りなど,かなり高度な分散並列処理が実現できます.これらの機能を使ってどのような面白いことができるか,今後も探求を続けていきたいと思います.