LoginSignup
76
70

More than 5 years have passed since last update.

IPython clusterによるバカパラ(分散並列処理)

Last updated at Posted at 2014-12-22

分散並列処理の基本かつ強力なアプローチとして,Single Program Multiple Data (SPMD) があります.これは,複数のプロセッサが同一のプログラムを実行し,その際に各プロセッサは別々のデータを処理する,というモデルです.大規模なデータを独立で処理できる単位に分割し,小分けのデータを複数のプロセッサで並列に処理することで,データ全体の処理時間を大幅に短縮できます.

例えば,a01.txt, a02.txt, a03.txt, a04.txtを入力ファイルとしてjobというコマンドを実行し,実行結果(出力)をそれぞれ,b01.txt, b02.txt, b03txt, b04.txtに格納することを考えます.次のコードは,この処理をbashシェルスクリプトで実現するものです.

#!/bin/bash
for src in $@
do
   job < $src > ${src/a/b}
done

このforループ内の処理内容は互いに独立ですので,容易に並列化できますし,全体に要する処理時間を短縮できます.

bakapara.png

自然言語処理や機械学習では,形態素解析や素性抽出など,データ分割による並列化が容易な処理が沢山あります.このような(分散)並列処理のことを(日本では)バカパラと呼ぶようです.この記事では,コマンド実行によるバカパラをIPython clusterで実現する方法,バカパラに特化したライブラリBakaparaを紹介します.

はじめに

公式ドキュメントのArchitecture overviewによると,IPython clusterは以下の4つの要素から構成されています.

  1. エンジン: 並列に実行するPythonコードを動かすIPythonインタプリタ.実行したいホストの上で並列に実行したい数だけ起動されます.ユーザのプログラムを実行している間,各エンジンは他の操作をブロックします.
  2. コントローラ: エンジン群を操作するインタフェース.ユーザはコントローラを操作することで,エンジンの動作を管理します.内部的に,1つのハブと複数のスケジューラから構成されます.
  3. ハブ: クラスタ実行環境の心臓部.エンジンへの接続,スケジューラ,クライアント,実行結果などを集中管理します.
  4. スケジューラ: エンジンのジョブを管理.

IPythonで並列実行環境を構築するには,1つのコントローラと複数のエンジンを起動する必要があります.コントローラとエンジンの起動方法は2つあります.
1. ipclusterコマンドでコントローラとエンジンを自動的に立ち上げる
2. ipcontrollerコマンドでコントローラ,ipengineコマンドでエンジンを手動で立ち上げる
今回は,ipclusterコマンドを使ってお手軽にクラスタ環境を構築したいと思います.

本記事ではサーバ群の環境として以下の事項を仮定します.
* 各サーバにはsshでパスワードなしで(ssh-agentなどを用いて)ログインできる
* 各サーバはNFS等で共有されたhomeディレクトリにアクセスできる
* 各サーバにIPythonがインストールされている

具体的な説明を心がけるため,この記事では以下のサーバ構成を例に説明します.
* エンジン用のサーバとして,mezcal01.cl.ecei.tohoku.ac.jpからmezcal12.cl.ecei.tohoku.ac.jpの12台を利用する.
* 各ノードで4つのエンジンを起動する(4並列/ノード)

IPython clusterの設定

公式ドキュメントのUsing ipcluster in SSH modeを参考に設定ファイルを作成・編集します.IPython clusterはクラスタ実行環境をプロファイルという単位で管理すると便利です.プロファイルの名前は任意ですが,ここではエンジンのサーバ群の名前に基いて"mezcal"という名前のプロファイルを作成します.

$ ipython profile create --parallel --profile=mezcal
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipython_nbconvert_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcontroller_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipengine_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/ipcluster_config.py'
[ProfileCreate] Generating default config file: u'/home/okazaki/.ipython/profile_mezcal/iplogger_config.py'

