63
47

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Boto3でS3のリスト出力をするときは、list_objects_v2ではなくBucket().objects.filterを使おう

Last updated at Posted at 2019-11-07

低レベルAPIと高レベルAPI

awsのpythonライブラリであるboto3ですが、ナイーブなAPIである低レベルAPIと、それをラップしたオブジェクト志向の高レベルAPIがあります

Boto3 で S3 のオブジェクトを操作する(高レベルAPIと低レベルAPI) - Qiita
https://qiita.com/sokutou-metsu/items/5ba7531117224ee5e8af

S3のリスト出力をする際、今までは低レベルAPIであるclient.list_objects_v2を使っていたのですが、対応する高レベルAPIとしてresouce.Bucket().objects.filterが存在します
(あまりにs3の資料が膨大で自分が見つけられていませんでした)

高レベルAPIを使ったほうが記述量も減るし、速度も上がったので高レベルAPIを使っていきましょうという記事です

低レベルAPI

S3 ListObjects APIの新バージョン ListObjectsV2を使おう | Developers.IO
https://dev.classmethod.jp/cloud/aws/s3-new-api-list-object-v2/

list_objects_v2では1000件ずつ取得します。ページネーション処理が必要になるため、例としてこのような記述になります
(再帰でこの記述を呼んでいます)

        s3client = self._session().client('s3')
        if next_token:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
                ContinuationToken=next_token,
            )
        else:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
            )

        if 'Contents' in response:
            keys = [i['Key'] for i in response['Contents']]
        else:
            keys = []

        if 'NextContinuationToken' in response:
            next_token = response['NextContinuationToken']
        else:
            next_token = None

速度

78733オブジェクト→46秒
Executed <function test at 0x10c0743b0> in 46.35232996940613 seconds

高レベルAPI

Bucket().objectsはObjectSummary型で、こちらにfilter, all, limit, page_sizeなどをchainさせることで属性を指定します
戻り値もObjectSummaryです
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.objects
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary

ObjectSummary自体はイテレータとなっており、イテレータの呼び出しタイミングで実際にデータを取得します
filterの引数にKeyMarkerを指定すると途中から検索できたり、RequestPayerを指定できたりとlist_objects_v2でできることはだいたいできそうです

        s3_resource = self._session().resource('s3')
        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
#        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix).limit(count=2000)
        b = [k.key for k in a]

速度

78733オブジェクト→33秒
Executed <function test at 0x10191f200> in 33.14992713928223 seconds

ソースコード全体

書き捨てコードなので若干適当な部分があります

import os
from pathlib import Path
from typing import Optional

import boto3
from dataclasses import dataclass
from lauda import stopwatch


@dataclass
class S3Manager:
    source_bucket: str
    source_prefix: str
    profile: Optional[str] = None

    def _session(self):
        s = boto3.session.Session(
            profile_name=self.profile
        )
        return s

    def _list_source(self, *, accumulated=None, next_token=None, func=None):
        s3client = self._session().client('s3')
        if next_token:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
                ContinuationToken=next_token,
            )
        else:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
            )

        if 'Contents' in response:
            keys = [i['Key'] for i in response['Contents']]
        else:
            keys = []

        if 'NextContinuationToken' in response:
            next_token = response['NextContinuationToken']
        else:
            next_token = None

        if func:
            return func(response=response, keys=keys, func=func, next_token=next_token, accumulated=accumulated)

    def _accumulate(self, *, response, keys, func, next_token, accumulated):
        got_keys = (accumulated or []) + keys
        if next_token:
            print(f'searching... current fetch keys are :{len(got_keys)}')
            return self._list_source(accumulated=got_keys, next_token=next_token, func=func)
        else:
            return got_keys

    def list_all(self) -> list:
        return self._list_source(func=self._accumulate)

    def _delete(self, *, response, keys, func, next_token, accumulated):
        if keys:
            print(f'deleting: {self.source_bucket}/{self.source_prefix}')
            s3client = boto3.Session().client('s3')
            s3client.delete_objects(
                Bucket=self.source_bucket,
                Delete={
                    'Objects': [{'Key': key} for key in keys],
                    'Quiet': False
                },
            )

        if next_token:
            return self._list_source(next_token=next_token, func=func)

    def delete_all(self) -> None:
        self._list_source(func=self._delete)

    def list_all_test(self):
        s3_resource = self._session().resource('s3')
        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
        b = [k.key for k in a]
        print(len(b))


if __name__ == '__main__':
    os.chdir(Path(__file__).parents[1])

    @stopwatch
    def test():
        s3 = S3Manager(
            source_bucket='ばけっと',
            source_prefix='検索するパス',
        )
        # s3.list_all()
        s3.list_all_test()

    test()

まとめ

低レベルAPIの方は拡張性のために関数を渡しており、若干のオーバーヘッドはあるかもしれないですが
高レベルAPIが遅いということは無いですし、記述も簡易になるので高レベルAPIを使っていきましょう

63
47
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
63
47

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?