search
LoginSignup
55

More than 5 years have passed since last update.

posted at

updated at

AWS Data Pipelineをスケジューラとして使う

AWS Data Pipeline。名前からするとETLやデータの移動のために使われるイメージが強いんだけど、実はCronぽいスケジューラとしても使えるので、ポイントをまとめておく。

方針は単純で、 Data Pipelineをシェルスクリプトを起動するためだけに利用するという感じ。

Data Pipeline、いろいろ覚えることがあって若干めんどくさいところもあるが 個別のサーバーに依存しないスケジューラを使える というのはそれなりに価値があるところだと思う。

処理の定義(アクティビティオブジェクト)

Data PipelineのアクティビティにはShellCommandActivityというものがあって、これはOS上で単純にシェルスクリプトを実行してくれる。

dp1.png

  • Commandというパラメータに直接シェルスクリプトを渡してやればOK。もう少しちゃんと管理したいならScriptUrlというパラメータにS3上のシェルスクリプトのパスを渡してやることもできる。
  • RunsOnまたはWorkerGroupでこのスクリプトを実行するリソースを定義する。
  • Scheduleで実行するスケジュールを定義。
  • On SuccessOn Failで成功時、失敗時の通知などを定義できる。

リソース(リソースオブジェクト)

リソースを用意する方法は2つある。

Data Pipeline側でEC2を起動して使う

アクティビティオブジェクトにてRunOnを使う場合。下記のようにEC2を定義して起動できる。

dp2.png

ポイントはここにもScheduleがあること。この場合、上記のSchellCommandActivityと同じEvery 15 minutesという名前を持ったスケジュールオブジェクトを参照しているので、以下のような流れで処理が実行される。

  1. 15分に1回、このEC2が起動される
  2. そしてシェルスクリプトが実行される
  3. Terminate Afterで指定されているように10分後、このEC2は削除される

もちろん、アクティビティオブジェクトとリソースオブジェクトを、別々のスケジュールで管理することもできる。

既存のサーバーを利用する

アクティビティオブジェクトにてWorkerGroupを指定する場合、自分で用意したEC2やオンプレミスのサーバー上でアクティビティを実行することができる。仕組みは単純で、TaskRunnerと呼ばれるエージェントを任意のサーバーで起動してやればOK。

TaskRunnerは下記から入手できる。
http://aws.amazon.com/developertools/AWS-Data-Pipeline/1920924250474601

そして以下のようにWorkerGroupの名前を指定して起動してやればOK。

$ java -jar TaskRunner-1.0.jar --config ~/credentials.json --workerGroup=myWorkerGroup --region=MyRegion --logUri=s3://mybucket/foldername

スケジューリング(スケジュールオブジェクト)

スケジュールについてはスクリーンショットを見てもらえば特に解説するようなところはない。

dp3.png

成否通知(アクションオブジェクト)

下記のようにSNSのTopicARNを指定してあれば、アクティビティの成功時、失敗時に通知を飛ばせる。

dp4.png

実行状況の確認

もちろん実行状況の確認もできる。。。が、若干見づらいというか、慣れが必要な気がする。

dp5.png

CLIで見たほうがなんとなくわかりやすい気がする。

$ aws datapipeline list-runs --pipeline-id df-022729633UUVCN2OHKNX

       Name                                                Scheduled Start      Status
       ID                                                  Started              Ended
---------------------------------------------------------------------------------------------------
   1.  ResourceId_VkJ2b                                    2015-06-07T05:57:13  FINISHED
       @ResourceId_VkJ2b_2015-06-07T05:57:13               2015-06-07T05:57:19  2015-06-07T06:08:53

   2.  ActivityId_vRKcJ                                    2015-06-07T05:57:13  FINISHED
       @ActivityId_vRKcJ_2015-06-07T05:57:13               2015-06-07T05:57:19  2015-06-07T06:06:50

   3.  ResourceId_VkJ2b                                    2015-06-07T06:12:13  FINISHED
       @ResourceId_VkJ2b_2015-06-07T06:12:13               2015-06-07T06:12:28  2015-06-07T06:20:02

   4.  ActivityId_vRKcJ                                    2015-06-07T06:12:13  FINISHED
       @ActivityId_vRKcJ_2015-06-07T06:12:13               2015-06-07T06:03:28  2015-06-07T06:17:59

   5.  ResourceId_VkJ2b                                    2015-06-07T06:27:13  FINISHED
       @ResourceId_VkJ2b_2015-06-07T06:27:13               2015-06-07T06:27:28  2015-06-07T06:31:49

   6.  ActivityId_vRKcJ                                    2015-06-07T06:27:13  FINISHED
       @ActivityId_vRKcJ_2015-06-07T06:27:13               2015-06-07T06:20:01  2015-06-07T06:29:46