$HOME/.ipython/profile_{プロファイル名}というディレクトリが作成され,IPython clusterの設定ファイルipcluster_config.pyが作成されます.このファイルをエディタで開き,各ノードの実行エンジンをSSH経由で起動するように設定します.設定箇所をコメントを残したまま示します.c.IPClusterStart.engine_launcher_class'SSH'とします.

ipcluster_config.py
# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
#     Local : start engines locally as subprocesses [default]
#     MPI : use mpiexec to launch engines in an MPI environment
#     PBS : use PBS (qsub) to submit engines to a batch queue
#     SGE : use SGE (qsub) to submit engines to a batch queue
#     LSF : use LSF (bsub) to submit engines to a batch queue
#     SSH : use SSH to start the controller
#                 Note that SSH does *not* move the connection files
#                 around, so you will likely have to do this manually
#                 unless the machines are on a shared file system.
#     HTCondor : use HTCondor to submit engines to a batch queue
#     WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
#     c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
#     ipcluster start --engines=MPI
c.IPClusterStart.engine_launcher_class = 'SSH'

先ほど設定したc.IPClusterStart.engine_launcher_classとの差異が不明ですが,c.IPClusterEngines.engine_launcher_class'SSH'とします.さらに,c.SSHEngineSetLauncher.enginesという辞書オブジェクトに分散並列処理を行うホスト名とエンジンの数(並列実行数)を指定します.engines辞書オブジェクトのキーにホスト名,値にエンジン数をセットします.mezcal[[01-12]].cl.ecei.tohoku.ac.jpで各4個の実行エンジンを起動し,最大で48並列処理を行うための設定例を示します.

ipcluster_config.py
# The class for launching a set of Engines. Change this value to use various
# batch systems to launch your engines, such as PBS,SGE,MPI,etc. Each launcher
# class has its own set of configuration options, for making sure it will work
# in your environment.
#
# You can also write your own launcher, and specify it's absolute import path,
# as in 'mymodule.launcher.FTLEnginesLauncher`.
#
# IPython's bundled examples include:
#
#     Local : start engines locally as subprocesses [default]
#     MPI : use mpiexec to launch engines in an MPI environment
#     PBS : use PBS (qsub) to submit engines to a batch queue
#     SGE : use SGE (qsub) to submit engines to a batch queue
#     LSF : use LSF (bsub) to submit engines to a batch queue
#     SSH : use SSH to start the controller
#                 Note that SSH does *not* move the connection files
#                 around, so you will likely have to do this manually
#                 unless the machines are on a shared file system.
#     HTCondor : use HTCondor to submit engines to a batch queue
#     WindowsHPC : use Windows HPC
#
# If you are using one of IPython's builtin launchers, you can specify just the
# prefix, e.g:
#
#     c.IPClusterEngines.engine_launcher_class = 'SSH'
#
# or:
#
#     ipcluster start --engines=MPI
c.IPClusterEngines.engine_launcher_class = 'SSH'

c.SSHEngineSetLauncher.engines = {
    'mezcal01.cl.ecei.tohoku.ac.jp': 4,
    'mezcal02.cl.ecei.tohoku.ac.jp': 4,
    'mezcal03.cl.ecei.tohoku.ac.jp': 4,
    'mezcal04.cl.ecei.tohoku.ac.jp': 4,
    'mezcal05.cl.ecei.tohoku.ac.jp': 4,
    'mezcal06.cl.ecei.tohoku.ac.jp': 4,
    'mezcal07.cl.ecei.tohoku.ac.jp': 4,
    'mezcal08.cl.ecei.tohoku.ac.jp': 4,
    'mezcal09.cl.ecei.tohoku.ac.jp': 4,
    'mezcal10.cl.ecei.tohoku.ac.jp': 4,
    'mezcal11.cl.ecei.tohoku.ac.jp': 4,
    'mezcal12.cl.ecei.tohoku.ac.jp': 4,
}

