3
4

More than 3 years have passed since last update.

AWS SAMとStepFunctionでAthenaの集計処理を自動化する

Last updated at Posted at 2020-08-07

2020-10-23 追記
StepFuntionにAthenaが統合され、ステートマシーンから直接AthenaのAPIを呼べるようになった。
これでLambdaを実装しなくてもクエリーを実行したりできる。
Call Athena with Step Functions | AWS Step Functions Developer Guide


AWS SAMを見ていたら AWS::Serverless::StateMachine というStepFunctionのコンポーネントがあったので、それを使ってAthenaの集計処理を実装してみた。
実際は集計期間に応じて処理を分岐してリトライ間隔など調整できるようにしたが、ここではシンプルにした。

余談:VSCode + AWS Toolkit

*.asl.json ファイルを編集するとlintやグラフの描画をリアルタイムに行ってくれるので便利だった。
できれば Fail タスクはもっと軽く扱って欲しい。

vscode+stepfunction

やりたいこと

下記の5タスクをステートマシーンにした。
この処理を週次/月次で実行する。

  1. 集計期間の計算
  2. Athenaへのクエリー
  3. クエリー進捗の確認/待機
  4. クエリー結果の加工
  5. S3への保存

StepFunctionでやるメリット

  • 各タスクがシンプルになる
    タスクはLambda(node.js)で実装したが、1つのファンクションで1つのことしかしないので各ファンクションがシンプルになる。

  • コードの分岐がなくなる
    Choise など使うとステートマシーン上で分岐できるので、コード上ではその分岐がなくなる。

  • コードのリトライ処理がなくなる
    エラー時のリトライやクエリー完了まで待機するのにエクスポネンシャルバックオフを使ったりできる。こういった処理を自分で実装する必要がなくなる。

  • テストがしやすい
    各Lambdaは処理が小さくなるのでローカルでのテストもしやすくなる。
    StepFunction自体が入出力の管理をしっかりしないといけないサービスなので、当然各Lambdaの入出力も分かりやすくせざるを得なくなる。
    また、AWS上でステートマシーンを実行すると各タスクの入出力と例外が表示されるので、どこでどういう結果になっているのか把握しやすい。

初期化

AWS SAMにはStepFunctionを利用したテンプレートが用意されているので、それを元に開発した。
sam init で下記のように選択していく。

$ sam init
Which template source would you like to use?
    1 - AWS Quick Start Templates
    2 - Custom Template Location
Choice: 1

Which runtime would you like to use?
    1 - nodejs12.x
    2 - python3.8
    3 - ruby2.7
    4 - go1.x
    5 - java11
    6 - dotnetcore3.1
    7 - nodejs10.x
    8 - python3.7
    9 - python3.6
    10 - python2.7
    11 - ruby2.5
    12 - java8
    13 - dotnetcore2.1
Runtime: 1

Project name [sam-app]:

Cloning app templates from https://github.com/awslabs/aws-sam-cli-app-templates.git

AWS quick start application templates:
    1 - Hello World Example
    2 - Step Functions Sample App (Stock Trader)
    3 - Quick Start: From Scratch
    4 - Quick Start: Scheduled Events
    5 - Quick Start: S3
    6 - Quick Start: SNS
    7 - Quick Start: SQS
    8 - Quick Start: Web Backend
Template selection: 2

-----------------------
Generating application:
-----------------------
Name: sam-app
Runtime: nodejs12.x
Dependency Manager: npm
Application Template: step-functions-sample-app
Output Directory: .

Next steps can be found in the README file at ./sam-app/README.md

この時点でこのような構成になっている。
functions 以下のLambdaを適当に追加・削除し、stock_trader.asl.jsontemplate.yaml を編集していく。

.
└── sam-app/
    ├── functions/
    │   ├── stock-buyer/
    :   :   :
    │   │   ├── app.js
    │   │   └── package.json
    │   ├── stock-checker/
    :   :   :
    │   │   ├── app.js
    │   │   └── package.json
    │   └── stock-seller/
    :       :
    │       ├── app.js
    │       └── package.json
    ├── statemachine/
    │   └── stock_trader.asl.json
    :
    └── template.yaml

最終的にはこうなった。

.
└── sam-app/
    ├── functions/
    │   ├── aggregation/
    :   :   :
    │   │   ├── app.js
    │   │   └── package.json
    │   ├── period-calculation/
    :   :   :
    │   │   ├── app.js
    │   │   └── package.json
    │   ├── store-result/
    :   :   :
    │   │   ├── app.js
    │   │   └── package.json
    │   ├── transform/
    :   :   :
    │   │   ├── app.js
    │   │   └── package.json
    │   └── wait-aggregation/
    :       :
    │       ├── app.js
    │       └── package.json
    ├── statemachine/
    │   └── athenaquery.asl.json
    :
    ├── samconfig.toml
    └── template.yaml

ステートマシーン

全体は省略するが、例えばAthenaのクエリーが完了するのを待つタスクはこのような定義になった。

ParametersQueryExecutionId.$ が対象クエリーを特定するIDで、前工程のタスクの出力を入力値として受け取っている。

Retry のところで InProgressError を受け取ってエクスポネンシャルバックオフでリトライするよう定義している。Lambdaのコードではステータスを確認して実行中であれば InProgressError をスローするだけ。長時間実行してクエリー完了を待つ必要がないのでタイムアウトの心配もない。

