GoでLambdaを書いてる記事が意外と少なかったので、備忘録を兼ねてさくっとまとめます。
ハマりどころもメモ程度にいくつか併記したので、別途ググってみてください。
完成したコードは最後にあります。
また、DynamoDBのテーブルは「アプリ内のお知らせ機能で新しいお知らせを打つために、データのCSVをバッチ処理でDynamoDBに書き込む」という目的でつくったものです。
後半でそんな感じの記述が出てきますので、それぞれの用途に応じてよしなに読み替え書き換えてください。
諸々のバージョン
module hoge
go 1.16
require (
github.com/aws/aws-lambda-go v1.27.1
github.com/aws/aws-sdk-go v1.42.25
)
Goは1.16である必要はなく、動かしたいものが動くバージョンであればOKです。
aws-sdk-goのREADMEには「Go1.5でgo get
するならちょっと注意が必要だよ」みたいな注意書きがあるので、少なくとも1.5以上なら動くのでしょう。
(早速の余談ですが、Go1.16からパッケージはgo get ...
ではなくgo install ...
+ go mod tidy
で管理することが推奨されています。)
AWS SAM CLIはbrewでインストールしました。お好きな方法でどうぞ。
% brew tap aws/tap
% brew install aws-sam-cli
% sam --version
SAM CLI, version 1.36.0
DynamoDBには既に対応するテーブルが作成済みであるとします。
aws-sdk-goとaws-lambda-goでLambda関数を書く
セッションの作成
aws-sdk-goおきまりのセッションを作成するところから始めます。
データが保存されたS3と、書き込む先のDynamoDBの両方へのアクセスで必要です。
// セッション
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String("ap-northeast-1"),
}))
// S3クライアント
svc := s3.New(sess)
// DynamoDBクライアント
ddb := dynamodb.New(sess, aws.NewConfig().WithRegion("ap-northeast-1"))
トリガーとなるイベント
Lambdaの発火条件であるイベントオブジェクトをhandler関数が受け取ることができます。
S3へのアップロードをはじめとするAWSリソース関連の各種イベントオブジェクトは、aws-sdk-go/eventsにそれぞれ構造体として定義されています。
READMEに超シンプルですがサンプルコードもあります。
いろんな情報が詰まってますが、今回使うのはバケット名とオブジェクトキーのみです。
このイベントオブジェクト自体にはアップロードされたデータは含まれていないので、s3Entity.Bucket.Name
バケットのs3Entity.Object.Key
キーに対してs3.New(sess).GetObject()
を叩いて一度データを取りに行く必要があります。
record := s3Event.Records[0]
s3Entity := record.S3
bucket := s3Entity.Bucket.Name
key := s3Entity.Object.Key
obj, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
なお、events.S3Event
構造体はS3EventRecord
構造体の配列だけをフィールドに持ちます。
そしてS3EventRecord
構造体は、リージョンやイベント時刻や対象のS3情報などを持っています。
一度に複数の(CSVなどの)ファイルをアップロードした場合には、各ファイルに対して同様の処理を行うイメージでこの配列をfor
で回す実装にする必要があります。
そのために[]S3EventRecord
になってるんですね。
本記事では1つのファイルをアップロードするユースケースのみを想定し、s3Event.Records[0]
であらかじめ取得することにしました。
CSV読み込み
↑で取ってきたCSVデータを読み込みます。
r := csv.NewReader(obj.Body)
defer func() { _ = obj.Body.Close() }()
recordList, err := r.ReadAll()
if err != nil {
log.Println(err.Error())
return
}
recordList
にCSVの各行が入っているイメージで、これをループで回してDynamoDBに書き込むItemにしていきます。
またまた余談ですが、s3Event.Records
をループで回している(複数ファイルをアップロードしている)場合、defer
の書き方に注意が必要です。
ループ中にdefer
文を書くと、ループが全て終わってからLIFOでスタック的に一気に実行されてしまいます。
リソースのリークにつながるのでよろしくありません。
DynamoDBへの書き込み
ここからCSVデータの中身(recordList
)を1つずつ詰め替えていきます。
複数データをバッチでDynamoDBに書き込むGoの実装には少しクセがあり、まずはリクエストの配列を作る必要があります。
1つ1つのリクエストにはPutかDeleteのどちらか一方のみを持たせる仕様です。
type WriteRequest struct {
// A request to perform a DeleteItem operation.
DeleteRequest *DeleteRequest `type:"structure"`
// A request to perform a PutItem operation.
PutRequest *PutRequest `type:"structure"`
// contains filtered or unexported fields
}
Putリクエストを作るには、CSVとDynamoDBテーブルを対応させてmap[string]*dynamodb.AttributeValue
に詰め込んだものをItem
フィールドにセットします。
ぱっと見でなんとなくお分かりかと思いますが、Item
を作る際には少し特殊な書き方が必要です。
テーブルのキー名をstring
、対応する値を*dynamodb.AttributeValue
型で指定します。
*dynamodb.AttributeValue
型はテーブル定義の型をキーに、書き込みたい値をおなじみのaws.Xxx()
変換関数に渡してバリューにセットします。
1つ注意点として、テーブル定義通りの全フィールドを書いておく必要があります。
「書いてないのはデフォルト値で埋めておいて〜」みたいな感じにはいかないんですね。
var batchWriteItems []*dynamodb.WriteRequest
for i, r := range recordList {
// CSVファイルの1行目はヘッダなのでスキップ(rangeにrecordList[1:]を渡してもよさそう)
if i == 0 {
continue
}
req := &dynamodb.PutRequest{
Item: map[string]*dynamodb.AttributeValue{
"user_id": {S: aws.String(r[0])},
"title": {S: aws.String(r[1])},
"text": {S: aws.String(r[2])},
"category": {N: aws.String(r[3])},
"image_url": {S: aws.String(r[4])},
"is_read": {BOOL: aws.Bool(false)},
"display_begin_at": {S: aws.String(r[5])},
"display_end_at": {S: aws.String(r[6])},
},
}
batchWriteItems = append(batchWriteItems, &dynamodb.WriteRequest{
PutRequest: req,
})
}
またまた余談ですが、Python3の場合はCSVを読み込むときにcsv.DictReader
型が便利です。
ヘッダをよしなにキーにして辞書型を作ってくれます。
これで取得してきたS3オブジェクトをcodecs.getreader('utf-8')(obj)
で読んでfor
で1行ずつ処理していきます。
(余談ここまで)
書き込むItemを作り終わったら、書き込み処理を呼びます。
最初に定義したddb
のメンバ関数として書き込みや削除など各種操作を叩く関数が生えており、今回はBatchWriteItemWithContext()
を使用します。
データを1件だけ書き込む場合はPutItem()
という関数でよいのですが、複数のデータを書き込む時はバッチで行います。
続いてRequestItems
フィールドに先ほど作ったItemのリストを渡すのですが、インサートするテーブル名をキーにしたmap[string]*dynamodb.WriteRequest
型で書く必要があります。
reqItems := map[string][]*dynamodb.WriteRequest{
// <テーブル名: 書き込むデータのリスト> の形式で渡す
"notification_message_histories": batchWriteItems,
}
_, err = ddb.BatchWriteItemWithContext(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: reqItems,
})
if err != nil {
log.Println(err.Error())
return
}
複数データにとどまらず、複数テーブルにも一気に書き込むこともできるんですね。
ここまで書いておいてなんですが、冒頭に記載のバージョン(2021年末現在)では、DynamoDBへの書き込み(ddb.BatchWriteItemWithContext()
の呼び出し)は一度に全テーブル合計で25件のデータまでしかできません。
トータル16MBまで、各Itemは400KBまで、という制約もあり、これはPython3やNode.jsでも同じ仕様のようです。
この制約を超える場合は、recordList
から25個ずつ取り出したりといった分割のためのループでさらに包む必要があります。
コード全体
説明が長くなってしまいました...。
ハンドラ関数がしれっとcontext.Context
を受け取っています。
今回の用途としてはDynamoDBへの書き込み関数にくっつけただけですが、実行環境や呼び出しリクエストに関するメタデータを取得することもできます。
(もちろんcontext.Context
を使わないLambda用関数も各種揃っています。)
完成形は↓こんな感じです。
package main
import (
"context"
"encoding/csv"
"log"
"math"
"os"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/s3"
)
const limitDynamoDBBatchWriteItem = 25
func handler(ctx context.Context, s3Event events.S3Event) {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String("ap-northeast-1"),
}))
svc := s3.New(sess)
ddb := dynamodb.New(sess, aws.NewConfig().WithRegion("ap-northeast-1"))
record := s3Event.Records[0]
s3Entity := record.S3
bucket := s3Entity.Bucket.Name
key := s3Entity.Object.Key
obj, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
log.Println(err.Error())
return
}
r := csv.NewReader(obj.Body)
defer func() { _ = obj.Body.Close() }()
recordList, err := r.ReadAll()
if err != nil {
log.Println(err.Error())
return
}
recordListWithoutHeader := recordList[1:]
length := len(recordListWithoutHeader)
for i := 0; i <= length/limitDynamoDBBatchWriteItem; i++ {
start := i * limitDynamoDBBatchWriteItem
end := int(math.Min(float64(i+1) * limitDynamoDBBatchWriteItem, float64(length)))
if start == length {
return
}
var batchWriteItems []*dynamodb.WriteRequest
for _, r := range recordListWithoutHeader[start:end]{
req := &dynamodb.PutRequest{
Item: map[string]*dynamodb.AttributeValue{
"user_id": {S: aws.String(r[0])},
"title": {S: aws.String(r[1])},
"text": {S: aws.String(r[2])},
"category": {N: aws.String(r[3])},
"image_url": {S: aws.String(r[4])},
"is_read": {BOOL: aws.Bool(false)},
"display_begin_at": {S: aws.String(r[5])},
"display_end_at": {S: aws.String(r[6])},
},
}
batchWriteItems = append(batchWriteItems, &dynamodb.WriteRequest{
PutRequest: req,
})
}
reqItems := map[string][]*dynamodb.WriteRequest{
os.Getenv("table"): batchWriteItems,
}
_, err = ddb.BatchWriteItemWithContext(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: reqItems,
})
if err != nil {
log.Println(err.Error())
return
}
}
}
func main() {
lambda.Start(handler)
}
sam deploy
最後にこのファイルを置いている階層で
% sam init
% sam build
% sam deploy --guided --profile <使うクレデンシャル>
を順に実行し、よしなに答えていけばデプロイが完了します。
--profile
には.aws/credentials
の中でどのクレデンシャルを使うのかを名前で渡します。
初回デプロイ時には--guided
をつけて「デプロイ前に変更入るリソースの一覧確認します?」とか「config.ymlにこの設定なんて名前で残します?」とかを対話形式で設定していきます。
次回以降、同じディレクトリで同じ環境でデプロイする時には
% sam deploy --config-env <つけた名前> --profile <使うクレデンシャル>
でさくっとデプロイし直すことができます。
コードを変更した時はsam build
を忘れずに。
(SAMでデプロイする詳細についてはドキュメントや他記事をご参照ください。)
最後まで読んでいただきありがとうございました。
参考記事