LoginSignup
1
2

More than 1 year has passed since last update.

S3にCSVファイルをアップロードしたらDynamoDBに書き込んでくれるLambdaをGolang1.xで書いてSAMでデプロイする

Last updated at Posted at 2022-01-06

GoでLambdaを書いてる記事が意外と少なかったので、備忘録を兼ねてさくっとまとめます。
ハマりどころもメモ程度にいくつか併記したので、別途ググってみてください。

完成したコードは最後にあります。
また、DynamoDBのテーブルは「アプリ内のお知らせ機能で新しいお知らせを打つために、データのCSVをバッチ処理でDynamoDBに書き込む」という目的でつくったものです。
後半でそんな感じの記述が出てきますので、それぞれの用途に応じてよしなに読み替え書き換えてください。

諸々のバージョン

go.mod
module hoge

go 1.16

require (
    github.com/aws/aws-lambda-go v1.27.1
    github.com/aws/aws-sdk-go v1.42.25
)

Goは1.16である必要はなく、動かしたいものが動くバージョンであればOKです。
aws-sdk-goのREADMEには「Go1.5でgo getするならちょっと注意が必要だよ」みたいな注意書きがあるので、少なくとも1.5以上なら動くのでしょう。

(早速の余談ですが、Go1.16からパッケージはgo get ...ではなくgo install ... + go mod tidyで管理することが推奨されています。)

AWS SAM CLIはbrewでインストールしました。お好きな方法でどうぞ。

% brew tap aws/tap
% brew install aws-sam-cli

% sam --version
SAM CLI, version 1.36.0

DynamoDBには既に対応するテーブルが作成済みであるとします。

aws-sdk-goとaws-lambda-goでLambda関数を書く

セッションの作成

aws-sdk-goおきまりのセッションを作成するところから始めます。
データが保存されたS3と、書き込む先のDynamoDBの両方へのアクセスで必要です。

// セッション
sess := session.Must(session.NewSession(&aws.Config{
    Region: aws.String("ap-northeast-1"),
}))

// S3クライアント
svc := s3.New(sess)

// DynamoDBクライアント
ddb := dynamodb.New(sess, aws.NewConfig().WithRegion("ap-northeast-1"))

トリガーとなるイベント

Lambdaの発火条件であるイベントオブジェクトをhandler関数が受け取ることができます。
S3へのアップロードをはじめとするAWSリソース関連の各種イベントオブジェクトは、aws-sdk-go/eventsにそれぞれ構造体として定義されています。
READMEに超シンプルですがサンプルコードもあります。

いろんな情報が詰まってますが、今回使うのはバケット名とオブジェクトキーのみです。
このイベントオブジェクト自体にはアップロードされたデータは含まれていないので、s3Entity.Bucket.Nameバケットのs3Entity.Object.Keyキーに対してs3.New(sess).GetObject()を叩いて一度データを取りに行く必要があります。

record := s3Event.Records[0]
s3Entity := record.S3
bucket := s3Entity.Bucket.Name
key := s3Entity.Object.Key
obj, err := svc.GetObject(&s3.GetObjectInput{
    Bucket: aws.String(bucket),
    Key:    aws.String(key),
})

なお、events.S3Event構造体はS3EventRecord構造体の配列だけをフィールドに持ちます。
そしてS3EventRecord構造体は、リージョンやイベント時刻や対象のS3情報などを持っています。

一度に複数の(CSVなどの)ファイルをアップロードした場合には、各ファイルに対して同様の処理を行うイメージでこの配列をforで回す実装にする必要があります。
そのために[]S3EventRecordになってるんですね。
本記事では1つのファイルをアップロードするユースケースのみを想定し、s3Event.Records[0]であらかじめ取得することにしました。

CSV読み込み

↑で取ってきたCSVデータを読み込みます。

r := csv.NewReader(obj.Body)
defer func() { _ = obj.Body.Close() }()
recordList, err := r.ReadAll()
if err != nil {
    log.Println(err.Error())
    return
}

recordListにCSVの各行が入っているイメージで、これをループで回してDynamoDBに書き込むItemにしていきます。

