3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

GoとRustで学ぶCustom AWS Lambda Runtimes 〜よりLambdaらしいCustom Runtime実装編〜

Last updated at Posted at 2018-12-24

概要

目指すもののおさらい

Custom Runtimeでもこんな風に独自イベントやhandlerを書きたい!

bootstrap.go
package main

import (
	"encoding/json"
	"fmt"
	"log"

	"github.com/toshi0607/go-custom-runtime-sample/runtime"
)

type MyEvent struct {
	Name string `json:"name"`
}

func MyHandler(ctx runtime.Context, event MyEvent) ([]byte, error) {
	return fmt.Sprintf("Hello %s!", event.Name), nil
}

func main() {
	runtime.Start(MyHandler)
}

実装仕方で工夫できるものの、前の記事の実装ではbootstrap.goにイベント取得処理やイベントを待ち受けるループを実装していてこれじゃない感を醸成していました。

runtime-client

前記事でも紹介したRustのCustom Runtimeのおおよその構成を踏襲して実装して見ます。

ただし、ここではすべてruntimeというパッケージの中にbootstrap以外のコードを置きます。もちろん分割するのもありだと思います。

runtime-clientはCustom Runtimeで登場した4つのAPIを愚直にハンドルするためのAPIクライアントです。必要な型も定義しています。

client.go
package runtime

import (
	"bytes"
	"encoding/json"
	"io/ioutil"
	"net/http"
	"strconv"

	"github.com/pkg/errors"
)

const (
	RUNTIME_API_VERSION = "2018-06-01"
	CONTENT_TYPE        = "application/json"
)

type Client interface {
	NextInvocation() ([]byte, *EventContext, error)
	InvocationResponse(awsRequestId string, content []byte) error
	InvocationError(awsRequestId string, err error) error
	InitializationError(err error) error
}

// https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/runtimes-api.html#runtimes-api-next
type EventContext struct {
	InvokedFunctionArn string
	AwsRequestId       string
	RuntimeTraceId     string
	DeadlineMs         int64
	ClientContext      ClientContext
	CognitoIdentity    CognitoIdentity
}

type CognitoIdentity struct {
	identityId     string
	identityPoolId string
}

type ClientContext struct {
	client      ClientApplication
	custom      map[string]string
	environment map[string]string
}

type ClientApplication struct {
	installationId string
	appTitle       string
	appVersionName string
	appVersionCode string
	appPackageName string
}

type client struct {
	endpoint string
	client   *http.Client
}

func NewClient(endpoint string) Client {
	return &client{
		endpoint: endpoint,
		client:   http.DefaultClient,
	}
}

// http://${AWS_LAMBDA_RUNTIME_API}/2018-06-01/runtime/invocation/next

// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-next
func (c *client) NextInvocation() ([]byte, *EventContext, error) {
	url := c.endpoint + "/" + RUNTIME_API_VERSION + "/runtime/invocation/next"
	resp, err := c.client.Get(url)
	if err != nil {
		return nil, nil, errors.New("failed to get a response")
	}
	defer resp.Body.Close()
	ec, err := getEventContext(resp.Header)
	if err != nil {
		return nil, nil, errors.New("failed to get an event context")
	}

	b, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return nil, nil, errors.New("failed to get an event")
	}

	return b, ec, nil
}

// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-response
func (c *client) InvocationResponse(awsRequestId string, content []byte) error {
	// /runtime/invocation/AwsRequestId/response
	url := c.endpoint + "/" + RUNTIME_API_VERSION + "/runtime/invocation/" + awsRequestId + "/response"

	resp, err := c.client.Post(url, CONTENT_TYPE, bytes.NewBuffer(content))
	if err != nil {
		return errors.Wrap(err, "failed to post content")
	}
	defer resp.Body.Close()

	return nil
}

// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-invokeerror
func (c *client) InvocationError(awsRequestId string, aerr error) error {
	// /runtime/invocation/AwsRequestId/error
	url := c.endpoint + "/" + RUNTIME_API_VERSION + "/runtime/invocation/" + awsRequestId + "/error"
	errs, err := json.Marshal(aerr)
	if err != nil {
		return errors.Wrap(err, "failed to post content")
	}

	resp, err := c.client.Post(url, CONTENT_TYPE, bytes.NewBuffer(errs))
	if err != nil {
		return errors.Wrap(err, "failed to post content")
	}
	defer resp.Body.Close()

	return nil
}

// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html#runtimes-api-initerror
func (c *client) InitializationError(aerr error) error {
	// /runtime/init/error
	url := c.endpoint + "/" + RUNTIME_API_VERSION + "/runtime/init/error"
	errs, err := json.Marshal(aerr)
	if err != nil {
		return errors.Wrap(err, "failed to post content")
	}

	resp, err := c.client.Post(url, CONTENT_TYPE, bytes.NewBuffer(errs))
	if err != nil {
		return errors.Wrap(err, "failed to post content")
	}
	defer resp.Body.Close()

	return nil
}

func getEventContext(header http.Header) (*EventContext, error) {
	invokedFunctionArn := header.Get("Lambda-Runtime-Invoked-Function-Arn")
	awsRequestId := header.Get("Lambda-Runtime-Aws-Request-Id")
	runtimeTraceId := header.Get("Lambda-Runtime-Trace-Id")

	runtimeDeadlineMs, err := strconv.ParseInt(header.Get("Lambda-Runtime-Deadline-Ms"), 10, 64)
	if err != nil {
		return nil, errors.Wrap(err, "failed to parse Lambda-Runtime-Deadline-Ms")
	}

	ev := &EventContext{
		InvokedFunctionArn: invokedFunctionArn,
		AwsRequestId:       awsRequestId,
		RuntimeTraceId:     runtimeTraceId,
		DeadlineMs:         runtimeDeadlineMs,
	}

	runtimeClientContext := header.Get("Lambda-Runtime-Client-Context")
	if runtimeClientContext != "" {
		var clientContext ClientContext
		if err := json.Unmarshal([]byte(runtimeClientContext), &clientContext); err != nil {
			return nil, errors.Wrap(err, "failed to unmarshal ClientContext")
		}

		ev.ClientContext = clientContext
	}

	runtimeCognitoIdentity := header.Get("Lambda-Runtime-Cognito-Identity") // 型
	if runtimeCognitoIdentity != "" {
		var cognitoIdentity CognitoIdentity
		if err := json.Unmarshal([]byte(runtimeClientContext), &cognitoIdentity); err != nil {
			return nil, errors.Wrap(err, "failed to unmarshal ClientContext")
		}

		ev.CognitoIdentity = cognitoIdentity
	}

	return ev, nil
}

runtime

runtime-clientを利用してイベントループを回すruntimeそのものです。

env.goは実行時に環境変数からハンドラーに渡すコンテキスト情報(APIで取得するもの以外)を集めるためのインターフェースとしてConfigProviderを定義しています。

Rust実装では同ファイルにテストで差し替える用のエンドポイントや仮の環境変数がセットできるようになっています。

env.go
package runtime

import (
	"os"
	"strconv"

	"github.com/pkg/errors"
)

type ConfigProvider interface {
	GetFunctionSettings() (*FunctionSettings, error)
	GetRuntimeApiEndpoint() string
}

type FunctionSettings struct {
	functionName string
	memorySize   int32
	version      string
	logStream    string
	logGroup     string
}

type configProvider struct{}

func NewConfigProvider() ConfigProvider {
	return &configProvider{}
}

func (c configProvider) GetFunctionSettings() (*FunctionSettings, error) {
	functionName := os.Getenv("AWS_LAMBDA_FUNCTION_NAME")
	version := os.Getenv("AWS_LAMBDA_FUNCTION_VERSION")
	logStream := os.Getenv("AWS_LAMBDA_LOG_STREAM_NAME")
	logGroup := os.Getenv("AWS_LAMBDA_LOG_GROUP_NAME")
	memoryStr := os.Getenv("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")
	memorySize, err := strconv.Atoi(memoryStr)
	if err != nil {
		return nil, errors.Wrapf(err, "failed to parse memoryStr: %s to int", memoryStr)
	}

	return &FunctionSettings{
		functionName: functionName,
		memorySize:   int32(memorySize),
		version:      version,
		logStream:    logStream,
		logGroup:     logGroup,
	}, nil
}

func (c configProvider) GetRuntimeApiEndpoint() string {
	return "http://" + os.Getenv("AWS_LAMBDA_RUNTIME_API")
}

runtime.goにメインの処理がまとまっています。ハンドラーを定義するファイルからruntime.start(MyHandler)のように呼び出すstart関数もここで定義しています。

runtime.go
package runtime

import (
	"encoding/json"
	"log"
)

type Handler interface {
	// ここでイベントの型に[]byteを指定してしまうと各ハンドラーでイベントをデシリアライズしないといけないので
	// interface{}型などにして型チェックを行いながら共通のデシリアライズ処理を実装すると
	// より柔軟になりそうです。
	Run(ctx Context, event []byte) ([]byte, error)
}

