Edited at

AWS Data Pipelineをスケジューラとして使う

More than 3 years have passed since last update.

AWS Data Pipeline。名前からするとETLやデータの移動のために使われるイメージが強いんだけど、実はCronぽいスケジューラとしても使えるので、ポイントをまとめておく。

方針は単純で、 Data Pipelineをシェルスクリプトを起動するためだけに利用するという感じ。

Data Pipeline、いろいろ覚えることがあって若干めんどくさいところもあるが 個別のサーバーに依存しないスケジューラを使える というのはそれなりに価値があるところだと思う。


処理の定義(アクティビティオブジェクト)

Data PipelineのアクティビティにはShellCommandActivityというものがあって、これはOS上で単純にシェルスクリプトを実行してくれる。

dp1.png



  • Commandというパラメータに直接シェルスクリプトを渡してやればOK。もう少しちゃんと管理したいならScriptUrlというパラメータにS3上のシェルスクリプトのパスを渡してやることもできる。


  • RunsOnまたはWorkerGroupでこのスクリプトを実行するリソースを定義する。


  • Scheduleで実行するスケジュールを定義。


  • On SuccessOn Failで成功時、失敗時の通知などを定義できる。


リソース(リソースオブジェクト)

リソースを用意する方法は2つある。


Data Pipeline側でEC2を起動して使う

アクティビティオブジェクトにてRunOnを使う場合。下記のようにEC2を定義して起動できる。

dp2.png

ポイントはここにもScheduleがあること。この場合、上記のSchellCommandActivityと同じEvery 15 minutesという名前を持ったスケジュールオブジェクトを参照しているので、以下のような流れで処理が実行される。


  1. 15分に1回、このEC2が起動される

  2. そしてシェルスクリプトが実行される


  3. Terminate Afterで指定されているように10分後、このEC2は削除される

もちろん、アクティビティオブジェクトとリソースオブジェクトを、別々のスケジュールで管理することもできる。


既存のサーバーを利用する

アクティビティオブジェクトにてWorkerGroupを指定する場合、自分で用意したEC2やオンプレミスのサーバー上でアクティビティを実行することができる。仕組みは単純で、TaskRunnerと呼ばれるエージェントを任意のサーバーで起動してやればOK。

TaskRunnerは下記から入手できる。

http://aws.amazon.com/developertools/AWS-Data-Pipeline/1920924250474601

そして以下のようにWorkerGroupの名前を指定して起動してやればOK。

$ java -jar TaskRunner-1.0.jar --config ~/credentials.json --workerGroup=myWorkerGroup --region=MyRegion --logUri=s3://mybucket/foldername


スケジューリング(スケジュールオブジェクト)

スケジュールについてはスクリーンショットを見てもらえば特に解説するようなところはない。

dp3.png


成否通知(アクションオブジェクト)

下記のようにSNSのTopicARNを指定してあれば、アクティビティの成功時、失敗時に通知を飛ばせる。

dp4.png


実行状況の確認

もちろん実行状況の確認もできる。。。が、若干見づらいというか、慣れが必要な気がする。

dp5.png

CLIで見たほうがなんとなくわかりやすい気がする。

$ aws datapipeline list-runs --pipeline-id df-022729633UUVCN2OHKNX

Name Scheduled Start Status
ID Started Ended
---------------------------------------------------------------------------------------------------
1. ResourceId_VkJ2b 2015-06-07T05:57:13 FINISHED
@ResourceId_VkJ2b_2015-06-07T05:57:13 2015-06-07T05:57:19 2015-06-07T06:08:53

2. ActivityId_vRKcJ 2015-06-07T05:57:13 FINISHED
@ActivityId_vRKcJ_2015-06-07T05:57:13 2015-06-07T05:57:19 2015-06-07T06:06:50

3. ResourceId_VkJ2b 2015-06-07T06:12:13 FINISHED
@ResourceId_VkJ2b_2015-06-07T06:12:13 2015-06-07T06:12:28 2015-06-07T06:20:02

