今回の検証内容
- Big Queryは外部からAPIリクエストに対して、毎秒何件程度処理できるのか
→リアルタイム処理を実装するにあたって、APIリクエストの場合の限界を検証
前提
- 本検証は仮に他のサービスを用いずにAPIリクエストでリアルタイム連携をする場合は、どれぐらいの限界があるのかという検証
- BigQueryでリアルタイム処理をするならPub/SubやDataflowを利用しましょう!
検証
実行環境
- Big Query
- Google Colab(Python)
検証の流れ
1. 事前準備:テーブル作成とGoogleColabからのBigQuery接続
2. 検証実施
2-1. DML INSERTで検証(失敗)
2-2. ストリーミングAPIで検証
1. 事前準備
BigQueryでテスト用テーブルを作成
カラムは適当に3列設定(おそらくカラムが増えてデータ処理量が増えるとクエリ成功率も下がるが今回は考慮しない)
GoogleColabからBigQueryに接続
コード
from google.colab import auth
from google.cloud import bigquery
# auth認証
auth.authenticate_user()
# BigQueryに接続
project_id = [projectID] #ご自身のGoogleCloudプロジェクトIDを代入
client = bigquery.Client(project=project_id)
2. 検証実施
2-1. DML INSERTで検証
まずは1回のクエリ実行でINSERTができることを検証
コード
# insertクエリの実行
# uuidは1,nameは固定.timestampは関数から取得
# 1回BigQueryにInsertする.
query = """
INSERT
`[projectID].test.test_continuous`
(uuid,name,timestamp)
VALUES
(1,'name',current_timestamp)
"""
query_job = client.query(query) # API request
では、10回のINSERT実行で検証
(この後も以下のコードを流用するため、この後のコードは割愛)
コード
# uuidだけ取得
query = """
SELECT
uuid
FROM
`[projectID].test.test_continuous`
WHERE
TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) = TIMESTAMP("2024-03-10")
LIMIT 1000
"""
df = client.query(query).to_dataframe()
max_uuid = df.max()['uuid'] # 取得したデータセットから最新のuuidを取得
# uuidだけ加算.nameは固定.timestampは関数から取得
# 10回BigQueryにInsertする.
for uuid in range(max_uuid+1,max_uuid+2):
query = """
INSERT
`[projectID].test.test_continuous`
(uuid,name,timestamp)
VALUES
("""+ str(uuid) + """,'name',current_timestamp)
"""
query_job = client.query(query) # API request
10回も成功. 処理時間は5秒だったため、約2件/秒の処理.
次に50回で検証.
40件しか追加されていない. つまり、20%のクエリが失敗.
処理時間は13秒のため、約3~4件/秒の処理.
流石に失敗率が高すぎるため、BigQueryのコンソールからジョブ履歴を見てエラー調査.
どうやらDML Insertが良くないとのこと.
以下サイトも参考にしつつ、ストリーミングAPIを利用する処理を変更.
(参考)https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery?hl=ja
2-2. ストリーミングAPIで検証
ストリーミングAPIを利用して、先ほど同様50回インサート.
コード
query = """
SELECT
uuid
FROM
`[projectID].test.test_continuous`
WHERE
TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) = TIMESTAMP("2024-03-10")
LIMIT 1000
"""
df_official = client.query(query).to_dataframe()
max_uuid = df_official.max()['uuid']
# 以下ストリーミングAPI利用
import datetime
table_id = "[projectID].test.test_continuous"
for uuid in range(max_uuid+1,max_uuid+51):
dt_now = datetime.datetime.now()
rows_to_insert = [
{u"uuid":uuid, u"name": u"name", u"timestamp": datetime.datetime.timestamp(dt_now)},
]
#print(rows_to_insert)
errors = client.insert_rows_json(table_id, rows_to_insert) # Make an API request.
if errors == []:
print("New rows have been added.")
else:
print("Encountered errors while inserting rows: {}".format(errors))
結果的に50回のインサート成功(1回多いのは試しで実施したため).
処理時間は11秒程度であった.
あまり多いと課金されてしまうため、最後に1000回,10000回で検証.
[1000回]
[10000回]
どちらも無事指定回数のインサートが成功.
それぞれの処理時間は、1000回:3分18秒、10000回:27分12秒であった.
10000回の検証では約6回/秒のAPIリクエストの処理ができている.
結果
- ストリーミングAPIを利用すると少なくとも約6回/秒の処理程度は問題なく処理可能
(今回は処理方式が良くなかったが、BigQueryはもっと多くの処理をこなせる) - DML INSERTはリアルタイム処理には向いていない
- そもそもGoogleColab ⇄ BigQueryのAPIリクエスト処理がそんなに早くない・・・
(処理の書き方が悪かった、、、?)
次回はDataflowを利用したリアルタイム連携を実施してみます.
もっとこういう風にしたほうが検証としては良いよなどのご意見あれば教えてください.