はじめに
pythonを使用して、異なる環境に存在するS3から別のS3へ、差分のオブジェクトのみをコピーする方法をご紹介します。
移行元、移行先のバケットのみ設定すれば、勝手に比較して差分をコピーしてくれるイメージです。
また、余計な情報が混ざってしまうのもあれなので、あえてエラーハンドリングなどは割愛しています。ご了承ください。
前提
移行元と移行先、どちらにもアクセスできる環境(EC2など)で動作させる前提です。
もしアクセス権限がない場合は、一時的な認証トークンなどを使用すれば可能ですが、本記事は割愛します。
その他のバージョン等は下記です。
python -V
>> Python 3.10.9
全体の流れ
全体の流れは下記です。
- 両者のS3からオブジェクトキーの一覧を取得
- それぞれを比較し、移行元にのみ存在するキーを抽出
- 抽出したキーを用いてコピー
それぞれ見ていきます。
1. 両者のS3からオブジェクトキーの一覧を取得
Amazonの各種サービスを、pythonから操作することのできるboto3というライブラリを使用します。
pip install boto3
S3に対して様々な操作を行うS3Bucketというクラスを定義し、オブジェクトキーを取得するメソッドを追加していきます。
import boto3
from typing import List
class S3Bucket:
def __init__(
self
) -> None:
# s3クライアントの作成
self.client = boto3.client('s3')
def ls(
self,
bucket: str,
prefix: str,
recursive: bool = False
) -> List[str]:
"""S3上のファイルリスト取得
Args:
bucket (str): バケット名
prefix (str): バケット以降のパス
recursive (bool): 再帰的にパスを取得するかどうか
"""
paths: List[str] = []
paths = self.__get_all_keys(
bucket, prefix, recursive=recursive)
return paths
def __get_all_keys(
self,
bucket: str,
prefix: str,
keys: List = None,
marker: str = '',
recursive: bool = False
) -> List[str]:
"""指定した prefix のすべての key の配列を返す
Args:
bucket (str): バケット名
prefix (str): バケット以降のパス
keys (List): 全パス取得用に用いる
marker (str): 全パス取得用に用いる
recursive (bool): 再帰的にパスを取得するかどうか
"""
s3 = boto3.client('s3')
if recursive:
response = s3.list_objects(
Bucket=bucket, Prefix=prefix, Marker=marker)
else:
response = s3.list_objects(
Bucket=bucket, Prefix=prefix, Marker=marker, Delimiter='/')
# keyがNoneのときは初期化
if keys is None:
keys = []
if 'CommonPrefixes' in response:
# Delimiterが'/'のときはフォルダがKeyに含まれない
keys.extend([content['Prefix']
for content in response['CommonPrefixes']])
if 'Contents' in response: # 該当する key がないと response に 'Contents' が含まれない
keys.extend([content['Key'] for content in response['Contents']])
if 'IsTruncated' in response:
return self.__get_all_keys(bucket=bucket, prefix=prefix, keys=keys, marker=keys[-1], recursive=recursive)
return keys
こちらの記事を参考にさせていただきました。ありがとうございます。
2. それぞれを比較し、移行元にのみ存在するキーを抽出
先ほど作成したクラスを使用し、オブジェクトキーを取得・比較を行います。
# バケット名
BUCKET_FROM = '' # 移行元
BUCKET_TO = '' # 移行先
def main() -> None:
# 先ほど作成したS3Bucketオブジェクトのインスタンス化
s3 = s3_ex.S3Bucket()
# それぞれのS3のオブジェクトキー一覧取得
paths_from = s3.ls(BUCKET_FROM, '', recursive=True)
paths_to = s3.ls(BUCKET_TO, '', recursive=True)
# 移行元にのみ存在するパスを抽出
paths = list(filter(lambda x: x not in paths_to, paths_from)
コピーする必要があるパス(キー)の用意まで完了しました。
3. 抽出したキーを用いてコピー
まず、S3のオブジェクトをコピーするメソッドを、S3Bucketクラスに追加します。
import botocore
class S3Bucket:
# =============
# ...略 ...
# =============
def copy_object_from_s3_to_s3(
self,
bucket_from: str,
bucket_to: str,
key: str
) -> None:
"""s3からs3へオブジェクトのコピー
Args:
bucket_from (str): 移行元のバケット名
bucket_to (str): 移行先のバケット以降のパス
key (str): 移行するオブジェクトのパス
"""
self.client.copy_object(
Bucket=bucket_to,
Key=key,
CopySource={
'Bucket': bucket_from,
'Key': key
},
ACL='bucket-owner-full-control'
)
このメソッドを使用して、順次コピーしていきます。
# バケット名
BUCKET_FROM = '' # 移行元
BUCKET_TO = '' # 移行先
def main() -> None:
# 先ほど作成したS3Bucketオブジェクトのインスタンス化
s3 = s3_ex.S3Bucket()
# それぞれのS3のオブジェクトキー一覧取得
paths_from = s3.ls(BUCKET_FROM, '', recursive=True)
paths_to = s3.ls(BUCKET_TO, '', recursive=True)
# 移行元にのみ存在するパスを抽出
paths = list(filter(lambda x: x not in paths_to, paths_from)
# オブジェクトコピー
# コピーできるファイルサイズの上限等があるので、ここはエラーハンドリングした方が無難だと思います。
for key in paths:
try:
s3.copy_object_from_s3_to_s3(bucket_from, bucket_to, key)
except botocore.exceptions.ClientError as e:
print(e)
以上です。
全体をまとめたのが下記です。
サンプルコード
import boto3
import botocore
# バケット名
BUCKET_FROM = '' # 移行元
BUCKET_TO = '' # 移行先
class S3Bucket:
def __init__(
self
) -> None:
# s3クライアントの作成
self.client = boto3.client('s3')
def ls(
self,
bucket: str,
prefix: str,
recursive: bool = False
) -> List[str]:
"""S3上のファイルリスト取得
Args:
bucket (str): バケット名
prefix (str): バケット以降のパス
recursive (bool): 再帰的にパスを取得するかどうか
"""
paths: List[str] = []
paths = self.__get_all_keys(
bucket, prefix, recursive=recursive)
return paths
def __get_all_keys(
self,
bucket: str,
prefix: str,
keys: List = None,
marker: str = '',
recursive: bool = False
) -> List[str]:
"""指定した prefix のすべての key の配列を返す
Args:
bucket (str): バケット名
prefix (str): バケット以降のパス
keys (List): 全パス取得用に用いる
marker (str): 全パス取得用に用いる
recursive (bool): 再帰的にパスを取得するかどうか
"""
s3 = boto3.client('s3')
if recursive:
response = s3.list_objects(
Bucket=bucket, Prefix=prefix, Marker=marker)
else:
response = s3.list_objects(
Bucket=bucket, Prefix=prefix, Marker=marker, Delimiter='/')
# keyがNoneのときは初期化
if keys is None:
keys = []
if 'CommonPrefixes' in response:
# Delimiterが'/'のときはフォルダがKeyに含まれない
keys.extend([content['Prefix']
for content in response['CommonPrefixes']])
if 'Contents' in response: # 該当する key がないと response に 'Contents' が含まれない
keys.extend([content['Key'] for content in response['Contents']])
if 'IsTruncated' in response:
return self.__get_all_keys(bucket=bucket, prefix=prefix, keys=keys, marker=keys[-1], recursive=recursive)
return keys
def copy_object_from_s3_to_s3(
self,
bucket_from: str,
bucket_to: str,
key: str
) -> None:
"""s3からs3へオブジェクトのコピー
Args:
bucket_from (str): 移行元のバケット名
bucket_to (str): 移行先のバケット以降のパス
key (str): 移行するオブジェクトのパス
"""
self.client.copy_object(
Bucket=bucket_to,
Key=key,
CopySource={
'Bucket': bucket_from,
'Key': key
},
ACL='bucket-owner-full-control'
)
def main() -> None:
# 作成したS3Bucketオブジェクトのインスタンス化
s3 = s3_ex.S3Bucket()
# それぞれのS3のオブジェクトキー一覧取得
paths_from = s3.ls(BUCKET_FROM, '', recursive=True)
paths_to = s3.ls(BUCKET_TO, '', recursive=True)
# 移行元にのみ存在するパスを抽出
paths = list(filter(lambda x: x not in paths_to, paths_from)
# オブジェクトコピー
# コピーできるファイルサイズの上限等があるので、ここはエラーハンドリングした方が無難だと思います。
for key in paths:
try:
s3.copy_object_from_s3_to_s3(bucket_from, bucket_to, key)
except botocore.exceptions.ClientError as e:
print(e)
オブジェクトが5GBを超える場合
1つ留意点として、コピーしたいオブジェクトが5GBを超える場合、copy_objectメソッドでは対応することができません。(煩雑になってしまうため、本記事では詳細は割愛します。)
5GBを超えるオブジェクトをコピーしたい場合はcopyメソッドを使用しまします。
copyメソッドは、オブジェクトを小さく分割してコピーするマルチパートアップロードに対応しています。
公式ドキュメントを張っておきます。
最後に
個々で見れば何番煎じか分かりませんが、「S3間のデータを移行する」という目的に沿って記述しました。
実際に使用する際は、ログの設定やエラーハンドリングやリトライ処理を行うことで、より堅牢なプログラムにした方がいいのかなあと思います。
ご覧いただきありがとうございました。
株式会社ジールでは、初期費用が不要で運用・保守の手間もかからず、ノーコード・ローコードですぐに手元データを分析可能なオールインワン型データ活用プラットフォーム「ZEUSCloud」を提供しております。
ご興味がある方は是非下記のリンクをご覧ください:
https://www.zdh.co.jp/products-services/cloud-data/zeuscloud/