Kinesisを使うと、例えばデータファイルを行毎にストリームとして流すことが可能です。
またKinesisをLambdaトリガーとして使うことで、後続処理をLambdaで作ることができます。
アップロード
テストとして、以下のLambdaFunctionを作成し、10,000件のデータファイルを流してみました。
1行あたりのデータ量は、平均で2KiB程度です。
Lambda Function
import boto3
import urllib.parse
s3 = boto3.resource('s3',
aws_access_key_id='XXXXXXXXXX',
aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXX',
region_name='ap-northeast-1'
)
kinesis = boto3.client('kinesis',
aws_access_key_id='XXXXXXXXXX',
aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXX',
region_name='ap-northeast-1'
)
def put_kinesis(json):
try:
result = client.put_record(
StreamName = 'TEST_KINESIS',
Data = json,
PartitionKey = 'test'
)
print(result)
except Exception as e:
print('Kinesis put record excption')
print(e.message)
return False
def lambda_handler(event, context):
print('Start lambda_handler')
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
file_path = '/tmp/' + key
try:
bucket = s3.Bucket(bucket)
bucket.download_file(key, file_path)
print('Download completed.')
except Exception as e:
print(e.message)
print('!!download error!!')
return False
try:
print('Now uploading to Kinesis...')
f = open(file_path, 'r')
for line in f.readlines():
put_kinesis(line)
f.close
print('Upload completed.')
except Exception as e:
print(e.message)
return
print('End lambda_handler')
CloudWatch Log
ログ上でのLambdaFunctionの結果です。
約1分43秒で1万件のデータの流し込みが終わったことが分かります。
Kinesis Monitoring
またこの時、PutレコードのLatencyは最大で0.2msとなっています。
アップロードまとめ
Lambdaのメモリ上からKinesisへのアップロードは、殆どリアルタイムでアップロードされると考えていいでしょう。
Latencyだけで考えるなら、1万件のデータ(20MiB)も2秒でアップロードできることになります。
実際では合計で1分43秒かかっていますので、LambdaFunctionのポイントとしてはファイルを開く際にreadlinesを使ってメモリ上にファイルを全て展開することですが、それでも1秒間に100件程度(200KiB/s=1.6Mbps)のアップロードが限界のようです。
この事を考えると、1つ2KiB程度のデータ(json等)をストリームとして流す場合、シャード数は最小の1で十分だという事になります。
ダウンロード
LambdaでKinesisのデータを取得する場合、同時に最大10,000件まで(デフォルト100件)取得することができます。
今回は1,000件まで上げて確認してみました。
Lambda Function
import boto3
import base64
import json
s3 = boto3.resource('s3',
aws_access_key_id='XXXXXXXXXX',
aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXX',
region_name='ap-northeast-1'
)
def lambda_handler(event, context):
cnt = len(event['Records'])
print('Record count: {}'.format(str(cnt)))
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
payload = base64.b64decode(record['kinesis']['data']).decode('UTF-8')
json_obj = json.loads(payload)
key = 'product_{}.json'.format(json_obj['original_id'])
s3_obj = s3.Object('test-kinesis-data', key)
result = s3_obj.put(Body = payload)
return 'Successfully processed {} records.'.format(len(event['Records']))
CloudWatch Log
ログ上でのLambdaFunctionの結果です。
上図のログは一部ですが、最終的に8分20秒で全ての処理が終わりました。
Kinesis Monitoring
またこの時、GetレコードのLatencyは最大で18msとなっています。
アップロードはリアルタイムと言っていい速度でしたが、ダウンロードはそれに較べて100倍ほどのLatencyがある事が分かります。
ダウンロードまとめ
S3にオブジェクトを書き込む処理に時間がかかる事が分かります。
だいたい1,000件で50秒かかっているので、40KiB/sといったところでしょうか。
また、KinesisStreamからLambdaを起動した場合、前のLambdaの処理が正常終了するまでKinesisのトリムが終わらないことが分かります。
この為、LambdaFunctionを並列起動する為には、まだ何か工夫が必要なようです。
具体的には、LambdaFunctionがイベントを取得したタイミングでトリムを走らせる必要がありそうです。
まとめ
1万件のデータを1件ずつに分解しS3にアップロードするという単純な処理をテスト環境として用意してみましたが、Lambda単体では5分の壁を超える事はできませんでした。
ですが間にKinesisを挿れる事で、5分の壁を超えて処理を継続することができます。
後は並行処理の為の手段さえあれば、KinesisはLambdaを並行実行する為の最適なトリガーとなるのではないでしょうか。