LoginSignup
10
0

More than 1 year has passed since last update.

PrefectにData PipelineのShellCommandActivityで実装した処理を移植する

Last updated at Posted at 2022-12-19

前提・背景

Prefect Cloud + Fargateでバッチ処理のワークフローを作成するまで」に記載したように、現在我々のチームではAWS Data PipelineのShellCommandActivityを利用して「独自のAMIでEC2サーバーを立ち上げ、S3にあるソースコードをダウンロードして実行する」ような形で多数のバッチ処理を実行しています。

この処理をPrefectのFlowで移植するとどのようになるか試してみました。利用しているのはPrefect 2です。

実装

我々のユースケースによくある「EC2インスタンスを立ち上げて、そのインスタンス上で複数のコマンドを並列で実行する。終わったらシャットダウンする」というよう処理をPrefectで実装してみました。小規模なバッチがたくさんあるイメージです。

Taskの実装

そのためには、次の3つのコマンドを実装する必要があります。ドキュメントによると、成功・失敗時の通知はPrefect Cloud側で設定できるようです。

  1. EC2インスタンスを起動する
  2. EC2インスタンス上でコマンドを実行する
  3. EC2インスタンスを終了する

実はPrefectのTaskはasyncの記法に対応していて、そちらの書き方にトライしてみたかったのですが、boto3が未対応で、サードパーティ製のaioboto3もEC2は未対応だったので諦めました。

tasks.py
from __future__ import annotations
import boto3
from prefect import flow, task

@task
def run_ec2_instance(image_id: str, instance_type: str, instance_profile: str) -> str:
    """EC2インスタンスを起動する。

    Args:
        image_id (str): AMIのID
        instance_type (str): インスタンスタイプ
        instance_profile (str): インスタンスプロファイル

    Returns:
        str: 起動したEC2インスタンスのID
    """
    client = boto3.client('ec2')
    result = client.run_instances(
        ImageId=image_id,
        MinCount=1,
        MaxCount=1,
        InstanceType=instance_type,
        KeyName="キー名",
        SecurityGroupIds=["sg-<セキュリティグループID>"],
        SubnetId="subnet-<サブネットID>",
        IamInstanceProfile={
            'Name': instance_profile,
        },
        TagSpecifications=[
            {
                'ResourceType': 'instance',
                'Tags': [
                    {
                        'Key': 'Name',
                        'Value': 'prefect-test', # 実行するインスタンス名
                    },
                ]
            },
        ],
    )

    waiter = client.get_waiter('instance_status_ok')
    instance_id = result['Instances'][0]['InstanceId']
    waiter.wait(
        InstanceIds = [instance_id],
        WaiterConfig = {
            'Delay': 30,
            'MaxAttempts': 60
        }
    )
    return instance_id

@task
def execute_commands(instance_id: str, commands: list[str], terminate_after: int) -> str:
    """コマンドを実行する

    Args:
        instance_id (str): インスタンスID
        commands (list[str]): 実行するコマンド
        terminate_after (int): この時間を過ぎたら強制終了する(単位は分)

    Returns:
        str: 標準出力
    """
    client = boto3.client('ssm')
    response = client.send_command(
        DocumentName="AWS-RunShellScript",
        Parameters={'commands': commands},
        InstanceIds=[instance_id],
        CloudWatchOutputConfig={
            'CloudWatchLogGroupName': '/prefect-agent/ec2-batch', # ログ名は仮
            'CloudWatchOutputEnabled': True
        },
    )

    waiter = client.get_waiter('command_executed')
    command_id = response['Command']['CommandId']
    # バッチが失敗した場合は、waiterが例外を出してくれる
    waiter.wait(
        CommandId=command_id,
        InstanceId=instance_id,
        WaiterConfig={
            'Delay': 60, # 60秒ごとに確認
            'MaxAttempts': terminate_after
        }
    )

    output = client.get_command_invocation(
        CommandId=command_id,
        InstanceId=instance_id,
    )

    return output["StandardOutputContent"].strip()

@task
def terminate_ec2_instance(instance_id: str):
    """EC2インスタンスを終了する。

    Args:
        instance_id (str): インスタンスID
    """
    client = boto3.client('ec2')
    client.terminate_instances(
        InstanceIds=[instance_id]
    )
    waiter = client.get_waiter('instance_terminated')
    waiter.wait(
        InstanceIds = [instance_id],
        WaiterConfig = {
            'Delay': 30,
            'MaxAttempts': 60
        }
    )

