BigQueryのストリーミングインサートで、どれくらいのスループットが出るのか検証した。
(わりと雑にやっているので、結果は参考程度に捉えてください)
ログデータの収集基盤を構築予定であり、その設計にあたり、1つのPythonプロセスでどれくらいのスループットが見込めるのかを知りたかった。
検証方法
BigQueryのストリーミングインサートは複数レコードをまとめてインサートするAPIを持つため、1リクエストに含めるデータの行数を変えて、スループットの変化を調べた。
なお、検証を簡単にするため、並列処理/並行処理は行わなかった。
環境
- Python3.6
- google-cloud-bigquery (0.30.0)
テストデータ
- 10フィールドに、それぞれ50文字のデータが入っている
- 各行約640バイト
# 1行のサンプルデータ
{"field1": "F4rcPYBFhJdMZTv2KMYFqEmBJP2ssBUSi1Vo0UJnDu0ss8jwpO", "field2": "CW02u2YrZHdSHpYJfmjSh0iVYwSRjjkH681lIxQ6dY2XQ6nZhm", "field3": "KPgLursc0IkfefWhFqVNqjXPbfgd5uEgFYoBz4vyTuKo2i6NM4", "field4": "qSwOLvISE6znB41FuGk2zXOS52zwFEDWWVTDrz3RBHj66hNwNe", "field5": "5LCyjxQjTyUH82rl0oENCdqKQ7xuUvw7bGh9khfX2AL638fdrK", "field6": "kRNO3Tdrldsiu3WgBh2SLqUqyWPz9oHBoAXEVHaBVqYLEgyARo", "field7": "gWLYfieU0LKeNsOSJdMxEmLbedSoV2QhbqYWVw0RDvmWsVAIOJ", "field8": "4CqDhio4cW0kN29lMZCUbFkzBWRDppG5ad61kxIz0kq2sU7ohc", "field9": "o2BNeGxbD8CaDXYxU4f0gdmi8mm3tTd6a9LCcobknXdiYqmGgB", "field10": "E5z80HP9hsXbJRzZCruh4gUp4kvgj3szmrYyDJYShs32WSphbg"}
結果
※「所要時間の平均」は、3回の実行の平均。
1000行のテストデータを使用
1リクエストに含まれる行数 | 所要時間の平均(秒) | スループット(件/秒) |
---|---|---|
1 | 263.04 | 3.80 |
10 | 26.15 | 38.24 |
100 | 5.25 | 190.47 |
1000 | 3.57 | 280.11 |
10000行のテストデータを使用
1リクエストに含まれる行数 | 所要時間の平均(秒) | スループット(件/秒) |
---|---|---|
10 | 261.99 | 38.16 |
100 | 47.79 | 209.248 |
500 | 33.82 | 295.68 |
1000 | 34.36 | 288.35 |
5000 | 33.78 | 296.03 |
ボトルネック
CPU、メモリ、NW帯域は頭打ちになっていなかったため、ローカル環境とBigQuery間の通信がボトルネックになってたと思う。
まとめ
1リクエストに含める行数が多いほど、ある一定のレベルまではスループットは上昇する。今回の検証では500行が基準となり、それ以上増やしてもスループットの上昇は見られなかった
また、BigQueryの公式ドキュメントでも500行程度を推奨している
最大 500 行をおすすめします。一括処理することでパフォーマンスとスループットをある程度向上させることはできますが、リクエストごとのレイテンシは高くなります。リクエストごとの行数が少なく、各リクエストにオーバーヘッドがあると取り込みの効率が下がります。また、リクエストごとの行数が多すぎるとスループットが下がります。推奨される行数はリクエストごとに 500 行程度ですが、代表データ(スキーマとデータサイズ)を使用したテストを実施して適切なバッチサイズを判断することをおすすめします。
さらなる性能向上
並列・並行処理を行う。
参考にしたリンク
BigQuery
- BigQueryのPythonライブラリの使い方
- BigQueryのPythonライブラリのソースコード
- ストリーミングインサートの制限(クオータ)
- BigQueryの料金
Python
- プログラムの実行時間を計測する
- ランダムな文字列生成
個人的なメモ
- GCPのライブラリのドキュメントは情報量が少ないので、ソースコードを見たほうが早い
- "client.insert_rows_json()"の戻り値はエラーのリスト(もしあれば)
- "client.insert_rows_json()"の引数にinsert_id(BigQueryが重複排除に用いるユニークなIDのリスト)を渡すことができる
- そのリストの長さは、リクエストに含める行数と一致している必要がある
- "client.insert_rows_json()"は、1リクエストに含まれる複数行のうち一部がエラーになった場合の挙動を選択することができる
- 全部エラーにするか、エラー行を飛ばすか
- エラー行を飛ばした場合に、どの行がエラーになったかどうかはわからない
- 全部エラーにするか、エラー行を飛ばすか
- 「Kinesis + Lambda + BigQuery」の構成にするなら、Lambda(=シャード)1つあたりの最大スループットを秒間200件と想定し、必要な分だけKinesisのシャードを追加するのが良さそう。(設計と実装が楽なので)
検証プログラム(一部)
インサート
# ...省略
for line in open(test_data_file, 'r'):
loop_count += 1
json_dict = json.loads(line)
json_rows.append(json_dict)
if loop_count % lines_per_request == 0:
result = insert_rows_json(client, table, json_rows)
json_rows = []
if json_rows:
result = insert_rows_json(client, table, json_rows)
elapsed_time = time.time() - start
テストデータ生成
メモ:けっこうCPU使う。10万件のデータ生成はだいぶしんどい。
import random, string
import json
def create_random_string(num):
return ''.join([random.choice(string.ascii_letters + string.digits)
for i in range(num)])
line_num = 10000
str_counts = 50
test_data_file = "../test_data_" + str(line_num) + ".txt"
with open(test_data_file, 'w') as f:
for i in range(line_num):
dict = {}
for i in range(10):
dict["field1"] = create_random_string(str_counts)
dict["field2"] = create_random_string(str_counts)
dict["field3"] = create_random_string(str_counts)
dict["field4"] = create_random_string(str_counts)
dict["field5"] = create_random_string(str_counts)
dict["field6"] = create_random_string(str_counts)
dict["field7"] = create_random_string(str_counts)
dict["field8"] = create_random_string(str_counts)
dict["field9"] = create_random_string(str_counts)
dict["field10"] = create_random_string(str_counts)
f.write(json.dumps(dict))
f.write("\n")