LoginSignup
10
0

More than 1 year has passed since last update.

AWS Step Functionsでサーバレスなデータパイプラインを作った話

Last updated at Posted at 2022-12-12

こちらはAWS for Games Advent Calendar 2022 13日目の記事です。

はじめに

私は普段UBISOFT OSAKAでサーバーエンジニアとして働いています。
UBISOFT OSAKAではRocksmith+というギター学習ツールを開発しています。

image.png

私自身これまでギターを触ったことはありませんでしたが、入社をきっかけにRocksmith+でギターの練習を始めました。
練習のメニューが充実しており、初心者から上級者まで楽しめる内容になっています。

残念なのは今のところ日本ではプレイできないところでしょうか…
ただいま(執筆時点)日本でのリリースは準備中です。

Rocksmith+の紹介はさておき、データパイプラインのお話です。

ゲームで楽曲が使われるまで大まかな流れは次の通りです。

  1. ライセンサーから(楽曲の権利を持つ企業)から楽曲のメタ情報(タイトルやアーティスト名等)を受け取る
  2. 音源や画像等のデータを保存する
  3. ゲームで利用する形に加工する

「メタ情報を受け取る」というイベントから処理が始まることもあり、全体的にイベント駆動アーキテクチャで設計しました。

全体の設計について

イベント駆動のアーキテクチャのパターンの一つであるMediator Toporogyを採用しました。
ワークフローを管理・コントロールするEvent Mediatorが存在するのが特徴です。

採用の理由としては次の点が挙げられます。

  • ワークフローがコントロールしやすい
  • エラーハンドリング、リカバリー、リスタートがしやすい

一方でワークフローのモデリングが複雑になったり、スケーラビリティやパフォーマンスがトレードオフになります。
この点はStep Functionsやその他のAWSのサービスを利用することで解消できていると実感しています。

O'Reillyの「ソフトウェアアーキテクチャの基礎」1の第14章に出てくる図に置き換えてみると全体像は次のようなイメージです。

mediator.drawio.png

イベントを受け取るとSTEP1から順番に処理が実行されていきます。
Mediatorの仕事はあくまで全体のワークフローのコントロールなので、実際の処理はEvent Channelを通して別の処理に任せます。

Event Mediatorの実装方法は様々ですが、我々はStep Functionsで実装しました。
最終的にAWSのサービスで置き換えるとこのようになりました。
mediator_aws.drawio.png

Step Functions

AWSのサービスを組み合わせてビジネスロジックを作成できるサービスです。
Workflow Studio(GUI)も用意されているのでドラッグ&ドロップでぽちぽちするだけで簡単にロジックを組むことができます。
視覚的に作成する以外にJSONの定義を書いて作成することも出来ます。

Step Functionsを構成する要素

Step Functionsでは一連のワークフローをState Machineと呼び、State Machineの作業単位をStateと呼びます。
Stateにはいくつか種類がありますが、自分が使ったStateは主に以下の4つです。
(その他のStateは https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/concepts-states.html を参照)

  • Task
    • Lambdaの実行したりパラメータを渡してAWSの各種サービスを実行します
  • Choice
    • 入力によって処理を分岐させます
  • Parallel
    • 単一の入力に対して複数の処理を並行処理します
  • Map
    • 複数の入力に対して単一の処理を並行処理します

Step Functionsの作成方法

Design your workflow visually

Step Functions Workflow Studioを利用して視覚的に作成します。
AWSのサービスのリストから必要なサービスをドラッグ&ドロップで中央に移動してWorkflowを構築します。

image.png

Write your workflow in code

JSONの定義から作成します。
定義したWorkflowが右側に表示されます。

image.png

環境構築はTerraformで行ったのでほとんどJSONを編集してWorkflowを構築しました。
ただ最初のテンプレート作成のためにWorkflow Studioも利用しました。
「S3のPutObjecを利用したいけど、どう書けばいい?」となったときはこんな感じで定義だけコピーしてました。

