Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
Help us understand the problem. What is going on with this article?

Boto3でS3のリスト出力をするときは、list_objects_v2ではなくBucket().objects.filterを使おう

More than 1 year has passed since last update.

低レベルAPIと高レベルAPI

awsのpythonライブラリであるboto3ですが、ナイーブなAPIである低レベルAPIと、それをラップしたオブジェクト志向の高レベルAPIがあります

Boto3 で S3 のオブジェクトを操作する(高レベルAPIと低レベルAPI) - Qiita
https://qiita.com/sokutou-metsu/items/5ba7531117224ee5e8af

S3のリスト出力をする際、今までは低レベルAPIであるclient.list_objects_v2を使っていたのですが、対応する高レベルAPIとしてresouce.Bucket().objects.filterが存在します
(あまりにs3の資料が膨大で自分が見つけられていませんでした)

高レベルAPIを使ったほうが記述量も減るし、速度も上がったので高レベルAPIを使っていきましょうという記事です

低レベルAPI

S3 ListObjects APIの新バージョン ListObjectsV2を使おう | Developers.IO
https://dev.classmethod.jp/cloud/aws/s3-new-api-list-object-v2/

list_objects_v2では1000件ずつ取得します。ページネーション処理が必要になるため、例としてこのような記述になります
(再帰でこの記述を呼んでいます)

        s3client = self._session().client('s3')
        if next_token:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
                ContinuationToken=next_token,
            )
        else:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
            )

        if 'Contents' in response:
            keys = [i['Key'] for i in response['Contents']]
        else:
            keys = []

        if 'NextContinuationToken' in response:
            next_token = response['NextContinuationToken']
        else:
            next_token = None

速度

78733オブジェクト→46秒
Executed <function test at 0x10c0743b0> in 46.35232996940613 seconds

高レベルAPI

Bucket().objectsはObjectSummary型で、こちらにfilter, all, limit, page_sizeなどをchainさせることで属性を指定します
戻り値もObjectSummaryです
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.objects
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary

ObjectSummary自体はイテレータとなっており、イテレータの呼び出しタイミングで実際にデータを取得します
filterの引数にKeyMarkerを指定すると途中から検索できたり、RequestPayerを指定できたりとlist_objects_v2でできることはだいたいできそうです

        s3_resource = self._session().resource('s3')
        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
#        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix).limit(count=2000)
        b = [k.key for k in a]

速度

78733オブジェクト→33秒
Executed <function test at 0x10191f200> in 33.14992713928223 seconds

ソースコード全体

書き捨てコードなので若干適当な部分があります

import os
from pathlib import Path
from typing import Optional

import boto3
from dataclasses import dataclass
from lauda import stopwatch


@dataclass
class S3Manager:
    source_bucket: str
    source_prefix: str
    profile: Optional[str] = None

    def _session(self):
        s = boto3.session.Session(
            profile_name=self.profile
        )
        return s

    def _list_source(self, *, accumulated=None, next_token=None, func=None):
        s3client = self._session().client('s3')
        if next_token:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
                ContinuationToken=next_token,
            )
        else:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
            )

        if 'Contents' in response:
            keys = [i['Key'] for i in response['Contents']]
        else:
            keys = []

        if 'NextContinuationToken' in response:
            next_token = response['NextContinuationToken']
        else:
            next_token = None

        if func:
            return func(response=response, keys=keys, func=func, next_token=next_token, accumulated=accumulated)

    def _accumulate(self, *, response, keys, func, next_token, accumulated):
        got_keys = (accumulated or []) + keys
        if next_token:
            print(f'searching... current fetch keys are :{len(got_keys)}')
            return self._list_source(accumulated=got_keys, next_token=next_token, func=func)
        else:
            return got_keys

    def list_all(self) -> list:
        return self._list_source(func=self._accumulate)

    def _delete(self, *, response, keys, func, next_token, accumulated):
        if keys:
            print(f'deleting: {self.source_bucket}/{self.source_prefix}')
            s3client = boto3.Session().client('s3')
            s3client.delete_objects(
                Bucket=self.source_bucket,
                Delete={
                    'Objects': [{'Key': key} for key in keys],
                    'Quiet': False
                },
            )

        if next_token:
            return self._list_source(next_token=next_token, func=func)

    def delete_all(self) -> None:
        self._list_source(func=self._delete)

    def list_all_test(self):
        s3_resource = self._session().resource('s3')
        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
        b = [k.key for k in a]
        print(len(b))


if __name__ == '__main__':
    os.chdir(Path(__file__).parents[1])

    @stopwatch
    def test():
        s3 = S3Manager(
            source_bucket='ばけっと',
            source_prefix='検索するパス',
        )
        # s3.list_all()
        s3.list_all_test()

    test()

まとめ

低レベルAPIの方は拡張性のために関数を渡しており、若干のオーバーヘッドはあるかもしれないですが
高レベルAPIが遅いということは無いですし、記述も簡易になるので高レベルAPIを使っていきましょう

lifull
日本最大級の不動産・住宅情報サイト「LIFULL HOME'S」を始め、人々の生活に寄り添う様々な情報サービス事業を展開しています。
https://lifull.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away