4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【AWS】pythonで、異なるS3間の差分をコピーする

Last updated at Posted at 2023-07-14

はじめに

pythonを使用して、異なる環境に存在するS3から別のS3へ、差分のオブジェクトのみをコピーする方法をご紹介します。

移行元、移行先のバケットのみ設定すれば、勝手に比較して差分をコピーしてくれるイメージです。

前後.png

また、余計な情報が混ざってしまうのもあれなので、あえてエラーハンドリングなどは割愛しています。ご了承ください。

前提

移行元と移行先、どちらにもアクセスできる環境(EC2など)で動作させる前提です。

もしアクセス権限がない場合は、一時的な認証トークンなどを使用すれば可能ですが、本記事は割愛します。

その他のバージョン等は下記です。

python -V 
>> Python 3.10.9

全体の流れ

全体の流れは下記です。

  1. 両者のS3からオブジェクトキーの一覧を取得
  2. それぞれを比較し、移行元にのみ存在するキーを抽出
  3. 抽出したキーを用いてコピー

それぞれ見ていきます。

1. 両者のS3からオブジェクトキーの一覧を取得

Amazonの各種サービスを、pythonから操作することのできるboto3というライブラリを使用します。

pip install boto3

S3に対して様々な操作を行うS3Bucketというクラスを定義し、オブジェクトキーを取得するメソッドを追加していきます。

import boto3
from typing import List

class S3Bucket:
    def __init__(
        self
    ) -> None:
        # s3クライアントの作成
        self.client = boto3.client('s3')

    def ls(
        self, 
        bucket: str, 
        prefix: str, 
        recursive: bool = False
    ) -> List[str]:
        """S3上のファイルリスト取得

        Args:
            bucket (str): バケット名
            prefix (str): バケット以降のパス
            recursive (bool): 再帰的にパスを取得するかどうか

        """
        paths: List[str] = []
        paths = self.__get_all_keys(
            bucket, prefix, recursive=recursive)
        return paths

    def __get_all_keys(
        self, 
        bucket: str, 
        prefix: str, 
        keys: List = None, 
        marker: str = '', 
        recursive: bool = False
    ) -> List[str]:
        """指定した prefix のすべての key の配列を返す

        Args:
            bucket (str): バケット名
            prefix (str): バケット以降のパス
            keys (List): 全パス取得用に用いる
            marker (str): 全パス取得用に用いる
            recursive (bool): 再帰的にパスを取得するかどうか

        """
        s3 = boto3.client('s3')
        if recursive:
            response = s3.list_objects(
                Bucket=bucket, Prefix=prefix, Marker=marker)
        else:
            response = s3.list_objects(
                Bucket=bucket, Prefix=prefix, Marker=marker, Delimiter='/')

        # keyがNoneのときは初期化
        if keys is None:
            keys = []

        if 'CommonPrefixes' in response:
            # Delimiterが'/'のときはフォルダがKeyに含まれない
            keys.extend([content['Prefix']
                        for content in response['CommonPrefixes']])
        if 'Contents' in response:  # 該当する key がないと response に 'Contents' が含まれない
            keys.extend([content['Key'] for content in response['Contents']])
            if 'IsTruncated' in response:
                return self.__get_all_keys(bucket=bucket, prefix=prefix, keys=keys, marker=keys[-1], recursive=recursive)
        return keys

こちらの記事を参考にさせていただきました。ありがとうございます。

2. それぞれを比較し、移行元にのみ存在するキーを抽出

先ほど作成したクラスを使用し、オブジェクトキーを取得・比較を行います。

# バケット名
BUCKET_FROM = ''    # 移行元
BUCKET_TO = ''    # 移行先

