20
4

More than 3 years have passed since last update.

AWSのモックライブラリ moto にコントリビュートしてみよう!

Last updated at Posted at 2020-12-02

この記事は ABEJA Advent Calendar 2020 3日目の記事です。

この記事ではPythonでAWSのモックをするライブラリであるmotoにコントリビュートして、未実装のサービスやオペレーションを追加する方法を説明していきます。
ABEJAではAWSのサービスを使ってシステムを作る事がありますが、利用しているAWSのサービスがmotoでは実装されていない事があります。それらに対応するためにmotoにコントリビュートしており、コントリビュート方法をまとめた記事です。筆者が出している Pull Requestはこちら

この記事では以下の順番で説明を進めます。

  • AWSのサービスを使ったプログラムに対するテストの課題と対処
  • motoとは
  • motoがboto3をモックする仕組み
  • motoに未実装のサービス/オペレーションを追加してみる

なお、本記事ではAWSのサービスとはEC2などのサービスを、オペレーションとはcreate_instance()などのサービスに対するオペレーションの事を指します。

AWSのサービスを使ったプログラムに対するテスト

テスト時の課題

AWSのサービスを使っているプログラムのユニットテストを実行する場合、AWSとの通信部分に関して以下のような課題が発生します。

  • ユニットテストの実行毎に課金されてしまう
  • AWSのリソースの作成・削除に時間がかかるためテスト実行が非効率・不安定
  • ローカル環境・CI環境で利用するAWSのクレデンシャルの管理が必要

AWSサービスのモック手法

テスト時の課題について触れましたが、実際にはユニットテスト時に実際のAWSのサービスと通信をしている事は無いでしょう。
実際には、テストコードでHTTPリクエストやSDKのメソッドに対するモックを実装したり、ローカル環境で起動したモック用のサーバーと通信をする事でテストを実行しているのでは無いでしょうか?

本記事ではmotoの動作の仕組みと、motoで未実装のサービスやオペレーションの追加方法について説明します。

motoとは

motoとはAWSのPythonライブラリであるboto3をモックするライブラリです。
以下の例のように、boto3でのAWSのサービスに対する操作をモックしてテストする事ができます。使い方はこの記事などを参照してください。

import boto3

def add_servers(ami_id, count):
    client = boto3.client('ec2', region_name='us-west-1')
    client.run_instances(ImageId=ami_id, MinCount=count, MaxCount=count)

# Test
from . import add_servers
from moto import mock_ec2

@mock_ec2
def test_add_servers():
    # add_servers内でのboto3の処理はmotoにモックされる
    add_servers('ami-1234abcd', 2)

    client = boto3.client('ec2', region_name='us-west-1')
    instances = client.describe_instances()['Reservations'][0]['Instances']
    assert len(instances) == 2

motoがboto3をモックする仕組み

motoがどのようにしてAWSのサービスへのリクエストをモックしているかを、以下の順番で見ていきましょう。各節では、motoのコア部分の動作の仕組みを説明した上で、kinesisvideoの実装を例として示しながら進めます。実装例は簡単のために一部コードを省略しています。

  • motoのモックの初期化
  • ルーティング
  • リクエスト/レスポンスの処理
  • ステートの管理
  • エラーハンドリング

motoのモックの初期化

motoのモックは@mock_kinesisvideoのようなPythonのデコレータをテスト関数に設定するところからはじまります。

moto/tests/test_kinesisvideo/test_kinesisvideo.py
@mock_kinesisvideo
def test_create_stream():
    client = boto3.client("kinesisvideo", region_name="ap-northeast-1")
    ...

motoのデコレータが呼び出されると、Pythonのデコレータの仕組みBotocoreEventMockAWS.start()が呼ばれます。その中ではBotocoreEventMockAWS.enable_patching()が実行されており、AWSへのHTTPリクエストのモックを登録しています。HTTPリクエストのモックにはgetsentry/responsesが使われています。

ルーティング