4. ActivityId_vRKcJ 2015-06-07T06:12:13 FINISHED
@ActivityId_vRKcJ_2015-06-07T06:12:13 2015-06-07T06:03:28 2015-06-07T06:17:59

5. ResourceId_VkJ2b 2015-06-07T06:27:13 FINISHED
@ResourceId_VkJ2b_2015-06-07T06:27:13 2015-06-07T06:27:28 2015-06-07T06:31:49

6. ActivityId_vRKcJ 2015-06-07T06:27:13 FINISHED
@ActivityId_vRKcJ_2015-06-07T06:27:13 2015-06-07T06:20:01 2015-06-07T06:29:46


マネージメントコンソールなしで利用する。

マネジメントコンソールのArchitectと呼ばれるGUIは好き嫌いが別れるところ・・・というか嫌いな人のほうが多そうな気がする笑。コレなしで利用するための方針としてはPutPipelineDefinitionというAPIを使ってJSON形式で定義したパイプラインを渡してやればよい。とはいえいきなりこれをソラで書くのはつらすぎるので、一度ArchitectにJSON出力機能がついているので、一度ざっくり定義したパイプラインを下記のように吐き出しておいて、あとは適当に必要に応じた修正をしてCLIを使ってaws datapipeline put-pipeline-definitionあたりで放り込んでやればOK。

{

"objects": [
{
"failureAndRerunMode": "CASCADE",
"schedule": {
"ref": "DefaultSchedule"
},
"resourceRole": "general",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "s3://YOUR_LOG_BUCKET/datapipeline/",
"scheduleType": "cron",
"name": "Default",
"id": "Default"
},
{
"role": "DataPipelineDefaultRole",
"subject": "Success: ShellCommandActivety",
"name": "DefaultAction1",
"id": "ActionId_kK5b2",
"message": "Success: ShellCommandActivety",
"type": "SnsAlarm",
"topicArn": "arn:aws:sns:ap-northeast-1:111222333444:ShellCommandActivityTest"
},
{
"schedule": {
"ref": "DefaultSchedule"
},
"onFail": {
"ref": "ActionId_dkw9J"
},
"name": "date",
"id": "ActivityId_vRKcJ",
"runsOn": {
"ref": "ResourceId_VkJ2b"
},
"type": "ShellCommandActivity",
"command": "aws s3 cp s3://YOUR_BUCKET/dptest ./; date >> dptest; aws s3 cp ./dptest s3://YOUR_BUCKET/",
"onSuccess": {
"ref": "ActionId_kK5b2"
}
},
{
"subnetId": "YOUR_SUBNET",
"role": "DataPipelineDefaultRole",
"imageId": "YOUR_AMI",
"instanceType": "t2.micro",
"type": "Ec2Resource",
"terminateAfter": "10 Minutes",
"schedule": {
"ref": "DefaultSchedule"
},
"resourceRole": "YOUR_EC2_ROLE",
"instanceCount": "1",
"securityGroupIds": "YOUR_SG_ID",
"name": "DefaultResource1",
"keyPair": "YOUR_EC2_KEYPAIR",
"id": "ResourceId_VkJ2b",
"region": "ap-northeast-1"
},
{
"role": "DataPipelineDefaultRole",
"subject": "Fail: ShellCommandActivety",
"name": "DefaultAction2",
"id": "ActionId_dkw9J",
"message": "Fail: ShellCommandActivety",
"type": "SnsAlarm",
"topicArn": "arn:aws:sns:ap-northeast-1:111222333444:ShellCommandActivityTest"
},
{
"period": "15 Minutes",
"name": "Every 15 minutes",
"id": "DefaultSchedule",
"type": "Schedule",
"startAt": "FIRST_ACTIVATION_DATE_TIME"
}
],
"parameters": []
}

これは実際の吐き出したパイプラインの定義。ところどころオブジェクトの固有名のようなものが入っているが、これそのまま使えたようが気がする。(ちゃんとは確認していない)


Disclaimer

このメモは私の個人のものであり、私の雇用者を代表するものではありません。