def main() -> None:
    # 先ほど作成したS3Bucketオブジェクトのインスタンス化
    s3 = s3_ex.S3Bucket()

    # それぞれのS3のオブジェクトキー一覧取得
    paths_from = s3.ls(BUCKET_FROM, '', recursive=True)
    paths_to = s3.ls(BUCKET_TO, '', recursive=True)

    # 移行元にのみ存在するパスを抽出
    paths = list(filter(lambda x: x not in paths_to, paths_from)

コピーする必要があるパス(キー)の用意まで完了しました。

3. 抽出したキーを用いてコピー

まず、S3のオブジェクトをコピーするメソッドを、S3Bucketクラスに追加します。

import botocore

class S3Bucket:
    # =============
    # ...略 ...
    # =============

    def copy_object_from_s3_to_s3(
        self, 
        bucket_from: str, 
        bucket_to: str, 
        key: str
    ) -> None:
        """s3からs3へオブジェクトのコピー

        Args:
            bucket_from (str): 移行元のバケット名
            bucket_to (str): 移行先のバケット以降のパス
            key (str): 移行するオブジェクトのパス

        """
        self.client.copy_object(
            Bucket=bucket_to, 
            Key=key, 
            CopySource={
                'Bucket': bucket_from, 
                'Key': key
            }, 
            ACL='bucket-owner-full-control'
        )

このメソッドを使用して、順次コピーしていきます。

# バケット名
BUCKET_FROM = ''    # 移行元
BUCKET_TO = ''    # 移行先

def main() -> None:
    # 先ほど作成したS3Bucketオブジェクトのインスタンス化
    s3 = s3_ex.S3Bucket()

    # それぞれのS3のオブジェクトキー一覧取得
    paths_from = s3.ls(BUCKET_FROM, '', recursive=True)
    paths_to = s3.ls(BUCKET_TO, '', recursive=True)

    # 移行元にのみ存在するパスを抽出
    paths = list(filter(lambda x: x not in paths_to, paths_from)

    # オブジェクトコピー
    # コピーできるファイルサイズの上限等があるので、ここはエラーハンドリングした方が無難だと思います。
    for key in paths:
        try:
            s3.copy_object_from_s3_to_s3(bucket_from, bucket_to, key)
        except botocore.exceptions.ClientError as e:
            print(e)

以上です。

全体をまとめたのが下記です。

サンプルコード
import boto3
import botocore

# バケット名
BUCKET_FROM = ''    # 移行元
BUCKET_TO = ''    # 移行先


class S3Bucket:
    def __init__(
        self
    ) -> None:
        # s3クライアントの作成
        self.client = boto3.client('s3')

    def ls(
        self, 
        bucket: str, 
        prefix: str, 
        recursive: bool = False
    ) -> List[str]:
        """S3上のファイルリスト取得

        Args:
            bucket (str): バケット名
            prefix (str): バケット以降のパス
            recursive (bool): 再帰的にパスを取得するかどうか

        """
        paths: List[str] = []
        paths = self.__get_all_keys(
            bucket, prefix, recursive=recursive)
        return paths

    def __get_all_keys(
        self, 
        bucket: str, 
        prefix: str, 
        keys: List = None, 
        marker: str = '', 
        recursive: bool = False
    ) -> List[str]:
        """指定した prefix のすべての key の配列を返す

        Args:
            bucket (str): バケット名
            prefix (str): バケット以降のパス
            keys (List): 全パス取得用に用いる
            marker (str): 全パス取得用に用いる
            recursive (bool): 再帰的にパスを取得するかどうか

        """
        s3 = boto3.client('s3')
        if recursive:
            response = s3.list_objects(
                Bucket=bucket, Prefix=prefix, Marker=marker)
        else:
            response = s3.list_objects(
                Bucket=bucket, Prefix=prefix, Marker=marker, Delimiter='/')

        # keyがNoneのときは初期化
        if keys is None:
            keys = []

        if 'CommonPrefixes' in response:
            # Delimiterが'/'のときはフォルダがKeyに含まれない
            keys.extend([content['Prefix']
                        for content in response['CommonPrefixes']])
        if 'Contents' in response:  # 該当する key がないと response に 'Contents' が含まれない
            keys.extend([content['Key'] for content in response['Contents']])
            if 'IsTruncated' in response:
                return self.__get_all_keys(bucket=bucket, prefix=prefix, keys=keys, marker=keys[-1], recursive=recursive)
        return keys

    def copy_object_from_s3_to_s3(
        self, 
        bucket_from: str, 
        bucket_to: str, 
        key: str
    ) -> None:
        """s3からs3へオブジェクトのコピー

        Args:
            bucket_from (str): 移行元のバケット名
            bucket_to (str): 移行先のバケット以降のパス
            key (str): 移行するオブジェクトのパス

        """
        self.client.copy_object(
            Bucket=bucket_to, 
            Key=key, 
            CopySource={
                'Bucket': bucket_from, 
                'Key': key
            }, 
            ACL='bucket-owner-full-control'
        )

def main() -> None:
    # 作成したS3Bucketオブジェクトのインスタンス化
    s3 = s3_ex.S3Bucket()

    # それぞれのS3のオブジェクトキー一覧取得
    paths_from = s3.ls(BUCKET_FROM, '', recursive=True)
    paths_to = s3.ls(BUCKET_TO, '', recursive=True)

    # 移行元にのみ存在するパスを抽出
    paths = list(filter(lambda x: x not in paths_to, paths_from)

    # オブジェクトコピー
    # コピーできるファイルサイズの上限等があるので、ここはエラーハンドリングした方が無難だと思います。
    for key in paths:
        try:
            s3.copy_object_from_s3_to_s3(bucket_from, bucket_to, key)
        except botocore.exceptions.ClientError as e:
            print(e)

オブジェクトが5GBを超える場合

1つ留意点として、コピーしたいオブジェクトが5GBを超える場合、copy_objectメソッドでは対応することができません。(煩雑になってしまうため、本記事では詳細は割愛します。)

5GBを超えるオブジェクトをコピーしたい場合はcopyメソッドを使用しまします。

copyメソッドは、オブジェクトを小さく分割してコピーするマルチパートアップロードに対応しています。

公式ドキュメントを張っておきます。

copy_object

copy

最後に

個々で見れば何番煎じか分かりませんが、「S3間のデータを移行する」という目的に沿って記述しました。

実際に使用する際は、ログの設定やエラーハンドリングやリトライ処理を行うことで、より堅牢なプログラムにした方がいいのかなあと思います。

ご覧いただきありがとうございました。


株式会社ジールでは、初期費用が不要で運用・保守の手間もかからず、ノーコード・ローコードですぐに手元データを分析可能なオールインワン型データ活用プラットフォーム「ZEUSCloud」を提供しております。
ご興味がある方は是非下記のリンクをご覧ください:
https://www.zdh.co.jp/products-services/cloud-data/zeuscloud/

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?