Help us understand the problem. What is going on with this article?

AWS Kinesis を使った Lambda 並行実行検証

More than 1 year has passed since last update.

Kinesisを使うと、例えばデータファイルを行毎にストリームとして流すことが可能です。
またKinesisをLambdaトリガーとして使うことで、後続処理をLambdaで作ることができます。

アップロード

テストとして、以下のLambdaFunctionを作成し、10,000件のデータファイルを流してみました。
1行あたりのデータ量は、平均で2KiB程度です。

Lambda Function

kinesis_upload.py
import boto3
import urllib.parse

s3 = boto3.resource('s3',
    aws_access_key_id='XXXXXXXXXX',
    aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXX',
    region_name='ap-northeast-1'
)

kinesis = boto3.client('kinesis',
    aws_access_key_id='XXXXXXXXXX',
    aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXX',
    region_name='ap-northeast-1'
)

def put_kinesis(json):
    try:
        result = client.put_record(
            StreamName = 'TEST_KINESIS',
            Data = json,
            PartitionKey = 'test'
        )
        print(result)

    except Exception as e:
        print('Kinesis put record excption')
        print(e.message)
        return False

def lambda_handler(event, context):
    print('Start lambda_handler')

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')

    file_path = '/tmp/' + key

    try:
        bucket = s3.Bucket(bucket)
        bucket.download_file(key, file_path)
        print('Download completed.')

    except Exception as e:
        print(e.message)
        print('!!download error!!')
        return False

    try:
        print('Now uploading to Kinesis...')
        f = open(file_path, 'r')
        for line in f.readlines():
            put_kinesis(line)

        f.close
        print('Upload completed.')

    except Exception as e:
        print(e.message)
        return

    print('End lambda_handler')

CloudWatch Log

ログ上でのLambdaFunctionの結果です。

スクリーンショット 2018-02-16 1.27.03.png

約1分43秒で1万件のデータの流し込みが終わったことが分かります。

Kinesis Monitoring

またこの時、PutレコードのLatencyは最大で0.2msとなっています。
スクリーンショット 2018-02-16 1.34.38.png

アップロードまとめ

Lambdaのメモリ上からKinesisへのアップロードは、殆どリアルタイムでアップロードされると考えていいでしょう。
Latencyだけで考えるなら、1万件のデータ(20MiB)も2秒でアップロードできることになります。
実際では合計で1分43秒かかっていますので、LambdaFunctionのポイントとしてはファイルを開く際にreadlinesを使ってメモリ上にファイルを全て展開することですが、それでも1秒間に100件程度(200KiB/s=1.6Mbps)のアップロードが限界のようです。
この事を考えると、1つ2KiB程度のデータ(json等)をストリームとして流す場合、シャード数は最小の1で十分だという事になります。

ダウンロード

LambdaでKinesisのデータを取得する場合、同時に最大10,000件まで(デフォルト100件)取得することができます。
今回は1,000件まで上げて確認してみました。

Lambda Function

kinesis_stream.py
import boto3
import base64
import json

s3 = boto3.resource('s3',
    aws_access_key_id='XXXXXXXXXX',
    aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXX',
    region_name='ap-northeast-1'
)

def lambda_handler(event, context):
    cnt = len(event['Records'])
    print('Record count: {}'.format(str(cnt)))

    for record in event['Records']:
        # Kinesis data is base64 encoded so decode here
        payload = base64.b64decode(record['kinesis']['data']).decode('UTF-8')

        json_obj = json.loads(payload)
        key = 'product_{}.json'.format(json_obj['original_id'])

        s3_obj = s3.Object('test-kinesis-data', key)
        result = s3_obj.put(Body = payload)

    return 'Successfully processed {} records.'.format(len(event['Records']))

CloudWatch Log

ログ上でのLambdaFunctionの結果です。

スクリーンショット 2018-02-16 1.55.29.png

上図のログは一部ですが、最終的に8分20秒で全ての処理が終わりました。

Kinesis Monitoring

またこの時、GetレコードのLatencyは最大で18msとなっています。
アップロードはリアルタイムと言っていい速度でしたが、ダウンロードはそれに較べて100倍ほどのLatencyがある事が分かります。
スクリーンショット 2018-02-16 1.58.41.png

ダウンロードまとめ

S3にオブジェクトを書き込む処理に時間がかかる事が分かります。
だいたい1,000件で50秒かかっているので、40KiB/sといったところでしょうか。
また、KinesisStreamからLambdaを起動した場合、前のLambdaの処理が正常終了するまでKinesisのトリムが終わらないことが分かります。
この為、LambdaFunctionを並列起動する為には、まだ何か工夫が必要なようです。
具体的には、LambdaFunctionがイベントを取得したタイミングでトリムを走らせる必要がありそうです。

まとめ

1万件のデータを1件ずつに分解しS3にアップロードするという単純な処理をテスト環境として用意してみましたが、Lambda単体では5分の壁を超える事はできませんでした。
ですが間にKinesisを挿れる事で、5分の壁を超えて処理を継続することができます。
後は並行処理の為の手段さえあれば、KinesisはLambdaを並行実行する為の最適なトリガーとなるのではないでしょうか。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away