ターミナルやIPython notebookを実行するサーバと,コントローラのサーバが異なる場合は,他のサーバからコントローラへの接続を許可する必要があります.サーバ群が信頼できるLANに配置されている場合は,すべてのホストからコントローラに接続できるようにしておくと便利です.ipcontrollerの起動オプションに"--ip='*'"を追加します(デフォルトはローカルホストのみ接続可).

ipcluster_config.py
#------------------------------------------------------------------------------
# LocalControllerLauncher configuration
#------------------------------------------------------------------------------

# Launch a controller as a regular external process.

# command-line args to pass to ipcontroller
c.LocalControllerLauncher.controller_args = ["--ip='*'", '--log-to-file', '--log-level=20']

今回は,コントローラとエンジンのホストでhomeディレクトリが共有されているので,以下の設定を追加します.

ipcluster_config.py
#------------------------------------------------------------------------------
# SSHLauncher configuration
#------------------------------------------------------------------------------

# A minimal launcher for ssh.
#
# To be useful this will probably have to be extended to use the ``sshx`` idea
# for environment variables.  There could be other things this needs as well.

# hostname on which to launch the program
# c.SSHLauncher.hostname = ''

# command for starting ssh
# c.SSHLauncher.ssh_cmd = ['ssh']

# user@hostname location for ssh in one setting
# c.SSHLauncher.location = ''

# List of (local, remote) files to send before starting
c.SSHLauncher.to_send = []

# command for sending files
# c.SSHLauncher.scp_cmd = ['scp']

# List of (remote, local) files to fetch after starting
c.SSHLauncher.to_fetch = []

# args to pass to ssh
# c.SSHLauncher.ssh_args = ['-tt']

# username for ssh
# c.SSHLauncher.user = ''

この設定の意味を理解するには,IPython clusterが開始されるまでの流れを理解する必要があります.IPython clusterが起動するまでの流れを示します.
1. コントローラを起動する
2. コントローラが作成したipcontroller-engine.jsonをエンジンのホストにscpで転送する
3. 各エンジンを起動する
ここで,ホームディレクトリが共有されている場合は,手順2が不要になるわけです.

クラスタの開始

コントローラを動かしたいホスト上でipclusterコマンドを実行し,コントローラとエンジンをまとめて起動します.このとき,プロファイル名を--profileオプションで指定します.

$ ipcluster start --profile=mezcal
2014-12-11 14:15:49.891 [IPClusterStart] Using existing profile dir: u'/home/okazaki/.ipython/profile_mezcal'
2014-12-11 14:15:50.023 [IPClusterStart] Starting ipcluster with [daemon=False]
2014-12-11 14:15:50.025 [IPClusterStart] Creating pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid
2014-12-11 14:15:50.025 [IPClusterStart] Starting Controller with LocalControllerLauncher
2014-12-11 14:15:51.024 [IPClusterStart] Starting 48 Engines with SSH
2014-12-11 14:16:25.117 [IPClusterStart] Engines appear to have started successfully

"Engines appear to have started successfully"と表示されれば成功です."Starting 48 Engines with SSH"という表示から,$12 \times 4 = 48$個のエンジンを始動したことを確認できます.

クラスタ上でプログラムを実行する

IPython.parallelモジュールをインポートします.

In [1]: from IPython.parallel import Client

コントローラとエンジンを操作するためのClientオブジェクトを作成します.profile引数にプロファイル名を指定します.

In [2]: rc = Client(profile='mezcal')

Clientオブジェクトが接続しているエンジンのIDを確認すると,48台のエンジンに接続していることが確認できます.

In [3]: rc.ids
Out[3]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47]

以下の手順はバカパラとは関係がありませんが,IPython clusterの一般的な利用方法として解説します.タスクスケジューラを介さず,各エンジンで直接的にコードを実行するにはDirectViewインスタンスを用います.

In [4]: dview = rc[:]

以下のコードは$x^2$を$x \in \{0, 1, ..., 49\}$に対して(並列化なしで)計算します.

In [5]: serial_result =  map(lambda x: x**2, range(50))

