はじめに
先日、S3へのファイルアップロードをトリガーに、別のS3にそのファイルをコピーする簡単なLambdaを作成してみたが、今度はAPI Geteway
経由でLambdaを実行する方法を試してみたので、その時の内容をメモとして残しておく。
※ 先日の記事はこちら。【AWSのLambdaを使ってS3間のファイルコピーを試してみた。】
今回やってみたい事
####片方のS3バケットの特定のプレフィックス中身を全て、もう片方のバケットに移動するLabmda①、および移動後のバケット中身(csvレコード)を取り出すLambda②を作成し、それらのLambdaをAPI Gateway経由で実行できる様にする。
今回使用するAWSの各サービスの説明は以下を参考に。(Lambda、S3については先日の記事と同じ説明文。)
■Lambda
イベントの発生に応じてプログラムを実行する環境を提供するクラウドコンピューティングサービス。(wikipedia引用)
(GCPで言う、Cloud Functionみたいなものと予想。裏側でプログラム実行用のコンテナが立ち上がるイメージだったので、VPCに属するサービスな気もしたが、実際はずっと起動しっぱなしという訳ではなく、10~15分起動したら落ちるらしい。)
設定でVPCに接続することもできるみたいで、関数内の処理でインターネットにアクセスする場合は必須らしい。
■S3
Webサービスのインタフェースを介してストレージを提供している。(wikipedia引用)
少しいじってみた感じだと、バケットのバージョニング機能があり、格納された全ドキュメントのバージョンを保存、取得、復元できるらしい。今回は無効にして作成すると思うが・・・
■API Gateway
あらゆる規模の REST、HTTP、および WebSocket API を作成、公開、維持、モニタリング、およびセキュア化するための AWS のサービス(公式サイトから引用)
使用するサービスの説明については以上の様なところで、具体的な手順としては、以下の流れで構築していく。
1."S3" のバケットを2つ作成する
2."Lambda" の作成
3."API Gateway" の作成
1."S3" のバケットを2つ作成する
バケットの作成自体は難しくないので、ここではその手順を書かないが、それぞれ以下の構成のバケットを作成する。
■バケット1
test_folder1(コンテンツ移動前のソース元フォルダ)
・test1.csv
・test2.csv
・test3.csv
・test_folderA.zip
(zip内には、test4.txt~test6.txtが)
※後でREST-APIよりデータを取得するため、csvファイルの中身は以下の様にしてある
■バケット2
test_folder2(コンテンツ移動先のフォルダ)
2."Lambda" の作成
Lambda① の作成
S3への書き込み権限も必要になるため、Lambda①に設定するロールについては今回S3FullAccess
のポリシーをアタッチする。以下のコードで、【バケット1⇒バケット2】のコピーを実現している。
import boto3
print('Loading function')
s3 = boto3.client('s3')
def lambda_handler(event, context):
# コピー元バケット・プレフィックスの指定
from_bucket = 'バケット1の名前'
from_prefix = 'test_folder1/'
# コピー先のバケット・フォルダを指定
to_bucket = 'バケット2の名前'
to_folder = 'test_folder2/'
# 各変数を出力
print('コピー元バケット', from_bucket)
print('コピー元プレフィックス', from_prefix)
print('コピー先バケット', to_bucket)
print('コピー先フォルダ', to_folder)
print('****************************************************')
try:
# バケット1のプレフィックス内のオブジェクトの一覧を取得する。
response = s3.list_objects_v2(Bucket=from_bucket, Prefix=from_prefix)
for content in response['Contents']:
if content['Key'] != from_prefix:
print('コピー元ファイルパス:', content['Key'])
# コピー先パスを生成
to_filepath = to_folder + content['Key'].split('/')[1]
print('コピー先ファイルパス:', to_filepath)
# バケット2へのコピー
s3.copy_object(Bucket=to_bucket, Key=to_filepath, CopySource={'Bucket': from_bucket, 'Key': content['Key']})
return "moving csvfile Successsfully!"
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
※取得できるオブジェクト数や実行時間には制限があるため、本格的使う時にはそこら辺もしっかり確認した方が良さそう。
Lambda② の作成
REST API経由より読み取り専用のLambdaとなるため、S3の読み取り権限のみのポリシーをアタッチする。(Lambda①とは別のロールを作成)
以下のコードが、バケット2内のcsvファイルを読み取り~出力。
# import pandas as pd
import json
import boto3
import csv
print('Loading function')
s3 = boto3.client('s3')
def lambda_handler(event, context):
# バケット2のバケット名・フォルダ名を指定
bucket = 'バケット2の名前'
prefix = 'test_folder2/'
# 各変数を出力
print('コピー元バケット', bucket)
print('コピー元プレフィックス', prefix)
print('****************************************************')
json_data = []
# csv抽出レコードの一致条件
filter = {
"age_max" : "",
"age_min" : "",
"sex" : ""
}
# REST APIよりGET時のパラメータ要素
# filter["age_max"] = 25
# filter["age_min"] = 20
# filter["sex"] = "male"
try:
# バケット2のプレフィックス内のオブジェクトの一覧を取得する。
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
# オブジェクトの数だけ繰り返し実行
for content in response['Contents']:
if content['Key'] != prefix:
# print('コピー元ファイルパス:', content['Key'])
# csvファイルのみ実行
if '.csv' in content['Key']:
# csvファイルの読み込み
csvfile = s3.get_object(Bucket=bucket, Key=content['Key'])
# csvファイルの中身を取得
csvcontent = csvfile['Body'].read().decode('utf-8')
# csvファイルを1行ずつ、json_dataに格納。
for row in csv.DictReader(csvcontent.split('\n')):
# フィルタ条件に一致するものみ、jsonデータとして追加
if param_judge(filter, row):
json_data.append(row)
#return json.dumps(json_data)
return json_data
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
# フィルター条件より、csvファイルの各行が該当レコードかどうかチェックする。
def param_judge(filter, csv_row):
# [age_max] フィルタリング
if filter["age_max"] != "" :
if int(csv_row["age"]) > filter["age_max"]:
return False
# [age_min] フィルタリング
if filter["age_min"] != "" :
if int(csv_row["age"]) < filter["age_min"]:
return False
# [sex] フィルタリング
if filter["sex"] != "" :
if csv_row["sex"] != filter["sex"]:
return False
return True
3."API Gateway" の作成
API Gatewayを作成する場合、まずはどのタイプのAPIを作成するのかを決めることになるが、今回はREST APIとする。
lambda①を実行するためのAPIを、以下の様に作成。 ※Lambda②の作成も基本は同じ!
リソースの作成は今回スキップ
リソースは、簡単に言うとAPIのパスみたいなものと認識している。
デフォルトは「/」となっていて、APIのエンドポイント(呼び出しURL)がhttps://api.example.amazonaws.com
だとすると、この直下にアクセスすることになる。
この配下に、さらにリソースを追加してパスを深くすることもできるが、今回は1つのLambdaの実行をAPI経由で外部からキックしたいだけなので、デフォルトの状態からリソースは追加しない。
メソッドの作成
アクションから『メソッドの作成』を選び、"GET" を選択。
※本来これらAPIは、直接データ場所にアクセスさせることなく、API経由で外部からデータの取得や登録を可能にする目的が強いため、この様なLambdaを叩くだけの使い方は一般的ではないと思うが、今回はAPI経由でLambdaを実行できるかのお試しなので一旦そういうのは気にしない。。
『API Gateway に、Lambda 関数を呼び出す権限を与えようとしています:』 と聞かれるので『OK』する。
※そうすると、Lambdaの関数側でもトリガーが追加されている事が確認できる。
メソッドが作成できたら早速テスト実行をしてみる。
上記の様にステータス:200
なっていて、レスポンス本文も意図した文字列(or数値など)が返ってきてれば、ひとまずは動いている。
テストが問題なければ、アクションから『APIのデプロイ』を選択してデプロイを実施。
外部からAPIをキックしてみる。
今回は "GET" メソッドで作成しているので、自分のブラウザから下記の赤枠で記載されているURLにアクセスして、Lambdaでreturn
に記載した内容が返されれば一旦の動作確認は完了。
Lambda②のAPI呼び出しURLにアクセスした場合(同様に作成してみた)
※試しに "PUT" メソッドで作成したAPIをデプロイし、ブラウザからURLを呼び出したが、{"message":"Missing Authentication Token"}
というレスポンスが返ってきて動作しない。
ブラウザ経由でURLにアクセスする場合、それは "GET" メソッドでアクセスしているので、その結果がこの様なレスポンスとして返ってきている。
POSTメソッド、PUTメソッド、DELETEメソッドで送信する方法はこちらのサイトが参考になった。
https://kapiecii.hatenablog.com/entry/2019/11/23/100000#%E3%83%96%E3%83%A9%E3%82%A6%E3%82%B6%E6%8B%A1%E5%BC%B5