0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

BigQueryで取得した時系列データをPub/Subに流す

Posted at

Pub/Sub に対して "Hello World" といったメッセージをメッセージを流すサンプルは公式からも出ているが、もう少し凝った時系列データを流したい時のメモ。
ローカル環境で BigQuery に対してクエリを実行し、クエリ結果をレコードごとに Pub/Sub に流すコードを Python で実装していく。

実現したいこと

検証などで BigQuery 上の TIMESTAMP 型のカラムの値によって Pub/Sub にレコードを流すタイミングを制御したい場合がある。
具体的には TIMESTAMP 型のカラムの値が 2023-11-01 HH:MM:SS レコードが複数あったとして、
2023-11-01 00:15:00 までのデータを Pub/Sub に流し、それ以降のデータはまだ流さない」といったように、擬似的に時系列データの投入タイミングを制御していく。

2024_07_22_1.png

使用するデータ

bigquery-public-data.chicago_taxi_trips.taxi_trips のデータを取得する。
こちらのテーブルに対して以下のようなクエリを実行して 2023-11-01 分のデータを取得する。

SELECT
    unique_key,
    DATETIME(trip_start_timestamp, 'Asia/Tokyo') AS trip_start_timestamp_jst,
    trip_seconds,
    trip_miles,
    trip_total,
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE DATE(trip_start_timestamp, 'Asia/Tokyo') = "2023-11-01"
ORDER BY trip_start_timestamp_jst

trip_start_timestamp の値によって Pub/Sub へのデータ投入タイミングを制御する。
trip_start_timestamp の値は 15 分ごとに丸め込まれているので、実行の流れを以下のようにしていく。

  • trip_start_timestamp の値が 2023-11-01 00:00:00 のデータを Pub/Sub に流し1分間スリープ
  • trip_start_timestamp の値が 2023-11-01 00:15:00 のデータを Pub/Sub に流し1分間スリープ
  • trip_start_timestamp の値が 2023-11-01 00:30:00 のデータを Pub/Sub に流し1分間スリープ
  • ...

事前準備

pip で google-cloud-bigquerygoogle-cloud-pubsub をインストール。

$ pip install google-cloud-bigquery
$ pip install google-cloud-pubsub

Pub/Sub のトピックを作成(トピック名: test-pokoyakazan-topic)

  • Add a default subscriptio にチェックを入れる(${トピック名}-subというサブスクリプションが自動でできる)

2024_07_22_2.png

全体のコード

引数として、project_id と、投入先となる Pub/Sub の topic_id を受け取る。

bq_to_pubsub.py

import argparse
from datetime import datetime
import json
import time

from google.cloud import bigquery, pubsub_v1

SLEEP_SECOND = 30


def run_bq_query(project_id, query):
    bq_client = bigquery.Client(project=project_id)
    query_job = bq_client.query(query=query)
    query_results = list(query_job.result())
    return query_results


def json_serial(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError("Type %s not serializable" % type(obj))


def msg_publish(project_id, topic_id, msg):
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    publish_msg = msg.encode("utf-8")
    future = publisher.publish(topic_path, publish_msg)
    message_id = future.result(timeout=300)
    print(f"Published {publish_msg.decode()} to {topic_path}: {message_id}")


def publish_query_results_to_pubsub(project_id, topic_id, query_results):
    index = 0
    for hour in range(0, 24):
        for minute in range(0, 60, 15):
            pseudo_now = datetime(
                2023, 11, 1, hour, minute
            )
            while True:
                row = query_results[index]
                trip_start_timestamp_jst = row['trip_start_timestamp_jst']
                if trip_start_timestamp_jst != pseudo_now:
                    break
                row_dict = dict(row.items())
                msg = json.dumps(row_dict, default=json_serial)
                msg_publish(project_id, topic_id, msg)
                index += 1

            print(f'Sleep {SLEEP_SECOND} seconds')
            print(f'Now: {pseudo_now}')
            time.sleep(SLEEP_SECOND)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("project_id", help="Google Cloud project ID")
    parser.add_argument("topic_id", help="Pub/Sub topic ID")
    args = parser.parse_args()

    with open("sql/get_taxi_trips.sql", mode='r') as rf:
        query = rf.read()
    query_results = run_bq_query(args.project_id, query)

    publish_query_results_to_pubsub(args.project_id, args.topic_id, query_results)

sql/get_taxi_trips.sql の中身

SELECT
    unique_key,
    DATETIME(trip_start_timestamp, 'Asia/Tokyo') AS trip_start_timestamp_jst,
    trip_seconds,
    trip_miles,
    trip_total,
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE DATE(trip_start_timestamp, 'Asia/Tokyo') = "2023-11-01"
ORDER BY trip_start_timestamp_jst

BigQuery 実行部分

BigQuery のクエリを query として受け取り BigQuery クライアント経由でクエリを実行している。

def run_bq_query(project_id, query):
    bq_client = bigquery.Client(project=project_id)
    query_job = bq_client.query(query=query)
    query_results = list(query_job.result())
    return query_results

Pub/Sub 投入部分

投入順序の制御

15分間隔で擬似的な時計を進めて、15分間分投入すると SLEEP_SECOND 秒スリープする。

def publish_query_results_to_pubsub(project_id, topic_id, query_results):
    index = 0
    for hour in range(0, 24):
        for minute in range(0, 60, 15):
            pseudo_now = datetime(
                2023, 11, 1, hour, minute
            )
            while True:
                row = query_results[index]
                trip_start_timestamp_jst = row['trip_start_timestamp_jst']
                if trip_start_timestamp_jst != pseudo_now:
                    break
                row_dict = dict(row.items())
                msg = json.dumps(row_dict, default=json_serial)
                msg_publish(project_id, topic_id, msg)
                index += 1

            print(f'Sleep {SLEEP_SECOND} seconds')
            print(f'Now: {pseudo_now}')
            time.sleep(SLEEP_SECOND)

注意点として、datetime 型の trip_start_timestamp_jst を JSON にダンプするために以下の関数で STRING に変換している。

def json_serial(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError("Type %s not serializable" % type(obj))

実際の投入

受け取ったメッセージをバイト変換して Pub/Sub トピックに投げる。

def msg_publish(project_id, topic_id, msg):
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    publish_msg = msg.encode("utf-8")
    future = publisher.publish(topic_path, publish_msg)
    message_id = future.result(timeout=300)
    print(f"Published {publish_msg.decode()} to {topic_path}: {message_id}")

実行

Pub/Sub に投入されたデータを PULL するためのサブスクライバーを用意する。

sub.py

import argparse

from google.cloud import pubsub_v1


def msg_pull(project_id, subscription_id):
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    def callback(message):
        print(f"Received {message}.")
        message.ack()
        print(f"Acknowledged {message.message_id}.")

    future = subscriber.subscribe(
        subscription_path, callback=callback
    )
    print(f"Listening for messages on {subscription_path}..\n")

    try:
        future.result(timeout=300)
    except:
        future.cancel()
        future.result()

    subscriber.close()


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "project_id", help="Google Cloud project ID"
    )
    parser.add_argument(
        "subscription_id", help="Pub/Sub subscription ID"
    )
    args = parser.parse_args()
    msg_pull(args.project_id, args.subscription_id)

サブスクライバー(sub.py)の実行

$ python3 sub.py ${project_id} test-pokoyakazan-topic-sub

パブリッシャー(bq_to_pubsub.py)の実行

$ python3 bq_to_pubsub.py ${project_id} test-pokoyakazan-topic
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?