Pub/Sub に対して "Hello World" といったメッセージをメッセージを流すサンプルは公式からも出ているが、もう少し凝った時系列データを流したい時のメモ。
ローカル環境で BigQuery に対してクエリを実行し、クエリ結果をレコードごとに Pub/Sub に流すコードを Python で実装していく。
実現したいこと
検証などで BigQuery 上の TIMESTAMP 型のカラムの値によって Pub/Sub にレコードを流すタイミングを制御したい場合がある。
具体的には TIMESTAMP 型のカラムの値が 2023-11-01 HH:MM:SS
レコードが複数あったとして、
「2023-11-01 00:15:00
までのデータを Pub/Sub に流し、それ以降のデータはまだ流さない」といったように、擬似的に時系列データの投入タイミングを制御していく。
使用するデータ
bigquery-public-data.chicago_taxi_trips.taxi_trips のデータを取得する。
こちらのテーブルに対して以下のようなクエリを実行して 2023-11-01 分のデータを取得する。
SELECT
unique_key,
DATETIME(trip_start_timestamp, 'Asia/Tokyo') AS trip_start_timestamp_jst,
trip_seconds,
trip_miles,
trip_total,
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE DATE(trip_start_timestamp, 'Asia/Tokyo') = "2023-11-01"
ORDER BY trip_start_timestamp_jst
trip_start_timestamp
の値によって Pub/Sub へのデータ投入タイミングを制御する。
trip_start_timestamp
の値は 15 分ごとに丸め込まれているので、実行の流れを以下のようにしていく。
-
trip_start_timestamp
の値が2023-11-01 00:00:00
のデータを Pub/Sub に流し1分間スリープ -
trip_start_timestamp
の値が2023-11-01 00:15:00
のデータを Pub/Sub に流し1分間スリープ -
trip_start_timestamp
の値が2023-11-01 00:30:00
のデータを Pub/Sub に流し1分間スリープ - ...
事前準備
pip で google-cloud-bigquery
と google-cloud-pubsub
をインストール。
$ pip install google-cloud-bigquery
$ pip install google-cloud-pubsub
Pub/Sub のトピックを作成(トピック名: test-pokoyakazan-topic
)
-
Add a default subscriptio
にチェックを入れる(${トピック名}-sub
というサブスクリプションが自動でできる)
全体のコード
引数として、project_id
と、投入先となる Pub/Sub の topic_id
を受け取る。
bq_to_pubsub.py
import argparse
from datetime import datetime
import json
import time
from google.cloud import bigquery, pubsub_v1
SLEEP_SECOND = 30
def run_bq_query(project_id, query):
bq_client = bigquery.Client(project=project_id)
query_job = bq_client.query(query=query)
query_results = list(query_job.result())
return query_results
def json_serial(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError("Type %s not serializable" % type(obj))
def msg_publish(project_id, topic_id, msg):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_msg = msg.encode("utf-8")
future = publisher.publish(topic_path, publish_msg)
message_id = future.result(timeout=300)
print(f"Published {publish_msg.decode()} to {topic_path}: {message_id}")
def publish_query_results_to_pubsub(project_id, topic_id, query_results):
index = 0
for hour in range(0, 24):
for minute in range(0, 60, 15):
pseudo_now = datetime(
2023, 11, 1, hour, minute
)
while True:
row = query_results[index]
trip_start_timestamp_jst = row['trip_start_timestamp_jst']
if trip_start_timestamp_jst != pseudo_now:
break
row_dict = dict(row.items())
msg = json.dumps(row_dict, default=json_serial)
msg_publish(project_id, topic_id, msg)
index += 1
print(f'Sleep {SLEEP_SECOND} seconds')
print(f'Now: {pseudo_now}')
time.sleep(SLEEP_SECOND)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("project_id", help="Google Cloud project ID")
parser.add_argument("topic_id", help="Pub/Sub topic ID")
args = parser.parse_args()
with open("sql/get_taxi_trips.sql", mode='r') as rf:
query = rf.read()
query_results = run_bq_query(args.project_id, query)
publish_query_results_to_pubsub(args.project_id, args.topic_id, query_results)
sql/get_taxi_trips.sql
の中身
SELECT
unique_key,
DATETIME(trip_start_timestamp, 'Asia/Tokyo') AS trip_start_timestamp_jst,
trip_seconds,
trip_miles,
trip_total,
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE DATE(trip_start_timestamp, 'Asia/Tokyo') = "2023-11-01"
ORDER BY trip_start_timestamp_jst
BigQuery 実行部分
BigQuery のクエリを query
として受け取り BigQuery クライアント経由でクエリを実行している。
def run_bq_query(project_id, query):
bq_client = bigquery.Client(project=project_id)
query_job = bq_client.query(query=query)
query_results = list(query_job.result())
return query_results
Pub/Sub 投入部分
投入順序の制御
15分間隔で擬似的な時計を進めて、15分間分投入すると SLEEP_SECOND
秒スリープする。
def publish_query_results_to_pubsub(project_id, topic_id, query_results):
index = 0
for hour in range(0, 24):
for minute in range(0, 60, 15):
pseudo_now = datetime(
2023, 11, 1, hour, minute
)
while True:
row = query_results[index]
trip_start_timestamp_jst = row['trip_start_timestamp_jst']
if trip_start_timestamp_jst != pseudo_now:
break
row_dict = dict(row.items())
msg = json.dumps(row_dict, default=json_serial)
msg_publish(project_id, topic_id, msg)
index += 1
print(f'Sleep {SLEEP_SECOND} seconds')
print(f'Now: {pseudo_now}')
time.sleep(SLEEP_SECOND)
注意点として、datetime
型の trip_start_timestamp_jst
を JSON にダンプするために以下の関数で STRING に変換している。
def json_serial(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError("Type %s not serializable" % type(obj))
実際の投入
受け取ったメッセージをバイト変換して Pub/Sub トピックに投げる。
def msg_publish(project_id, topic_id, msg):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_msg = msg.encode("utf-8")
future = publisher.publish(topic_path, publish_msg)
message_id = future.result(timeout=300)
print(f"Published {publish_msg.decode()} to {topic_path}: {message_id}")
実行
Pub/Sub に投入されたデータを PULL するためのサブスクライバーを用意する。
sub.py
import argparse
from google.cloud import pubsub_v1
def msg_pull(project_id, subscription_id):
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received {message}.")
message.ack()
print(f"Acknowledged {message.message_id}.")
future = subscriber.subscribe(
subscription_path, callback=callback
)
print(f"Listening for messages on {subscription_path}..\n")
try:
future.result(timeout=300)
except:
future.cancel()
future.result()
subscriber.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"project_id", help="Google Cloud project ID"
)
parser.add_argument(
"subscription_id", help="Pub/Sub subscription ID"
)
args = parser.parse_args()
msg_pull(args.project_id, args.subscription_id)
サブスクライバー(sub.py
)の実行
$ python3 sub.py ${project_id} test-pokoyakazan-topic-sub
パブリッシャー(bq_to_pubsub.py
)の実行
$ python3 bq_to_pubsub.py ${project_id} test-pokoyakazan-topic