はじめに
DynamoDBでデータを取得する際scanを使用するとテーブルの全てのレコードに対して逐次処理をしていきます。
そのため基本的にはscan
ではなく、query
を使うというのがベストプラクティスです。
しかし、どうしてもスキャンを使いたいときに効率よくスキャンできそうな、並列スキャンについてboto3で試してみた備忘録です。
並列スキャンについてのドキュメントはこちらです。
環境
確認に使用した環境は以下です。
- python: v3.9.7
- boto3: v1.21.11
検証
検証用データの準備
今回は下記のようなテーブルに対して10,000件のデータを入れました。
その結果、テーブルのサイズは300KBになりました。
テーブル名: parallel-scan-sample
項目名 | データ型 | |
---|---|---|
パーティションキー | productId | number |
name | string | |
rate | number |
普通のスキャン
コード
まずは単純に全件取得のスキャンしてみます。全件取得して経過時間とレスポンスを表示しているだけです。
import boto3
import time
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('parallel-scan-sample')
start_time = time.time()
response = table.scan(
ReturnConsumedCapacity='TOTAL' #キャパシティユニットを表示するため
)
elapsed_time = time.time() - start_time
print('ElapsedTime:', elapsed_time)
print('ConsumedCapacity:', response['ConsumedCapacity'])
出力はこんな感じです。
ElapsedTime: 7.379684925079346
ConsumedCapacity: {'TableName': 'parallel-scan-sample', 'CapacityUnits': 36.5}
結果
実行ごとに処理時間に誤差があるので5回ほどためしてその平均を出しました。
消費キャパシティユニット: 36.5
1回目: 7.379684925079346秒
2回目: 6.913349151611328秒
3回目: 7.883518934249878秒
4回目: 7.087058305740356秒
5回目: 7.384438037872314秒
平均: 7.329609870910644秒
並列スキャン
次に並列スキャンを試してみます。
今回は分割するセグメントを4にしています。
ドキュメントには基準となるような記載はなく、いくつがいいのか試す必要があるとだけありました。。。
Segment および TotalSegments の値は、個々の Scan リクエストに適用されるため、いつでも異なる値を使用できます。
アプリケーションが最高のパフォーマンスを達成するまで、これらの値および使用するワーカーの数を試さなければならない場合があります。
コード
実際に試してみたコードは下記になります。
import boto3
import time
from concurrent import futures
def parallel_scan(segment):
response = table.scan(
TotalSegments=4,
Segment=segment,
ReturnConsumedCapacity='TOTAL' #キャパシティユニットを表示するため
)
return {
'Count': response['Count'],
'ScannedCount': response['ScannedCount'],
'ConsumedCapacity': response['ConsumedCapacity']
}
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('parallel-scan-sample')
futures_list = []
start_time = time.time()
with futures.ThreadPoolExecutor() as executor:
for segment in range(0, 4):
future = executor.submit(parallel_scan, segment)
futures_list.append(future)
elapsed_time = time.time() - start_time
print('ElapsedTime:', elapsed_time)
print('Segment0:', futures_list[0].result())
print('Segment1:', futures_list[1].result())
print('Segment2:', futures_list[2].result())
print('Segment3:', futures_list[3].result())
出力はこんな感じです。
ElapsedTime: 3.1862332820892334
Segment0: {'Count': 2414, 'ScannedCount': 2414, 'ConsumedCapacity': {'TableName': 'parallel-scan-sample', 'CapacityUnits': 9.0}}
Segment1: {'Count': 2532, 'ScannedCount': 2532, 'ConsumedCapacity': {'TableName': 'parallel-scan-sample', 'CapacityUnits': 9.5}}
Segment2: {'Count': 2516, 'ScannedCount': 2516, 'ConsumedCapacity': {'TableName': 'parallel-scan-sample', 'CapacityUnits': 9.5}}
Segment3: {'Count': 2538, 'ScannedCount': 2538, 'ConsumedCapacity': {'TableName': 'parallel-scan-sample', 'CapacityUnits': 9.5}}
結果
並列スキャンも実行ごとに誤差があるので5回の平均を出してます。
消費した合計のキャパシティユニットはすべての実行で同じでした。
合計の消費キャパシティユニット: 37.5
1回目: 3.18623328208923秒
2回目: 3.18821120262146秒
3回目: 3.18266010284423秒
4回目: 3.24029588699340秒
5回目: 3.18203210830688秒
平均: 3.19588651657104秒
まとめ
それぞれ、5回試してみた処理時間の平均を比べてみると
普通のスキャン(平均) | 並列スキャン(平均) |
---|---|
7.329609870910644秒 | 3.19588651657104秒 |
となり、今回の検証では並列スキャンを使うと処理時間が短縮されることがわかりました。
テーブルのサイズが300KBと非常に小さい中での検証でしたが、テーブルのサイズが大きく集計等でどうしてもスキャンが必要なときは選択肢になるのかなと感じました。
並列スキャンの場合いくつのセグメントで分割すべきかを考えるのが大変な気がしますが。。。
ちなみに公式のベストプラクティスによると
並列スキャンは有益ですが、プロビジョニングされたスループットに大量のリクエストが発生する可能性があります
とのことなので注意が必要そうです。
また、以下のケースでは正しい選択肢となるとのことです。
- テーブルのサイズが 20 GB 以上である。
- テーブルのプロビジョニングされた読み込みスループットが完全に使用されていない。
- シーケンシャル Scan オペレーションが遅すぎる。