TL;DR
Serverless Frameworkを多用している弊社にて、スモールプロダクトが増えてきそうだったので、何かLambdaで共通で使える処理なり、処理の系統なりなりを統一できないか模索して、Lambdaに使えるフレームワークなるものを実装した話
背景
Moff AdventCalender第二週にさしかかってきました。弊社のアプリでいくつかマイクロサービス化してアプリもリリースしていた時期があったんですが、少人数のスタートアップには人員的なリソースが厳しいかつ、当時Serverlessを先進的にする人もそこまで多くはなかったので、Serverless(API Gateway + Lambda + DynamoDB、ところによりKinesis Data Stream)にてどのプロダクトも統一的な記法でプログラミングしてバックエンドロジックをかけないか、テストがかけないかを模索していました。そこでAPI Gateweay + Lambda用に使える社内フレームワークを開発したのでそれを紹介しようと思います。
フレームワークの紹介
Sample Sourceはこちら
SampleはTodolist(ListとTodoのみのデータ構造でのCRUD)を作ることを想定した時のフレームワークになります。
ぜひAWS上で立ち上げてみてください。
年内に公開します。すいません。
Clean Architecture
Clean Architectureの概念に関してはすでに@naninunenoyが下記記事をまとめているので、
詳細はこちらを参考にしていただけるとありがたいです。
https://qiita.com/naninunenoy/items/2b05bef337f89bdbb7c3
構成
Flaskベースで構成しています。(FlaskなのでPythonです。)
ディレクトリ構成はこんな感じ。
(TODOアプリで簡単にTODOを残すアプリで想定されるAPIのCRUDを例としています。)
├── README.md
├── package-lock.json
├── package.json
├── pytest.ini
├── requirements.txt
├── serverless.yml
├── src(API本体のソースコード)
│   ├── __init__.py
│   ├── adapters
│   │   ├── __init__.py
│   │   ├── controllers
│   │   │   ├── __init__.py
│   │   │   ├── error_handler_restful_api.py
│   │   │   └── todos.py
│   │   └── gateways
│   │       ├── __init__.py
│   │       └── repositories
│   │           ├── __init__.py
│   │           └── dynamo_todo_repository.py
│   ├── app.py
│   ├── domains
│   │   ├── __init__.py
│   │   ├── exceptions
│   │   │   ├── FieldEmptyException.py
│   │   │   └── IllegalArgumentException.py
│   │   ├── shared
│   │   │   ├── __init__.py
│   │   │   └── domain_model.py
│   │   └── todo.py
│   └── usecases
│       ├── __init__.py
│       ├── create_todo.py
│       ├── exceptions
│       │   ├── __init__.py
│       │   ├── duplicate_exception.py
│       │   └── not_found_exception.py
│       ├── get_todo.py
│       ├── get_todos.py
│       └── repositories
│           └── todo_repository.py
├── tests(単体テスト)
│   ├── adapters
│   │   ├── controllers
│   │   │   └── test_todos.py
│   │   └── gateways
│   │       └── repositories
│   │           └── test_dynamo_todo_repository.py
│   ├── domains
│   │   └── test_todo.py
│   ├── envs
│   ├── integration
│   │   └── resources
│   │       ├── test_todos_get.py
│   │       ├── test_todos_get_list.py
│   │       └── test_todos_post.py
│   ├── shared
│   │   └── todo_parameter.py
│   └── usecases
│       ├── test_create_todo.py
│       ├── test_get_todo.py
│       └── test_get_todos.py
└── tests-deployed-server(APIテスト)
    ├── base.py
    ├── test.py
    └── test.sh
Serverless.yml/app.py
serverless-wsgiを使っています。
まずはPluginの設定から。ローカルテストが実行できるようにdynamodb-localも入れております。
plugins:
  - serverless-python-requirements
  - serverless-wsgi
  - serverless-dynamodb-local
wsgiに関する記述も下記のようにしています。
  wsgi:
    app: src/app.app
    packRequirements: false
  pythonRequirements:
    dockerizePip: non-linux
基本的にアクセスする場所にルーティングなど諸々を固めておいておきます。
from flasgger import Swagger
from flask import Flask
from flask_cors import CORS
from src.adapters import controllers
from src.adapters.controllers.error_handler_restful_api import ErrorHandlerRestfulApi
app = Flask(__name__)
cors = CORS(app)
app.config['SWAGGER'] = {
    'uiversion': 3
}
swagger = Swagger(app, template_file='./swagger-template.yml')
api = ErrorHandlerRestfulApi(app)
api.add_resource(controllers.todos.Todos, '/todos')
api.add_resource(controllers.todos.Todo, '/todos/<todo_id>')
def create_app():
    return app, api
関数の記述はwsgiベースになります。
functions:
  create:
    handler: wsgi.handler
    events:
      - http:
          path: todos
          method: post
          cors: true
          private: true
  getTodos:
    handler: wsgi.handler
    events:
      - http:
          path: todos
          method: get
          cors: true
          private: true
  getTodo:
    handler: wsgi.handler
    events:
      - http:
          path: todos/{todo_id}
          method: get
          cors: true
          private: true
Domain
Clean ArchitectureのDomainに相当するので、ここでオブジェクトのロジックをまとめておきます。
import uuid
from src.domains.shared.domain_model import DomainModel
class Todo(DomainModel):
    def __init__(self, fields):
        DomainModel.__init__(self, fields)
        if not self.has_value('todo_id'):
            self.todo_id = str(uuid.uuid4())
        self.check_fields()
    def numeric_field(self):
        return ["story_point"]
    def required_field(self):
        return [
            "todo_id",
            "title"
        ]
Adapter/Gateway
Clean ArchitectureのFramework Driverの役目を担うところで(厳密には違う様に感じている)
一般的なServerless構成なので、ここではDynamoDBを例にした記述を掲載します。DynamoDBへのアクセス方法をインターフェースとして用意する想定です。(なのでDynamoDBで提供しているものを加工するようなことはここではあまりやらないようにしているつもりです。) IS_OFFLINEはちなみにDynamoDB-Local実行してテストをCircleCIで動かす用のフラグ用の変数です。
import os
import decimal
import boto3
from boto3.dynamodb.conditions import Key
from flask import json
from src.domains.todo import Todo
from src.usecases.exceptions.duplicate_exception import DuplicateException
from src.usecases.exceptions.not_found_exception import NotFoundException
STAGE = os.environ['STAGE']
IS_OFFLINE = os.environ.get('IS_OFFLINE')
dynamodb = boto3.resource(
    'dynamodb',
    region_name='localhost',
    endpoint_url='http://localhost:8000') if IS_OFFLINE \
    else boto3.resource('dynamodb')
table_name = STAGE + "-newtemplate-Todos"
print(table_name)
table_todos = dynamodb.Table(table_name)
class DynamoTodoRepository:
    def __init__(self):
        pass
    def save(self, todo):
        try:
            self.find_by_id(todo.todo_id)
        except NotFoundException:
            pass
        else:
            raise DuplicateException("Duplicated primary key")
        s = json.dumps(todo.__dict__)
        param = json.loads(s, parse_float=decimal.Decimal)
        table_todos.put_item(Item=param)
    def find_all(self):
        response = table_todos.scan()
        todos = [self.convert_to_model(x) for x in response.get('Items')]
        return todos
    def find_by_id(self, todo_id):
        response = table_todos.get_item(Key={'todo_id': todo_id})
        if not response.get('Item'):
            raise NotFoundException('%s:%s is not found' % (
                table_name,
                todo_id,))
        todo = self.convert_to_model(response.get('Item'))
        return todo
    def convert_to_model(self, item):
        def convert_decimal(obj):
            try:
                return int(obj)
            except TypeError:
                pass
            return float(obj)
        param = json.dumps(item, default=convert_decimal)
        todo = Todo(json.loads(param))
        return todo
UseCase
Clean ArchitecureのUseCaseに相当します。UseCaseでメソッドのロジックを固めます。
TODOの例では、「Todoを作る」「Todoを単体取得する」「Todoを複数取得する」で切り分けています。
※UseCaseが誰に対してってのが結構色々別れるポイントかなーとか思いつつですが、あくまで上記は例で、こっちの方がわかりやすいのかも知んないなーくらいに思っています。
import copy
from src.domains.todo import Todo
class CreateTodoUseCase(object):
    def __init__(self, repo):
        self.repo = repo
    def execute(self, todo_field_dict):
        param = copy.deepcopy(todo_field_dict)
        todo = Todo(param)
        self.repo.save(todo)
        return todo