この計算を各要素$x \in \{0, 1, ..., 49\}$ごとに並列化してみます.

In [6]: parallel_result = dview.map_sync(lambda x: x**2, range(50))

parallel_resultには,各エンジンで実行した結果が集約・格納されます.

In [7]: parallel_result
Out[7]:
[0,
 1,
 4,
 9,
 ...
 2401]

当然のことながら,並列化の有無に関係なく,計算結果は同じです.

In [8]: serial_result == parallel_result
Out[8]: True

remoteデコレータを使えば,各エンジンで実行される関数(remote function)を定義できます.以下のgethostname関数は,ホスト名をsocket.getfqdn()で取得して返す関数ですが,関数内でsocketモジュールをインポートしていることに注意してください.クライアント側でモジュールをインポートしても,エンジン上のIPythonプロセスではモジュールをインポートしたことにならないため,関数内でモジュールをインポートする必要が生じます.

In [9]: @dview.remote(block=True)
   ...: def gethostname():
   ...:     import socket
   ...:     return socket.getfqdn()
   ...:

クライアントでgethostname関数を呼び出すと,各エンジンのホスト名が取得できます.順番はバラバラですが,mezcal01からmezcal12までのホストで,各4つのエンジンが起動していることが確認できます.

In [10]: gethostname()
Out[10]: 
['mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal06.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal09.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 'mezcal04.cl.ecei.tohoku.ac.jp',
 ...
 'mezcal01.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp',
 'mezcal03.cl.ecei.tohoku.ac.jp']

他にも,並列に実行する関数を定義するparallelデコレータもあります.詳しくは,IPython's Direct interfaceを参照してください.

スケジューラを介した実行

LoadBalancedViewインスタンスは,動的な負荷分散を用いてジョブを実行します.個別のエンジンに直接アクセスすることは出来なくなりますが,multiprocessing.Poolのジョブ・キューのように,クラスタのエンジンでジョブ・キューを実現できます.

単純な例として,sleep 10というコマンドを各エンジンで実行してみましょう.実行したいジョブを格納したリストを作成します.

In [11]: jobs = [dict(cmd='sleep 10') for i in range(100)]

このリストの各要素は辞書型で,cmdキーの値に実行したいコマンドを格納します.今回はすべてのジョブでsleep 10を実行していますが,実際にバカパラを行うときは,入力データに応じてコマンドの内容が変わるはずです.先頭の5個のジョブはこんな感じです.

In [12]: jobs[:5]
Out[12]:
[{'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'},
 {'cmd': 'sleep 10'}]

辞書オブジェクトで表現されたジョブ(コマンド)を実行する関数runjobを実装します.受け取った辞書オブジェクトのcmdキーの値をシェル上で実行し,その戻り値を辞書オブジェクトに格納して返します.

In [13]: def runjob(job):
   ....:     import subprocess
   ....:     try:
   ....:         returncode = subprocess.call(job['cmd'], shell=True)
   ....:         return dict(code=returncode)
   ....:     except OSError, e:
   ....:         return dict(error=str(e))
   ....:

このrunjob関数をジョブキューで順々に実行するため,クライアントからLoadBalancedViewインスタンスを取得します.

In [14]: lview = rc.load_balanced_view()

そして,jobsリストの各要素に対してrunjob関数を非同期で実行します.

In [15]: ar = lview.map_async(runjob, jobs)

このコードの実行はブロックされず,直ちにAsyncResultオブジェクトが返されます.ジョブの実行結果や進捗状況は,このAsyncResultオブジェクトを介して確認できます.例えば,ジョブの実行状況をインタラクティブシェル上に表示してみます(IPython notebook上でも利用できます).

In [16]: ar.wait_interactive()
  48/100 tasks finished after   15 s