HTTPリクエストのモックには、URLのパスとパスに対応するハンドラ関数が登録されます。URLのパスと対応するハンドラ関数は各サービスのurls.pyで定義されます。
各ハンドラ関数はAWSのAPIに対するHTTPリクエストを受けて、AWSのAPIが定義するフォーマットでレスポンスを返すように実装します。
各サービスのurls.pyでは、URLのパスに対応するハンドラ関数を実装すればいいのですが、大半のサービスの実装ではBaseResponseクラスを継承したResponseクラスを作ってresponse.dispatchをハンドラ関数として渡しています。

moto/moto/kinesisvideo/urls.py
url_bases = [
    "https?://kinesisvideo.(.+).amazonaws.com",
]
response = KinesisVideoResponse()
url_paths = {
    "{0}/createStream$": response.dispatch,
    "{0}/deleteStream$": response.dispatch,
    "{0}/listStreams$": response.dispatch,
}

BaseResponse.dispatch()の中ではBaseResponse.call_action()が呼ばれていて、クエリストリングやURLからリクエストされたオペレーション名を特定し、各サービス毎のResponseクラスに実装されているはずの、オペレーションに対応するインスタンスメソッドを呼びます。

リクエスト/レスポンスの処理

各サービスのResponseクラスは、クエリ・ストリングやメッセージ・ボディなどを渡されて処理を開始し、最終的にXMLやJSON等のサービス毎に決められたフォーマットでレスポンスを返す必要があります。
AWSではサービス毎にAPIプロトコルが様々なのでサービス毎にリクエスト・レスポンスの処理をする必要があります。フォーマットの詳細に関してはAPIドキュメントを参照して実装する必要があります。
オペレーションに対するインプットパラメータの取得はBaseReponseにメソッドが定義されているので、それのメソッドを使いパラメータをパースします。

moto/moto/kinesisvideo/responses.py
class KinesisVideoResponse(BaseResponse):
    SERVICE_NAME = "kinesisvideo"

    @property
    def kinesisvideo_backend(self):
        return kinesisvideo_backends[self.region]

    def create_stream(self):
        stream_name = self._get_param("StreamName")
        stream_arn = self.kinesisvideo_backend.create_stream(
            device_name=device_name,
        )
        return json.dumps(dict(StreamARN=stream_arn))

    def list_streams(self):
        stream_info_list = self.kinesisvideo_backend.list_streams()
        return json.dumps(dict(StreamInfoList=stream_info_list))

    def delete_stream(self):
        stream_arn = self._get_param("StreamARN")
        self.kinesisvideo_backend.delete_stream(
            stream_arn=stream_arn
        )
        return json.dumps(dict())

ステートの管理

motoはboto3のモックとしてリクエストされたのリソースのステートを管理する必要があります。そのため、各サービス・リージョン毎にBackendクラスを作ってサービスのステートを管理します。同様に、サービス内のリソースのステートの管理も必要なので、BaseModelを継承したModelを作って管理します。Modelは、S3でのバケットやEC2でのInstance等に対応します。

moto/moto/kinesisvideo/models.py
class Stream(BaseModel):
    def __init__(
        self,
        region_name,
        stream_name,
        media_type,
    ):
        self.region_name = region_name
        self.stream_name = stream_name
        self.media_type = media_type
        self.creation_time = datetime.utcnow()
        stream_arn = "arn:aws:kinesisvideo:{}:{}:stream/{}/1598784211076".format(
            self.region_name, ACCOUNT_ID, self.stream_name
        )
        self.arn = stream_arn

    def to_dict(self):
        return {
            "StreamName": self.stream_name,
            "StreamARN": self.arn,
            "MediaType": self.media_type,
        }

class KinesisVideoBackend(BaseBackend):
    def __init__(self, region_name=None):
        super(KinesisVideoBackend, self).__init__()
        self.region_name = region_name
        self.streams = {}

    def reset(self):
        region_name = self.region_name
        self.__dict__ = {}
        self.__init__(region_name)

    def create_stream(
        self,
        stream_name,
        media_type,
    ):
        streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
        if len(streams) > 0:
            raise ResourceInUseException(
                "The stream {} already exists.".format(stream_name)
            )
        stream = Stream(
            self.region_name,
            stream_name,
            media_type,
        )
        self.streams[stream.arn] = stream
        return stream.arn

    def _get_stream(self, stream_name, stream_arn):
        if stream_name:
            streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
            if len(streams) == 0:
                raise ResourceNotFoundException()
            stream = streams[0]
        elif stream_arn:
            stream = self.streams.get(stream_arn)
            if stream is None:
                raise ResourceNotFoundException()
        return stream

    def list_streams(self):
        stream_info_list = [_.to_dict() for _ in self.streams.values()]
        return stream_info_list

    def delete_stream(self, stream_arn, current_version):
        stream = self.streams.get(stream_arn)
        if stream is None:
            raise ResourceNotFoundException()
        del self.streams[stream_arn]

