Help us understand the problem. What is going on with this article?

S3トリガーでLambdaを使うときの、イベント抜けに対する一つの解法

More than 1 year has passed since last update.

これは初老丸Advent Calendar 2017の15日目の記事です。

S3トリガーのLambda

皆さんLambda使ってますか?Lambdaいいですよね!
特にS3をトリガーに使うと、外部からの処理開始の契機はS3にファイル置くだけだし、どんどん置いていったら適当に並列にLambdaが呼ばれてスケールするしで性能のことも何も考えなくていいので最高です(たぶん)。

ただ、ちょっと注意点もあります。それは、「S3のイベント発火はたまに抜けが出る」という問題です。公式ドキュメントのどこに書いてあるかは探しきれてませんが、実際結構抜けます。

ここでは例として以下のようなアーキテクチャを考えます。

  • 適当なバケットを作って、そこにファイルが置かれたらLambdaが起動する
  • 起動したLambdaはそのファイルを処理する
  • 処理が終わったらそのファイルを同じバケット内の「bak」というフォルダに移動させる

これ、しばらく動かしてみたら分かりますが、結構bakに移動しないで残ってるファイルが出てくるんですね。これがS3のイベント抜け(と私は呼んでる)です。これは困りました。

解法1:SQSを併せて使って処理を開始し、S3のイベント発火に頼らない

一つの解決案として、cloudpackさんのWhitePaperでは、「SQSにファイル名を同時に書き込み、時間起動したLambdaでpollして処理する」というアーキテクチャを挙げられています。

ここでは別解として、S3とLambdaだけで解決する方法を考えてみます。

解法2:S3を別のLambdaで見張る

要はS3にいつまでも処理されずに残っているファイルがあるのが問題なわけですから、それを見張っておいて抜けたイベントを再度無理矢理起こしてやればいいわけです。

処理としては

  • 時間起動するLambdaを使う
  • S3を見て、一定時間以上残っているファイルを一度bakフォルダに移す
  • bakフォルダからそのファイルを戻す

でいけるはずです。
以下がサンプルコードになります。

python
# coding=utf-8
import boto3
import os
import re
from datetime import tzinfo, timedelta, datetime

BUCKET_NAME = '対象バケット名'

ZERO = timedelta(0)

class UTC(tzinfo):
  def utcoffset(self, dt):
    return ZERO
  def tzname(self, dt):
    return "UTC"
  def dst(self, dt):
    return ZERO

s3 = boto3.resource('s3');
s3c = boto3.client('s3', region_name='リージョン名')
bucket = s3.Bucket(BUCKET_NAME);

time_threshold = timedelta(minutes=5) # 5分以上残っているものを対象にする

for object in bucket.objects.all():
  if not re.findall(r'^bak/', object.key, flags=re.I):
        key = object.key
        gap = datetime.now(UTC()) - object.last_modified
        if gap > time_threshold:
            res = s3c.copy_object(Bucket = BUCKET_NAME, Key = 'back/'+key+'.bak',CopySource = {'Bucket': BUCKET_NAME,'Key': key})
            res = s3c.copy_object(Bucket = BUCKET_NAME, Key = key,CopySource = {'Bucket': BUCKET_NAME,'Key': 'back/'+key+'.bak'})

とりあえずこいつを5分おきにでも動かしておくことで、イベント抜けはなくなりました。最大でも5分の遅延でちゃんと処理されていました。

利点

S3とLambdaだけで完結しているのでSQSまで使うのに比べて導入が楽(全体のアーキテクチャをいじらず後付けでいける)のは楽ちんです。
また、S3イベントから呼ばれるLambdaも、処理に失敗したら最後のファイル移動をしないで終了してしまえば5分後にはこいつがイベントを再発火してくれてリトライがかかるので、エラーハンドリングで手を抜けます(それがいいかどうかは別のお話ですが)。

注意点

bakフォルダ以下にファイルがたまってくると動作は遅くなってきますので、S3のアーカイブ機能で適当に消してやるなり、そもそもの処理済みファイルの移動先を別のバケットにしてやる等で適宜解決する必要があります。
また、copy_objectが成功するのを前提にしてるので、万一「bakに動かすのは成功したけど、戻すのに失敗した」時はどうにもなりません。そのあたりはご自身でエラーハンドリングを書き足してください。

ファイルを移動しないでもTAGを書き足すとかでもイベントを発火できないかなーと思ったりもしましたが試してませんのでごめんなさい。

kkimura
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした