概要
BigQuery Storage Write APIのPythonのサンプルが公開されたので、使ってみました。チュートリアルとして、YoutubeのライブストリーミングのチャットをリアルタイムでBigQueryに取り込みました。
BigQuery Storage Write APIとは
BigQuery Storage Write API は、高スループットでデータを BigQuery に取り込むための gRPC ベースのストリーミング API です。
詳細はこちら→ https://cloud.google.com/bigquery/docs/write-api
※ 2022年1月時点ではプレビュー段階の機能となっており、テスト環境での使用を想定されているものです。
Python Client
2022-01-29時点では日本語ドキュメントには反映されていませんが、英語ドキュメントには追加されています。
https://cloud.google.com/bigquery/docs/write-api#write_data_using_the_python_client
手順
BigQueryのテーブルを作成する
宛先となるBigQueryのデータセット、テーブルを、BigQueryのWebコンソールなどで作成します。
CREATE TABLE tutorial.youtube_comments
(video_url string, time timestamp, author string, message string)
CLUSTER BY video_url, time;
認証情報を用意する
BigQuery Storage Write API
BigQuery Storage Write APIをPythonで扱う際には、環境変数GOOGLE_APPLICATION_CREDENTIALS
に認証情報のJSONのパスを渡す必要があります。
- サービスアカウント からサービスアカウントを作成します
- 作成したサービスアカウントの鍵を作成し、保存します。
- IAMと管理 から、作成したサービスアカウントに対して「BigQueryデータ編集者」以上の権限を付与します。
Youtube API Key
Youtube Data API v3の鍵を取得します(こちら)。
こちらの記事などで詳細に解説されているので省略します。
各種ツールをインストールする
# Bigquery Storage API の Pythonライブラリ
pip install --upgrade google-cloud-bigquery-storage
# Protocol Bufferのコンパイラ MacOS/aptの例
# 詳細は→ https://grpc.io/docs/protoc-installation/
brew install protobuf
apt install -y protobuf-compiler
Protocol Buffer定義ファイルの作成
次のようなProtocol Bufferの定義ファイルを作成します
syntax = "proto2";
message YoutubeComments {
required string video_url = 1;
required int64 time = 2;
required string author = 3;
required string message = 4;
}
そして、その形式のProtocol BufferをPythonで扱うラッパーを作成します。下のコマンドを実行すると、youtube_comments_pb2.py
というファイルが作成されます。
protoc --python_out=. youtube_comments.proto
Pythonスクリプトの作成
BigQuery Storage Write APIのサンプルコードに、YoutubeLiveのチャットを取得するスクリプトをくっつけるだけです。
**ソースコードはこちら**
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
from apiclient.discovery import build
import datetime
import os
import time
# Protocol Buffer
import youtube_comments_pb2
# 認証情報
YOUTUBE_API_KEY = 'YOUTUBE APIキー'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'BigQueryにアクセス可能なサービスアカウントの認証鍵'
def create_row_data(video_url: str, time: str, author: str, message: str, elapsedTime: str = "" ):
row = youtube_comments_pb2.YoutubeComments()
row.video_url = video_url
row.time = int(datetime.datetime.fromisoformat(time).timestamp() * 1000000)
row.author = author
row.message = message
return row.SerializeToString()
def insert_youtube_comments(project_id: str, dataset_id: str, table_id: str, video_id: str):
# 準備
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
write_stream = types.WriteStream()
# COMMITTED: データを挿入するとすぐにデータが取得可能になる
# PENDING: データを挿入後に、コミットをしないとデータが取得可能にならない。
# https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
write_stream.type_ = types.WriteStream.Type.COMMITTED
write_stream = write_client.create_write_stream(
parent=parent, write_stream=write_stream
)
stream_name = write_stream.name
request_template = types.AppendRowsRequest()
request_template.write_stream = stream_name
# BigQueryに送りつける入れ物づくり
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
youtube_comments_pb2.YoutubeComments.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Youtube APIの準備
youtube = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY)
# ライブチャットのIDを取得
request = youtube.videos().list(
part="snippet,contentDetails,statistics,liveStreamingDetails",
id=video_id
)
response = request.execute()
active_live_chat_id = response['items'][0]['liveStreamingDetails']['activeLiveChatId']
latest_chat_timestamp = None
try:
request_cnt = 0
while True:
# ライブチャットから最新の70件を取得する
request = youtube.liveChatMessages().list(
liveChatId=active_live_chat_id,
part="snippet,authorDetails",
)
chat_messages = request.execute()
tmp_latest_chat_timestamp = latest_chat_timestamp
proto_rows = types.ProtoRows()
for c in chat_messages["items"]:
published_at = (c['snippet']['publishedAt'].split('+')[0] + '00000000')[0:26]+'+00:00'
# 新しく追加された差分だけ切り出す
if tmp_latest_chat_timestamp is None:
tmp_latest_chat_timestamp = published_at
if tmp_latest_chat_timestamp < published_at:
tmp_latest_chat_timestamp = published_at
if latest_chat_timestamp is not None and published_at <= latest_chat_timestamp:
continue
# 取得したデータを配列に入れていく
proto_rows.serialized_rows.append(create_row_data(video_id, published_at, c['authorDetails']['displayName'], c['snippet']['displayMessage']))
# 送信できるデータが有れば送信する
if len(proto_rows.serialized_rows) > 0:
request = types.AppendRowsRequest()
request.offset = request_cnt
request_cnt += len(proto_rows.serialized_rows)
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
res = append_rows_stream.send(request)
latest_chat_timestamp = tmp_latest_chat_timestamp
# 配信が終了したら閉じる
if 'offlineAt' in chat_messages:
append_rows_stream.close()
print("stream is finished at " + chat_messages['offlineAt'])
break
# 10秒おき
time.sleep(10)
except KeyboardInterrupt:
print("C-c")
append_rows_stream.close()
insert_youtube_comments('your-gcp-project', 'sample', 'youtube_comments', "SdAfHFvmEcQ")
おわりに
Protocol Bufferを使う必要がある分、コードや手順は煩雑に見えますが、定型作業が多いので見た目ほど利用は難しくはない印象です。BigQuery Storage Write APIのPythonライブラリは登場したばかりで、利用例がまだまだ少ないので、参考の一つになれば幸いです。