またまた余談ですが、s3Event.Recordsをループで回している(複数ファイルをアップロードしている)場合、deferの書き方に注意が必要です。
ループ中にdefer文を書くと、ループが全て終わってからLIFOでスタック的に一気に実行されてしまいます。
リソースのリークにつながるのでよろしくありません。

DynamoDBへの書き込み

ここからCSVデータの中身(recordList)を1つずつ詰め替えていきます。

複数データをバッチでDynamoDBに書き込むGoの実装には少しクセがあり、まずはリクエストの配列を作る必要があります。
1つ1つのリクエストにはPutかDeleteのどちらか一方のみを持たせる仕様です。

WriteRequest型(下記ドキュメントより抜粋)
type WriteRequest struct {
    // A request to perform a DeleteItem operation.
    DeleteRequest *DeleteRequest `type:"structure"`

    // A request to perform a PutItem operation.
    PutRequest *PutRequest `type:"structure"`
    // contains filtered or unexported fields
}

Putリクエストを作るには、CSVとDynamoDBテーブルを対応させてmap[string]*dynamodb.AttributeValueに詰め込んだものをItemフィールドにセットします。

ぱっと見でなんとなくお分かりかと思いますが、Itemを作る際には少し特殊な書き方が必要です。
テーブルのキー名をstring、対応する値を*dynamodb.AttributeValue型で指定します。
*dynamodb.AttributeValue型はテーブル定義の型をキーに、書き込みたい値をおなじみのaws.Xxx()変換関数に渡してバリューにセットします。

1つ注意点として、テーブル定義通りの全フィールドを書いておく必要があります。
「書いてないのはデフォルト値で埋めておいて〜」みたいな感じにはいかないんですね。

リクエストを作成
var batchWriteItems []*dynamodb.WriteRequest
for i, r := range recordList {
    // CSVファイルの1行目はヘッダなのでスキップ(rangeにrecordList[1:]を渡してもよさそう)
    if i == 0 {
        continue
    }
    req := &dynamodb.PutRequest{
        Item: map[string]*dynamodb.AttributeValue{
            "user_id":          {S: aws.String(r[0])},
            "title":            {S: aws.String(r[1])},
            "text":             {S: aws.String(r[2])},
            "category":         {N: aws.String(r[3])},
            "image_url":        {S: aws.String(r[4])},
            "is_read":          {BOOL: aws.Bool(false)},
            "display_begin_at": {S: aws.String(r[5])},
            "display_end_at":   {S: aws.String(r[6])},
        },
    }
    batchWriteItems = append(batchWriteItems, &dynamodb.WriteRequest{
        PutRequest: req,
    })
}

またまた余談ですが、Python3の場合はCSVを読み込むときにcsv.DictReader型が便利です。
ヘッダをよしなにキーにして辞書型を作ってくれます。
これで取得してきたS3オブジェクトをcodecs.getreader('utf-8')(obj)で読んでforで1行ずつ処理していきます。

(余談ここまで)

書き込むItemを作り終わったら、書き込み処理を呼びます。
最初に定義したddbのメンバ関数として書き込みや削除など各種操作を叩く関数が生えており、今回はBatchWriteItemWithContext()を使用します。

データを1件だけ書き込む場合はPutItem()という関数でよいのですが、複数のデータを書き込む時はバッチで行います。

続いてRequestItemsフィールドに先ほど作ったItemのリストを渡すのですが、インサートするテーブル名をキーにしたmap[string]*dynamodb.WriteRequest型で書く必要があります。

DynamoDBに書き込み
reqItems := map[string][]*dynamodb.WriteRequest{
    // <テーブル名: 書き込むデータのリスト> の形式で渡す
    "notification_message_histories": batchWriteItems,
}
_, err = ddb.BatchWriteItemWithContext(ctx, &dynamodb.BatchWriteItemInput{
    RequestItems: reqItems,
})
if err != nil {
    log.Println(err.Error())
    return
}

複数データにとどまらず、複数テーブルにも一気に書き込むこともできるんですね。

ここまで書いておいてなんですが、冒頭に記載のバージョン(2021年末現在)では、DynamoDBへの書き込み(ddb.BatchWriteItemWithContext()の呼び出し)は一度に全テーブル合計で25件のデータまでしかできません。

トータル16MBまで、各Itemは400KBまで、という制約もあり、これはPython3やNode.jsでも同じ仕様のようです。

