search
LoginSignup
3

More than 1 year has passed since last update.

posted at

updated at

Organization

AWSのモックライブラリ moto にコントリビュートしてみよう!

この記事は ABEJA Advent Calendar 2020 3日目の記事です。

この記事ではPythonでAWSのモックをするライブラリであるmotoにコントリビュートして、未実装のサービスやオペレーションを追加する方法を説明していきます。
ABEJAではAWSのサービスを使ってシステムを作る事がありますが、利用しているAWSのサービスがmotoでは実装されていない事があります。それらに対応するためにmotoにコントリビュートしており、コントリビュート方法をまとめた記事です。筆者が出している Pull Requestはこちら

この記事では以下の順番で説明を進めます。

  • AWSのサービスを使ったプログラムに対するテストの課題と対処
  • motoとは
  • motoがboto3をモックする仕組み
  • motoに未実装のサービス/オペレーションを追加してみる

なお、本記事ではAWSのサービスとはEC2などのサービスを、オペレーションとはcreate_instance()などのサービスに対するオペレーションの事を指します。

AWSのサービスを使ったプログラムに対するテスト

テスト時の課題

AWSのサービスを使っているプログラムのユニットテストを実行する場合、AWSとの通信部分に関して以下のような課題が発生します。

  • ユニットテストの実行毎に課金されてしまう
  • AWSのリソースの作成・削除に時間がかかるためテスト実行が非効率・不安定
  • ローカル環境・CI環境で利用するAWSのクレデンシャルの管理が必要

AWSサービスのモック手法

テスト時の課題について触れましたが、実際にはユニットテスト時に実際のAWSのサービスと通信をしている事は無いでしょう。
実際には、テストコードでHTTPリクエストやSDKのメソッドに対するモックを実装したり、ローカル環境で起動したモック用のサーバーと通信をする事でテストを実行しているのでは無いでしょうか?

本記事ではmotoの動作の仕組みと、motoで未実装のサービスやオペレーションの追加方法について説明します。

motoとは

motoとはAWSのPythonライブラリであるboto3をモックするライブラリです。
以下の例のように、boto3でのAWSのサービスに対する操作をモックしてテストする事ができます。使い方はこの記事などを参照してください。

import boto3

def add_servers(ami_id, count):
    client = boto3.client('ec2', region_name='us-west-1')
    client.run_instances(ImageId=ami_id, MinCount=count, MaxCount=count)

# Test
from . import add_servers
from moto import mock_ec2

@mock_ec2
def test_add_servers():
    # add_servers内でのboto3の処理はmotoにモックされる
    add_servers('ami-1234abcd', 2)

    client = boto3.client('ec2', region_name='us-west-1')
    instances = client.describe_instances()['Reservations'][0]['Instances']
    assert len(instances) == 2

motoがboto3をモックする仕組み

motoがどのようにしてAWSのサービスへのリクエストをモックしているかを、以下の順番で見ていきましょう。各節では、motoのコア部分の動作の仕組みを説明した上で、kinesisvideoの実装を例として示しながら進めます。実装例は簡単のために一部コードを省略しています。

  • motoのモックの初期化
  • ルーティング
  • リクエスト/レスポンスの処理
  • ステートの管理
  • エラーハンドリング

motoのモックの初期化

motoのモックは@mock_kinesisvideoのようなPythonのデコレータをテスト関数に設定するところからはじまります。

moto/tests/test_kinesisvideo/test_kinesisvideo.py
@mock_kinesisvideo
def test_create_stream():
    client = boto3.client("kinesisvideo", region_name="ap-northeast-1")
    ...

motoのデコレータが呼び出されると、Pythonのデコレータの仕組みBotocoreEventMockAWS.start()が呼ばれます。その中ではBotocoreEventMockAWS.enable_patching()が実行されており、AWSへのHTTPリクエストのモックを登録しています。HTTPリクエストのモックにはgetsentry/responsesが使われています。

ルーティング