type handler func(ctx Context, event []byte) ([]byte, error)

// func型に名前を付けてRunメソッドを定義することでHandler interfaceを実装し、
// 自分で定義したハンドラーを渡せるようにしているところがポイントです。
// 本家aws-lambda-goでもそのように実装してあり参考にしました。
func (h handler) Run(ctx Context, event []byte) ([]byte, error) {
	response, err := h(ctx, event)
	if err != nil {
		return nil, err
	}

	responseBytes, err := json.Marshal(response)
	if err != nil {
		return nil, err
	}

	return responseBytes, nil
}

func Start(h handler) {
	startWithConfig(h, NewConfigProvider())
}

func startWithConfig(h handler, config ConfigProvider) {
	endpoint := config.GetRuntimeApiEndpoint()
	settings, err := config.GetFunctionSettings()
	if endpoint == "" || err != nil {
		log.Fatal("failed to init lambda")
	}

	startWithRuntimeClient(h, *settings, NewClient(endpoint))

}

func startWithRuntimeClient(h handler, s FunctionSettings, c Client) {
	NewRuntime(c, h, s).Start()
}

type Runtime struct {
	client   Client
	handler  Handler
	settings FunctionSettings
}

type Context struct {
	MemorySize         int32
	FunctionName       string
	FunctionVersion    string
	InvokedFunctionArn string
	AwsRequestId       string
	XrayTraceId        string
	LogStreamName      string
	LogGroupName       string
	ClientContext      ClientContext
	Identity           CognitoIdentity
	Deadline           int64
}

func NewRuntime(client Client, handler Handler, settings FunctionSettings) *Runtime {
	return &Runtime{
		client:   client,
		handler:  handler,
		settings: settings,
	}
}

func (r *Runtime) Start() {
	for {
		ctx, ev := r.getNextEvent()
		requestId := ctx.AwsRequestId
		result, err := r.handler.Run(ctx, ev)
		if err != nil {
			err := r.client.InvocationError(requestId, err)
			if err != nil {
				log.Fatal("failed to invoke an error")
			}
		}
		err = r.client.InvocationResponse(requestId, result)
		if err != nil {
			err := r.client.InvocationError(requestId, err)
			if err != nil {
				log.Fatal("failed to invoke an error")
			}
		}
	}
}

func (r *Runtime) getNextEvent() (Context, []byte) {
	ev, ctx, err := r.client.NextInvocation()
	if err != nil {
		r.client.InitializationError(err)
	}
	return Context{
		MemorySize:         r.settings.memorySize,
		FunctionName:       r.settings.functionName,
		FunctionVersion:    r.settings.version,
		LogStreamName:      r.settings.logStream,
		LogGroupName:       r.settings.logGroup,
		InvokedFunctionArn: ctx.InvokedFunctionArn,
		AwsRequestId:       ctx.AwsRequestId,
		XrayTraceId:        ctx.RuntimeTraceId,
		ClientContext:      ctx.ClientContext,
		Identity:           ctx.CognitoIdentity,
		Deadline:           ctx.DeadlineMs,
	}, ev
}

bootstrap

以上のようにruntime(とruntime-client)を実装すると、bootstrapを次のように実装できるようになります。

bootstrap.go
package main

import (
	"encoding/json"
	"fmt"
	"log"

	"github.com/toshi0607/go-custom-runtime-sample/runtime"
)

type MyEvent struct {
	Name string `json:"name"`
}

func MyHandler(ctx runtime.Context, event []byte) ([]byte, error) {
	// runtime.Startの引数を[]byteとして実装しているので`event MyEvent`とは書けません…
	log.Printf("request id: %s\n", ctx.AwsRequestId)

	var me MyEvent
	json.Unmarshal(event, &me)

	str := fmt.Sprintf("Hello %s!", me.Name)
	log.Println(str)
	return []byte(str), nil
}

func main() {
	runtime.Start(MyHandler)
}

これでいつものLambda(に前の記事のよりは近いが不完全なもの)を実装することができました。

Rust版ではリトライ処理が入っていたり、aws-lambda-go(Custom Runtimeでなく公式サポートされているGo向けのライブラリ)ではハンドラーの型チェックが入っていたり、実装を色々比較しながら自分で実装してみると気づきがあって面白かったです。

ただ冒頭でも書いた通りいろいろと省略してしまっているので改善したい気持ちが強いです。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?