```py:src/usecases/get_todo.py
class GetTodoUseCase(object):
    def __init__(self, repo):
        self.repo = repo
    def execute(self, todo_id):
        todo = self.repo.find_by_id(todo_id)
        return todo
class GetTodosUseCase(object):
    def __init__(self, repo):
        self.repo = repo
    def execute(self):
        todos = self.repo.find_all()
        return todos
Adapter/Controller
Clean ArchitectureのViewに相当するイメージですが、実質ここでビジネスロジックを編成してしまいます。一番ソースのボリュームが増える箇所がこのあたりとう想定です。DynamoDBでリストを引っ張ってくることを想定するとして、ソーティングや件数を制御するのがUseCaseで実際にビジネスロジックを書きまくるのがControllerってところでしょうか。Todoのサンプルでは複数とるケースと単独で取得するケースはクラスを分けて記述しています。
from flask_restful import Resource, request
from src.adapters.gateways.repositories.dynamo_todo_repository import DynamoTodoRepository
from src.usecases.create_todo import CreateTodoUseCase
from src.usecases.get_todo import GetTodoUseCase
from src.usecases.get_todos import GetTodosUseCase
class Todos(Resource):
    def __init__(self, **kwargs):
        pass
    def post(self):
        """
        Create Todo
        ---
        # Flasgger が requestBody に対応するまで swagger-template.yml 併用
        # requestBody:
        #     $ref: '#/components/requestBodies/Todo'
        # responses:
        #     201:
        #         description: Created
        """
        todo = CreateTodoUseCase(DynamoTodoRepository()).execute(request.json)
        return todo.__dict__, 201
    def get(self):
        """
            Find Todos
        ---
        responses:
            200:
                description: Success
                schemas:
                    $ref: '#/components/requestBodies/Todos'
        """
        todos = GetTodosUseCase(DynamoTodoRepository()).execute()
        res = {
            'total': len(todos)
        }
        return res, 200
class Todo(Resource):
    def get(self, todo_id):
        """
        Find Todo
        ---
        parameters:
            - in: path
              name: todo_id
              required: true
              schema:
                type: string
        responses:
            200:
                description: Success
                schemas:
                    $ref: '#/components/requestBodies/Todo'
            404:
                description: Not Found
        """
        todo = GetTodoUseCase(DynamoTodoRepository()).execute(todo_id)
        return todo.__dict__, 200
テスト
テストコードも用意しています。
各層に応じてテストを実施する形もできる様にしています。
これでDomain->Gateway->UseCase->Controllerの順番で実装していけば、出戻って開発することもなくすことを想定しています。例はDomainにおける単体テストコードの例です。
import copy
import os
import sys
import pytest
from pytest import fail
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/../../")
from src.domains.exceptions.FieldEmptyException import FieldEmptyException
from src.domains.exceptions.IllegalArgumentException import IllegalArgumentException
from tests.shared.todo_parameter import todo_dict
from src.domains.todo import Todo
def test_initialize_new_success():
    param = copy.deepcopy(todo_dict)
    todo = Todo(param)
    for field in param:
        assert todo.__getattribute__(field) == param[field]
    assert todo.todo_id is not None
    assert todo.created_at is not None
    assert todo.updated_at is not None
def test_initialize_success():
    all_dict = copy.deepcopy(todo_dict)
    all_dict['created_at'] = 'testcreated_at'
    all_dict['updated_at'] = 'testupdated_at'
    todo = Todo(all_dict)
    for field in all_dict:
        assert todo.__getattribute__(field) == all_dict[field]
@pytest.mark.parametrize('field_name', ["title"])
def test_initialize_fail_empty(field_name):
    param = copy.deepcopy(todo_dict)
    del param[field_name]
    try:
        Todo(param)
    except FieldEmptyException as e:
        e.message() is None
    else:
        fail("should raise FieldEmptyException for " + field_name)
@pytest.mark.parametrize('field_name', ["story_point"])
def test_initialize_fail_numeric(field_name):
    param = copy.deepcopy(todo_dict)
    param[field_name] = "str" + str(param[field_name])
    try:
        Todo(param)
    except IllegalArgumentException as e:
        assert e.message() is not None
    else:
        fail("should raise IllegalArgumentException " + field_name)
テストコマンドはpackage.jsonに登録しておきます。
ローカルでテストできるように、Dynamodb-Localでローカル環境状に仮想サーバーを立ててテストできるようにしています。
  "scripts": {
    "test": "npm run test:all",
    "start": "npm run start:all",
    "start:init": "sls dynamodb install && sls dynamodb start -s local &",
    "start:server": ". tests/envs; sls wsgi serve -s local",
    "start:all": "npm run start:init & npm run start:server",
    "stop:dynamodb": "sls dynamodb remove",
    "test:init": "npm run start:init",
    "test:all": "npm run test:init & sls wsgi serve -s local & sleep 10s && . tests/envs; pytest -sv tests --cov=src && coverage html",
    "test:controllers": ". tests/envs; pytest -sv tests/adapters/controllers --cov=src/adapters/controllers && coverage html",
    "test:repos": ". tests/envs; pytest -sv tests/adapters/gateways/repositories --cov=src/adapters/gateways/repositories && coverage html",
    "test:domains": ". tests/envs; pytest -sv tests/domains --cov=src/domains && coverage html",
    "test:usecases": ". tests/envs; pytest -sv tests/usecases --cov=src/usecases && coverage html",
    "test:integration": ". tests/envs; pytest -sv tests/integration --cov=src && coverage html"
  },
CircleCIと連携しておけばテストも自動でなされる様にしています。
(ちなみに勝手にAWSにテスト用にデプロイしてテストする処理もあります)
よかった?
いいと思う。(実際これで設計周りは安定して記述できているので、プロトタイピングなら1週間程度でAPIは実装できると思う)
Lambdaの中身はなんでPythonなの?
Serverlessで自前で用意した当初がPythonだったってだけなのでそうしているだけです。
(社内でNode.jsで実装している内容のものもありますが、ノウハウため中)
Layer使わないの?
実装当初はLambda Layerはなかったので、もし作っていくならLayer化でしょうかね。
Flaskにしたのは?
端的に使いやすいものを選んだだけです。
~~まずは使ってみてフィードバックをください!~~公開予定です
割とサービスをたくさんServerlessで運用しているので、割と同じ枠組みで実装されているのでフレームワークの最初のインプットさえ乗り越えば、他のサービスも等価にコードリーディングできるんじゃないかなーと思っていますし、それを期待しています。
Lambdaのコード量は確かに多くはなるんですが、エラーなりなんなりは共通のパーツが使えるのでいいかなとか。
課題はあるの?
* test:allができればレイヤー順に実行してほしいが、pytestで使う際どうしても英字昇順で実行してしまうのでどうしてもcontrollerから実行されてしまう。
- DynamoDB-Localたまに落ちるんだよなあ。
- 実装が増えてくるとLambdaのコードがどんどんでかくなる問題(これはこれで、スリムなAPIを作れって制約になってほしいが、、)
- どうせならNode.jsで全て完結させた方が一言語だけでできて学習コスト少なくできるんですけどねえ。。
- 一つの機能を作るのに新規構築だとDomainから全て揃えるのはテストコードを含めると結構しんどいのでRubyのScaffold的なものがあってもいいなあとか思っている。
まとめ・展望
- 弊社でAPI Gateway + Lambda用のバックエンドロジックを作るためのフレームワークを紹介しました。
- Clean ArchitectureをベースにFlaskにて実装しました。
- ぜひ使ってみてフィードバックください。
できることならOSS化していろんな人に使ってもらいたいなあとか思ったりしています。(ちなみにNode.js版も作っている最中)
ありがとうございました。(繰り返しですが、サンプルソースは今年中には公開する予定です。)
Special Thanks
設計・実装から何から何までお世話になったのでこちらに掲載させていただきます。
HiromiShikata

