Help us understand the problem. What is going on with this article?

AWSでipython cluster をミニマムに立ち上げてみた

More than 3 years have passed since last update.

ipython cluster は ipyparallelをインストールするとローカルですぐにでも立ち上がるのですが、それでは面白くないので分散先をAWSに置いてみました。という覚書。

登場人物

  • 利用者

    • Client (ローカルマシン)
  • クラスタ

    • Controller/Hub (EC2)
    • Engine (EC2)

概要図

ipcluster_image.png

ここで説明する話は、このような構図です。各ホスト間はSSHでつなぎに行ってますので、それぞれの接続情報は事前にセットアップする必要があります。実際には、起動するだけでEngine用ノードとして機能するAMIを用意するとかの一手間があると楽でしょう。

EngineとController のホストを作成する

適当にAmazon Linuxでインスタンスを作成します。

ここではAmazon Linux AMI 2015.09 (HVM)を使用し、東京リージョンのt2.microで作成しました。

インストールしたら、ipythonipyparallelをインストールします。

sudo yum groupinstall -y 'Development Tools'
sudo pip install ipython ipyparallel

ここではSSH経由で Engine を起動するので、ControllerからEngineのサーバへは ec2-user でパスワードなしで ssh できるようにしておきます。

Controllerで新しく ipython profile を作成する

ipython profile create --parallel --profile=myprofile

これで以下のようなファイルが生成されます。

tree .ipython/profile_myprofile/

~/.ipython/profile_myprofile/
├── ipcluster_config.py
├── ipcontroller_config.py
├── ipengine_config.py
├── ipython_config.py
├── ipython_kernel_config.py
├── log
├── pid
├── security
└── startup
    └── README

クラスタ動作させるための設定をしていきます。

ipcluster_config.py

冒頭に以下のようなコードを書き足しましょう。

ipcluster_config.py
c = get_config()

c.IPClusterEngines.engine_launcher_class = 'SSHEngineSetLauncher'

c.SSHEngineSetLauncher.engines = {
    '<ENGINE_HOST1>' : 3,
    '<ENGINE_HOST2>' : 3
}

ipcontroller_config.py

中のコメントアウト部分を探して該当箇所を書き換えます。

ipcontroller_config.py
... 中略  ...

c.IPControllerApp.reuse_files = True

... 中略  ...

c.RegistrationFactory.ip = u'*'

...

reuse_files を True にしていると、.ipython/profile_/security/ 以下のファイルが使いまわされます。使いまわされると何が起こるかというと、クラスタの Key が再生成されないのでコントローラを再起動するたびに設定ファイルを更新しなくても良くなるのです。

Controller を起動する

コントローラのサーバで以下コマンドを実行します。

ipcluster start --profile=myprofile

より詳細な出力が得たければ--debugオプションを利用します。また、ログファイルは .ipython/profile_myprofile/log/です。

うまく動作すれば、コントローラが起動したのち、ipcontroller-client.jsonipcontroller-engine.jsonを各ノードに転送した感じのログが流れます。コントローラの起動に失敗すると ipcontroller-* 関連のファイルは生成されません。(そのため、コントローラ起動に失敗するとさりげなくコントローラ停止しましたみたいなメッセージを出力後、各Engineのノードにipcontroller-client.json を転送できないよと言われて落ちます。不親切だ)

2015-10-04 11:15:21.568 [IPClusterStart] Engines appear to have started successfully

こんな感じのログが出たら起動完了です。

デーモン化したい場合は --daemonize オプションをつけます。

Clientから接続する

こっからはローカルのマシンからやります。

Controllerからipcontroller-client.jsonを取得する

このファイルはController起動時に作成されます。

scpか何かでこのファイルを Client のローカルに落としてきましょう。

scp ec2-user@<CONTROLLER_HOST>:./.ipython/profile_myprofile/security/ipcontroller-client.json ./

接続確認用スクリプトの作成

ここでは色々参考にしながら以下のような確認スクリプトを用意しました。

cluster_check.py
from ipyparallel import Client
rc = Client("ipcontroller-client.json", sshserver="ec2-user@<CONTROLLER_HOST>")
print rc.ids

dview = rc[:]
@dview.remote(block=True)
def gethostname():
    import socket
    return socket.getfqdn()

print gethostname()

これで実行すると、こんな感じになるはずです。

$ python cluster_check.py

[0, 1, 2, 3, 4, 5]
[<HOSTNAME>, <HOSTNAME>, <HOSTNAME>, <HOSTNAME2>, <HOSTNAME2>, <HOSTNAME2>]

上記の設定通りなら、一つのホスト3つずつ起動してるはずです。

これで ipython cluster で複数ノードに処理を分散させられます。

プログラミング

インポートは関数中に行う

上の動作確認コードのように、複数のノードで実行するコードの中で 必要な import などはする必要があります。

@require@depend デコレータを利用し、そのノードでちゃんと実行できるコードを実行してあげましょう。(このあたりを参考にしてください)

タスクキュー的な使い方がでける

タスクを順番に複数ノードで処理するみたいな使いかたなら LoadBalancedView を使いましょう。

load_balanced_view を取得し、map_asyncで非同期mapをかける

In [1]: from ipyparallel import Client
In [3]: a = Client("ipcontroller-client.json", sshserver="ec2-user@<CONTROLLER_HOST>")
In [4]: views = a.load_balanced_view()
In [5]: result = views.map_async(lambda x: x*x , (x for x in xrange(10000)))
In [6]: result
Out[6]: <AsyncMapResult: <lambda>>

1つの値を別のコントローラに転送し、ラムダ式を転送し、ばらばらと各エンジンに割り振られ、計算して結果を受け取るというのを全てのリスト要素に対して実行するので、この例では無駄もいいところですがまあ例ということで一つ。

実行後は AsyncMapResult の progress と ready() で様子がわかる

In [8]: result.progress
Out[8]: 8617
In [9]: result.progress
Out[9]: 9557

In [11]: result.progress
Out[11]: 10000

In [12]: result.ready()
Out[12]: True

In [13]: result
Out[13]: <AsyncMapResult: finished>

結果の .result に map した結果が格納されている。

In [14]: len(result.result)
Out[14]: 10000
In [15]: type(result.result)
Out[15]: list

なお、.result に不用意にアクセスすると分散処理が終わるまでロックされるので注意しましょう。

まとめ

ということで、クラスタ作って分散処理ができるようになりました。

ipythonは「便利なrepl環境」くらいにしか思ってなかったのですがこれはとてもすごいものですねー。

動的にノードを確保するとか負荷に応じてクラスタ増減するとかだと大変そうだけど、なにより分散先をローカルにして作ったコードがまんま大規模分散に乗せられるというのは夢が広がります。逆か、大規模分散で動くコードをそのままローカルモードで動作させられるのが楽しいのか。規模大きくなってくると、たぶんクラスタ管理はまた別の課題として出てくるけれど。

参考

すごく参考にしました。

http://qiita.com/chokkan/items/750cc12fb19314636eb7

hatena-corp
「知る」「つながる」「表現する」で新しい体験を提供し、人の生活を豊かにする
https://hatenacorp.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした