2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Moto Server × boto3 でローカルS3環境を構築し、差分同期まで実装してみた

2
Posted at

1. はじめに

よくお世話になっていたMinIOがいつの間にか終了していたことと、
AWSに接続出来ないけどS3を使った開発が必要だった
という場面に遭遇し(そんな特殊な状況はなかなかないと思いますが)
試行錯誤した記録として記事にしました。

やりたいこと

  • S3(boto3)を使った処理をローカルで検証したい
  • 実際の AWS に接続できない
  • 保存したデータをローカルで確認したい
  • 出来るだけ軽くしたい

本記事のポイント

  • Moto を使って ローカルでS3を再現
  • exporter により S3の中身をファイルとして可視化
  • manifest を使った 差分同期で軽量化
  • 作成・更新・削除の一連の動作をローカルで検証可能

この構成のメリット

  • AWSに接続せず安全に開発できる
  • 本番コードをほぼそのまま利用できる
  • データの中身を直接確認できるためデバッグしやすい
  • 低スペック環境でも動作する

2. 今回作るもの

今回の構成

ローカル環境
 ├─ moto_server(S3のモックAPI)
 └─ exporter(差分同期ツール)
 └─ 任意のアプリ(boto3で接続)

ポイント

  • Moto Server を使って S3 API をローカルで再現
  • boto3 からそのまま接続可能
  • exporter が S3の内容をローカルに同期
  • 差分のみ取得するので軽量

使用技術

  • Python
  • boto3
  • Moto(S3モック)
  • subprocess(プロセス管理)

3. 主要コンポーネントについて

3.1 Moto Server とは

Moto は、AWSサービスをモックするためのライブラリ

特に Server mode を使うと:

  • HTTPサーバとして起動
  • boto3 から普通に接続できる
  • 実際のS3 APIに近い挙動

「ほぼ本番と同じコード」で検証できる

3.2 Exporter を作った理由

Moto Server だけだと
データはメモリ上なのでサーバー停止で消え、中身が見えにくい

そこで、
S3の中身をローカルファイルとして保存する仕組み
を作りました。

4. ディレクトリ構成

project/
  moto_server/
    runner.py        # moto server と exporter を起動・管理する
    exporter.py      # S3の内容を差分でローカルに同期する
    config.py        # 各種設定値(ポート・バケット名など)
  moto_export/         # S3の内容をローカルに保存するディレクトリ
    manifest.json      # 前回状態を記録し、差分同期に使う
    test-bucket/       # S3バケットごとの保存先
      sample/          # S3のキー構造がそのまま再現される
        hello.txt

5. config.py(設定管理)

ここからファイル単位で記載していきます。
まずはプログラム内で使う設定値をconfig.pyとしてまとめました。
dotenvを使わず、シンプルに Python ファイルで管理しています。

config.py
# moto server のポート番号
# runner.py でこのポートで起動される
MOTO_PORT = 9876

# boto3 から接続するエンドポイント
# 本来の AWS ではなくローカルの moto を指す
MOTO_ENDPOINT = f"http://localhost:{MOTO_PORT}"

# 使用するリージョン
# バケット作成時の設定にも使用される
AWS_REGION = "ap-northeast-1"

# S3の内容をローカルに保存するディレクトリ
# exporter がこの配下にファイルを書き出す
EXPORT_DIR = "./moto_export"

# ポーリング間隔(秒)
# exporter が S3 の変更をチェックする周期
EXPORT_INTERVAL = 15

# 削除同期を有効にするかどうか
# True にすると、S3 から削除されたファイルをローカルでも削除する
DELETE_SYNC = False

# 必須バケット一覧
# exporter 起動時に存在しなければ自動作成される
REQUIRED_BUCKETS = ["test-bucket", "report-bucket"]

6. runner.py(プロセス起動・管理)

runner.py
import signal
import socket
import subprocess
import sys
import time

import config

# 起動した子プロセスを管理するリスト
# (name, process) のタプルで保持する
processes = []


def wait_for_port(host: str, port: int, timeout: int = 10) -> None:
    """
    指定したホスト・ポートが接続可能になるまで待機する
    moto_server の起動完了待ちに使用
    """
    start = time.time()
    while True:
        try:
            # ソケット接続できれば起動済みと判断
            with socket.create_connection((host, port), timeout=1):
                return
        except OSError:
            # タイムアウトを超えたらエラー
            if time.time() - start > timeout:
                raise RuntimeError(f"Port {host}:{port} did not open in time")
            # 少し待って再試行
            time.sleep(0.5)