image.png

Tips

Stateについて開発時に得たTipsをご紹介します。

  • Task
    • 非同期処理
  • Choice
    • Passの使い方
  • Parallel/Map

Task 非同期処理

Lambdaで外部のサービスのAPIを呼んだり、時間のかかる処理をしたいケース。
SNSでファンアウトしたりSQS+Lambdaの非同期処理黄金パターンをはさみたかったので非同期処理が必要でした。
(非同期処理黄金パターンについては https://pages.awscloud.com/rs/112-TZM-766/images/E-1_devday.pdf を参照)

同期で処理するタスクに以下の設定が必要です。

Step Functionsの定義

  • Resource.waitForTaskToken を付ける
  • HeartbeatSecondsを設定する
  • パラメータに$$.Task.Tokenを渡す

こちらにも注記されています。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/connect-to-resource.html#connect-wait-token
image.png
無期限に待機し続けるのでHeartbeatSecondsを忘れずにつけておきましょう。

callback_task.json
{
  "Comment": "Callback Sample",
  "StartAt": "Sample Topic",
  "States": {
    "Sample Topic": {
      "Type": "Task",
-     "Resource": "arn:aws:states:::sns:publish",
+     "Resource": "arn:aws:states:::sns:publish.waitForTaskToken",
+     "HeartbeatSeconds": 60,
      "Parameters": {
        "TopicArn": "sample-topic-arn",
        "Message": {
          "Input.$": "$",
+         "TaskToken.$": "$$.Task.Token"
        }
      },
      "End": true
    }
  }
}

処理側(Lambda)

  • 処理完了後、SendTaskSuccess(SendTaskFailure)のAPIをコールする
    • Task実行時にパラメータとして渡したTaskTokenが必要
send_task_token.py
import boto3

def lambda_handler(context, event)
    sfn = boto3.client("stepfunctions", region_name="us-east-1")
    sfn.send_task_success(taskToken=event["TaskToken"], output=json.dumps())
    return

Choice

ある条件に合致したら何か処理をしたい場合に利用します。

wrong_choice_task.json
{
  "Comment": "Choice Sample",
  "StartAt": "Sample Choice",
  "States": {
    "Sample Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.Param",
          "StringEquals": "Hoge",
          "Next": "Next Task"
        }
      ],
      "End": true //この設定はできない
    },
    "Next Task": {
      // 略
    }
  }
}

としたいところですが、出来ません :no_good:

ドキュメントにもはっきりと書いてありました。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-choice-state.html
image.png
Passを使います。Passは何もせず入力を出力に渡す処理です。
こんな感じです :ok_woman:

correct_choice_task.json
{
  "Comment": "Choice Sample",
  "StartAt": "Sample Choice",
  "States": {
    "Choice": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.Param",
          "StringEquals": "Hoge",
          "Next": "Next Task"
        }
      ],
-     "End": true //この設定はできない
+     "Default": "Pass"
    },
    "Next Task": {
      // 略
    },
+   "Pass": {
+     "Type": "Pass",
+     "End": true
+   }
  }
}

Parallel/Map 並行処理

  • 同じ入力で複数のステップを実行(Parallel)
{
  "Comment": "Parallel Sample",
  "StartAt": "Sample Parallel",
  "States": {
    "Sample Parallel": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Parallel Task 1",
          "States": {
            "Parallel Task 1": {
              "Type": "Task",
              // 略
              "End": true
            }
          }
        },
        {
          "StartAt": "Parallel Task 2",
          "States": {
            "Parallel Task 2": {
              "Type": "Task",
              // 略
              "End": true
            }
          }
        }
      ]
    }
  },
  "End": true
}

一つの入力に対しBranch内の処理を行います。

  • 複数の入力に対して同じステップを実行(Map)