マネージメントコンソールなしで利用する。

マネジメントコンソールのArchitectと呼ばれるGUIは好き嫌いが別れるところ・・・というか嫌いな人のほうが多そうな気がする笑。コレなしで利用するための方針としてはPutPipelineDefinitionというAPIを使ってJSON形式で定義したパイプラインを渡してやればよい。とはいえいきなりこれをソラで書くのはつらすぎるので、一度ArchitectにJSON出力機能がついているので、一度ざっくり定義したパイプラインを下記のように吐き出しておいて、あとは適当に必要に応じた修正をしてCLIを使ってaws datapipeline put-pipeline-definitionあたりで放り込んでやればOK。

{
  "objects": [
    {
      "failureAndRerunMode": "CASCADE",
      "schedule": {
        "ref": "DefaultSchedule"
      },
      "resourceRole": "general",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "s3://YOUR_LOG_BUCKET/datapipeline/",
      "scheduleType": "cron",
      "name": "Default",
      "id": "Default"
    },
    {
      "role": "DataPipelineDefaultRole",
      "subject": "Success: ShellCommandActivety",
      "name": "DefaultAction1",
      "id": "ActionId_kK5b2",
      "message": "Success: ShellCommandActivety",
      "type": "SnsAlarm",
      "topicArn": "arn:aws:sns:ap-northeast-1:111222333444:ShellCommandActivityTest"
    },
    {
      "schedule": {
        "ref": "DefaultSchedule"
      },
      "onFail": {
        "ref": "ActionId_dkw9J"
      },
      "name": "date",
      "id": "ActivityId_vRKcJ",
      "runsOn": {
        "ref": "ResourceId_VkJ2b"
      },
      "type": "ShellCommandActivity",
      "command": "aws s3 cp s3://YOUR_BUCKET/dptest ./; date >> dptest; aws s3 cp ./dptest s3://YOUR_BUCKET/",
      "onSuccess": {
        "ref": "ActionId_kK5b2"
      }
    },
    {
      "subnetId": "YOUR_SUBNET",
      "role": "DataPipelineDefaultRole",
      "imageId": "YOUR_AMI",
      "instanceType": "t2.micro",
      "type": "Ec2Resource",
      "terminateAfter": "10 Minutes",
      "schedule": {
        "ref": "DefaultSchedule"
      },
      "resourceRole": "YOUR_EC2_ROLE",
      "instanceCount": "1",
      "securityGroupIds": "YOUR_SG_ID",
      "name": "DefaultResource1",
      "keyPair": "YOUR_EC2_KEYPAIR",
      "id": "ResourceId_VkJ2b",
      "region": "ap-northeast-1"
    },
    {
      "role": "DataPipelineDefaultRole",
      "subject": "Fail: ShellCommandActivety",
      "name": "DefaultAction2",
      "id": "ActionId_dkw9J",
      "message": "Fail: ShellCommandActivety",
      "type": "SnsAlarm",
      "topicArn": "arn:aws:sns:ap-northeast-1:111222333444:ShellCommandActivityTest"
    },
    {
      "period": "15 Minutes",
      "name": "Every 15 minutes",
      "id": "DefaultSchedule",
      "type": "Schedule",
      "startAt": "FIRST_ACTIVATION_DATE_TIME"
    }
  ],
  "parameters": []
}

これは実際の吐き出したパイプラインの定義。ところどころオブジェクトの固有名のようなものが入っているが、これそのまま使えたようが気がする。(ちゃんとは確認していない)

Disclaimer

このメモは私の個人のものであり、私の雇用者を代表するものではありません。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
55