LoginSignup
2

More than 5 years have passed since last update.

api-gateway+lambdaからlambda・AWS batchを非同期でキックしてみた記事

Last updated at Posted at 2017-12-06

最近はやりのchatopsをrubyで書いていたのですが、botが死ぬたびに原因調査を行うのがめんどくさくて、slackでメッセージを受け取るところと内部の処理を分離しました。
分離した処理はjenkinsのapiを使って処理していたのですが、せっかくなのでAWSに全てあげてしまうことにしました。
そこでlambdaからLambda・AWS Batchを非同期でキックした時のことを書こうと思います。

はじめに

本稿はslackにbotを多数動かす想定で設計しているので、個人でちゃちゃっと作りたい人はbotで完結したほうがいいと思います。
またlambdaの非同期でキックするところを中心に書くのでbot部分や周辺は割愛します。
(litaをECS上で動作 passなどはkmsで管理)

想定ユーザー

  • データ基盤の機能などの一部を社内ユーザーに解放したい
  • いくつかのbotを動かしたい
  • terraformで環境構築している
  • サーバーレスが好き

全体的な構成

以下構成図になります
- メッセージは全てlita一台で受け取ってApi-GatewayにPOST
- LambdaからLambdaかAWS batchを非同期で起動

スクリーンショット 2017-12-05 12.42.21.png

post.json
{
    "method":{
         "channel_name" : "****",
         "req_usr" : "***",
         "magic_words" : "特定の情報を渡す場所(ddl取る場合はスキーマとテーブル名など)"
    }
}

api-gatewayの部分

以下の記事にlambdaをキックするところまでを書いたので割愛します。
API Gatewayを叩いてLambdaからRedshiftにSQLを投げる(ついでにslackにsnippet通知)

lambda(kicker)の部分

  • post内容から呼び出したい関数名を取り出す
  • 関数名のlambdaがあればそれをキック
  • 関数名に対応するAWS batchがあればそれをキック
kicker.py
import simplejson as json
import boto3
import logging


logger = logging.getLogger()
logger.setLevel(logging.INFO)

def handler(event, context):
    event_dict = json.loads(event["body"])
    function_name = list(event_dict.keys())[0]
    if function_name == "kicker":
        status_code = 404
        body_text = "Kicker loop error : " + function_name
        logger.error(body_text)
    elif kick_lambda(function_name, event):
        status_code = 201
        body_text = "kick lambda : " + function_name
        logger.info(body_text)
    elif kick_batch(function_name, event):
        status_code = 201
        body_text = "kick batch : " + function_name
        logger.info(body_text)
    else:
        status_code = 404
        body_text = "No Method Error : " + function_name
        logger.error(body_text)

    responseObj = {
        "statusCode": status_code,
        "body": body_text
    }
    return responseObj


def kick_lambda(function_name, event):
    try:
        clientLambda = boto3.client("lambda")
        res = clientLambda.invoke(
            FunctionName=function_name,
            InvocationType="Event",
            Payload=json.dumps(event)
        )
        return True
    except:
        return False


def kick_batch(function_name, event):
    try:
        clientBatch = boto3.client("batch")
        res = clientBatch.submit_job(
            jobName=function_name,
            jobQueue=function_name + "_queue",
            jobDefinition=function_name + "_job_definition",
            parameters={
                'event' : json.dumps(event)
            }
        )
        return True
    except:
        return False
    return False


if __name__ == '__main__':
    handler('', '')

非同期で呼び出されたlambda(右側)の部分

eventをそのまま渡しているので以下の記事のように色々作ればOK
API Gatewayを叩いてLambdaからRedshiftにSQLを投げる(ついでにslackにsnippet通知)

非同期で呼び出されたAWS batch(右側)の部分

kickerで定義された名前に対応するものを作成しておけばOK
例のものであれば
- aws_batch_job_queueのname属性が{function_name}_queue
- aws_batch_job_definitionのname属性が{function_name}_job_definition

kicker.pyの一部
def kick_batch(function_name, event):
    try:
        clientBatch = boto3.client("batch")
        res = clientBatch.submit_job(
            jobName=function_name,
            jobQueue=function_name + "_queue",
            jobDefinition=function_name + "_job_definition",
            parameters={
                'event' : json.dumps(event)
            }
        )
        return True
    except:
        return False
    return False

終わりに

全体的にスッキリして機能追加などがかなり簡単になりました。
あとはterraform・lambdaのzipファイル・AWS batchのコンテナイメージをどうやって運用するかが考え所な感じがしました

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
2