エラーハンドリング

ユーザからリクエストされたリソースが見つからない場合等のエラー時には、moto.core.exceptionsのクラスを継承した例外を発生させることで、boto3のエラーとして例外が発生します。

moto/moto/kinesisvideo/exceptions.py

from moto.core.exceptions import RESTError

class KinesisvideoClientError(RESTError):
    code = 400

class ResourceNotFoundException(KinesisvideoClientError):
    def __init__(self):
        self.code = 404
        super(ResourceNotFoundException, self).__init__(
            "ResourceNotFoundException",
            "The requested stream is not found or not active.",
        )

motoの動作の仕組みまとめ

ここまで見てきたように、motoは以下の仕組みによってboto3をモックしています。

  • motoのモックの初期化
  • ルーティング
  • リクエスト/レスポンスの処理
  • ステートの管理
  • エラーハンドリング

各サービスに対応するパッケージは以下の構成で、オペレーションを追加する場合は各ファイルにオペレーションに対応するコードを実装していきます。

kinesisvideo/
├── __init__.py
├── exceptions.py
├── models.py
├── responses.py
└── urls.py

moto/kinesisvideo/

motoに未実装のサービス/オペレーションを追加してみる

コントリビュートガイドにもあるようにサービス追加のためのテンプレート生成スクリプトが用意されています。スクリプトを実行すると、サービス名とオペレーション名の選択ができるので、それぞれ選択するとテンプレートが生成されます。


$ git clone git@github.com:spulec/moto.git
$ cd moto
$ make scaffold

Select service: medialive
==Current Implementation Status==
[ ] batch_update_schedule
[ ] create_channel
...
[ ] update_reservation
=================================
Select Operation: create_channel
    Initializing service    medialive
    creating    moto/medialive/models.py
    creating    moto/medialive/responses.py
    creating    moto/medialive/urls.py
   ...

追加したいサービスが全く実装されていない場合は、追加サービスに必要となるファイル(urls.py, responses.py, models.py, exceptions.py等)のテンプレートがはじめに生成されます。サービスが既に実装されている場合には追加オペレーションのメソッドがResponseBackendに追加されます。
各メソッドのオペレーション名、リクエストパラメータ、レスポンスパラメータは前述したbotocoreのservice_modelを使って自動生成されます。

moto/medialive/responses.py
class MediaLiveResponse(BaseResponse):
    SERVICE_NAME = 'medialive'
    @property
    def medialive_backend(self):
        return medialive_backends[self.region]

    def create_channel(self):
        channel_class = self._get_param("ChannelClass")
        ...
        tags = self._get_param("Tags")
        channel = self.medialive_backend.create_channel(
            channel_class=channel_class,
            ...
            tags=tags,
        )
        # TODO: adjust response
        return json.dumps(dict(channel=channel))

後は、生成されたテンプレートの中身を実装していく事でmotoで未実装のサービス/オペレーションを追加する事ができます。

ここまで例としてきたkinesisvideoを追加するPull Requestは以下のものなので、実装時の参考になるかもしれません。
https://github.com/spulec/moto/pull/3271

まとめ

本記事では、以下の項目について説明しました。

  • AWSのサービスを使ったプログラムに対するテストの課題と対処
  • motoとは
  • motoがboto3をモックする仕組み
  • motoに未実装のサービス/オペレーションを追加してみる

ここまで見てきたようにmotoの実装はコントリビュートの力技努力によってできており、まだまだ未実装のサービスやオペレーションが沢山あります。OSSコントリビュートチャンスなので、一度トライしてはいかがでしょうか?


  1. ちなみにLocalStackの内部では一部motoが使われています。 

20
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
4