def stop_process(name: str, proc: subprocess.Popen) -> None:
    """
    子プロセスを安全に停止する
    terminate → wait → kill の順で確実に終了させる
    """
    # すでに終了している場合は何もしない
    if proc.poll() is not None:
        return

    print(f"Stopping {name}...")

    # まずは穏やかに終了を要求
    proc.terminate()

    try:
        # 一定時間待つ
        proc.wait(timeout=5)
    except subprocess.TimeoutExpired:
        # 停止しない場合は強制終了
        print(f"{name} did not stop with terminate(); killing...")
        proc.kill()
        proc.wait(timeout=5)


def stop_all():
    """
    すべての子プロセスを停止する
    reversed にすることで、起動した順の逆で終了させる
    (依存関係を意識)
    """
    for name, proc in reversed(processes):
        try:
            stop_process(name, proc)
        except Exception as e:
            print(f"[warn] failed to stop {name}: {e}")


def handle_signal(signum, frame):
    """
    Ctrl+C(SIGINT)や SIGTERM を受けた時の処理
    すべての子プロセスを停止して終了する
    """
    print(f"Received signal {signum}, shutting down...")
    stop_all()
    sys.exit(0)


def main():
    # Ctrl+C や kill コマンドに対応
    signal.signal(signal.SIGINT, handle_signal)
    signal.signal(signal.SIGTERM, handle_signal)

    # --- moto server 起動 ---
    moto_proc = subprocess.Popen(
        ["moto_server", "-p", str(config.MOTO_PORT)],
    )
    processes.append(("moto", moto_proc))

    # moto server の起動完了を待つ(ポートが開くまで)
    wait_for_port("localhost", config.MOTO_PORT, timeout=10)

    # --- exporter 起動 ---
    export_proc = subprocess.Popen(
        ["python", "moto_server/exporter.py"],
    )
    processes.append(("exporter", export_proc))

    print("moto + exporter started. Press Ctrl+C to stop.")

    try:
        # 親プロセスは常駐して監視する
        while True:
            # 子プロセスが落ちていないか軽く監視
            for name, proc in processes:
                code = proc.poll()
                if code is not None:
                    # 子プロセスが異常終了した場合は例外を投げる
                    raise RuntimeError(f"{name} exited unexpectedly with code {code}")

            # CPU負荷を下げるため少し待つ
            time.sleep(1)

    finally:
        # 例外・終了時に必ず全プロセスを停止
        stop_all()


if __name__ == "__main__":
    main()

runner.py はこの3つをやっています。

  1. moto server を起動
  2. exporter を起動
  3. プロセスを安全に停止

7. runner.py のポイント

7.1 moto server をバックグラウンド起動

moto_proc = subprocess.Popen(
    ["moto_server", "-p", str(config.MOTO_PORT)],
)

7.2 起動待ち(重要)

sleepではなくポート確認

wait_for_port("localhost", config.MOTO_PORT)

7.3 exporter 起動

この後記述するローカルへ保存するexporterの起動

export_proc = subprocess.Popen(
    ["python", "dev_support/moto_server/exporter.py"],
)

7.4 親プロセス待機

while True:
    time.sleep(1)

wait() ではなくループで待つ理由

  • moto_server は終了しない
  • exporter も終了しない
  • そのため親が制御を持つ必要がある

7.5 安全な終了処理

proc.terminate()
proc.wait()

さらに:

proc.kill()

terminate → wait → kill の順で確実に止める

8. exporter.py

exporter.py
import json
import signal
import time
from pathlib import Path

import boto3
from botocore.exceptions import BotoCoreError, ClientError

import config

# config.py から各種設定値を読み込む
MOTO_ENDPOINT = config.MOTO_ENDPOINT
AWS_REGION = config.AWS_REGION
EXPORT_DIR = Path(config.EXPORT_DIR)

# 差分同期用の状態ファイル
# 前回取得したオブジェクト一覧(etag / size)を保存する
MANIFEST_PATH = EXPORT_DIR / "manifest.json"

# S3をチェックする間隔(秒)
POLL_INTERVAL = config.EXPORT_INTERVAL

# 削除同期を有効にするかどうか
# True の場合、S3から消えたファイルをローカル側でも削除する
DELETE_SYNC = config.DELETE_SYNC

# 必須バケット一覧
# exporter 起動時に存在しなければ自動作成する
REQUIRED_BUCKETS = [name.strip() for name in config.REQUIRED_BUCKETS if name.strip()]

# メインループ継続フラグ
# SIGINT / SIGTERM を受けたら False にして安全に停止する
running = True