少し細くすると、次のような注意点がありました。

  • SecurityGroupIdsなど共通でいいものは固定値(※実際は環境変数から読み込むつもり)で、 InstanceType など実行したいバッチによって変わるものはTaskの引数に指定
  • エラー時の調査のために、CloudWatch Logsで /prefect-agent/ec2-batch に出力している。実際はFlowのIDも入れたい…
  • EC2の起動時の確認方法に instance_status_okinstance_running があるが、コマンド実行可能なのは前者です。古い記事ですが「AWS CLIでのEC2インスタンス起動時Wait」が参考になりました

Flowの実装

そして、これらを組み合わせてFlowを実装してみます。前評判の通り、Pythonの普通の実装に近い感覚で書けました。

sample_flow.py
from tasks import run_ec2_instance, execute_commands, terminate_ec2_instance

@flow
def sample_flow():
    instance_id = run_ec2_instance(
        image_id="ami-******", # 自作のAMI
        instance_type="t2.micro",
        instance_profile='SSMInstanceRole', # ssmで接続するため、最低限この権限が必要
    )

    try:
        # taskはmapメソッドで複数の引数で並列実行できる
        futures = execute_commands.map(
            instance_id=instance_id,
            commands=[["echo 'hello task1'"], ["echo 'hello task2'"]],
            terminate_after=30,
        )
        # PrefectFutureのリストが返ってくるので、実行結果を出力
        # このオブジェクトは `.wait()` や `.result()` を呼び出すと実行完了まで待つ
        for x in futures:
            print(x.result())
    finally:
        terminate_ec2_instance(instance_id)


if __name__ == "__main__":
    sample_flow()

ちょっと困っているのが、「バッチの失敗時にも確実に後処理(※インスタンスを落とす)をしたい」ときに finally を使った書き方しかできなさそうな点にちょっと困っています。そのため次の点を調べているところです。

  • 何らかの原因でPrefect Agentが失敗したときに、EC2インスタンスの消し忘れが怖い。時間制限をつけたEC2インスタンスの起動方法や、定期的にチェックできる方法はないか?
  • 具体的にはインスタンス名に起動日時を入れるとか、タスクの実行名を入れる方法が無いか探しています

実行結果

これを実行すると、次のように表示されます。

% python sample_flow.py
20:34:05.094 | INFO    | prefect.engine - Created flow run 'mellow-quokka' for flow 'sample_flow'
20:34:07.534 | INFO    | Flow run 'mellow-quokka' - Created task run 'run_ec2_instance-5cbad45c-0' for task 'run_ec2_instance'
20:34:07.535 | INFO    | Flow run 'mellow-quokka' - Executing 'run_ec2_instance-5cbad45c-0' immediately...
20:36:41.580 | INFO    | Task run 'run_ec2_instance-5cbad45c-0' - Finished in state Completed()
20:36:41.885 | INFO    | Flow run 'mellow-quokka' - Created task run 'execute_commands-649a7a6e-1' for task 'execute_commands'
20:36:41.886 | INFO    | Flow run 'mellow-quokka' - Submitted task run 'execute_commands-649a7a6e-1' for execution.
20:36:41.968 | INFO    | Flow run 'mellow-quokka' - Created task run 'execute_commands-649a7a6e-0' for task 'execute_commands'
20:36:41.969 | INFO    | Flow run 'mellow-quokka' - Submitted task run 'execute_commands-649a7a6e-0' for execution.
20:37:43.843 | INFO    | Task run 'execute_commands-649a7a6e-1' - Finished in state Completed()
20:37:43.969 | INFO    | Task run 'execute_commands-649a7a6e-0' - Finished in state Completed()
hello task1
hello task2
20:37:44.296 | INFO    | Flow run 'mellow-quokka' - Created task run 'terminate_ec2_instance-c26f4e1b-0' for task 'terminate_ec2_instance'
20:37:44.297 | INFO    | Flow run 'mellow-quokka' - Executing 'terminate_ec2_instance-c26f4e1b-0' immediately...
20:38:46.383 | INFO    | Task run 'terminate_ec2_instance-c26f4e1b-0' - Finished in state Completed()
20:38:46.682 | INFO    | Flow run 'mellow-quokka' - Finished in state Completed('All states completed.'

CloudWatch Logsにもきちんと結果が残されていました。

スクリーンショット 2022-12-19 21.07.33.png

まとめと所感

今のところ、Prefectの実装の使い勝手はなかなか好感触です。特に、普段実装しているPythonのスクリプトと同じように、PC上で実際に動かして、試行錯誤しながら実装できるのが嬉しいです。

AgentをFargateで動かした場合、けっこうAgentに渡すRoleの権限に注意しないといけなさそうなことと、実運用を開始すると想定していなかった困り事がありそうな気はしますが、ひとまずDataPipelineからの移植は始められそうです。

10
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
0