18
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Prefect Cloud + Fargateでバッチ処理のワークフローを作成するまで

Last updated at Posted at 2022-12-15

前提・背景

LIFULLにはMAMという有料集客支援を行う内製ツールがあり、広告出稿関連のデータをBigQueryで管理していて、100個以上のバッチ処理を定期実行しています。MAMは「マーケティングオートメーション|CRMではない、MAを考えてみた。」にもある通り、このような責務を担っています。

・インハウスデータ蓄積・分析基盤の構築
・広告適性評価のためのデータドリブンアトリビューションモデル構築
・適切な広告宣伝費の試算

MAMではバッチ処理の管理のため、初期実装時からAWS Data Pipelineを利用していたのですが、ついこの間「2023年2月28日からmaintenance modeになる」という表示がされ始めてしまいました。AWSのサポートによると「コンソールからアクセスできなくなる」状態になるらしく、この状態で運用していくのは正直きつそうです。

AWS Data Pipeline へのコンソールアクセスは、2023 年 2 月 28 日に廃止されます。この日以降、お客様はコンソールから AWS Data Pipeline にアクセスできなくなります。なお、引き続き、コマンドラインインターフェイスと API を通じて AWS Data Pipeline にアクセスできます。AWS Data Pipeline サービスはメンテナンスモードであり、新しいリージョンにサービスを拡張する予定はないことにご注意頂ければと存じます。

前述の通り、CLIやAPIを介した利用は可能となっておりますので、引き続き当該サービスをご利用いただく際には、CLIやAPIからご利用いただけるよう準備を進めていただければと存じます。
また、ご懸念の既存の環境につきましては、廃止されることはなく、引き続きご利用いただけますのでご安心いただければと存じます。

なお、AWS Data Pipeline の代替をご検討の際には、AWS Glue または Amazon Managed Workflows for Apache Airflow をご覧いただければと存じます。

AWSサポートからはMWAA (Amazon Managed Workflows for Apache Airflow)を薦められているのですが、我々はPrefectというデータフローツールの利用を検討しています。次のような観点で良さそうだと判断したからです。

  • 元々ShellCommandActivityを利用し、「独自のAMIでEC2サーバーを立ち上げ、S3にあるソースコードをダウンロードして実行する」ような形で実行していた。一旦そのまま「EC2インスタンスを立ち上げてシェルスクリプトを実行する」形で移植したいため、Airflowはちょっとオーバースペックだ
  • Airflowに比べ、構文やUIがシンプルで使い勝手が良さそう
  • バッチの運用メンバーの人数は少なく、3人以内のユースケースなので無料のPersonalプランで収まる(※プライベートの開発や、少人数のベンチャー企業でも利用しやすそうです)
  • 将来独自管理のAWSで構築した環境から「データ活用を促進するためのデータプラットフォーム開発」にある全社のKubernets + ArgoWorkflowのバッチ基盤に移行したいと思っていて、その前段階としてDocker化を行いたいと思っていた。バッチをECSで起動するものと併存でき、この方針について問題を産まない

ところが、そこでドキュメントや利用者のブログを参考に環境を整えようとすると、いくつかの点で詰まってしまっていました。

  • Prefect 2系が最新だが、Prefect 1系の解説記事が多い。1と2では設計やコマンドもかなり変わっている
  • ワークフローの管理はクラウドサービス(Prefect Cloud)が、コードの実行管理はユーザーが自前の環境にインストールして実行する(Agent)ような役割分担のHybrid Model1と呼ばれる構成に困惑してしまった。Hybrid Modelの要点を掴みづらい
  • Agentを自前のAWS環境で実行するために試行錯誤が必要だった。様々な環境(AWS, GCP, Azureなど)向けのサンプルはあるものの、まだ間違いがあることがある

また、この記事はAWSの設定方法などを説明する(また、チーム内への説明に使う)のが目的のため、Prefect自体のインストール方法や用語(Flowなど)の説明は疎かになっていると思います。公式ドキュメントを参照してください。

Prefectとは

Prefectの概要

既に「次世代のワークフロー管理ツールPrefectでMLワークフローを構築する」という素晴らしい記事があり、Airflowと比較した形での説明がまとめられています。主にPrefect 1の解説ですが、この点の内容は変わっていないはずです。詳しく知りたい方はリンク先を読んでください。

PrefectはAirFlowArgoWorkflowのようなワークフロー管理ツールで、2018年にリリースされました。
CEOのJeremiah Lowin氏が元Airflowの開発者ということもあり、Airflowのイケていない部分を解決する思想の元作られています。
公式ドキュメントでも、Why Not Airflow?という強気な内容でAirflowと対比してPrefectの特徴を取り上げています。
Airflowと同様、PrefectでもPythonを用いてワークフローを記述します。

PrefectのHybrid Modelについて

まず「Python+Prefectでゼロから始めるデータパイプライン構築」から引用します。

