こんにちは。会話 AI ロボット Romi のサーバーサイドを作ってきた halhorn です。
年末なので(そして個人的に Romi でのお仕事が一区切りつくので) Romi の仕組みを0から作ってきた中での知見を投稿していこうと思います。
Romi のサーバーサイドのコードでは、データソース(RDS, DynamoDB その他のデータベースや、 API 呼び出しなど)にアクセスする際にはサードパーティ製のライブラリを直接使うのではなく、かならず独自のクラスを使ってラップしています。
この記事では、データソース層をラップする理由と、ラップすることで実際にどのようなメリットがあるのかを紹介します。
(記事のなかに出てくるコードは Romi で実際に使っているコードではありません。)
データソース層をラップするとは??
データソース層をラップする、というのは「アプリケーションコードからはサードパーティライブラリを直接触らず、必ず自分たちのデータソースアクセス用クラスを経由する」というルールを徹底する、という意味です。
例えば DynamoDB (KVS) からデータを取ってくるシンプルなコードは以下のようになります。
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('User')
response = table.get_item(
Key={
'user_id': 'hal',
}
)
user_name = response['Item']['name']
Romi ではこのように直接 boto3 を使うことはせず、 KVSBase というベースクラスを継承したクラス経由でデータにアクセスします。
kvs = UserKVS.create(partition_key='hal')
kvs.get('name')
上記は DynamoDB ですが、 Romi では他にも以下のように各種データソースに対してラッパーが作られています
- DynamoDB
- RDS
- Elasticsearch / Opensearch
- Elasticache (memcached)
- S3
- SQS
- SES
- GoogleDrive
- Spreadsheet
- RestAPI 呼び出し
- 外部 API や、独自 AI をホストする GPU サーバーなど
データソースをラップするメリット
データソースへのアクセスを専用クラスでラップすることで、そのデータソースにアクセスする時に必ず特定のコードを実行できます。
これによって、メトリクス取得・キャッシュ・環境切替・安全なテストといった制御をアプリ全体で統一できます。
Romi では例えば具体的には以下のようなことを行っています。
A. データソースへのアクセス回数・時間を記録する
多くのアプリケーションにおいて、サーバーの処理の多くの時間を占めているのはデータソースへのアクセスです。
会話 AI ロボットである Romi の場合でも(AI 部分は GPU サーバーに切り出しているため)アプリケーションサーバーのボトルネックは各種データソース呼び出しとなります。
したがって、 Romi の会話の高速化を考えるときにまず大事になるのは、1回の会話で何回どの種類のデータソースにアクセスを行い、その結果データソースアクセスでどれだけの時間がかかったかを計測することです。
(テンポ感よく会話するために - Romi の応答高速化の技術 でもこの仕組みも使ってボトルネック調査をしています。)
Romi では PerformanceCounter というクラスを作っており、このクラスをデータソースラッパーの中で呼び出すことで、1回のアプリケーションサーバーリクエスト(e.g.会話リクエスト)の中でどのデータソースへのアクセスが何回・何秒かかったかを記録できるようにしています。
(「1回のアプリケーションサーバーリクエスト」単位で処理を行う基盤もあるのですがそちらは気が向いたら別の記事で書きます。)
以下は擬似的なコードです。
class KVSBase:
def get(self, key: str) -> dict:
# この with 文を通った回数や、 with 文の中でかかった時間を記録
with PerformanceCounter('kvs.get_item'):
return self._get_from_dynamodb(key)
B. データソースにアクセスするクライアントやコネクションをキャッシュする
一部のライブラリはインスタンスを作るのに時間がかかることがあります。
例えば DynamoDB の場合 boto3.resource('dynamodb') 自体は10ms程度でできるのですが、初回の get_item 時などに100msほど時間がかかったりします。
これが resource をキャッシュしておけば5-10msで行えます。
(具体的数値は環境によって変わり得ます)
というわけで、 resource をデータソースラッパーでキャッシュしておけば高速に動作します。
class KVSBase:
_resource_cache = None
def get(self, key: str) -> dict:
# リソースをキャッシュ
if self._resource_cache is None:
self._resource_cache = boto3.resource('dynamodb')
table = self._resource_cache.Table('User')
...
C. データソースのアクセス先を環境によって切り替える
開発環境と本番環境、また CI でのテスト時など、データソースの向き先を切り替えたいことがよくあります。
このような場合に、毎度アプリケーションレイヤーのコードで接続先を書くのは、接続先情報があちこちに散らばってしまってよくありません。
接続先の選択をデータソースラッパーの中に隠蔽することで、アプリケーションレイヤーはその DB をどの環境で使うのかを気にすることなくビジネスロジックの実装に集中できます。
ちなみに Romi では接続先ホスト情報の責務を集約したクラス(& 設定ファイル)を作っており、環境変数から自身の環境(DEV, PRD, TEST)を判定して自動的につなぎ先を切り替えるようになっています。
class DBBase:
@classmethod
async def create_instance() -> Self:
# ここで内部的に環境に応じたホスト設定を読み出す
host = Host.RDS
# 秘匿すべき情報とかも隠蔽できる
secret = secret_loader.load('rds')
loop = asyncio.get_event_loop()
engine = await create_engine(
host=host.host,
port=host.port,
user=secret['user'],
password=secret['password'],
db=self.db_name,
charset='utf8',
autocommit=True,
loop=loop,
)
...
このあたりは DI (Dependency Injection) でやるべきという説もありますが、個人的には毎度依存を引き回すのも面倒なので、ある程度依存そのものが固定な場合は上記のような仕組みでも良いのではと思っていたりします。(個人の感想)
D. テストが安全にかける
データソースにアクセスするテストには以下のリスクがあります。
- a. 本番データにテストからアクセスしてしまい、本番のデータを変更してしまう
- b. データソースが保持するデータが「状態」になってしまい、複数のテストが状態を介して依存してしまう
a -> テストでの本番データソースアクセスを防ぐ
a の対策は C の「データソースのアクセス先を環境によって切り替える」によって行えます。
テスト環境では、テスト専用のデータソースをローカルに立ててそちらに向き先を変えます。
Romi の場合は個人開発環境や CI でのテストでは、 docker compose などでテスト用の mysql 用サーバーなどデータソースサーバーをローカルにたて、そちらにアクセス先を向けています。
データソースごとに以下の docker コンテナを使っています。
| データソース | コンテナイメージ |
|---|---|
| mysql | mysql |
| Elasticsearch | elasticsearch (を一部変えたもの) |
| Opensearch | opensearch (を一部変えたもの) |
| Elasticache (memcached) | memcached |
| AWS 各種 | localstack/localstack |
なかにはテスト用にデータソースのサーバーを立てるのが難しいものもあります。
そのような場合、そのデータソースをテストから呼び出した時に例外を発するようにしておけば、誤ってテスト中に本番のデータにアクセスすることを防げます。
※ 少し話がずれますが、上で書いた本番は開発環境です。ユーザーがアクセスする本当の本番環境はそもそもネットワーク的に分離して、個人開発環境や CI からアクセスできないようにしましょう。
b -> テストが他のテストに影響されるのを防ぐ
ローカルのデータソースサーバーを作ったとしても、以下のような問題が起きます。
- テスト A が、ユーザー X をローカル DB に作る
- テスト B が、ユーザー X をローカル DB に作る
- ここですでにユーザー X のデータが DB にあるので unique 制約に違反して例外発生!
このような複数のテストがお互いに影響を与えてしまう問題はとても厄介です。
「落ちたテストを単体で実行してもテストが通るのに(膨大にある)すべてのテストを通しで実行するとなぜか落ちる・・」が起きたときにはデバッグにかかる時間は計り知れません。
というわけで、各テストは可能な限り独立であるべきです。
Romi では fixture という仕組みを作っています。
(pytest では fixture の機能が標準搭載されていますが、 Romi では unittest を使っており独自に fixture 的な仕組みを作っています)
class BaseFixture:
@classmethod
async def initialize_globally(cls) -> None:
'''
テスト全体の前に一度だけ実行される処理を実装します。
'''
pass
async def setup(self) -> None:
'''
各テストの最初に実行される処理を実装します。
'''
pass
async def teardown(self) -> None:
'''
各テストの最後に実行される処理を実装します。
'''
pass
def monitor(self, patch_cls: type, patch_method: str, func: Callable) -> Any:
'''
与えられたクラスの与えられたメソッドの挙動を変えることなく、そのメソッドが呼ばれたときに指定された処理を行います。
SYNOPSIS:
def _monitor_get_table(kvs_self):
kvs_call_count += 1
monitor = self.monitor(KVSBase, '_get_table', _monitor_get_table)
with monitor:
HogeKVS.create(...)
:param patch_cls: モニタ対象のクラス
:param patch_method: モニタ対象のメソッド名
:param func: モニタ処理
:return: patch.object の結果
'''
original = getattr(patch_cls, patch_method)
def _wrapper(patch_self, *args, **kwargs) -> Any:
func(patch_self, *args, **kwargs)
return original(patch_self, *args, **kwargs)
return patch.object(patch_cls, patch_method, _wrapper, spec=patch_cls)
この BaseFixture を継承した各データソースの種類ごとのクラスを作っておけば、自動的に各テストの最初に setup() が呼ばれ、各テストのおわりに teardown() が呼ばれます。
したがって、基本的には
- setup でデータソースの準備をする(テーブルを作るなど)
- teardown でデータソースの中身を消す
をすれば良いです。
ただ、膨大にあるテストで毎度それをやるとテストの時間がとてもかかってしまいます。
そこで上記コードの monitor をデータソースのラッパーに仕込んでおきます。
「データソースにアクセスするときには必ずラッパーを介する」というルールが守られているのであれば、これによってそのデータソースにアクセスがあったかを確実に知ることができます。
あとは、アクセスがあったデータソースにだけ teardown でデータのクリア処理を行えば OK です。
これによって、必要なときのみデータソースのクリアが走るようになり、テストを安全かつ高速に回すことができます。
おわりに
本記事では、私が Romi でのテストの依存の問題や高速化といった様々な課題を解決していく中で作ってきた仕組みの中からデータソースのラッパーに関するものを紹介させていただきました。
このようにデータソースのラッパークラスを作ることは安全でハイパフォーマンスなコードを書くうえでとても効果的です。
上記の他にもデータソースにアクセスする口を押さえておけば、パフォーマンスやセキュリティなどの様々なアイディアを統一的に適用することが可能になります。
ただし、上で挙げたメリットは「データソースにアクセスする場合は必ずラッパーを介する」というルールが守られてはじめて価値を生みます。逆に言うと、途中から一部だけ生ライブラリへのアクセスが混ざり始めると、一気にメリットが薄れてしまいます。
そのため、新しいプロジェクトを始める場合にはなるべく早い段階でこの仕組みを導入してしまうこと、一度入れたらレビューなどを通じてルールを徹底していくことが大事ですね。