LoginSignup
37
19

More than 5 years have passed since last update.

Run CommandとStep Functionsでサーバレス(?)にEC2のバッチを動かす

Last updated at Posted at 2017-06-11

動機

cronやdigdag-serverは便利で使い勝手がいいですが、常時サーバを動作させておかないといけないのが難点です。一日一回の業務のために、インスタンスを立ち上げっぱなしにしたり冗長構成にするのはコスト的にやめたいなという場合もあると思います。
そこで本記事では、EC2 Run CommandとStep Functionsを使って、EC2インスタンス起動、Dockerを使ったバッチ起動、EC2シャットダウンまでの一連の処理の設定の仕方を説明していこうと思います。

処理実装

State Machine

State Machineの概要は下記になります。基本的には、処理開始→数秒待つ→状態を監視→状態がOKなら次へ進む、という構成になっています。

※ Run Command実行時にたまにエラー(InvalidInstanceId)になります。実行前のWaitの時間を長くするなど対策が必要かもしれないです。

スクリーンショット 2017-06-12 2.53.26.png

下記のJSONをStep Functionsへ登録します。

step-functions.json
{
  "Comment": "Invoke job",
  "StartAt": "StartInstance",
  "States": {
    "StartInstance": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:889476386829:function:qiita-sample-start-instance",
      "Next": "WaitInstanceState"
    },
    "WaitInstanceState": {
      "Type": "Wait",
      "Seconds": 60,
      "Next": "ConfirmInstanceState"
    },
    "ConfirmInstanceState": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:889476386829:function:qiita-sample-confirm-instance-state",
      "Next": "ChoiceInstanceState"
    },
    "ChoiceInstanceState": {
      "Type": "Choice",
      "Default": "FailInstanceState",
      "Choices": [
        {
          "Variable": "$.instance_state",
          "StringEquals": "pending",
          "Next": "WaitInstanceState"
        },
        {
          "Variable": "$.instance_state",
          "StringEquals": "running",
          "Next": "StartJob"
        }
      ]
    },
    "FailInstanceState": {
      "Type": "Fail",
      "Cause": "Failed to launch instance"
    },
    "StartJob": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:889476386829:function:qiita-sample-start-job",
      "Retry": [
        {
          "ErrorEquals": [
            "States.TaskFailed"
          ],
          "IntervalSeconds": 30,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "Next": "WaitJob"
    },
    "WaitJob": {
      "Type": "Wait",
      "Seconds": 10,
      "Next": "ConfirmJobStatus"
    },
    "ConfirmJobStatus": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:889476386829:function:qiita-sample-confirm-job-status",
      "Next": "ChoiceJobStatus"
    },
    "ChoiceJobStatus": {
      "Type": "Choice",
      "Default": "WaitJob",
      "Choices": [
        {
          "Or": [
            {
              "Variable": "$.job_status",
              "StringEquals": "Failed"
            },
            {
              "Variable": "$.job_status",
              "StringEquals": "TimedOut"
            },
            {
              "Variable": "$.job_status",
              "StringEquals": "Cancelled"
            },
            {
              "Variable": "$.job_status",
              "StringEquals": "Success"
            }
          ],
          "Next": "TerminateInstance"
        }
      ]
    },
    "TerminateInstance": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:889476386829:function:qiita-sample-terminate-instance",
      "End": true
    }
  }
}

Lambda関数

前準備

Lambda関数へ割り当てられているrole(大抵の場合はlambda-default-roleという名前になっている)へ、下記の権限を割り振ってください。

  • ec2:RunInstances
  • iam:PassRole
  • ec2:DescribeInstances
  • ssm:SendCommand
  • ec2:TerminateInstances
  • states:StartExecution

また、同じroleへ下記のポリシーもアタッチしてください。

  • AmazonSSMReadOnlyAccess

さらに、EC2インスタンスにアタッチするIAM roleを作成し、下記ポリシーをアタッチしておいてください。(本記事ではssm-roleと名前をつけます)

  • AmazonSSMFullAccess

EC2インスタンスを起動する

EC2インスタンスを起動します。取得したインスタンスIDを次のLambdaへ渡します。

start_instance.py
import boto3
import base64