この制約を超える場合は、recordListから25個ずつ取り出したりといった分割のためのループでさらに包む必要があります。

コード全体

説明が長くなってしまいました...。

ハンドラ関数がしれっとcontext.Contextを受け取っています。
今回の用途としてはDynamoDBへの書き込み関数にくっつけただけですが、実行環境や呼び出しリクエストに関するメタデータを取得することもできます。
(もちろんcontext.Contextを使わないLambda用関数も各種揃っています。)

完成形は↓こんな感じです。

main.go
package main

import (
    "context"
    "encoding/csv"
    "log"
    "math"
    "os"

    "github.com/aws/aws-lambda-go/events"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/dynamodb"
    "github.com/aws/aws-sdk-go/service/s3"
)

const limitDynamoDBBatchWriteItem = 25

func handler(ctx context.Context, s3Event events.S3Event) {
    sess := session.Must(session.NewSession(&aws.Config{
        Region: aws.String("ap-northeast-1"),
    }))
    svc := s3.New(sess)
    ddb := dynamodb.New(sess, aws.NewConfig().WithRegion("ap-northeast-1"))

    record := s3Event.Records[0]
    s3Entity := record.S3
    bucket := s3Entity.Bucket.Name
    key := s3Entity.Object.Key
    obj, err := svc.GetObject(&s3.GetObjectInput{
        Bucket: aws.String(bucket),
        Key:    aws.String(key),
    })
    if err != nil {
        log.Println(err.Error())
        return
    }

    r := csv.NewReader(obj.Body)
    defer func() { _ = obj.Body.Close() }()
    recordList, err := r.ReadAll()
    if err != nil {
        log.Println(err.Error())
        return
    }
    recordListWithoutHeader := recordList[1:]

    length := len(recordListWithoutHeader)
    for i := 0; i <= length/limitDynamoDBBatchWriteItem; i++ {
        start := i * limitDynamoDBBatchWriteItem
        end := int(math.Min(float64(i+1) * limitDynamoDBBatchWriteItem, float64(length)))
        if start == length {
            return
        }

        var batchWriteItems []*dynamodb.WriteRequest
        for _, r := range recordListWithoutHeader[start:end]{
            req := &dynamodb.PutRequest{
                Item: map[string]*dynamodb.AttributeValue{
                    "user_id":          {S: aws.String(r[0])},
                    "title":            {S: aws.String(r[1])},
                    "text":             {S: aws.String(r[2])},
                    "category":         {N: aws.String(r[3])},
                    "image_url":        {S: aws.String(r[4])},
                    "is_read":          {BOOL: aws.Bool(false)},
                    "display_begin_at": {S: aws.String(r[5])},
                    "display_end_at":   {S: aws.String(r[6])},
                },
            }
            batchWriteItems = append(batchWriteItems, &dynamodb.WriteRequest{
                PutRequest: req,
            })
        }

        reqItems := map[string][]*dynamodb.WriteRequest{
            os.Getenv("table"): batchWriteItems,
        }
        _, err = ddb.BatchWriteItemWithContext(ctx, &dynamodb.BatchWriteItemInput{
            RequestItems: reqItems,
        })
        if err != nil {
            log.Println(err.Error())
            return
        }
    }
}

func main() {
    lambda.Start(handler)
}

sam deploy

最後にこのファイルを置いている階層で

SAMでデプロイ
% sam init
% sam build
% sam deploy --guided --profile <使うクレデンシャル>

を順に実行し、よしなに答えていけばデプロイが完了します。
--profileには.aws/credentialsの中でどのクレデンシャルを使うのかを名前で渡します。

初回デプロイ時には--guidedをつけて「デプロイ前に変更入るリソースの一覧確認します?」とか「config.ymlにこの設定なんて名前で残します?」とかを対話形式で設定していきます。
次回以降、同じディレクトリで同じ環境でデプロイする時には

% sam deploy --config-env <つけた名前> --profile <使うクレデンシャル>

でさくっとデプロイし直すことができます。
コードを変更した時はsam buildを忘れずに。

(SAMでデプロイする詳細についてはドキュメントや他記事をご参照ください。)

最後まで読んでいただきありがとうございました。

参考記事

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2