これは初老丸Advent Calendar 2017の15日目の記事です。
S3トリガーのLambda
皆さんLambda使ってますか?Lambdaいいですよね!
特にS3をトリガーに使うと、外部からの処理開始の契機はS3にファイル置くだけだし、どんどん置いていったら適当に並列にLambdaが呼ばれてスケールするしで性能のことも何も考えなくていいので最高です(たぶん)。
ただ、ちょっと注意点もあります。それは、「S3のイベント発火はたまに抜けが出る」という問題です。公式ドキュメントのどこに書いてあるかは探しきれてませんが、実際結構抜けます。
例
ここでは例として以下のようなアーキテクチャを考えます。
- 適当なバケットを作って、そこにファイルが置かれたらLambdaが起動する
- 起動したLambdaはそのファイルを処理する
- 処理が終わったらそのファイルを同じバケット内の「bak」というフォルダに移動させる
これ、しばらく動かしてみたら分かりますが、結構bakに移動しないで残ってるファイルが出てくるんですね。これがS3のイベント抜け(と私は呼んでる)です。これは困りました。
解法1:SQSを併せて使って処理を開始し、S3のイベント発火に頼らない
一つの解決案として、cloudpackさんのWhitePaperでは、「SQSにファイル名を同時に書き込み、時間起動したLambdaでpollして処理する」というアーキテクチャを挙げられています。
ここでは別解として、S3とLambdaだけで解決する方法を考えてみます。
解法2:S3を別のLambdaで見張る
要はS3にいつまでも処理されずに残っているファイルがあるのが問題なわけですから、それを見張っておいて抜けたイベントを再度無理矢理起こしてやればいいわけです。
処理としては
- 時間起動するLambdaを使う
- S3を見て、一定時間以上残っているファイルを一度bakフォルダに移す
- bakフォルダからそのファイルを戻す
でいけるはずです。
以下がサンプルコードになります。
# coding=utf-8
import boto3
import os
import re
from datetime import tzinfo, timedelta, datetime
BUCKET_NAME = '対象バケット名'
ZERO = timedelta(0)
class UTC(tzinfo):
def utcoffset(self, dt):
return ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return ZERO
s3 = boto3.resource('s3');
s3c = boto3.client('s3', region_name='リージョン名')
bucket = s3.Bucket(BUCKET_NAME);
time_threshold = timedelta(minutes=5) # 5分以上残っているものを対象にする
for object in bucket.objects.all():
if not re.findall(r'^bak/', object.key, flags=re.I):
key = object.key
gap = datetime.now(UTC()) - object.last_modified
if gap > time_threshold:
res = s3c.copy_object(Bucket = BUCKET_NAME, Key = 'back/'+key+'.bak',CopySource = {'Bucket': BUCKET_NAME,'Key': key})
res = s3c.copy_object(Bucket = BUCKET_NAME, Key = key,CopySource = {'Bucket': BUCKET_NAME,'Key': 'back/'+key+'.bak'})
とりあえずこいつを5分おきにでも動かしておくことで、イベント抜けはなくなりました。最大でも5分の遅延でちゃんと処理されていました。
利点
S3とLambdaだけで完結しているのでSQSまで使うのに比べて導入が楽(全体のアーキテクチャをいじらず後付けでいける)のは楽ちんです。
また、S3イベントから呼ばれるLambdaも、処理に失敗したら最後のファイル移動をしないで終了してしまえば5分後にはこいつがイベントを再発火してくれてリトライがかかるので、エラーハンドリングで手を抜けます(それがいいかどうかは別のお話ですが)。
注意点
bakフォルダ以下にファイルがたまってくると動作は遅くなってきますので、S3のアーカイブ機能で適当に消してやるなり、そもそもの処理済みファイルの移動先を別のバケットにしてやる等で適宜解決する必要があります。
また、copy_objectが成功するのを前提にしてるので、万一「bakに動かすのは成功したけど、戻すのに失敗した」時はどうにもなりません。そのあたりはご自身でエラーハンドリングを書き足してください。
ファイルを移動しないでもTAGを書き足すとかでもイベントを発火できないかなーと思ったりもしましたが試してませんのでごめんなさい。