def lambda_handler(event, context):
    client = boto3.client('ec2', region_name='us-east-1')
    resp = client.run_instances(
        ImageId='ami-898b1a9f',
        MinCount=1,
        MaxCount=1,
        KeyName='my-key',
        SecurityGroups=['default'],
        UserData=_make_user_data(),
        InstanceType='m3.medium',
        IamInstanceProfile={
            'Name': 'ssm-role',
        },
    )

    event['instance_id'] = resp['Instances'][0]['InstanceId']

    return event

def _make_user_data():
    with open('user-data.txt', 'rb') as f:
        return base64.b64encode(f.read())
user-data.txt
#!/bin/bash -xe

cd /tmp
curl https://amazon-ssm-us-east-1.s3.amazonaws.com/latest/linux_amd64/amazon-ssm-agent.rpm -o amazon-ssm-agent.rpm
yum install -y amazon-ssm-agent.rpm

EC2インスタンスの起動状況を確認する

confirm_instance_state.py
import boto3


def lambda_handler(event, context):
    ec2 = boto3.resource('ec2', region_name='us-east-1')
    instance = ec2.Instance(event['instance_id'])

    event['instance_state'] = instance.state['Name']

    return event

バッチを起動する

EC2 Run CommandでEC2インスタンスへコマンドを発行していきます。本記事では説明のためにpublicなrepositoryからイメージをpullしていますが、本番利用ではprivate repositoryからイメージを利用することが多いと思うので、そのための処理も入れています。
また、dockerの起動オプションで、標準出力をCloudWatch Logsへログ転送するように設定すると便利です(あらかじめLog Groupを作っておく必要があると思います)。

start_job.py
import boto3


def lambda_handler(event, context):
    client = boto3.client('ssm', region_name='us-east-1')

    command = "docker run --log-driver=awslogs" \
              " --log-opt awslogs-group=hello-world" \
              " --log-opt awslogs-region=us-east-1 --rm" \
              " -i alpine:latest /bin/echo 'hello, world'"

    resp = client.send_command(
        InstanceIds=[event['instance_id']],
        DocumentName='AWS-RunShellScript',
        MaxConcurrency='1',
        Parameters={
            'commands': [
                "yum install -y docker",
                "service docker start",
                "eval $(aws ecr get-login --no-include-email --region us-east-1)",
                command,
            ],
        },
        TimeoutSeconds=3600,
    )

    event['command_id'] = resp['Command']['CommandId']

    return event

バッチ処理の状態を確認する

confirm_job_status.py
import boto3


def lambda_handler(event, context):
    client = boto3.client('ssm', region_name='us-east-1')

    resp = client.get_command_invocation(
        CommandId=event['command_id'],
        InstanceId=event['instance_id'],
    )

    event['job_status'] = resp['Status']

    return event

EC2インスタンスをシャットダウンする

terminate_instance.py
import boto3


def lambda_handler(event, context):
    ec2 = boto3.resource('ec2', region_name='us-east-1')
    instance = ec2.Instance(event['instance_id'])
    resp = instance.terminate()

    return event

Step Functionsを呼び出す

下記Lambda関数を定期実行するよう設定します。State Machine名は変わりやすいので環境変数にしておくと便利です。

invoke-step-functions.py
import boto3
import uuid
import os


def lambda_handler(event, context):
    account_id = event.get('account')

    state_machine_arn = 'arn:aws:states:us-east-1:%s:stateMachine:%s'\
                        % (account_id, os.environ.get("STATE_MACHINE_NAME"))
    client = boto3.client('stepfunctions', region_name='us-east-1')
    resp = client.start_execution(
        stateMachineArn=state_machine_arn,
        name=str(uuid.uuid4()),
    )

    return 'invoked step functions'

まとめ

本記事では、EC2 Run CommandとStep Functionsを使って定期的にバッチ処理を起動するやり方を説明しました。今回は簡単な実装にしましたが、スポットインスタンスの価格を調べて10%上乗せして落札できたらそちらを利用するなど、工夫すれば複雑な処理も可能です。
従来のPaaSサービスでは待ち時間もCPU課金されますが、Step Functionsは状態遷移回数での課金ですので新たな使い方ができると思います。

追記

Githubへ公開しました
https://github.com/runtakun/qiita-sample-step-functions

追記その2 (2017.11.30)

AWS Fargate( https://aws.amazon.com/jp/blogs/news/aws-fargate-a-product-overview/ )を使いましょう :)

37
19
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
37
19