HTTPリクエストのモックには、URLのパスとパスに対応するハンドラ関数が登録されます。URLのパスと対応するハンドラ関数は各サービスのurls.pyで定義されます。
各ハンドラ関数はAWSのAPIに対するHTTPリクエストを受けて、AWSのAPIが定義するフォーマットでレスポンスを返すように実装します。
各サービスのurls.pyでは、URLのパスに対応するハンドラ関数を実装すればいいのですが、大半のサービスの実装ではBaseResponseクラスを継承したResponseクラスを作ってresponse.dispatchをハンドラ関数として渡しています。

moto/moto/kinesisvideo/urls.py
url_bases = [
    "https?://kinesisvideo.(.+).amazonaws.com",
]
response = KinesisVideoResponse()
url_paths = {
    "{0}/createStream$": response.dispatch,
    "{0}/deleteStream$": response.dispatch,
    "{0}/listStreams$": response.dispatch,
}

BaseResponse.dispatch()の中ではBaseResponse.call_action()が呼ばれていて、クエリストリングやURLからリクエストされたオペレーション名を特定し、各サービス毎のResponseクラスに実装されているはずの、オペレーションに対応するインスタンスメソッドを呼びます。

リクエスト/レスポンスの処理

各サービスのResponseクラスは、クエリ・ストリングやメッセージ・ボディなどを渡されて処理を開始し、最終的にXMLやJSON等のサービス毎に決められたフォーマットでレスポンスを返す必要があります。
AWSではサービス毎にAPIプロトコルが様々なのでサービス毎にリクエスト・レスポンスの処理をする必要があります。フォーマットの詳細に関してはAPIドキュメントを参照して実装する必要があります。
オペレーションに対するインプットパラメータの取得はBaseReponseにメソッドが定義されているので、それのメソッドを使いパラメータをパースします。

moto/moto/kinesisvideo/responses.py
class KinesisVideoResponse(BaseResponse):
    SERVICE_NAME = "kinesisvideo"

    @property
    def kinesisvideo_backend(self):
        return kinesisvideo_backends[self.region]

    def create_stream(self):
        stream_name = self._get_param("StreamName")
        stream_arn = self.kinesisvideo_backend.create_stream(
            device_name=device_name,
        )
        return json.dumps(dict(StreamARN=stream_arn))

    def list_streams(self):
        stream_info_list = self.kinesisvideo_backend.list_streams()
        return json.dumps(dict(StreamInfoList=stream_info_list))

    def delete_stream(self):
        stream_arn = self._get_param("StreamARN")
        self.kinesisvideo_backend.delete_stream(
            stream_arn=stream_arn
        )
        return json.dumps(dict())

ステートの管理

motoはboto3のモックとしてリクエストされたのリソースのステートを管理する必要があります。そのため、各サービス・リージョン毎にBackendクラスを作ってサービスのステートを管理します。同様に、サービス内のリソースのステートの管理も必要なので、BaseModelを継承したModelを作って管理します。Modelは、S3でのバケットやEC2でのInstance等に対応します。

moto/moto/kinesisvideo/models.py
class Stream(BaseModel):
    def __init__(
        self,
        region_name,
        stream_name,
        media_type,
    ):
        self.region_name = region_name
        self.stream_name = stream_name
        self.media_type = media_type
        self.creation_time = datetime.utcnow()
        stream_arn = "arn:aws:kinesisvideo:{}:{}:stream/{}/1598784211076".format(
            self.region_name, ACCOUNT_ID, self.stream_name
        )
        self.arn = stream_arn

    def to_dict(self):
        return {
            "StreamName": self.stream_name,
            "StreamARN": self.arn,
            "MediaType": self.media_type,
        }

class KinesisVideoBackend(BaseBackend):
    def __init__(self, region_name=None):
        super(KinesisVideoBackend, self).__init__()
        self.region_name = region_name
        self.streams = {}

    def reset(self):
        region_name = self.region_name
        self.__dict__ = {}
        self.__init__(region_name)

    def create_stream(
        self,
        stream_name,
        media_type,
    ):
        streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
        if len(streams) > 0:
            raise ResourceInUseException(
                "The stream {} already exists.".format(stream_name)
            )
        stream = Stream(
            self.region_name,
            stream_name,
            media_type,
        )
        self.streams[stream.arn] = stream
        return stream.arn

    def _get_stream(self, stream_name, stream_arn):
        if stream_name:
            streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
            if len(streams) == 0:
                raise ResourceNotFoundException()
            stream = streams[0]
        elif stream_arn:
            stream = self.streams.get(stream_arn)
            if stream is None:
                raise ResourceNotFoundException()
        return stream

    def list_streams(self):
        stream_info_list = [_.to_dict() for _ in self.streams.values()]
        return stream_info_list

    def delete_stream(self, stream_arn, current_version):
        stream = self.streams.get(stream_arn)
        if stream is None:
            raise ResourceNotFoundException()
        del self.streams[stream_arn]

