はじまり
元旦にぼけーっと 相棒 - 元日スペシャル を見ていました。「まったく無関係で独立して実行された一連の出来事が、実は全て計画された通りに動いていたんだよ!!」というシーンを見て、急激に SWF と戯れたくなりました。いつも通りパイセンのお導きにより Python と boto を使って戯れたいと思います。
Amazon SWF とは?
SWF は複数のコンポーネント(EC2でもオンプレのサーバでも)上で実行される各処理を確実に重複なく順番に実行するためのワークフロー管理のサービス。主な登場人物は以下の通りです。
The Amazon Simple Workflow Service - ワークフロー、実行タスク管理、タスク間の処理ルーティング、一貫性の担保、実行履歴・リトライ・タイムアウトなどの状態管理。ここの部分は AWS 側で冗長構成をとっているので利用者側で冗長構成を考える必要はありません。
Workflow Executors - ワークフローを開始するもの。ボタンをクリックするなどユーザーのアクションであったり cronjob であったりなど。
Deciders - アクティビティをどの順序でやっていくかというロジック部分。状態やアクティビティの結果に基づいて判断することができる。EC2 インスタンスやサーバ上で動かすプログラムなので冗長構成は利用者側で設計する必要がある。AutoScaling で起動するとよい。
Activity Workers - Decider によってセットされたタスクを実際に実行するプログラム。これも EC2 インスタンスなどサーバ上で動かすプログラムなので冗長構成は利用者側で設計する必要がある。これも AutoScaling で起動するとよい。
お戯れの準備
以下の boto の SWF チュートリアルに従って SWF を触ってみました。
Amazon Simple Workflow Tutorial
このチュートリアルにはいくつかのサンプルコードが含まれています。
一番簡単な例は上から三つの Python のコードを動かすだけで試せます。
- register.py - ドメインとワークフローとアクティビティを登録するコード。これは API 叩けるホストであれば、どこから実行しても構いません。同じことをマネジメントコンソールからやっておくでも可。
- hello_decider.py - 一番最初の単純なサンプルのディサイダーとして動く
- 無題 - コードの名前が何も書いていないんですが、一番最初の単純なサンプルのアクティビティワーカーとして動かすもの。タスクが入って来たら Hello World するだけの役割。以下参照。
import boto.swf.layer2 as swf
DOMAIN = 'boto_tutorial'
VERSION = '1.0'
TASKLIST = 'default'
class HelloWorker(swf.ActivityWorker):
domain = DOMAIN
version = VERSION
task_list = TASKLIST
def run(self):
activity_task = self.poll()
if 'activityId' in activity_task:
print 'Hello, World!'
self.complete()
return True
今回は何台もインスタンスを立ち上げるのが面倒だったのでディサイダーとアクティビティワーカーのコードを同じインスタンスで動かしてしまいましたが、スケーラビリティやリソースの分離を考えるのであれば別インスタンスやサーバーで動かすべきでしょう。
特にアクティビティワーカーは処理の負荷状態に応じてスケールさせることもありそうなので AutoScaling で立ち上げたほうが良いです。
当然のことながら上記の一連のコードを動かすインスタンスには SWF にアクセス出来る権限を持ったアクセスキー、シークレットアクセスキーを設定しておくか IAM Role で SWF を使う権限を与えます。
お戯れの時間
インタラクティブモードで各ディサイダーとアクティビティワーカーのコードを動かします。
今回は判りやすくするためだと思いますが、インタラクティブモードでやっていますが、普通に実装するのであれば、タスクリストに対してロングポーリングを繰り返し行うループが必要です。タスクリストというのはタスクを受け渡しするために SWF に内包されたキューのようなものだと思って下さい。一回のロングポーリングは60秒で何もなければコネクションが切れてしまいます。
$ python -i hello_decider.py
>>> while HelloDecider().run(): pass
...
$ python -i ./hello_worker.py
>>> while HelloWorker().run(): pass
...
ここで、どのインスタンスでもサーバでもいいのですが SWF の API を叩けるインスタンスからワークフロー実行をしてあげます。実際の例ならユーザーのクリック操作や cron でこれを呼び出してあげる必要があります。
$ python
Python 2.7.8 (default, Nov 12 2014, 02:03:09)
[GCC 4.8.2 20140120 (Red Hat 4.8.2-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> execution = swf.WorkflowType(name='HelloWorkflow', domain='boto_tutorial', version='1.0', task_list='default').start()
するとアクティビティワーカーのターミナルに Hello, World! の文言が現れました。焦って複数回ワークフロー実行を連打してしまったのでたくさん出てきました(汗)。
$ python -i hello_worker.py
while HelloWorker().run(): pass
...
Hello, World!
Hello, World!
Hello, World!
以下略
今回のディサイダーのサンプルは単純なものなので、以下のようにイベントの状態が WorkflowExecutionStarted ならアクティビティタスクをスケジュールし、アクティビティタスクが終わった状態ならワークフロー実行完了とするようになっています。
decisions = swf.Layer1Decisions()
if last_event['eventType'] == 'WorkflowExecutionStarted':
decisions.schedule_activity_task('saying_hi', ACTIVITY, VERSION, task_list=TASKLIST)
elif last_event['eventType'] == 'ActivityTaskCompleted':
decisions.complete_workflow_execution()
self.complete(decisions=decisions)
return True
これだけだとSWFの面白みが伝わりにくいかもしれません。シリアル実行、パラレル実行、サブワークフロー実行のサンプルも上で紹介したチュートリアルのページにあるので、後でまた取り上げてみようと思います。
免責
こちらは個人の意見で、所属する企業や団体は関係ありません。