LoginSignup
29
20

More than 5 years have passed since last update.

AWS S3にアップロードされたExcelファイルをLambdaで処理

Posted at

概要

  • AWSのS3にアップされたExcelファイルをLamda関数を使って処理する方法を試したので、手順を共有する。

課題

  • 定型のExcelファイルを使ってデータを納品する業務をシステム化したい。
  • なぜExcelか? 古い業界なので、いまでもExcelが活躍している。Excelマクロを組めるとヒーローだ。CSV/TSVのほうが機械処理には楽だが、人間にとってはExcelのほうが運用性・業務親和性が高い。
  • そこで担当者がExcelファイルをS3にアップしたら、その内容を自動的にRDSやDynamoに登録するような仕組みを実現したい

課題解決の方針

  • S3にExcelファイルがアップされたらEvent機能でSQSに通知して、それをLamda関数で処理するまでの範囲を本稿で検討する。S3EventからLambdaを直接キックしないのは、エラー再処理などの運用性を考慮しているからである。
  • システム構成図
    システム構成.png
  • 運用者がExcelファイルをS3の特定パスにアップすることは別のソリューションで行う。S3互換のファイル転送ソフトなどを用いることも可能。

手段の検討

  • Lambdaは、ランタイムでPython3系を選択する1。同期処理を考えなくてもよいバッチ処理のようなコーディングには簡易だからである。
  • Excelファイル読み取りでxlrdというライブラリを選択する。もしかすると古いExcelファイルも処理したくなるかもしれないので。他のライブラリは試していないので、これがベストな選択かどうかはわからない。そのあとの処理で集計するならPandasもよさそうだ。
  • xlrdを使うために、対象ファイルをローカルに取得する必要がある。Lambdaでは一般的にローカル領域として/tmpを使うのだが、インスタンスを共有しているのでファイル名がぶつからないように考慮する。/tmpが満杯になるともちろん処理が失敗するので、処理が終わったらファイルは削除するべき。
    参考 AWS Lambda の制限

構築手順

  • AWSで 実用的なLambda関数をつくったことがあるレベルの人を前提に以降で手順を紹介する。

S3バケットの作成

  • 新規バケットを作成する。バケット名、ARNをメモしておく。

SQSキューを作成

  • S3バケットと同じリージョンで、マネジメントコンソールからSQSを選択。
  • 新規キューを作成する。
    キュー名は notify_upload_excelfile (任意)
    標準キューを指定
    キューの詳細設定はデフォルトのままでよい。後で実際の業務にあわせて調整すればよい
  • キューが作成されたら、キューのURLとARNをメモしておく。
  • キューを選択して、アクセス許可でS3を追加する。
    方法は以下を参考に。サンプルのポリシーをコピペして、環境にあわせて修正すればよい。
    バケットを通知用に設定する (メッセージの宛先: SNS トピックおよび SQS キュー)

S3バケットにイベント通知を設定する

  • 作成したバケットのプロパティからEventsをクリック
    通知の追加
    cap001.png
    名前 : NotifyExcelFilePut(任意)
    「すべてのオブジェクト作成イベント」にチェック
    サフィックス: .xlsx
    送信先 SQSで上記で作成したキューARNを指定する。

Lambda 関数の作成

  • マネジメントコンソールのLambda画面で「一から作成」を選択
    名前 : function_readExcelfile
    ランタイム : Python 3.7
  • ロールは新規作成
    ロール名 : lambdaexec_readExcelfile(任意)
    作成後に以下アクセス許可を追加
    sqs:ChangeMessageVisibility
    sqs:DeleteMessage
    sqs:GetQueueAttributes
    sqs:ReceiveMessage
IAMポリシーの例
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "s3:GetObject",
                "logs:CreateLogStream",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "sqs:GetQueueAttributes",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:*:*:*",
                "arn:aws:sqs:us-west-2:xxxxxxxxxxx:notify_upload_excelfile(キューARN)",
                "arn:aws:s3:::xxxxxxxxxxxxx(バケットARN)/*"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}

参考: チュートリアル: AWS Lambda 関数をトリガーするように Amazon SQS キューへの着信メッセージを設定する

  • Lambda関数の作成
    ローカルPC上で適当なフォルダで以下のPythonファイルを作成する。
