AWS Data Pipeline。名前からするとETLやデータの移動のために使われるイメージが強いんだけど、実はCronぽいスケジューラとしても使えるので、ポイントをまとめておく。
方針は単純で、 Data Pipelineをシェルスクリプトを起動するためだけに利用するという感じ。
Data Pipeline、いろいろ覚えることがあって若干めんどくさいところもあるが 個別のサーバーに依存しないスケジューラを使える というのはそれなりに価値があるところだと思う。
処理の定義(アクティビティオブジェクト)
Data PipelineのアクティビティにはShellCommandActivityというものがあって、これはOS上で単純にシェルスクリプトを実行してくれる。
-
Command
というパラメータに直接シェルスクリプトを渡してやればOK。もう少しちゃんと管理したいならScriptUrl
というパラメータにS3上のシェルスクリプトのパスを渡してやることもできる。 -
RunsOn
またはWorkerGroup
でこのスクリプトを実行するリソースを定義する。 -
Schedule
で実行するスケジュールを定義。 -
On Success
やOn Fail
で成功時、失敗時の通知などを定義できる。
リソース(リソースオブジェクト)
リソースを用意する方法は2つある。
Data Pipeline側でEC2を起動して使う
アクティビティオブジェクトにてRunOn
を使う場合。下記のようにEC2を定義して起動できる。
ポイントはここにもSchedule
があること。この場合、上記のSchellCommandActivityと同じEvery 15 minutes
という名前を持ったスケジュールオブジェクトを参照しているので、以下のような流れで処理が実行される。
- 15分に1回、このEC2が起動される
- そしてシェルスクリプトが実行される
-
Terminate After
で指定されているように10分後、このEC2は削除される
もちろん、アクティビティオブジェクトとリソースオブジェクトを、別々のスケジュールで管理することもできる。
既存のサーバーを利用する
アクティビティオブジェクトにてWorkerGroup
を指定する場合、自分で用意したEC2やオンプレミスのサーバー上でアクティビティを実行することができる。仕組みは単純で、TaskRunnerと呼ばれるエージェントを任意のサーバーで起動してやればOK。
TaskRunnerは下記から入手できる。
http://aws.amazon.com/developertools/AWS-Data-Pipeline/1920924250474601
そして以下のようにWorkerGroup
の名前を指定して起動してやればOK。
$ java -jar TaskRunner-1.0.jar --config ~/credentials.json --workerGroup=myWorkerGroup --region=MyRegion --logUri=s3://mybucket/foldername
スケジューリング(スケジュールオブジェクト)
スケジュールについてはスクリーンショットを見てもらえば特に解説するようなところはない。
成否通知(アクションオブジェクト)
下記のようにSNSのTopicARNを指定してあれば、アクティビティの成功時、失敗時に通知を飛ばせる。
実行状況の確認
もちろん実行状況の確認もできる。。。が、若干見づらいというか、慣れが必要な気がする。
CLIで見たほうがなんとなくわかりやすい気がする。
$ aws datapipeline list-runs --pipeline-id df-022729633UUVCN2OHKNX
Name Scheduled Start Status
ID Started Ended
---------------------------------------------------------------------------------------------------
1. ResourceId_VkJ2b 2015-06-07T05:57:13 FINISHED
@ResourceId_VkJ2b_2015-06-07T05:57:13 2015-06-07T05:57:19 2015-06-07T06:08:53
2. ActivityId_vRKcJ 2015-06-07T05:57:13 FINISHED
@ActivityId_vRKcJ_2015-06-07T05:57:13 2015-06-07T05:57:19 2015-06-07T06:06:50
3. ResourceId_VkJ2b 2015-06-07T06:12:13 FINISHED
@ResourceId_VkJ2b_2015-06-07T06:12:13 2015-06-07T06:12:28 2015-06-07T06:20:02
4. ActivityId_vRKcJ 2015-06-07T06:12:13 FINISHED
@ActivityId_vRKcJ_2015-06-07T06:12:13 2015-06-07T06:03:28 2015-06-07T06:17:59
5. ResourceId_VkJ2b 2015-06-07T06:27:13 FINISHED
@ResourceId_VkJ2b_2015-06-07T06:27:13 2015-06-07T06:27:28 2015-06-07T06:31:49
6. ActivityId_vRKcJ 2015-06-07T06:27:13 FINISHED
@ActivityId_vRKcJ_2015-06-07T06:27:13 2015-06-07T06:20:01 2015-06-07T06:29:46
マネージメントコンソールなしで利用する。
マネジメントコンソールのArchitectと呼ばれるGUIは好き嫌いが別れるところ・・・というか嫌いな人のほうが多そうな気がする笑。コレなしで利用するための方針としてはPutPipelineDefinitionというAPIを使ってJSON形式で定義したパイプラインを渡してやればよい。とはいえいきなりこれをソラで書くのはつらすぎるので、一度ArchitectにJSON出力機能がついているので、一度ざっくり定義したパイプラインを下記のように吐き出しておいて、あとは適当に必要に応じた修正をしてCLIを使ってaws datapipeline put-pipeline-definition
あたりで放り込んでやればOK。
{
"objects": [
{
"failureAndRerunMode": "CASCADE",
"schedule": {
"ref": "DefaultSchedule"
},
"resourceRole": "general",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "s3://YOUR_LOG_BUCKET/datapipeline/",
"scheduleType": "cron",
"name": "Default",
"id": "Default"
},
{
"role": "DataPipelineDefaultRole",
"subject": "Success: ShellCommandActivety",
"name": "DefaultAction1",
"id": "ActionId_kK5b2",
"message": "Success: ShellCommandActivety",
"type": "SnsAlarm",
"topicArn": "arn:aws:sns:ap-northeast-1:111222333444:ShellCommandActivityTest"
},
{
"schedule": {
"ref": "DefaultSchedule"
},
"onFail": {
"ref": "ActionId_dkw9J"
},
"name": "date",
"id": "ActivityId_vRKcJ",
"runsOn": {
"ref": "ResourceId_VkJ2b"
},
"type": "ShellCommandActivity",
"command": "aws s3 cp s3://YOUR_BUCKET/dptest ./; date >> dptest; aws s3 cp ./dptest s3://YOUR_BUCKET/",
"onSuccess": {
"ref": "ActionId_kK5b2"
}
},
{
"subnetId": "YOUR_SUBNET",
"role": "DataPipelineDefaultRole",
"imageId": "YOUR_AMI",
"instanceType": "t2.micro",
"type": "Ec2Resource",
"terminateAfter": "10 Minutes",
"schedule": {
"ref": "DefaultSchedule"
},
"resourceRole": "YOUR_EC2_ROLE",
"instanceCount": "1",
"securityGroupIds": "YOUR_SG_ID",
"name": "DefaultResource1",
"keyPair": "YOUR_EC2_KEYPAIR",
"id": "ResourceId_VkJ2b",
"region": "ap-northeast-1"
},
{
"role": "DataPipelineDefaultRole",
"subject": "Fail: ShellCommandActivety",
"name": "DefaultAction2",
"id": "ActionId_dkw9J",
"message": "Fail: ShellCommandActivety",
"type": "SnsAlarm",
"topicArn": "arn:aws:sns:ap-northeast-1:111222333444:ShellCommandActivityTest"
},
{
"period": "15 Minutes",
"name": "Every 15 minutes",
"id": "DefaultSchedule",
"type": "Schedule",
"startAt": "FIRST_ACTIVATION_DATE_TIME"
}
],
"parameters": []
}
これは実際の吐き出したパイプラインの定義。ところどころオブジェクトの固有名のようなものが入っているが、これそのまま使えたようが気がする。(ちゃんとは確認していない)
Disclaimer
このメモは私の個人のものであり、私の雇用者を代表するものではありません。