LoginSignup
5
0

More than 1 year has passed since last update.

BigQuery Storage Write APIでストリーミング挿入するPythonスクリプト

Posted at

概要

BigQuery Storage Write APIのPythonのサンプルが公開されたので、使ってみました。チュートリアルとして、YoutubeのライブストリーミングのチャットをリアルタイムでBigQueryに取り込みました。

BigQuery Storage Write APIとは

BigQuery Storage Write API は、高スループットでデータを BigQuery に取り込むための gRPC ベースのストリーミング API です。

詳細はこちら→ https://cloud.google.com/bigquery/docs/write-api

※ 2022年1月時点ではプレビュー段階の機能となっており、テスト環境での使用を想定されているものです。

Python Client

2022-01-29時点では日本語ドキュメントには反映されていませんが、英語ドキュメントには追加されています。
https://cloud.google.com/bigquery/docs/write-api#write_data_using_the_python_client

手順

BigQueryのテーブルを作成する

宛先となるBigQueryのデータセット、テーブルを、BigQueryのWebコンソールなどで作成します。

CREATE TABLE tutorial.youtube_comments
(video_url string, time timestamp,  author string, message string)
CLUSTER BY video_url, time;

認証情報を用意する

BigQuery Storage Write API

BigQuery Storage Write APIをPythonで扱う際には、環境変数GOOGLE_APPLICATION_CREDENTIALSに認証情報のJSONのパスを渡す必要があります。

  1. サービスアカウント からサービスアカウントを作成します
  2. 作成したサービスアカウントの鍵を作成し、保存します。
  3. IAMと管理 から、作成したサービスアカウントに対して「BigQueryデータ編集者」以上の権限を付与します。

Youtube API Key

Youtube Data API v3の鍵を取得します(こちら)。

こちらの記事などで詳細に解説されているので省略します。

各種ツールをインストールする

# Bigquery Storage API の Pythonライブラリ
pip install --upgrade google-cloud-bigquery-storage

# Protocol Bufferのコンパイラ MacOS/aptの例
# 詳細は→ https://grpc.io/docs/protoc-installation/
brew install protobuf
apt install -y protobuf-compiler

Protocol Buffer定義ファイルの作成

次のようなProtocol Bufferの定義ファイルを作成します

youtube_comments.proto
syntax = "proto2";

message YoutubeComments {
  required string video_url = 1;
  required int64 time = 2;
  required string author = 3;
  required string message = 4;
}

そして、その形式のProtocol BufferをPythonで扱うラッパーを作成します。下のコマンドを実行すると、youtube_comments_pb2.pyというファイルが作成されます。

protoc --python_out=. youtube_comments.proto

Pythonスクリプトの作成

BigQuery Storage Write APIのサンプルコードに、YoutubeLiveのチャットを取得するスクリプトをくっつけるだけです。

ソースコードはこちら
main.py
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
from apiclient.discovery import build

import datetime
import os
import time

# Protocol Buffer 
import youtube_comments_pb2

# 認証情報
YOUTUBE_API_KEY = 'YOUTUBE APIキー'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'BigQueryにアクセス可能なサービスアカウントの認証鍵'


def create_row_data(video_url: str, time: str, author: str, message: str, elapsedTime: str = "" ):
    row = youtube_comments_pb2.YoutubeComments()
    row.video_url = video_url
    row.time = int(datetime.datetime.fromisoformat(time).timestamp() * 1000000)
    row.author = author
    row.message = message
    return row.SerializeToString()


def insert_youtube_comments(project_id: str, dataset_id: str, table_id: str, video_id: str):

    # 準備
    write_client = bigquery_storage_v1.BigQueryWriteClient()
    parent = write_client.table_path(project_id, dataset_id, table_id)
    write_stream = types.WriteStream()

    # COMMITTED: データを挿入するとすぐにデータが取得可能になる
    # PENDING: データを挿入後に、コミットをしないとデータが取得可能にならない。
    # https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
    write_stream.type_ = types.WriteStream.Type.COMMITTED
    write_stream = write_client.create_write_stream(
        parent=parent, write_stream=write_stream
    )
    stream_name = write_stream.name

    request_template = types.AppendRowsRequest()
    request_template.write_stream = stream_name

    # BigQueryに送りつける入れ物づくり
    proto_schema = types.ProtoSchema()
    proto_descriptor = descriptor_pb2.DescriptorProto()
    youtube_comments_pb2.YoutubeComments.DESCRIPTOR.CopyToProto(proto_descriptor)
    proto_schema.proto_descriptor = proto_descriptor
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.writer_schema = proto_schema
    request_template.proto_rows = proto_data

    append_rows_stream = writer.AppendRowsStream(write_client, request_template)

    # Youtube APIの準備
    youtube = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY)

    # ライブチャットのIDを取得
    request = youtube.videos().list(
        part="snippet,contentDetails,statistics,liveStreamingDetails",
        id=video_id
    )
    response = request.execute()
    active_live_chat_id = response['items'][0]['liveStreamingDetails']['activeLiveChatId']

    latest_chat_timestamp = None 

    try:
      request_cnt = 0
      while True:
        # ライブチャットから最新の70件を取得する
        request = youtube.liveChatMessages().list(
            liveChatId=active_live_chat_id,
            part="snippet,authorDetails",
        )
        chat_messages = request.execute()
        tmp_latest_chat_timestamp = latest_chat_timestamp

        proto_rows = types.ProtoRows()

        for c in chat_messages["items"]:
          published_at =  (c['snippet']['publishedAt'].split('+')[0] + '00000000')[0:26]+'+00:00'

          # 新しく追加された差分だけ切り出す
          if tmp_latest_chat_timestamp is None:
            tmp_latest_chat_timestamp = published_at
          if tmp_latest_chat_timestamp < published_at:
            tmp_latest_chat_timestamp =  published_at
          if latest_chat_timestamp is not None and published_at <= latest_chat_timestamp:
            continue

          # 取得したデータを配列に入れていく
          proto_rows.serialized_rows.append(create_row_data(video_id,  published_at, c['authorDetails']['displayName'], c['snippet']['displayMessage']))

        # 送信できるデータが有れば送信する
        if len(proto_rows.serialized_rows) > 0:
          request = types.AppendRowsRequest()
          request.offset = request_cnt
          request_cnt +=  len(proto_rows.serialized_rows)
          proto_data = types.AppendRowsRequest.ProtoData()
          proto_data.rows = proto_rows
          request.proto_rows = proto_data
          res = append_rows_stream.send(request)

        latest_chat_timestamp = tmp_latest_chat_timestamp

        # 配信が終了したら閉じる
        if 'offlineAt' in chat_messages:
          append_rows_stream.close()
          print("stream is finished at " + chat_messages['offlineAt'])
          break

        # 10秒おき
        time.sleep(10)

    except KeyboardInterrupt:
      print("C-c")
      append_rows_stream.close()

insert_youtube_comments('your-gcp-project', 'sample', 'youtube_comments', "SdAfHFvmEcQ")

おわりに

Protocol Bufferを使う必要がある分、コードや手順は煩雑に見えますが、定型作業が多いので見た目ほど利用は難しくはない印象です。BigQuery Storage Write APIのPythonライブラリは登場したばかりで、利用例がまだまだ少ないので、参考の一つになれば幸いです。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0