lambda_function.py
import logging
import os
import json
import boto3
import urllib.parse
from datetime import datetime
import xlrd
import random
import pprint

logger = logging.getLogger()
logger.setLevel(logging.INFO)

count_success=0
s3 = boto3.resource('s3')

def read_file(bucket,key):
    global count_success
    # ローカルの一時ファイルパスを生成する
    local_file_path = '/tmp/tmp_file_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S-') + str(random.randint(0,999999))
    logger.info('local_path: ' + local_file_path)
    try:
        # S3からファイルを一時パスにダウンロード
        bucket = s3.Bucket(bucket)
        bucket.download_file(key, local_file_path)
        #  ダウンロードしたファイルをExcelと解釈して読み込む(エラー処理はしていない)
        wb = xlrd.open_workbook(local_file_path)
        sheet = wb.sheet_by_index(0)
        # 特定のセルから値を取得する例
        #logger.info("Cell A01 is {0}".format(sheet.cell_value(rowx=0, colx=0)))
        # 各行を取り出して成形して表示(実際の処理ではこの部分をカスタマイズする)
        for rx in range(sheet.nrows):
            print(sheet.row(rx))
        count_success+=1
        # 不要な一時ファイルを削除
        if os.path.exists(local_file_path):
            os.remove(local_file_path)
            # logger.info('tmp-file removed : ' + local_file_path)
    except Exception as e:
        logger.info(str(e))

def parce_message(messages):
    messages_dict = json.loads(messages)
    if ('Records' in messages_dict):
        for record in messages_dict['Records']:
            input_backet = record['s3']['bucket']['name']
            input_key = urllib.parse.unquote_plus(record['s3']['object']['key'], encoding='utf-8')
            # logger.info('update:' + input_backet + ':' + input_key)
            read_file(input_backet,input_key)
    else:
        logger.warning('ignore invalid SQS message: ' + str(messages_dict))

def lambda_handler(event, context):
    for record in event['Records']:
       parce_message(record["body"])
    logger.info('Success ' + str(count_success) + ' files')
  • ライブラリをダウンロードしZipファイルを作成
    以下でローカルフォルダにxlrdライブラリを取得。キャッシュファイルは削除して、Zipファイルに固める
pip install xlrd -t .
  • ZipファイルをマネジメントコンソールのLambda関数の関数コードの箇所で、Zipファイルをロードして保存

  • Lamda関数のトリガーにSQSを指定
    上記キューのARNを指定
    バッチサイズは 1

  • S3バケットにExcelファイルをアップして、CloudWatchログで出力を確認する。

まとめ

  • S3にExcelファイルをアップして、Lambda関数で自動的に取得しExcelファイルの内容にアクセスできることを確認した。
  • 日本語処理については完全性は保証できないが、Windows10環境のOffice2016で作成したExcelファイル名が日本語、データに日本語を含む場合に日本語を処理できたことは確認した。

今後の課題

  • Excelファイルはxlrdの処理前にチェックしていないが、Lambda関数の実行リソースに制限があるのでExcelファイルが信用できない場合は、対策が必要である2
  • 現状のコードだと、コード中のtryで囲った範囲外でエラーした場合は、何度もリトライして処理が詰まる状態になる。キューのメッセージ数の監視と、エラーになった場合の処理フローや、キュー設定のチューニング、デッドレターキュー設定を検討すべきである。
  • SQSの仕様上、標準キューはメッセージ順序、排他取得を保証していない。またLambdaの処理が開始する前に、同じExcelファイルが更新される可能性があるので、Excelの処理ロジックは疎連携を考慮した内容にすべきである3

参考文献


  1. 慣れているのと個人的な趣向でLambdaはnodeかPython3の2択である 

  2. 例えばすごく巨大なExcelファイルがアップされたら、/tmpが溢れるので、上記コードだと攻撃可能である 

  3. メッセージ処理のトラブルは今回のテストでは観測できなかったので、そんなに心配しなくてもよいかも? 排他処理により複雑化して保守性が下がるのであれば、実際の業務システムでは、「一度に大量のExcelファイルをアップしたら動作保証しない」という運用ルールでカバーするのもアリ。FIFOキュー採用も検討してもよい。 

29
20
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
29
20