LoginSignup
17
15

More than 5 years have passed since last update.

DynamoDBから大量レコードを並列に取得する方法

Last updated at Posted at 2015-06-11

概要

DynamoDB から大量レコードを高速で取得する必要があったため、並列で取得するよう実装してみました。

環境

  • Ruby 2.2
  • AWS SDK for Ruby V2

ポイント

内部動作が並列な API を並列に呼び出すことで高速化を図りました。

  • BatchGetItem
  • マルチスレッド

BatchGetItem

最大100アイテム (ただし、16MBを超えないアイテム数) を並列で取得してくれるありがたい API です。
DynamoDB 側で並列処理をしてくれているようです。

A single operation can retrieve up to 16 MB of data, which can contain as many as 100 items.
In order to minimize response latency, BatchGetItem retrieves items in parallel.

BatchGetItem - Amazon DynamoDB

マルチスレッド

BatchGetItem は上記の通り、1度の呼び出しで取得できるデータ容量に制限があるため、Ruby のスレッドを使って並列に呼び出すことにしました。
Ruby のスレッドに関する基本と、実装方法は以下の記事を参考にしました。

実装

プライマリキーとテーブル名を渡すと、BatchGetItem と Thread を使って並列に問い合わせを実行し、その結果を返すメソッドを作成しました。
ハッシュキーのみが PK のテーブルへも、ハッシュキーとレンジキーの複合が PK へのテーブルへも問い合わせが可能です。

def query_dynamo(many_keys, table_name)
  locks = Queue.new
  3.times { locks.push :lock }

  [].tap do |results|
    many_keys.each_slice(100).map do |keys|
      Thread.new do
        lock = locks.pop
        results << dynamodb.batch_get_item(
          request_items: { table_name => { keys: keys } }
        ).responses[table_name]
        locks.push lock
      end
    end.each(&:join) # 全スレッドに対し Thread#join を呼び出す
  end.flatten
end

上記の実装ですと、1アイテムの容量が 160 KB (= 52 MB/100) を超える場合はレコードの取得漏れが発生してしまうので注意が必要です。
DynamoDB の各レコードの最大容量は 400 KB と公式ドキュメントにあるので、各アイテムの容量の予想がつきづらい場合はその値を参考にクエリを分割すると良いかもしれません。

また、スレッドの同時実行数は3にしていますが、こちらは動かしながら適切な値を探すのが良いかもしれません。
私が試した環境ですと、3よりも大きくしても性能があがらなかったため、3にしました。

追記

取得しようとしていたアイテム数を調べたところ、300未満しかありませんでした。
すなわち、スレッド自体がそもそも3以下しか生成されていない状態でした。
取得するアイテム数を増やし、スレッドの同時実行数も増やしてみた結果、さらなる高速化が行えました。

Hash Type Primary Key への問い合わせ

簡単な問い合わせ例です。hash_keysはハッシュキーの配列となっています。

query_dynamo(
  hash_keys.map { |e| { HASH_KEY_NAME => e } },
  HASH_PK_TABLE_NAME
)

Hash and Range Type Primary Key への問い合わせ

こちらも簡単な問い合わせ例です。hash_and_range_keysは「ハッシュキーとレンジキーの配列」の配列となっています。

query_dynamo(
  hash_and_range_keys.map { |e| { HASH_KEY_NAME => e[0], RANGE_KEY_NAME => e[1] } },
  HASH_AND_RANGE_PK_TABLE_NAME
)

成果

正確な時間測定やデータ量の計算をしていないので参考になる数値ではありませんが、ざっくり以下のような成果がでました。

BatchGetItem を使ってざっくり2500レコード取得した場合、1.5秒位かかっていました。
そして BatchGetItem の呼び出しにスレッドを導入した結果、0.7秒位になりました。

数値としては全く当てにはならないと思いますが、なんと倍!当社比倍!
GetItem 単体での速度は測ってはいないです。

もっと正確に測定した方がいたら教えて下さい!笑

改善(追記)

より実用的なメソッドを目指して改善を行ってみました。
主な変更点は以下のとおりです。

  • タスクの共有を Queue で行うように変更
  • 取得漏れが生じた場合の処理を追加
  • スループット上限に達した場合の例外処理を追加
def find_by_keys!(keys)
  results = []
  queue = Queue.new
  keys.each_slice(100).each { |each_keys| queue.push each_keys }

  16.times.map do
    Thread.new do
      until queue.empty?
        each_keys = queue.pop
        retries = 0

        begin
          results.concat(
            dynamodb.batch_get_item(
              request_items: { TABLE_NAME => { keys: each_keys } }
            )
            .tap { |res| queue.push res.unprocessed_keys[TABLE_NAME].keys unless res.unprocessed_keys.empty? }
            .responses[TABLE_NAME]
          )
        rescue Aws::DynamoDB::Errors::ProvisionedThroughputExceededException => e
          raise if retries >= 3
          retries += 1
          puts "Retry count: #{retries}, Message: #{e.message}"
          sleep(3 ** retries)
          retry
        end
      end
    end
  end.each(&:join)
  results
end

これで取得漏れなく DynamoDB から並列に大量レコードを取得できるかと思います。

17
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
15