はじめに
Kinesis Streamに入れたログをlambdaで処理するときlambdaはシャードに対して直列で動きます
Kinesisにログが入るスピード < ログ1件あたりのlambdaの処理時間となると、ログが入ってから処理が完了するまでの時間がどんどん開いていきます
シャードを分割すればlambdaも並列で動くのですが、分割するほどログも多くない
ということで、シャードは一つですが非同期に並列で処理できる仕組みを考えました
言語はpython
自分自身を再帰的に呼び出します
参考
lambdaとkinesisの設定方法は他を参考にしてください
AWS Lambda編~Kinesisと連携してみる~
ストリーム型(kinesis stream)のAWS Lambdaの同時起動数とデータの取り方を整理
設定
デプロイしたlambdaにkinisisを紐づけます
一度に処理するログの数は10
コード
非同期処理へ投げる
- Kinesisへ投げるログはjson
- Kinesisから流れてきたログは配列に入れ、async_flagをつけて非同期実行へ投げる
- async_flagが付いている場合は、データを取得してループを回す
run.py
import json
import base64
import time
from aws_lambda import AwsLambdaModule
def run(event, context):
data_dict = {}
data_dict['async_flag'] = True
data_dict['data'] = []
if 'async_flag' in event:
# 非同期で呼び出される場合
main(event['data'])
else:
# 非同期で呼び出す場合
# キネシスストリームから起動した場合
for record in event['Records']:
# kinesisのデータを戻す
kinesis_data = load_kinesis_data(record['kinesis']['data'])
# 詰め替える
data_dict['data'].append(kinesis_data)
# 非同期で再帰呼び出し
lambda_function = AwsLambdaModule()
lambda_function.invoke_async(data_dict)
def main(data_):
for data in data_:
time.sleep(2)
print(data)
def load_kinesis_data(data):
"""
kinesisから受け取ったデータはエンコードされているため
"""
payload = base64.b64decode(data)
return json.loads(payload)
lambdaの非同期呼び出し
aws_lambda.py
import os
import json
import boto3
class AwsLambdaModule(object):
def __init__(self):
self.client = boto3.client('lambda')
self.function_name = os.environ.get('AWS_LAMBDA_FUNCTION_NAME')
def invoke_async(self, data):
"""
lambdaを非同期呼び出し
"""
if self.function_name is None:
return
print('### Asynchronous execution ###')
self.client.invoke(
FunctionName=self.function_name,
InvocationType="Event",
Payload=json.dumps(data)
)
結果
100連続でKinesisへログを挿入
{"test":"momonga"}
非同期処理への命令と2秒ごとにログをダンプしている処理が非同期で動作していることがわかります