"WaitAggregation": {
  "Type": "Task",
  "Resource": "${WaitAggregationArn}",
  "Parameters": {
    "Source.$": "$$.Execution.Input",
    "QueryExecutionId.$": "$.QueryExecutionId"
  },
  "Next": "Transform",
  "Retry": [
    {
      "ErrorEquals": [
        "States.TaskFailed"
      ],
      "IntervalSeconds": 3,
      "MaxAttempts": 5,
      "BackoffRate": 1
    },
    {
      "ErrorEquals": [
        "InProgressError"
      ],
      "IntervalSeconds": 5,
      "MaxAttempts": 10,
      "BackoffRate": 1.2
    }
  ],
  "Catch": [
    {
      "ErrorEquals": [
        "InvalidRequestException"
      ],
      "Next": "Fail"
    }
  ]
}

テンプレート

CloudFormationテンプレートの抜粋。
デフォルトだとStepFunctionのStandardWorkflowで構築される。

DefinitionSubstitutions に設定した値は、ステートマシーンの定義で ${WaitAggregationArn} のようにプレースホルダーとして利用できる。

Events のところでEventBridgeのスケジュールも同時に定義している。
UTCで毎週土曜日15:05(JSTで毎週日曜00:05)と毎月月末15:05(JSTで毎月1日00:05)。

また Input のところでステートマシーンの初期入力値を渡している。
さきほどのステートマシーンの定義にある $$.Execution.Input にこの値が含まれる。

AthenaQueryStateMachine:
  Type: AWS::Serverless::StateMachine
  Properties:
    DefinitionUri: statemachine/athenaquery.asl.json
    DefinitionSubstitutions:
      PeriodCalculationArn: !GetAtt PeriodCalculation.Arn
      AggregationArn: !GetAtt Aggregation.Arn
      WaitAggregationArn: !GetAtt WaitAggregation.Arn
      TransformArn: !GetAtt Transform.Arn
      StoreResultArn: !GetAtt StoreResult.Arn
    Events:
      Weekly:
        Type: Schedule
        Properties:
          Description: Schedule to run the user ranking state machine every week
          Enabled: True
          Input: !Sub '{"IntervalUnit":"Weekly","Env":"${Environment}"}'
          Schedule: "cron(5 15 ? * SAT *)"
      Monthly:
        Type: Schedule
        Properties:
          Description: Schedule to run the user ranking state machine every month
          Enabled: True
          Input: !Sub '{"IntervalUnit":"Monthly","Env":"${Environment}"}'
          Schedule: "cron(5 15 L * ? *)"
    Policies:
      - LambdaInvokePolicy:
          FunctionName: !Ref PeriodCalculation
      - LambdaInvokePolicy:
          FunctionName: !Ref Aggregation
      - LambdaInvokePolicy:
          FunctionName: !Ref WaitAggregation
      - LambdaInvokePolicy:
          FunctionName: !Ref Transform
      - LambdaInvokePolicy:
          FunctionName: !Ref StoreResult

Lambda

Athena関連のタスクだけ抜粋。

Aggregation

ここでAthenaのクエリーを実行している。
startQueryExecution を実行すると QueryExecutionId が返ってくるので、それを次タスクに渡す。

const Athena = require("aws-sdk/clients/athena");
const athena = new Athena();

exports.lambdaHandler = async (event, context) => {
  const env = event.Source.Env;
  const query = "select ...";

  const params = {
    QueryString: query,
    QueryExecutionContext: {
      Database: `database_${env}`,
    },
    ResultConfiguration: {
      OutputLocation: `s3://athena-query-result-${env}/`,
    },
    WorkGroup: env,
  };
  const result = await athena.startQueryExecution(params).promise();
  const qid = result.QueryExecutionId;

  const ret = {
    QueryExecutionId: qid,
  };
  return ret;
}

WaitAggregation

QueryExecutionId を受け取りクエリーが完了するまで監視する。
完了したら結果を取得して次のタスクに渡している。
前述の通り InProgressError が出ている間は何度も実行されるタスク。

今回は結果セットが小さかったので直接次のタスクに渡した。サイズが大きい場合は QueryExecutionId だけ渡すなり、別のDBを経由するなりしないといけないかもしれない。

getQueryExecution(QueryExecutionId) で実行情報を取得し、ステータスを見て完了していれば getQueryResults(QueryExecutionId) で結果を取得する。

const Athena = require("aws-sdk/clients/athena");
const athena = new Athena();

class InProgressError extends Error {
  constructor(...params) {
    super(...params);
    this.name = this.constructor.name;
  }
}

exports.lambdaHandler = async (event, context) => {
  const qid = event.QueryExecutionId;

  const params = { QueryExecutionId: qid };
  const exec = await athena.getQueryExecution(params).promise();

  const list = [];
  const status = exec.QueryExecution.Status.State;

  switch (status) {
    case "QUEUED":
    case "RUNNING":
      throw new InProgressError("InProgress");
      break;
    case "FAILED":
      throw new Error("Failed");
      break;
    case "CANCELLED":
      throw new Error("Canceled");
      break;
    default:
      const result = await athena.getQueryResults(params).promise();
      const rs = result.ResultSet;
      const fields = rs.Rows.shift();
      for (const row of rs.Rows) {
        const processed = processRow(fields, row);
        list.push(processed);
      }
      break;
  }

  const ret = {
    QueryResult: list,
  };
  return ret;
}

デプロイ

初回デプロイは sam deploy -g で行う。
設定を保存すると samconfig.toml が生成され、次回からは sam deploy だけでデプロイできる。

sam build
sam deploy -g
3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4