本記事は「AWS LambdaとServerless Advent Calendar 2020」の17日目の記事です。
やりたいこと
- Amazon Redshiftにある10億レコードを越えるデータを1レコードずつ、特定の列を判定したりという、シンプルな処理だが全レコードを見ていく必要のある処理をしたい
従来やってた方法
- RedshiftからSQLでELTしてUNLOAD
- UNLOAD結果をEC2へS3からダウンロード
- JavaやPythonなどのプログラムでバッチ処理実行
- 結果をS3に格納
この方法でも良かったのですが、大規模データをEC2で高速に並行で処理する、、、前に少し考えてみると、あれLambdaが相性良いのでは?と思い試してみました。
具体的な方法
- MAXFILESIZEを指定してUNLOAD
- UNLOAD先をLambdaのイベントトリガで発火
Lambdaのおおよそのコード
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
outputdata = "";
try:
dlfilename ='/tmp/'+key.replace("/","")
s3client.download_file(bucket, key, dlfilename)
gzipfile = gzip.open(dlfilename, 'rt')
csvreader = csv.reader(gzipfile, delimiter=SEP, lineterminator=L_SEP, quoting=csv.QUOTE_NONE)
for line in csvreader:
#略
uploadbinary = gzip.compress(bytes(outputdata , 'utf-8'))
obj = s3resource.Object(processedbucket,processedkey)
obj.put( Body=uploadbinary )
except Exception as e:
print(e)
os.remove(dlfilename)
os.remove(dlfilename)
流れとしては、gz圧縮でUNLOADされたファイルをダウンロードし、処理したものをgzip圧縮し、再びS3に格納しています。
Redhisftのクエリ
(割愛)
PARALLEL ON gzip MAXFILESIZE 50MB allowoverwrite;
この時のUNLOAD先を、先のLambda関数のトリガに設定します。
注意点
タイムアウト
Lambdaは最大実行時間が15分のため、それ以内で収まるファイルサイズでのUNLOADが必要です。もちろん15分ギリギリは狙わず、余裕を持たせて終わるサイズでチューニングするのが良いと思います。
メモリ不足
アウトプット用のファイルデータをつくっていくうちに設定したメモリを越えるとOutOfMemoryErrorとなります。
ちょくちょく以下のようなものでデバッグしました。
print("memory size at outputdata:"+str(sys.getsizeof(outputdata)))
対策としては、メモリが上がると処理性能も上がるようなのと、多少のお金を掛けてエラーが減るならありと考え、今回の処理においてはギリギリをチューニングする意味は無いと考え、最大値の3008MBで全振りしてました。
(って書いてたら12月のアップデートで10,240MBまで上限がアップしておりました...)
今回の場合、データセットの特殊性も相まって事前のファイルサイズから処理後のデータ量が読み辛く「想定の倍」くらいにしています。それが丁度3GBくらいだったので、当時の3008MB全振りが丁度よかった感じです。
ディスクサイズ不足
Lambdaの/tmp ディレクトリのサイズは512MBです。
当然ですが、MAXFILESIZEを指定せず=分割せずにUNLOADして処理結果がこれを越えるとダウンロード時点でエラーとなります。
また、開発時にハマった点としては「予期せぬエラーで終了した時にファイルが消えてない状態」になり、その後コンテナが再利用されると、イベント自体は別のものでも過去に処理したファイルが残っている、という点です。
1コンテナで複数のイベントは処理されないことは保証されているのですが、イベントが終わった時に例えば/tmp/領域が綺麗になる訳では無い、ということですね。
なのでディスクサイズにおける対策は
・適切に小さいファイルサイズで処理する
・/tmp/を都度消す処理を入れる
と2点になるかと思います。開発時点ではEFSでハンドリングしやすくするのもありなのかもなぁ、と思わなくもないです。
適切なMAXFILESIZEのMB数があるかとは思うのですが、処理するデータの中身次第で処理の重さが変わることがあった点に加え、大きめのメモリで速攻で1イベントを終わらせる分にはコストが跳ね上がることもないため「メモリ多め」「ファイルサイズ少な目」というざっくりとしたチューニング方針を採りました。
感想
無事安定稼働しています。サーバーレスということで、気を遣わなければいけない点はあるものの、並列部分を任せられるのはやはり大きいですし、1並列あたりの処理も512MBという十分なファイル容量と潤沢なメモリは魅力的ですね。文中に書いてある2020/12のアップデートによる10,240MBのユースケースが気になります...