この記事は ABEJA Advent Calendar 2020 3日目の記事です。
この記事ではPythonでAWSのモックをするライブラリであるmotoにコントリビュートして、未実装のサービスやオペレーションを追加する方法を説明していきます。
ABEJAではAWSのサービスを使ってシステムを作る事がありますが、利用しているAWSのサービスがmotoでは実装されていない事があります。それらに対応するためにmotoにコントリビュートしており、コントリビュート方法をまとめた記事です。筆者が出している Pull Requestはこちら。
この記事では以下の順番で説明を進めます。
- AWSのサービスを使ったプログラムに対するテストの課題と対処
- motoとは
- motoがboto3をモックする仕組み
- motoに未実装のサービス/オペレーションを追加してみる
なお、本記事ではAWSのサービスとはEC2
などのサービスを、オペレーションとはcreate_instance()
などのサービスに対するオペレーションの事を指します。
AWSのサービスを使ったプログラムに対するテスト
テスト時の課題
AWSのサービスを使っているプログラムのユニットテストを実行する場合、AWSとの通信部分に関して以下のような課題が発生します。
- ユニットテストの実行毎に課金されてしまう
- AWSのリソースの作成・削除に時間がかかるためテスト実行が非効率・不安定
- ローカル環境・CI環境で利用するAWSのクレデンシャルの管理が必要
AWSサービスのモック手法
テスト時の課題について触れましたが、実際にはユニットテスト時に実際のAWSのサービスと通信をしている事は無いでしょう。
実際には、テストコードでHTTPリクエストやSDKのメソッドに対するモックを実装したり、ローカル環境で起動したモック用のサーバーと通信をする事でテストを実行しているのでは無いでしょうか?
- モック用のローカルAPIサーバ
- SDKのモック/スタブ
- Ruby: AWS SDK for Ruby
- Javascript: aws-sdk-mock
- Python: moto ←今日はこの説明
- etc...
- HTTPリクエストのモック
- Ruby: webmock, [VCR]
(https://github.com/vcr/vcr) - Python: responses
- etc...
- Ruby: webmock, [VCR]
本記事ではmotoの動作の仕組みと、motoで未実装のサービスやオペレーションの追加方法について説明します。
motoとは
motoとはAWSのPythonライブラリであるboto3をモックするライブラリです。
以下の例のように、boto3でのAWSのサービスに対する操作をモックしてテストする事ができます。使い方はこの記事などを参照してください。
import boto3
def add_servers(ami_id, count):
client = boto3.client('ec2', region_name='us-west-1')
client.run_instances(ImageId=ami_id, MinCount=count, MaxCount=count)
# Test
from . import add_servers
from moto import mock_ec2
@mock_ec2
def test_add_servers():
# add_servers内でのboto3の処理はmotoにモックされる
add_servers('ami-1234abcd', 2)
client = boto3.client('ec2', region_name='us-west-1')
instances = client.describe_instances()['Reservations'][0]['Instances']
assert len(instances) == 2
motoがboto3をモックする仕組み
motoがどのようにしてAWSのサービスへのリクエストをモックしているかを、以下の順番で見ていきましょう。各節では、motoのコア部分の動作の仕組みを説明した上で、kinesisvideoの実装を例として示しながら進めます。実装例は簡単のために一部コードを省略しています。
- motoのモックの初期化
- ルーティング
- リクエスト/レスポンスの処理
- ステートの管理
- エラーハンドリング
motoのモックの初期化
motoのモックは@mock_kinesisvideo
のようなPythonのデコレータをテスト関数に設定するところからはじまります。
@mock_kinesisvideo
def test_create_stream():
client = boto3.client("kinesisvideo", region_name="ap-northeast-1")
...
motoのデコレータが呼び出されると、Pythonのデコレータの仕組みでBotocoreEventMockAWS.start()
が呼ばれます。その中ではBotocoreEventMockAWS.enable_patching()
が実行されており、AWSへのHTTPリクエストのモックを登録しています。HTTPリクエストのモックにはgetsentry/responsesが使われています。
ルーティング
HTTPリクエストのモックには、URLのパスとパスに対応するハンドラ関数が登録されます。URLのパスと対応するハンドラ関数は各サービスのurls.py
で定義されます。
各ハンドラ関数はAWSのAPIに対するHTTPリクエストを受けて、AWSのAPIが定義するフォーマットでレスポンスを返すように実装します。
各サービスのurls.py
では、URLのパスに対応するハンドラ関数を実装すればいいのですが、大半のサービスの実装ではBaseResponse
クラスを継承したResponse
クラスを作ってresponse.dispatch
をハンドラ関数として渡しています。
url_bases = [
"https?://kinesisvideo.(.+).amazonaws.com",
]
response = KinesisVideoResponse()
url_paths = {
"{0}/createStream$": response.dispatch,
"{0}/deleteStream$": response.dispatch,
"{0}/listStreams$": response.dispatch,
}
BaseResponse.dispatch()
の中ではBaseResponse.call_action()
が呼ばれていて、クエリストリングやURLからリクエストされたオペレーション名を特定し、各サービス毎のResponse
クラスに実装されているはずの、オペレーションに対応するインスタンスメソッドを呼びます。
リクエスト/レスポンスの処理
各サービスのResponseクラスは、クエリ・ストリングやメッセージ・ボディなどを渡されて処理を開始し、最終的にXMLやJSON等のサービス毎に決められたフォーマットでレスポンスを返す必要があります。
AWSではサービス毎にAPIプロトコルが様々なのでサービス毎にリクエスト・レスポンスの処理をする必要があります。フォーマットの詳細に関してはAPIドキュメントを参照して実装する必要があります。
オペレーションに対するインプットパラメータの取得はBaseReponseにメソッドが定義されているので、それのメソッドを使いパラメータをパースします。
class KinesisVideoResponse(BaseResponse):
SERVICE_NAME = "kinesisvideo"
@property
def kinesisvideo_backend(self):
return kinesisvideo_backends[self.region]
def create_stream(self):
stream_name = self._get_param("StreamName")
stream_arn = self.kinesisvideo_backend.create_stream(
device_name=device_name,
)
return json.dumps(dict(StreamARN=stream_arn))
def list_streams(self):
stream_info_list = self.kinesisvideo_backend.list_streams()
return json.dumps(dict(StreamInfoList=stream_info_list))
def delete_stream(self):
stream_arn = self._get_param("StreamARN")
self.kinesisvideo_backend.delete_stream(
stream_arn=stream_arn
)
return json.dumps(dict())
ステートの管理
motoはboto3のモックとしてリクエストされたのリソースのステートを管理する必要があります。そのため、各サービス・リージョン毎にBackend
クラスを作ってサービスのステートを管理します。同様に、サービス内のリソースのステートの管理も必要なので、BaseModel
を継承したModel
を作って管理します。Model
は、S3でのバケットやEC2でのInstance等に対応します。
class Stream(BaseModel):
def __init__(
self,
region_name,
stream_name,
media_type,
):
self.region_name = region_name
self.stream_name = stream_name
self.media_type = media_type
self.creation_time = datetime.utcnow()
stream_arn = "arn:aws:kinesisvideo:{}:{}:stream/{}/1598784211076".format(
self.region_name, ACCOUNT_ID, self.stream_name
)
self.arn = stream_arn
def to_dict(self):
return {
"StreamName": self.stream_name,
"StreamARN": self.arn,
"MediaType": self.media_type,
}
class KinesisVideoBackend(BaseBackend):
def __init__(self, region_name=None):
super(KinesisVideoBackend, self).__init__()
self.region_name = region_name
self.streams = {}
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def create_stream(
self,
stream_name,
media_type,
):
streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
if len(streams) > 0:
raise ResourceInUseException(
"The stream {} already exists.".format(stream_name)
)
stream = Stream(
self.region_name,
stream_name,
media_type,
)
self.streams[stream.arn] = stream
return stream.arn
def _get_stream(self, stream_name, stream_arn):
if stream_name:
streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
if len(streams) == 0:
raise ResourceNotFoundException()
stream = streams[0]
elif stream_arn:
stream = self.streams.get(stream_arn)
if stream is None:
raise ResourceNotFoundException()
return stream
def list_streams(self):
stream_info_list = [_.to_dict() for _ in self.streams.values()]
return stream_info_list
def delete_stream(self, stream_arn, current_version):
stream = self.streams.get(stream_arn)
if stream is None:
raise ResourceNotFoundException()
del self.streams[stream_arn]
エラーハンドリング
ユーザからリクエストされたリソースが見つからない場合等のエラー時には、moto.core.exceptions
のクラスを継承した例外を発生させることで、boto3のエラーとして例外が発生します。
from moto.core.exceptions import RESTError
class KinesisvideoClientError(RESTError):
code = 400
class ResourceNotFoundException(KinesisvideoClientError):
def __init__(self):
self.code = 404
super(ResourceNotFoundException, self).__init__(
"ResourceNotFoundException",
"The requested stream is not found or not active.",
)
motoの動作の仕組みまとめ
ここまで見てきたように、motoは以下の仕組みによってboto3をモックしています。
- motoのモックの初期化
- ルーティング
- リクエスト/レスポンスの処理
- ステートの管理
- エラーハンドリング
各サービスに対応するパッケージは以下の構成で、オペレーションを追加する場合は各ファイルにオペレーションに対応するコードを実装していきます。
kinesisvideo/
├── __init__.py
├── exceptions.py
├── models.py
├── responses.py
└── urls.py
motoに未実装のサービス/オペレーションを追加してみる
コントリビュートガイドにもあるようにサービス追加のためのテンプレート生成スクリプトが用意されています。スクリプトを実行すると、サービス名とオペレーション名の選択ができるので、それぞれ選択するとテンプレートが生成されます。
$ git clone git@github.com:spulec/moto.git
$ cd moto
$ make scaffold
Select service: medialive
==Current Implementation Status==
[ ] batch_update_schedule
[ ] create_channel
...
[ ] update_reservation
=================================
Select Operation: create_channel
Initializing service medialive
creating moto/medialive/models.py
creating moto/medialive/responses.py
creating moto/medialive/urls.py
...
追加したいサービスが全く実装されていない場合は、追加サービスに必要となるファイル(urls.py
, responses.py
, models.py
, exceptions.py
等)のテンプレートがはじめに生成されます。サービスが既に実装されている場合には追加オペレーションのメソッドがResponse
やBackend
に追加されます。
各メソッドのオペレーション名、リクエストパラメータ、レスポンスパラメータは前述したbotocoreのservice_modelを使って自動生成されます。
class MediaLiveResponse(BaseResponse):
SERVICE_NAME = 'medialive'
@property
def medialive_backend(self):
return medialive_backends[self.region]
def create_channel(self):
channel_class = self._get_param("ChannelClass")
...
tags = self._get_param("Tags")
channel = self.medialive_backend.create_channel(
channel_class=channel_class,
...
tags=tags,
)
# TODO: adjust response
return json.dumps(dict(channel=channel))
後は、生成されたテンプレートの中身を実装していく事でmotoで未実装のサービス/オペレーションを追加する事ができます。
ここまで例としてきたkinesisvideo
を追加するPull Requestは以下のものなので、実装時の参考になるかもしれません。
https://github.com/spulec/moto/pull/3271
まとめ
本記事では、以下の項目について説明しました。
- AWSのサービスを使ったプログラムに対するテストの課題と対処
- motoとは
- motoがboto3をモックする仕組み
- motoに未実装のサービス/オペレーションを追加してみる
ここまで見てきたようにmotoの実装はコントリビュートの力技努力によってできており、まだまだ未実装のサービスやオペレーションが沢山あります。OSSコントリビュートチャンスなので、一度トライしてはいかがでしょうか?
-
ちなみにLocalStackの内部では一部motoが使われています。 ↩