def handle_signal(signum, frame):
    """
    Ctrl+C(SIGINT)や SIGTERM を受けた時に呼ばれる
    running を False にして、ループを安全に抜ける
    """
    global running
    running = False


# シグナル受信時の処理を登録
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)


def create_s3_client():
    """
    moto server に接続するための S3 クライアントを作成する
    endpoint_url を差し替えることで、AWS本番ではなく moto に接続する
    """
    return boto3.client(
        "s3",
        region_name=AWS_REGION,
        aws_access_key_id="dummy",
        aws_secret_access_key="dummy",
        endpoint_url=MOTO_ENDPOINT,
    )


def load_manifest() -> dict:
    """
    manifest.json を読み込む
    ファイルが存在しない場合や読み込みに失敗した場合は空 dict を返す
    """
    if not MANIFEST_PATH.exists():
        return {}

    try:
        return json.loads(MANIFEST_PATH.read_text(encoding="utf-8"))
    except Exception:
        return {}


def save_manifest(manifest: dict) -> None:
    """
    manifest.json を保存する

    いきなり本番ファイルへ書かず、一時ファイルに書いてから置き換えることで、
    書き込み途中の破損を避けやすくしている
    """
    EXPORT_DIR.mkdir(parents=True, exist_ok=True)

    tmp_path = MANIFEST_PATH.with_suffix(".tmp")
    tmp_path.write_text(
        json.dumps(manifest, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )
    tmp_path.replace(MANIFEST_PATH)


def local_path(bucket: str, key: str) -> Path:
    """
    S3 の bucket / key に対応するローカル保存先パスを返す
    例:
      bucket = "test-bucket"
      key = "sample/hello.txt"
      -> moto_export/test-bucket/sample/hello.txt
    """
    return EXPORT_DIR / bucket / key


def ensure_required_buckets(s3) -> None:
    """
    必須バケットが存在するか確認し、なければ作成する
    今回は別アプリ側でバケット作成しない前提のため、exporter 側で保証する
    """
    response = s3.list_buckets()
    existing = {bucket["Name"] for bucket in response.get("Buckets", [])}

    for bucket_name in REQUIRED_BUCKETS:
        if bucket_name in existing:
            continue

        # us-east-1 は CreateBucketConfiguration 不要
        if AWS_REGION == "us-east-1":
            s3.create_bucket(Bucket=bucket_name)
        else:
            s3.create_bucket(
                Bucket=bucket_name,
                CreateBucketConfiguration={"LocationConstraint": AWS_REGION},
            )

        print(f"[created bucket] {bucket_name}")


def list_all_objects(s3) -> dict:
    """
    S3上の全バケット・全オブジェクトの一覧を取得する

    返り値のイメージ:
    {
      "test-bucket": {
        "sample/hello.txt": {"etag": "...", "size": 123},
        "sample/data.json": {"etag": "...", "size": 456}
      }
    }

    本文はまだ取得せず、差分判定に必要な最小限の情報だけ集める
    """
    current = {}
    buckets = s3.list_buckets().get("Buckets", [])

    for bucket in buckets:
        bucket_name = bucket["Name"]
        current[bucket_name] = {}

        paginator = s3.get_paginator("list_objects_v2")
        pages = paginator.paginate(Bucket=bucket_name)

        for page in pages:
            for obj in page.get("Contents", []):
                key = obj["Key"]
                current[bucket_name][key] = {
                    "etag": obj.get("ETag"),
                    "size": obj.get("Size"),
                }

    return current


def download_if_needed(s3, previous: dict, current: dict) -> None:
    """
    前回状態(manifest)と今回状態を比較し、
    新規または更新されたオブジェクトだけダウンロードする

    毎回全件 get_object() しないことで、負荷を抑えている
    """
    for bucket_name, objects in current.items():
        prev_bucket = previous.get(bucket_name, {})

        for key, meta in objects.items():
            prev_meta = prev_bucket.get(key)

            changed = (
                prev_meta is None
                or prev_meta.get("etag") != meta.get("etag")
                or prev_meta.get("size") != meta.get("size")
            )

            # 差分がなければダウンロードしない
            if not changed:
                continue

            obj = s3.get_object(Bucket=bucket_name, Key=key)
            body = obj["Body"].read()

            path = local_path(bucket_name, key)
            path.parent.mkdir(parents=True, exist_ok=True)
            path.write_bytes(body)

            print(f"[downloaded] s3://{bucket_name}/{key}")


def delete_removed_files(previous: dict, current: dict) -> None:
    """
    DELETE_SYNC=True の場合のみ、
    S3 から削除されたファイルをローカル側でも削除する
    """
    if not DELETE_SYNC:
        return

    for bucket_name, prev_objects in previous.items():
        current_objects = current.get(bucket_name, {})

        for key in prev_objects.keys():
            if key in current_objects:
                continue

            path = local_path(bucket_name, key)
            if path.exists():
                path.unlink()
                print(f"[deleted] {path}")

        # ファイル削除後に空ディレクトリが残るため、掃除する
        bucket_dir = EXPORT_DIR / bucket_name
        remove_empty_dirs(bucket_dir)


def remove_empty_dirs(root: Path) -> None:
    """
    配下の空ディレクトリを削除する
    ファイル削除後に不要なディレクトリだけ残るのを防ぐ
    """
    if not root.exists():
        return

    # 深い階層から順に削除していく
    for path in sorted(root.rglob("*"), reverse=True):
        if path.is_dir():
            try:
                path.rmdir()
            except OSError:
                pass

    try:
        root.rmdir()
    except OSError:
        pass


def sync_once():
    """
    1回分の同期処理を行う

    1. S3クライアント作成
    2. 必須バケット作成
    3. 前回状態を読み込み
    4. 現在状態を取得
    5. 差分だけダウンロード
    6. 必要なら削除同期
    7. 最新状態を manifest に保存
    """
    s3 = create_s3_client()

    # 必須バケットを先に作成
    ensure_required_buckets(s3)

    previous = load_manifest()
    current = list_all_objects(s3)

    download_if_needed(s3, previous, current)
    delete_removed_files(previous, current)
    save_manifest(current)


def main():
    """
    exporter のメインループ
    一定間隔で sync_once() を繰り返す
    """
    EXPORT_DIR.mkdir(parents=True, exist_ok=True)

    print(
        f"Exporter started: endpoint={MOTO_ENDPOINT}, "
        f"region={AWS_REGION}, interval={POLL_INTERVAL}s, "
        f"delete_sync={DELETE_SYNC}, required_buckets={REQUIRED_BUCKETS}"
    )

    while running:
        try:
            sync_once()
        except (ClientError, BotoCoreError, OSError) as e:
            print(f"[warn] sync failed: {e}")
        except Exception as e:
            print(f"[warn] unexpected error: {e}")

        # 1秒ずつ待つことで、停止要求が来た時にすぐ反応しやすくする
        for _ in range(POLL_INTERVAL):
            if not running:
                break
            time.sleep(1)

    print("Exporter stopped cleanly.")


if __name__ == "__main__":
    main()

exporter.py は今回の構成の中で、最も重要なコンポーネントです。

役割はシンプルで、

Moto上のS3の中身を、ローカルファイルとして同期すること

9. exporter.pyのポイント

まずは全体の流れを見ておきます。

① 必須バケットを作成
② 前回状態(manifest.json)を読み込む
③ 現在のS3状態を取得
④ 差分を検知
⑤ 変更があるものだけダウンロード
⑥ 必要なら削除同期
⑦ 最新状態を保存

9−1. 差分同期について

「全部ダウンロード」だと

毎回 get_object() 全件

となってしまい下記懸念がありました。

  • オブジェクト数が増えると重い
  • 無駄なI/Oが多い
  • 低スペックPCで辛い

そこで ETag + Size を使った差分同期 を採用しました。

差分同期の仕組み

前回状態(manifest.json)
   ↓
今回のlist_objects結果
   ↓
比較(ETag, Size)
   ↓
変更ありだけ get_object()

9-2. manifest.json の役割

差分同期を実現するために、前回の状態をファイルとして保持しています。

moto_export/
  manifest.json

中身のイメージ:

manifest.json
{
  "test-bucket": {
    "sample/hello.txt": {
      "etag": "\"abc...\"",
      "size": 123
    }
  }
}

9-3. バケットの自動作成

ensure_required_buckets(s3)

今回はアプリ側ですでにあるバケットへの処理となっており作成済想定のため、
moto_serverではexporter 側で起動毎に作成しています。

9-4. 一覧取得(軽量化のポイント)

list_objects_v2

ここでは本文は取得していません。

取得しているのは:

  • Key
  • ETag
  • Size

のみでこれが「軽量」のポイント

9-5. 差分ダウンロード

changed = (
    prev_meta is None
    or prev_meta["etag"] != meta["etag"]
    or prev_meta["size"] != meta["size"]
)

条件はシンプルです。

  • 新規
  • ETag変更
  • サイズ変更

該当した場合のみ

get_object()

を実行します。

9-6. 削除同期

if DELETE_SYNC:

このフラグで切り替えています。

ON の場合

  • S3から消えたもの → ローカルも削除

OFF の場合

  • ローカルに残る(バックアップ的)

9-7. 空ディレクトリの掃除

remove_empty_dirs()

削除後はディレクトリだけ残るため、後処理として掃除しています

9-8. メインループ

while running:

設定した頻度で定期的に同期を行います。

for _ in range(POLL_INTERVAL):
    time.sleep(1)

1秒単位で待つ理由

  • Ctrl+C にすぐ反応できる
  • graceful shutdown が可能

9-9. エラーハンドリング

except (ClientError, BotoCoreError, OSError)

S3通信系のエラーを想定

except Exception

想定外もログ出し

10. 動作テスト:作成

サンプルファイル作成用のスクリプト

sample_put_object.py
from datetime import UTC, datetime

import boto3

import config

# config.py から接続情報を取得
MOTO_ENDPOINT = config.MOTO_ENDPOINT
AWS_REGION = config.AWS_REGION
BUCKET_NAME = "test-bucket"


def create_s3_client():
    """
    moto server に接続する S3 クライアントを作成する

    endpoint_url を指定することで、
    AWS 本番ではなくローカルの moto に接続する
    """
    return boto3.client(
        "s3",
        region_name=AWS_REGION,
        aws_access_key_id="dummy",
        aws_secret_access_key="dummy",
        endpoint_url=MOTO_ENDPOINT,
    )


def main():
    """
    動作確認用のサンプルデータを S3(moto)に保存する

    exporter がこのデータを検知し、
    ローカルに同期されることを確認するために使用する
    """
    s3 = create_s3_client()

    # 現在時刻(UTC)を取得
    # 毎回内容が変わるため、差分更新の確認にも使える
    now = datetime.now(UTC).isoformat()

    # --- 1つ目: テキストファイル ---
    s3.put_object(
        Bucket=BUCKET_NAME,
        Key="sample/hello.txt",  # S3上のキー(パス)
        Body=f"hello moto server\ncreated_at={now}\n".encode(),
        ContentType="text/plain",
    )

    # --- 2つ目: JSONファイル ---
    json_body = f'{{\n  "message": "sample json",\n  "created_at": "{now}"\n}}\n'

    s3.put_object(
        Bucket=BUCKET_NAME,
        Key="sample/data.json",
        Body=json_body.encode("utf-8"),
        ContentType="application/json",
    )

    print(f"uploaded objects to s3://{BUCKET_NAME}/sample/")


if __name__ == "__main__":
    main()

まずはサンプルデータをS3(Moto)に登録します。
別ターミナルで作成用スクリプト実行

python sample_put_object.py

実行後、exporter により数十秒以内にローカルへ同期されます。

moto_export/
  test-bucket/
    sample/
      hello.txt
      data.json

S3の内容が、そのままローカルファイルとして確認できます。

11. 動作テスト:削除

サンプルファイル削除用のスクリプト

delete_sample_objects.py
import boto3

import config

# config.py から接続情報を取得
MOTO_ENDPOINT = config.MOTO_ENDPOINT
AWS_REGION = config.AWS_REGION
BUCKET_NAME = "test-bucket"


def create_s3_client():
    """
    moto server に接続する S3 クライアントを作成する
    """
    return boto3.client(
        "s3",
        region_name=AWS_REGION,
        aws_access_key_id="dummy",
        aws_secret_access_key="dummy",
        endpoint_url=MOTO_ENDPOINT,
    )


def main():
    """
    サンプルで作成したファイルを削除する

    exporter 側で DELETE_SYNC=True にしている場合、
    ローカルのファイルも削除されることを確認できる
    """
    s3 = create_s3_client()

    # 削除対象のキー一覧
    keys = [
        "sample/hello.txt",
        "sample/data.json",
    ]

    for key in keys:
        s3.delete_object(
            Bucket=BUCKET_NAME,
            Key=key,
        )
        print(f"[deleted] s3://{BUCKET_NAME}/{key}")

    print("delete completed")


if __name__ == "__main__":
    main()

次に、サンプルデータを削除します。

python delete_sample_objects.py

config.py で以下を有効にしている場合:

DELETE_SYNC = True

exporter が削除を検知し、ローカル側も削除されます。

moto_export/
  test-bucket/

S3の削除がローカルにも反映されることが確認できます。

おわりに

今回の記事では、Moto Server を使ってローカルにS3環境を構築し、さらにその中身をローカルファイルとして確認できる仕組みを実装しました。
Moto単体では「見えないS3」ですが、
exporter を組み合わせることで「見えるS3」として扱えるようにしてみました。

簡易的なので最低限ですが、誰かの開発の助けになれば嬉しいです。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?