また、Prefectでは、ハイブリッドモデルを採用しております。ワークフローの実施を担う基盤とワークフローの監視やスケジュール実施が基盤(Prefect Cloud)が分離しており、Prefect Cloud上には、データやソースコードが保存されずに、タスクのスケジュール実施や監視のみが行われます。

以下は私が社内ドキュメントのために作った図なのですが、素直にPrefectを使うと次のような形になると思います。

スクリーンショット 2022-12-15 16.35.14.png

  • Prefect Cloudは、データ自体やソースコードの内容は知らず、ymlの設定ファイルのみを受け取っている。設定ファイルには「どんなタイミングでどのflowを実行するか」程度の内容しかない
  • ソースコード自体はユーザー環境にインストールされたAgentにデプロイされる
  • 「Prefect Cloudが適切なタイミングでコードの実行内容をキューイングし、Agentがその内容を取りに行く」ような設計思想になっている

そのため、Prefectのユーザーは、実行管理のソフトウェアを管理せずに済みつつ、機密性の高いデータについての処理を安全に実行することができます(と主張されています1)。またPrefect CloudがAWSの権限を持たずに済むのも良いです。

最初、私はこのモチベーションが分からなかったため、「ソースコードとは別に、なぜわざわざymlの設定ファイルを生成するんだろう?」みたいにチュートリアルの手順も非常に複雑に見えてしまっていました。公式ドキュメント(Deployments)にもある図を引用すると、デプロイコマンドの実行時に「Prefect API(Prefect Cloud)には設定のみ(※実線)が、AgentにはStorage(S3などの自前環境)を経由して実際のコード(※点線)が送信される」ことが示されています。

スクリーンショット 2022-12-15 16.33.12.png

また、私は試していませんが、自分の環境にUIをインストールして実行することもできるようです。

実際にFargateでAgentを起動する

サンプルとして、Slack APIでメッセージ投稿するだけのFlowを作って、Prefect Cloudから手動実行してみます。(※Slack投稿なのは深い意味はなく、何かIOのある簡単な処理のサンプルとしてちょうどよかっただけです)

基本的にPrefect RecipesにあるAgent on ECS Fargate with AWS CLIを踏襲しているのですが、いくつか補足しています。

①Blockの設定

まず、Agentにプログラムを送信するときに使うBlocksを作ります。Prefect Cloudの画面のBlocksからS3を選択すると、次の画面に行くはずなので、利用したいS3のパスとBlock Nameを入力しましょう。ひとまず sample-s3-block という名前で作成したことにします。

スクリーンショット 2022-12-15 17.20.46.png

※Access Key IDなどはサンプルとして表示されているもので、流出してるわけではありません😅

②ECRにAgentのDockerイメージを登録

公式のイメージがあり、基本的にそのまま立ち上げるだけなのです。ただ、ストレージとしてS3を利用するためには追加でs3fsをインストールする必要があり、S3以外でも同様みたいです。

次のDockerfileを用意し、ECRに登録します。

Dockerfile
FROM prefecthq/prefect:2-latest
RUN pip install s3fs

ECRへのpush方法などは、AWSの公式ドキュメントを参考にしてください。コンソールでECRを見ると具体的なコマンドが表示されるので、そこまで困らないはずです。

③IAM Roleを作成

prefect-recipesの指示通り、trust-policy.jsonを元にIAM Roleを作成します。

登録コマンド
aws iam create-role --role-name PrefectECSRole --assume-role-policy-document file://trust-policy.json

# StorageにS3を利用するので、2番目のコマンドでS3の参照権限も追加する必要があります
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess --role-name PrefectECSRole
trust-policy.json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "ecs-tasks.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

④タスク定義を作成

サンプルコードの prefect-agent-td.jsonの以下の値を補完します。

  1. WORK_QUEUE_ID: 利用したいキューの名前(この記事ではsample-queueとします)
  2. PREFECT_API_KEY: こちらのドキュメントに生成方法が載っています
  3. PREFECT_API_URL: こちらのドキュメントを参照。ACCOUNT-IDWORKSPACE-IDが何か分からなかったのですが、Prefect CloudにアクセスするURLに入ってました。

また、次の点も変更する必要があります。

  • "taskRoleArn""executionRoleArn"にある<>も、自分のAWS環境のAccount IDに変更する必要があります
  • "image"にある"prefecthq/prefect:2-latest"を、先ほど作成したECRのパスに変更する
  • サンプルの"taskRoleArn"がおそらく間違っていて、実際はprefectTaskRoleではなくPrefectECSRoleに修正する必要がある。(以下は修正済み)