map_task.json
{
  "Comment": "Map Sample",
  "StartAt": "Sample Map",
  "States": {
    "Sample Map": {
      "Type": "Map",
      "MaxConcurrency": 10,
      "Iterator": {
        "StartAt": "Sample Iterator",
        "States": {
          "Sample Iterator": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            // 略
            "End": true
          }
        }
      },
      "End": true
    }
  },
  "End": true
}

複数の入力に対してIterator内の処理を行います。
定義の書き方自体は直感的だと思います。

実際の処理ではParallelやMapの前後に別の処理があります。

  1. 入力
  2. 処理A
  3. Paralllel/Map
  4. 処理B

処理Aの出力とParallel/Mapの出力を処理Bの入力として利用したいケースもあると思います。

input.json
{
  "Payload": {
    "Id": "abc1234", // この値を次に渡したい
    "List": [
      "value_1",
      "value_2"
    ]
  }
}
map_task_add_path.json
{
  "Comment": "Map Sample",
  "StartAt": "Sample Map",
  "States": {
    "Map": {
      "Type": "Map",
      "MaxConcurrency": 10,
+     "ItemsPath": "$.Payload.List", // 配列のJSONパスを渡す
      "Iterator": {
        "StartAt": "Sample Iterator",
        "States": {
          "Sample Iterator": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            // 略
            "End": true
          }
        }
      },
+     "ResultPath": "$.TaskResult", // 結果のTaskResultに結果が追加される
      "End": true
    }
  },
  "End": true
}
output.json
{
  "Payload": {
    "Id": "abc1234",
    "List": [
      "value_1",
      "value_2"
    ]
  },
  "TaskResult": [
    "result_1",
    "result_2"
  ]
}

その他にも入出力に関して次のフィールドを利用可能です。
https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/concepts-input-output-filtering.html を参照)

  • InputPath
  • OutputPath
  • ResultPath
  • Parameters
  • ResultSelector

入出力の設定をするのが一番うまく行かなかった記憶があります。
実装当初は入出力の受け渡しは実際に動かして試しながら設定しました。

今ではData Flow Simulatorを利用して入出力を確認しながら進めています。
なれるまで少し癖があるように思います。

DBへのアクセス(おまけ)

並行処理が実行される(Lambdaが同時実行される)ことでコネクションが大量に発生し接続エラーになることがあります。
幸いRDSを利用する場合はLambdaからRDS Proxyを利用して接続することでコネクションの管理をRDS Proxyに任せることができます。

私の場合LambdaからAWS DocumentDBへの接続をしています。
global変数にコネクションを定義し同じ実行環境でコネクションを使いまわすことで対応しました。
(Lambdaの実行環境のライフサイクルについては https://aws.amazon.com/jp/blogs/news/operating-lambda-performance-optimization-part-1/ を参照)

詳しくはこちらのブログで紹介されています。
https://aws.amazon.com/jp/blogs/database/creating-a-rest-api-for-amazon-documentdb-with-mongodb-compatibility-with-amazon-api-gateway-and-aws-lambda/

connect_docdb.py
from pymongo import MongoClient

# GLOBAL
docdb_client = None
DOCDB_URL=xxx


def get_docdb_client():
    global docdb_client
    if docdb_client is None:
        docdb_client = MongoClient(DOCDB_URL)
    return docdb_client

まとめ

  • イベント駆動アーキテクチャのEvent MediatorとしてStep Functionsを利用できる。
  • Step FunctionsはAWSのサービスを組み合わせて(比較的)簡単にビジネスロジックを構築できる。
  • 簡単とはいえハマりどころもある。ドキュメントをよく確認する。

マイクロサービスと同じで、どの機能をどの処理に任せるのかが悩みどころかもしれません。

今思うともう少しLambdaを細かく分けてもよかったかなと思う部分もあります。
そういうときそのLambdaだけを修正すればいいという点もStep Functionsを利用する利点かなと思います。

それでは皆さんも良きStep Functionsライフを!

  1. https://www.oreilly.co.jp/books/9784873119823/

10
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
0