このwait_interactive関数を呼び出すと,1秒毎にジョブの実行状況がシェル上に表示されます.上の表示は,ジョブの実行開始から15秒経過した時点のもので,100個中48個のジョブが完了したことを表しています.1つのジョブの所要時間が10秒で,48個のエンジンを同時に利用しているため,実行開始から10秒で48個のジョブが完了し,10秒から20秒の間でさらに48個のジョブを実行することになります.すべてのジョブが完了すると,以下の様に表示されます.

In [16]: ar.wait_interactive()
 100/100 tasks finished after   30 s
done

各ジョブの実行は非同期で行われていますが,wait_interactive関数はすべてのジョブが完了するまで終了しません.ジョブの進捗状況の表示を止めたいときは,[Ctrl]+[c](IPython notebookならば"Kernel"-"Interrupt")で中断すればOKです.表示を中断してもジョブの実行は継続されます.

ジョブの実行結果は,AsyncResultオブジェクトから確認できます.先頭から5個のジョブの実行結果を確認してみます.

In [17]: ar[:5]
Out[17]: [{'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}, {'code': 0}]

code0とは,sleepコマンドを実行したシェル(/bin/sh)の返り値が0であったことを示しています.

Bakaparaモジュール

シェル上のコマンドによるジョブキューをIPython cluster上でバカパラ実行する処理をモジュール化したものが,Bakaparaです.PythonのインタラクティブシェルやIPython notebookから利用できるように設計されています.また,コマンドラインから単体で動かすことも可能です.

インストール

モジュールとして利用する場合は,bakapara.pyをダウンロードして,PYTHONPATHの通ったディレクトリに配置してください.コマンドラインから実行する場合は,PATHの通ったディレクトリに配置しておくと便利です.

ジョブの仕様

Bakaparaオブジェクトは,ジョブのリストを受け取り,クラスタのエンジンで実行します.各ジョブは以下の仕様を満たす辞書オブジェクトであれば,どのような形式でも構いません.

キー
cmd エンジン上で実行したいコマンド.実際にはシェルを介してコマンドを実行するので,パイプやリダイレクトも利用可能. wc -l /work/001.txt
cd (省略可)エンジンがコマンドを実行するときの作業ディレクトリ.このキーが存在しない場合は,エンジンの作業ディレクトリを変更しません.なお,各エンジンの作業ディレクトリは,Bakaparaオブジェクト構築時の作業ディレクトリで初期化されます. /home/okazaki/projects/bakapara
out (省略可)この値にファイル名を指定すると,コマンド実行時の標準出力がファイルに保存されます.このキーが存在しない場合は,標準出力の内容を保存しません. /work/001.txt.out
err (省略可)この値にファイル名を指定すると,コマンド実行時の標準エラー出力がファイルに保存されます.このキーが存在しない場合は,標準エラー出力の内容を保存しません. /work/001.txt.err
host (省略可)コマンドを実行できるホスト(エンジン)のリスト.このキーが存在しない場合は,このジョブはすべてのエンジンで実行可能であるとみなします. ['mezcal03.cl.ecei.tohoku.ac.jp',]

hostは,特定のジョブを特定のホストで実行したいときに使います.例えば,処理したいデータを各サーバのローカルディスクに分散配置した場合,ジョブの実行に必要なデータが配置されているホストをhostで指定できます.また,分散ファイルシステムGFarmでは,分散ファイルシステム上の各ファイルがどのホストに配置されているか調べることが出来ますので,処理データが配置されているホストを指定することで,HDFS+Hadoopのようなデータ局所性を考慮した分散並列処理を実現できます.

各ジョブの実行結果は,辞書オブジェクトのresultキーの値として格納(上書き)されます.resultの値は以下の仕様の辞書オブジェクトです.

キー
code 終了コード 0
submitted ジョブがクライアントから発行された日時 '2014-12-13T01:17:05.593718'
started エンジン上でジョブの実行が開始された日時 '2014-12-13T01:17:04.559970'
completed エンジン上でジョブの実行が完了した日時 '2014-12-13T01:17:14.566251'
received クライアントがジョブの実行結果を受け取った日時 '2014-12-13T01:17:15.614301'
elapsed ジョブの実行に要した時間(completed-started '0:00:10.006281'
engine_id ジョブを実行したエンジンのID(インデックス) 3
host ジョブを実行したエンジンのホスト名 'mezcal06.cl.ecei.tohoku.ac.jp'
pyerr Pythonの例外(あれば).ジョブを実行するホスト名を指定した時にUnmetDependencyという例外が表示されることがありますが,これは正常です. None
pyout Pythonインタプリタの出力(あれば) None
status 'ok'もしくは'error' 'ok'
msg_id クライアントとエンジンがやりとりしたメッセージのUUID u'48fbc58b-ef73-4815-9f32-1932c01421ad'
error (致命的なエラー発生時のみ存在)エラーメッセージ ''

status'ok'でもコマンドの実行に失敗しているケースがありますので,注意が必要です.例えば,cmdのコマンドを書き間違えて実行出来なかった場合でも,status'ok'になりますが,標準エラー出力には'/bin/bash: 1: hoge: not found\n'のようなメッセージが残ります.なお,ジョブのコマンドは/bin/bash -o pipefail経由で実行されます.したがって,パイプで結ばれたコマンドのいずれかが0以外のステータスコードを返した場合は,その戻り値がcodeに格納されることになります.したがって,戻り値codeを確認することは重要です.

Pythonインタラクティブシェル上での利用

まず,bakaparaモジュールをインポートして,Bakaparaインスタンスを作成します.Bakaparaクラスのコンストラクタの仕様はIPython.parallel.Clientと同一です.通常は,以下のようにプロファイル名を指定することになるかと思います.

In [1]: from bakapara import Bakapara

In [2]: bp = Bakapara(profile='mezcal')

実行したいジョブのリストを作成します.以下のコードは,sleep 10というコマンドを100回実行するジョブを作成しています.

In [3]: jobs = [dict(cmd='sleep 10') for i in range(100)]

説明のため,先頭のジョブを確認してみます.

In [4]: jobs[0]
Out[4]: {'cmd': 'sleep 10'}

ジョブを開始するにはrunメソッドを用います.返り値Trueはジョブの登録に成功したことを表します.

In [5]: bp.run(jobs)
Out[5]: True

ジョブの進捗状況はstatusメソッドで確認できます.ジョブの進捗状況が1秒おきに表示されます.

In [6]: bp.status()
48/100 tasks finished after 0:00:12.554905

このstatusメソッドはすべてのジョブが完了するまで実行が終了しません.他の作業をしたくなったら,[Ctrl]+[c]キー(IPython notebookでは”Kernel"-"Interrupt")を押して中断してください.

ジョブの実行が完了すると,以下のように表示されます.

In [7]: bp.status()
100/100 tasks finished after 0:00:30.137117
100 tasks completed in 0:00:30.137117

ジョブが完了すると,ジョブリスト(Bakaparaクラスのrunメソッドに与えたジョブリスト)に結果が書き込まれます.正確には,ジョブの完了後にstatusメソッドもしくはwaitメソッドを呼び出すことで実行結果がjobsリストに書き込まれます.全体のジョブが完了していなくても,statusメソッドもしくはwaitメソッドを呼び出した時点までの結果がジョブリストに書き込まれます.ジョブの実行結果を取得するためにも,statusメソッドを呼び出し,"xx/xxx tasks finished"というメッセージが表示されることを確認しましょう.

実行結果を確認してみましょう.ジョブを実行したホスト名,所要時間,終了コードなどの情報にアクセスできます.

In [8]: jobs[0]
Out[8]:
{'cmd': 'sleep 10',
 'result': {'code': 0,
  'completed': '2014-12-13T12:26:12.665525',
  'elapsed': '0:00:10.005647',
  'engine_id': 11,
  'host': 'mezcal11.cl.ecei.tohoku.ac.jp',
  'msg_id': u'72666439-9364-4a37-9dfb-8a04921d9a0c',
  'pyerr': None,
  'pyout': None,
  'received': '2014-12-13T12:24:38.638917',
  'started': '2014-12-13T12:26:02.659878',
  'status': u'ok',
  'submitted': '2014-12-13T12:23:58.320781'}}

コマンドラインからの利用

今まで見てきたように,Bakaparaモジュールを使うのは簡単です.ただ,中にはPythonは全く分からないけど,バカパラの分散並列処理を行いたい人もいるかと思います.そんな方のために,コマンドライン・インタフェースも準備しました.bakapara.pyを単体で実行すると,ジョブのJSONを標準入力から読み込み,ジョブの実行結果のJSONを標準出力に書き出します.入力と出力のJSONは,1行1ジョブで,各行にジョブの辞書オブジェクトをJSONで記述した形式を採用しています.全体のジョブをリスト形式で格納しないのは,ジョブが完了すると直ちにその実行結果を書き出せるようにするためです.

例えば,sleep 10を100件実行するジョブのJSONファイルは,次のようになります.

$ head -n5 sleep.task.json
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}
{"cmd": "sleep 10"}

このジョブを実行し,その実行結果をsleep.result.jsonに書き出すには,次のようなコマンドを実行します.IPython clusterのプロファイル名は-pオプションで指定します.

$ python bakapara.py -p mezcal < sleep.task.json > sleep.result.json
2014-12-21 10:39:49,231 total 100 jobs on 48 engines
2014-12-21 10:39:59,288 [1/100] returned 0 in 0:00:10.007444 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [2/100] returned 0 in 0:00:10.005645 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [3/100] returned 0 in 0:00:10.005994 sec on mezcal06.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:39:59,289 [4/100] returned 0 in 0:00:10.006593 sec on mezcal01.cl.ecei.tohoku.ac.jp: sleep 10
...
2014-12-21 10:40:19,282 [97/100] returned 0 in 0:00:10.005299 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [98/100] returned 0 in 0:00:10.005097 sec on mezcal08.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [99/100] returned 0 in 0:00:10.005758 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 [100/100] returned 0 in 0:00:10.004995 sec on mezcal02.cl.ecei.tohoku.ac.jp: sleep 10
2014-12-21 10:40:19,283 completed

標準出力から,実行したジョブが(JSONでエンコードされた)辞書オブジェクトとして返ってきます.resultキーに実行結果が格納されていることが分かると思います.

$ head -n5 sleep.result.json
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262566", "code": 0, "engine_id": 9, "started": "2014-12-21T10:40:46.649199", "completed": "2014-12-21T10:40:56.656643", "msg_id": "22d664c5-793a-44f1-b29d-c74f2aa434c1", "submitted": "2014-12-21T10:39:49.235879", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.007444", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.262205", "code": 0, "engine_id": 11, "started": "2014-12-21T10:40:46.650998", "completed": "2014-12-21T10:40:56.656643", "msg_id": "e8eb5db2-ac9b-481b-b0a4-fdb2ef15be62", "submitted": "2014-12-21T10:39:49.236327", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005645", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.287773", "code": 0, "engine_id": 8, "started": "2014-12-21T10:40:46.679033", "completed": "2014-12-21T10:40:56.685027", "msg_id": "8a7e6fe0-482a-4ae0-a2ff-8321849aa8b0", "submitted": "2014-12-21T10:39:49.244347", "pyerr": null, "host": "mezcal06.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.005994", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.284039", "code": 0, "engine_id": 46, "started": "2014-12-21T10:40:46.698136", "completed": "2014-12-21T10:40:56.704729", "msg_id": "f03f9b93-4a60-494b-9a21-625cdcac252e", "submitted": "2014-12-21T10:39:49.242042", "pyerr": null, "host": "mezcal01.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006593", "pyout": null}}
{"cmd": "sleep 10", "result": {"status": "ok", "received": "2014-12-21T10:39:59.259553", "code": 0, "engine_id": 28, "started": "2014-12-21T10:40:46.889807", "completed": "2014-12-21T10:40:56.895995", "msg_id": "bc9e7b74-64ba-45f4-ac0e-31b27db5d862", "submitted": "2014-12-21T10:39:49.234939", "pyerr": null, "host": "mezcal07.cl.ecei.tohoku.ac.jp", "elapsed": "0:00:10.006188", "pyout": null}}

ジョブの中断について

クラスタ上で実行されているジョブの実行結果を眺めているうちに,ジョブの内容が間違っていたこと,もしくはジョブが暴走していることに気づくことがあります.この場合,ジョブの実行をキャンセルすることになります.Bakaparaでジョブの実行をキャンセルする方法は2つあります.

  1. abort()メソッドを呼び出す.
  2. ipclusterプロセスを停止し,すべてのエンジンとコントローラを強制終了する.

方法1の場合,実行中のジョブはキャンセルされず,ジョブキューに入っているジョブのみがキャンセルされることになります.したがって,非常に時間のかかるジョブを実行している場合や,ジョブが暴走した場合には使えません.

方法2は,クラスタ実行環境のエンジンとコントローラを終了することで,実行中のジョブも強制終了させる方法です.具体的には,ipclusterを実行したコンソール上で[Ctrl]+[c]キーを押すことになります.

^C2014-12-14 17:16:40.729 [IPClusterStart] ERROR | IPython cluster: stopping
2014-12-14 17:16:40.730 [IPClusterStart] Stopping Engines...
2014-12-14 17:16:43.734 [IPClusterStart] Removing pid file: /home/okazaki/.ipython/profile_mezcal/pid/ipcluster.pid

残念ながら,現バーションのLoadBalancedViewインタフェースには,実行中のジョブを停止する方法が用意されていません(参考文献: [IPython-dev] Fwd: interrupt/abort parallel jobs).緊急の場合は方法2のようにipclusterそのものを再起動する必要があります.

おわりに

私がクラスタ環境での分散並列処理を始めた頃(2005年頃)はGXP Grid & Cluster Shellを利用していました.GXPにはmakeのようなワークフローを実行する機能がありますが,私はもっぱらバカパラの用途で使っていました.便利なツールですが,現在は開発が停止しているようです.この程度の用途であれば,GNU Parallalでも十分だったかもしれません.

2011年頃,Hadoopの利用を真剣に検討したことがありますが,大学の研究室レベルでHadoopを管理・運用するのは大変でした.また,Pythonで実装した小規模なプログラムをバカパラで並列化するには,Hadoop向けの追加作業が要るため,学生さんにはハードルが高いように感じました.ただ,分散ファイルシステム(HDFS)は非常に便利に感じました.そこで,分散ファイルシステムとしてGFarmを使うようになりました.分散ファイルシステム上に置いたデータを局所性を考慮して処理するため,独自のジョブキューシステムをParamikoを使って実装しました.そのジョブキューシステムの残骸がgfqsubです.

ところが最近,実験やデータ分析作業をIPython notebook上で行うようになりました.IPython notebookは大変便利なのですが,コマンドラインツールとの親和性はイマイチで,バカパラを実現するPythonライブラリを探しました.調べてみると,IPython clusterで分散並列処理がサポートされているものの,公式ドキュメント以外の情報が少なかったため,自分でドキュメントを読みながらBakaparaを作りました.

今回はSSHを使ってバカパラを実現しましたが,IPython clusterはipcluster_config.pyを設定することで,MPI, PBS (qsub) , Windows HPC Server, Amazon EC2を統一的に利用できるようです.おそらく,Bakaparaモジュールはこれらの環境でもそのまま動作すると思います(私自身はテストをしておりません).また,IPython clusterはコントローラとエンジン間のコード・データのやり取りなど,かなり高度な分散並列処理が実現できます.これらの機能を使ってどのような面白いことができるか,今後も探求を続けていきたいと思います.

76
70
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
76
70