AWS LambdaがPythonに対応したので試しに使ってみました。
今回はS3のバケット間ファイルコピーに使ったのですが、色々とはまりどころがあったので共有したいと思います。
やりたいこと
- s3のバケット内に存在するファイルを別バケットにコピーしたい
- シングルプロセスでコピーすると遅いのでマルチプロセスで同時にバケットコピーしたい
- AWS Lambda Pythonを使いたい
主に3番目の理由からやってみました。
やったこと
Lambda functionを作ってs3バケットを取得、コピーを並列で行うスクリプトを実装しました。
Lambda Functionの作成
Lambda Functionを作成します。
-
Create a Lambda Function
をクリック
Select blue print
使用するテンプレートを選択します。
-
hello-world-python
を選択
Configure function
Lambda functionの基本的な設定をします。
- Name: Lambda Function名
- 例:s3LogBucketCopy
- Description: Lambda Functionの説明
- 例:copy logs between buckets
- Runtime: 実行環境
- Python2.7
Lambda function code
実行されるプログラムコードを提供します。
以下の3種類の中から選択することができます。
- 画面上でコードを編集
- 自身のマシンからコードをアップロード
- s3からコードをアップロード
pythonの標準ライブラリやboto3以外のライブラリをインポートする必要がある場合は、2または3の方法を選択する必要があります。
詳細はこちらにまとまっていますので、興味のある方は参照してください。
ちなみに今回は、標準ライブラリとboto3のみを使用するため、1の方法で実装しています。
後で実装するので、最初は特に変更しません。
Lambda function handler and role
- Handler: 実行するハンドラの名前(module名.function名)
- 例:lambda_function.s3_log_copy_handler
- Role: Lambdaの実行権限(s3などのリソースへのアクセス権限など)
- 例:S3 execution Role
Advanced settings
使用可能なメモリやタイムアウト時間の設定をします。
- Memory(MB): 使用可能なメモリ
- 例:128MB
- Timeout: タイムアウト時間
- 例:5 min
Review
設定の確認をします。
問題なければCreate Function
を選択します
スクリプトの実装
multi_processでコピーするスクリプトを実装します。
以下、簡単なサンプルです。
#! /user/local/bin/python
# -*- coding:utf-8 -*-
import boto3
from multiprocessing import Process
def parallel_copy_bucket(s3client, source_bucket, dest_bucket, prefix):
'''
s3バケットを並列でコピーする
'''
# バケットをコピーする
def copy_bucket(s3client, dest_bucket, copy_source, key):
s3client.copy_object(Bucket=dest_bucket, CopySource=copy_source, Key=key)
# list_objectだと1000件までしかデータが取れないので注意
result = s3client.list_objects(
Bucket=source_bucket,
Prefix=prefix
)
# コピー元のkey一覧を取得してコピー
if 'Contents' in result:
keys = [content['Key'] for content in result['Contents']]
p = None
for key in keys:
copy_source = '{}/{}'.format(source_bucket, key)
p = Process(target=copy_bucket, args=(s3client, dest_bucket, copy_source, key))
p.start()
if p:
p.join()
# 実行時に呼ばれるハンドラ
def s3_log_copy_handler(event, context):
source_bucket = event["source_bucket"] # コピー元バケット
dest_bucket = event["dest_bucket"] # コピー先バケット
prefixes = event["prefixes"] # コピー元のファイル名の条件
s3client = boto3.client('s3')
for prefix in prefixes:
print("Start loading {}".format(prefix))
parallel_copy_bucket(s3client, source_bucket, dest_bucket, prefix)
print("Complete loading")
テスト実行
Actions
ボタンからConfigure Sample Event
を設定
ハンドラに渡すパラメータを設定
例えば、s3の構成が以下のようになっていた場合
- samplelogs.source # コピー元バケット
- /key1
- hogehoge.dat
- /key2
- fugafuga.dat
- samplelogs.dest # コピー先バケット
以下のようにJSONを設定します。
{
"source_bucket": "samplelogs.source",
"dest_bucket": "samplelogs.dest",
"prefixes" : [
"key1",
"key2"
]
}
ハマったところ
Roleでのs3バケットへの処理の許可
デフォルトのS3 Execution Ruleでは、s3:GetObject
とs3:PutObject
しか定義されていません。
このときに、s3client.list_objects()
を呼び出すと、A client error (AccessDenied) occurred: Access Denied
というエラーがでてしまいます。
このメソッドは、S3:GetObject
では実行できず、S3:ListObejct
という別の実行権限が必要になります。
そのため、Policyにs3:ListObject
を追加する必要があります。
multiprocessing.Pool
マルチプロセスでの実行時に、プールを指定するとOSErrors - [Errno 38] Function not implemented
というエラーが出てしまいます。
これは、poolを保持するために必要なOS権限をLambdaで実行する場合に持っていないため起きる問題です。
Poolの設定を外して実行する必要があります。
TimeOutの設定
Lambdaでは、実行時間が指定した値を超えたときにタイムアウトするように設定する必要があります。
タイムアウトの最大値は300sec(5min)となっているため、それ以上実行に時間がかかるものについては、実行を完了することができません。
そのため、ある程度大きなサイズのファイルを持つバケットの場合は、数回に分けてLambda Functionを実行する必要があります。
所感
やっぱり使いどころかなという気がしますが、アラートやプッシュ通知、小さめのファイル転送などの軽めの処理に向いているのかなと思いました。
反対に、重い処理を書くのには適さないようです。
また、APIエンドポイントを持てるようになったので、超軽量なAPIにも向いているかもしれません。今度試してみようと思います。
こちらLambda Functionを使って嬉しいところと残念なところをまとめました。
- 嬉しいところ
- 簡単なバッチ処理を書くのにインスタンスを立てる必要がない
- 最低限の設定で簡単にバッチを実装できる
- aws内のリソースにアクセスするのがIAM Roleで管理できるので簡単
- boto3が標準で使える
- 残念なところ
- Python3系に未対応
- 標準パッケージやboto3以外のインポートが面倒
- 作ったコードの管理が難しい
- 最大のタイムアウト時間が短い
参考
https://boto3.readthedocs.org/en/latest/
http://qiita.com/m-sakano/items/c53ba194a8574f44e78a
http://www.perrygeo.com/running-python-with-compiled-code-on-aws-lambda.html