Go
Rust
sam
lambda
CustomRuntime

GoとRustで学ぶCustom AWS Lambda Runtimes 〜よりLambdaらしいCustom Runtime実装編〜

概要

目指すもののおさらい

Custom Runtimeでもこんな風に独自イベントやhandlerを書きたい!

bootstrap.go
package main

import (
    "encoding/json"
    "fmt"
    "log"

    "github.com/toshi0607/go-custom-runtime-sample/runtime"
)

type MyEvent struct {
    Name string `json:"name"`
}

func MyHandler(ctx runtime.Context, event MyEvent) ([]byte, error) {
    return fmt.Sprintf("Hello %s!", event.Name), nil
}

func main() {
    runtime.Start(MyHandler)
}

実装仕方で工夫できるものの、前の記事の実装ではbootstrap.goにイベント取得処理やイベントを待ち受けるループを実装していてこれじゃない感を醸成していました。

runtime-client

前記事でも紹介したRustのCustom Runtimeのおおよその構成を踏襲して実装して見ます。

ただし、ここではすべてruntimeというパッケージの中にbootstrap以外のコードを置きます。もちろん分割するのもありだと思います。

runtime-clientはCustom Runtimeで登場した4つのAPIを愚直にハンドルするためのAPIクライアントです。必要な型も定義しています。

client.go
package runtime

import (
    "bytes"
    "encoding/json"
    "io/ioutil"
    "net/http"
    "strconv"

    "github.com/pkg/errors"
)

const (
    RUNTIME_API_VERSION = "2018-06-01"
    CONTENT_TYPE        = "application/json"
)

type Client interface {
    NextInvocation() ([]byte, *EventContext, error)
    InvocationResponse(awsRequestId string, content []byte) error
    InvocationError(awsRequestId string, err error) error
    InitializationError(err error) error
}

// https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/runtimes-api.html#runtimes-api-next
type EventContext struct {
    InvokedFunctionArn string
    AwsRequestId       string
    RuntimeTraceId     string
    DeadlineMs         int64
    ClientContext      ClientContext
    CognitoIdentity    CognitoIdentity
}

type CognitoIdentity struct {
    identityId     string
    identityPoolId string
}

type ClientContext struct {
    client      ClientApplication
    custom      map[string]string
    environment map[string]string
}

type ClientApplication struct {
    installationId string
    appTitle       string
    appVersionName string
    appVersionCode string
    appPackageName string
}

type client struct {
    endpoint string
    client   *http.Client
}

func NewClient(endpoint string) Client {
    return &client{
        endpoint: endpoint,
        client:   http.DefaultClient,
    }
}

// http://${AWS_LAMBDA_RUNTIME_API}/2018-06-01/runtime/invocation/next

// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-next
func (c *client) NextInvocation() ([]byte, *EventContext, error) {
    url := c.endpoint + "/" + RUNTIME_API_VERSION + "/runtime/invocation/next"
    resp, err := c.client.Get(url)
    if err != nil {
        return nil, nil, errors.New("failed to get a response")
    }
    defer resp.Body.Close()
    ec, err := getEventContext(resp.Header)
    if err != nil {
        return nil, nil, errors.New("failed to get an event context")
    }

    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, nil, errors.New("failed to get an event")
    }

    return b, ec, nil
}

// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-response
func (c *client) InvocationResponse(awsRequestId string, content []byte) error {
    // /runtime/invocation/AwsRequestId/response
    url := c.endpoint + "/" + RUNTIME_API_VERSION + "/runtime/invocation/" + awsRequestId + "/response"

    resp, err := c.client.Post(url, CONTENT_TYPE, bytes.NewBuffer(content))
    if err != nil {
        return errors.Wrap(err, "failed to post content")
    }
    defer resp.Body.Close()

    return nil
}

// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-invokeerror
func (c *client) InvocationError(awsRequestId string, aerr error) error {
    // /runtime/invocation/AwsRequestId/error
    url := c.endpoint + "/" + RUNTIME_API_VERSION + "/runtime/invocation/" + awsRequestId + "/error"
    errs, err := json.Marshal(aerr)
    if err != nil {
        return errors.Wrap(err, "failed to post content")
    }

    resp, err := c.client.Post(url, CONTENT_TYPE, bytes.NewBuffer(errs))
    if err != nil {
        return errors.Wrap(err, "failed to post content")
    }
    defer resp.Body.Close()

    return nil
}

// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-initerror
func (c *client) InitializationError(aerr error) error {
    // /runtime/init/error
    url := c.endpoint + "/" + RUNTIME_API_VERSION + "/runtime/init/error"
    errs, err := json.Marshal(aerr)
    if err != nil {
        return errors.Wrap(err, "failed to post content")
    }

    resp, err := c.client.Post(url, CONTENT_TYPE, bytes.NewBuffer(errs))
    if err != nil {
        return errors.Wrap(err, "failed to post content")
    }
    defer resp.Body.Close()

    return nil
}

func getEventContext(header http.Header) (*EventContext, error) {
    invokedFunctionArn := header.Get("Lambda-Runtime-Invoked-Function-Arn")
    awsRequestId := header.Get("Lambda-Runtime-Aws-Request-Id")
    runtimeTraceId := header.Get("Lambda-Runtime-Trace-Id")

    runtimeDeadlineMs, err := strconv.ParseInt(header.Get("Lambda-Runtime-Deadline-Ms"), 10, 64)
    if err != nil {
        return nil, errors.Wrap(err, "failed to parse Lambda-Runtime-Deadline-Ms")
    }

    ev := &EventContext{
        InvokedFunctionArn: invokedFunctionArn,
        AwsRequestId:       awsRequestId,
        RuntimeTraceId:     runtimeTraceId,
        DeadlineMs:         runtimeDeadlineMs,
    }

    runtimeClientContext := header.Get("Lambda-Runtime-Client-Context")
    if runtimeClientContext != "" {
        var clientContext ClientContext
        if err := json.Unmarshal([]byte(runtimeClientContext), &clientContext); err != nil {
            return nil, errors.Wrap(err, "failed to unmarshal ClientContext")
        }

        ev.ClientContext = clientContext
    }

    runtimeCognitoIdentity := header.Get("Lambda-Runtime-Cognito-Identity") // 型
    if runtimeCognitoIdentity != "" {
        var cognitoIdentity CognitoIdentity
        if err := json.Unmarshal([]byte(runtimeClientContext), &cognitoIdentity); err != nil {
            return nil, errors.Wrap(err, "failed to unmarshal ClientContext")
        }

        ev.CognitoIdentity = cognitoIdentity
    }

    return ev, nil
}

runtime

runtime-clientを利用してイベントループを回すruntimeそのものです。

env.goは実行時に環境変数からハンドラーに渡すコンテキスト情報(APIで取得するもの以外)を集めるためのインターフェースとしてConfigProviderを定義しています。

Rust実装では同ファイルにテストで差し替える用のエンドポイントや仮の環境変数がセットできるようになっています。

env.go
package runtime

import (
    "os"
    "strconv"

    "github.com/pkg/errors"
)

type ConfigProvider interface {
    GetFunctionSettings() (*FunctionSettings, error)
    GetRuntimeApiEndpoint() string
}

type FunctionSettings struct {
    functionName string
    memorySize   int32
    version      string
    logStream    string
    logGroup     string
}

type configProvider struct{}

func NewConfigProvider() ConfigProvider {
    return &configProvider{}
}

func (c configProvider) GetFunctionSettings() (*FunctionSettings, error) {
    functionName := os.Getenv("AWS_LAMBDA_FUNCTION_NAME")
    version := os.Getenv("AWS_LAMBDA_FUNCTION_VERSION")
    logStream := os.Getenv("AWS_LAMBDA_LOG_STREAM_NAME")
    logGroup := os.Getenv("AWS_LAMBDA_LOG_GROUP_NAME")
    memoryStr := os.Getenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")
    memorySize, err := strconv.Atoi(memoryStr)
    if err != nil {
        return nil, errors.Wrapf(err, "failed to parse memoryStr: %s to int", memoryStr)
    }

    return &FunctionSettings{
        functionName: functionName,
        memorySize:   int32(memorySize),
        version:      version,
        logStream:    logStream,
        logGroup:     logGroup,
    }, nil
}

func (c configProvider) GetRuntimeApiEndpoint() string {
    return "http://" + os.Getenv("AWS_LAMBDA_RUNTIME_API")
}

runtime.goにメインの処理がまとまっています。ハンドラーを定義するファイルからruntime.start(MyHandler)のように呼び出すstart関数もここで定義しています。

runtime.go
package runtime

import (
    "encoding/json"
    "log"
)

type Handler interface {
    // ここでイベントの型に[]byteを指定してしまうと各ハンドラーでイベントをデシリアライズしないといけないので
    // interface{}型などにして型チェックを行いながら共通のデシリアライズ処理を実装すると
    // より柔軟になりそうです。
    Run(ctx Context, event []byte) ([]byte, error)
}