エラーハンドリング

ユーザからリクエストされたリソースが見つからない場合等のエラー時には、moto.core.exceptionsのクラスを継承した例外を発生させることで、boto3のエラーとして例外が発生します。

moto/moto/kinesisvideo/exceptions.py

from moto.core.exceptions import RESTError

class KinesisvideoClientError(RESTError):
    code = 400

class ResourceNotFoundException(KinesisvideoClientError):
    def __init__(self):
        self.code = 404
        super(ResourceNotFoundException, self).__init__(
            "ResourceNotFoundException",
            "The requested stream is not found or not active.",
        )

motoの動作の仕組みまとめ

ここまで見てきたように、motoは以下の仕組みによってboto3をモックしています。

  • motoのモックの初期化
  • ルーティング
  • リクエスト/レスポンスの処理
  • ステートの管理
  • エラーハンドリング

各サービスに対応するパッケージは以下の構成で、オペレーションを追加する場合は各ファイルにオペレーションに対応するコードを実装していきます。

kinesisvideo/
├── __init__.py
├── exceptions.py
├── models.py
├── responses.py
└── urls.py

moto/kinesisvideo/

motoに未実装のサービス/オペレーションを追加してみる

コントリビュートガイドにもあるようにサービス追加のためのテンプレート生成スクリプトが用意されています。スクリプトを実行すると、サービス名とオペレーション名の選択ができるので、それぞれ選択するとテンプレートが生成されます。


$ git clone git@github.com:spulec/moto.git
$ cd moto
$ make scaffold

Select service: medialive
==Current Implementation Status==
[ ] batch_update_schedule
[ ] create_channel
...
[ ] update_reservation
=================================
Select Operation: create_channel
    Initializing service    medialive
    creating    moto/medialive/models.py
    creating    moto/medialive/responses.py
    creating    moto/medialive/urls.py
   ...

追加したいサービスが全く実装されていない場合は、追加サービスに必要となるファイル(urls.py, responses.py, models.py, exceptions.py等)のテンプレートがはじめに生成されます。サービスが既に実装されている場合には追加オペレーションのメソッドがResponseBackendに追加されます。
各メソッドのオペレーション名、リクエストパラメータ、レスポンスパラメータは前述したbotocoreのservice_modelを使って自動生成されます。

moto/medialive/responses.py
class MediaLiveResponse(BaseResponse):
    SERVICE_NAME = 'medialive'
    @property
    def medialive_backend(self):
        return medialive_backends[self.region]

    def create_channel(self):
        channel_class = self._get_param("ChannelClass")
        ...
        tags = self._get_param("Tags")
        channel = self.medialive_backend.create_channel(
            channel_class=channel_class,
            ...
            tags=tags,
        )
        # TODO: adjust response
        return json.dumps(dict(channel=channel))

後は、生成されたテンプレートの中身を実装していく事でmotoで未実装のサービス/オペレーションを追加する事ができます。

ここまで例としてきたkinesisvideoを追加するPull Requestは以下のものなので、実装時の参考になるかもしれません。
https://github.com/spulec/moto/pull/3271

まとめ

本記事では、以下の項目について説明しました。

  • AWSのサービスを使ったプログラムに対するテストの課題と対処
  • motoとは
  • motoがboto3をモックする仕組み
  • motoに未実装のサービス/オペレーションを追加してみる

ここまで見てきたようにmotoの実装はコントリビュートの力技努力によってできており、まだまだ未実装のサービスやオペレーションが沢山あります。OSSコントリビュートチャンスなので、一度トライしてはいかがでしょうか?


  1. ちなみにLocalStackの内部では一部motoが使われています。 

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
3