prefect-agent-td.json
{
    "family": "prefect-agent",
    "requiresCompatibilities": [
        "FARGATE"
    ],
    "networkMode": "awsvpc",
    "cpu": "512",
    "memory": "1024",
    "taskRoleArn": "arn:aws:iam::<>:role/PrefectECSRole",
    "executionRoleArn": "arn:aws:iam::<>:role/ecsTaskExecutionRole",
    "containerDefinitions": [
        {
            "name": "prefect-agent",
            "image": "prefecthq/prefect:2-latest",
            "essential": true,
            "command": [
                "prefect",
                "agent",
                "start",
                "<WORK_QUUE_ID>"
            ],
            "environment": [
                {
                    "name": "PREFECT_API_KEY",
                    "value": "<your-key>"
                },
                {
                    "name": "PREFECT_API_URL",
                    "value": "https://api.prefect.cloud/api/accounts/[ACCOUNT-ID]/workspaces/[WORKSPACE-ID]"
                },
                {
                    "name": "PREFECT_LOGGING_LEVEL",
                    "value": "INFO"
                }
            ],
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-group": "/ecs/prefect-agent",
                    "awslogs-region": "us-east-1",
                    "awslogs-stream-prefix": "ecs",
                    "awslogs-create-group": "true"
                }
            }
        }
    ]
}

そして、以下のコマンドでタスク定義を作成します。

aws ecs register-task-definition --cli-input-json file://<full_path_to_task_definition_file>/prefect-agent-td.json

⑤Agentのサービスを起動

以下のコマンドを補完し、実行すると、Fargateのサービスが立ち上がってAgentが起動します。

  • サブネット名
  • セキュリティグループ名
  • クラスター名(※事前にクラスターを立ち上げておく必要があります。AWSのドキュメントを参照してください)
  • タグ
aws ecs create-service
--service-name prefect-agent \
--task-definition prefect-agent:latest \
--desired-count 1 \
--launch-type FARGATE \
--platform-version LATEST \
--cluster <起動したいクラスター名> \
--network-configuration "awsvpcConfiguration={subnets=[subnet-<起動したいサブネット>],securityGroups=[sg-<起動したいセキュリティグループ>],assignPublicIp=ENABLED}" \
--tags key=key1,value=value1 key=key2,value=value2 key=key3,value=value3 # 設定したいタグの内容

これもAgent on ECS Fargate with AWS CLIにある最後のコマンドですが、いくつか修正しています。

⑥デプロイ

最後に、実際にデプロイしてみましょう。

slack.py
# サンプルコード
import json
import requests
from prefect import flow

@flow
def post_slack():
    requests.post(
        "https://hooks.slack.com/services/****Slackの投稿エンドポイント****",
        data=json.dumps({"text": "Hi from Prefect!"}),
        headers={'content-type': 'application/json'}
    )

if __name__ == "__main__":
    post_slack()

まず、以下のコマンドでブラウザ上で認証画面が開きます。もしCI等で自動化する場合は、ドキュメントにある通りAPI_KEYを利用することになります。

prefect cloud login

以下のコマンドでデプロイメントを作成します。

  • slack:post_slack が「 slack.pypost_slack のFlowを実行する」という意味
  • -n がデプロイメントの名前。ここでは slack とする
  • -q がキューの名前。手順④で指定したものと同じものです
  • -sb がプログラムをデプロイするブロック名
prefect deployment build slack:post_slack -n slack -q sample-queue -sb s3/sample-s3-block -a

次に <デプロイメントの名前>-deployment.yml ができているはずなので、それを反映します。

prefect deployment apply slack-deployment.yaml

詳細は公式ドキュメントを参照してください。

⑦手動実行してみる

もしデプロイが成功していれば、次のようにFlowsの画面にslackが追加されているはずです。

スクリーンショット 2022-12-15 18.24.33.png

ここをクリックすると、三点︙からQuick runで実行できます。(Custom runでコマンドライン引数を指定して実行することもできるようです)

スクリーンショット 2022-12-15 18.30.28.png

これでSlackにサンプルメッセージが投稿されていていれば設定完了です。

まとめ・所感

いくつかドキュメントで分かりづらい点があることを除いて、今のところかなり使い勝手がよさそうなプロダクトだと感じています。これからData Pipelineで運用していた既存のバッチ処理を、徐々にPrefectに移行してみて、実運用での使い勝手などを検証していこうと思います。

  • PricingのPersonalプランにある通り、3人以内の小さいチームや個人開発でも始めやすい
  • データやプログラム、AWSの権限などをクラウドサービス側に置かなくていいので安心

今のところ、検証をしていてやや注意が必要そうな点は次の点です。

  • Prefect Cloudで、エラー時のstack traceやログの内容が表示されている。もし機密性の高いデータを扱うなら、(当たり前ですが)ログやエラー内容にそれらの情報が含まれないように気をつける必要がある
  • PersonalプランではService Accountsが作れず、APIキーが個人に紐付いてしまう(※Personalなのでまあそうだなって感じもしますが)

最後に、この記事は細かい部分は再検証できておらず、不備があるかもしれません。もし見つけた方はコメントや編集リクエストで教えて頂けると嬉しいです。

  1. https://medium.com/the-prefect-blog/the-prefect-hybrid-model-1b70c7fd296 2

18
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?