type handler func(ctx Context, event []byte) ([]byte, error)

// func型に名前を付けてRunメソッドを定義することでHandler interfaceを実装し、
// 自分で定義したハンドラーを渡せるようにしているところがポイントです。
// 本家aws-lambda-goでもそのように実装してあり参考にしました。
func (h handler) Run(ctx Context, event []byte) ([]byte, error) {
    response, err := h(ctx, event)
    if err != nil {
        return nil, err
    }

    responseBytes, err := json.Marshal(response)
    if err != nil {
        return nil, err
    }

    return responseBytes, nil
}

func Start(h handler) {
    startWithConfig(h, NewConfigProvider())
}

func startWithConfig(h handler, config ConfigProvider) {
    endpoint := config.GetRuntimeApiEndpoint()
    settings, err := config.GetFunctionSettings()
    if endpoint == "" || err != nil {
        log.Fatal("failed to init lambda")
    }

    startWithRuntimeClient(h, *settings, NewClient(endpoint))

}

func startWithRuntimeClient(h handler, s FunctionSettings, c Client) {
    NewRuntime(c, h, s).Start()
}

type Runtime struct {
    client   Client
    handler  Handler
    settings FunctionSettings
}

type Context struct {
    MemorySize         int32
    FunctionName       string
    FunctionVersion    string
    InvokedFunctionArn string
    AwsRequestId       string
    XrayTraceId        string
    LogStreamName      string
    LogGroupName       string
    ClientContext      ClientContext
    Identity           CognitoIdentity
    Deadline           int64
}

func NewRuntime(client Client, handler Handler, settings FunctionSettings) *Runtime {
    return &Runtime{
        client:   client,
        handler:  handler,
        settings: settings,
    }
}

func (r *Runtime) Start() {
    for {
        ctx, ev := r.getNextEvent()
        requestId := ctx.AwsRequestId
        result, err := r.handler.Run(ctx, ev)
        if err != nil {
            err := r.client.InvocationError(requestId, err)
            if err != nil {
                log.Fatal("failed to invoke an error")
            }
        }
        err = r.client.InvocationResponse(requestId, result)
        if err != nil {
            err := r.client.InvocationError(requestId, err)
            if err != nil {
                log.Fatal("failed to invoke an error")
            }
        }
    }
}

func (r *Runtime) getNextEvent() (Context, []byte) {
    ev, ctx, err := r.client.NextInvocation()
    if err != nil {
        r.client.InitializationError(err)
    }
    return Context{
        MemorySize:         r.settings.memorySize,
        FunctionName:       r.settings.functionName,
        FunctionVersion:    r.settings.version,
        LogStreamName:      r.settings.logStream,
        LogGroupName:       r.settings.logGroup,
        InvokedFunctionArn: ctx.InvokedFunctionArn,
        AwsRequestId:       ctx.AwsRequestId,
        XrayTraceId:        ctx.RuntimeTraceId,
        ClientContext:      ctx.ClientContext,
        Identity:           ctx.CognitoIdentity,
        Deadline:           ctx.DeadlineMs,
    }, ev
}

bootstrap

以上のようにruntime(とruntime-client)を実装すると、bootstrapを次のように実装できるようになります。

bootstrap.go
package main

import (
    "encoding/json"
    "fmt"
    "log"

    "github.com/toshi0607/go-custom-runtime-sample/runtime"
)

type MyEvent struct {
    Name string `json:"name"`
}

func MyHandler(ctx runtime.Context, event []byte) ([]byte, error) {
    // runtime.Startの引数を[]byteとして実装しているので`event MyEvent`とは書けません…
    log.Printf("request id: %s\n", ctx.AwsRequestId)

    var me MyEvent
    json.Unmarshal(event, &me)

    str := fmt.Sprintf("Hello %s!", me.Name)
    log.Println(str)
    return []byte(str), nil
}

func main() {
    runtime.Start(MyHandler)
}

これでいつものLambda(に前の記事のよりは近いが不完全なもの)を実装することができました。

Rust版ではリトライ処理が入っていたり、aws-lambda-go(Custom Runtimeでなく公式サポートされているGo向けのライブラリ)ではハンドラーの型チェックが入っていたり、実装を色々比較しながら自分で実装してみると気づきがあって面白かったです。

ただ冒頭でも書いた通りいろいろと省略してしまっているので改善したい気持ちが強いです。