この記事は ミクシィグループ Advent Calendar 2020 の 17日目の記事です.
はじめに
GCPでゲーム分析基盤を0から構築する機会があったので, 技術選定どうしたかや直面した問題と対処法についてお話したいなと思います.
これからGCPでデータ分析基盤を構築するエンジニアや私と同じようにデータエンジニアに入門したての方のお役に立てれば幸いです.
GCPでデータ分析基盤を構築する
単にGCPでデータ分析基盤を構築すると言っても扱うデータの種類やドメインによって最適解は変わってくるのかなと思います. この記事では, 以下のやりたいことを実現するために採用した技術スタックとTipsを紹介します。
やりたいこと(要件)
- デイリーでデータベース(Cloud Spanner)のスナップショットを取得する
- 上記スナップショットとGKEなどから出力されたアプリケーションログ, リクエストログをBigQueryに取り込み分析可能な状態にする
採用した技術スタック
やりたいことを実現するために採用した技術スタックは以下の通りです
- DWH(データウェアハウス)はBigQuery
- ワークフローエンジンには, Apache Airflow on Cloud Composer
- データベースのスナップショットはDataflow
本番サーバーから出力されたアプリケーションログや本番DB(Spanner)から取得したスナップショットはすべてGoogle Cloud Storageに保存しています. これをワークフローエンジンであるAirflowが依存関係に基づいてデイリーでBigQueryに取り込むというアーキテクチャを採用しました.
技術選定どうしたか
技術選定する際の方針として, BigQueryでETLを完結させることを重視して上記のようなアーキテクチャになりました.
GCPでETLに利用できるサービスとしては, Dataflow (Apache Beam)や, Dataproc(Hive, Spark等), GUIでワークフローを構築できるDataFusionなどが存在しますが, BQでETLできるような構造化されたデータであれば, BQでETLを完結させたほうが実装コストを抑えることができると考えたため, Airflowでは依存関係の制御に徹底し生データの加工処理はすべてBigQueryで行うようにしています.
技術選定1. Spanner to BigQuery
SpannerでのスナップショットにはDataflowを利用しました.
Dataflowの場合Spannerのスナップショットを取るためのGoogle提供のバッチテンプレート(Cloud Spanner to Cloud Storage Avro)が存在するためノーコーディングでGCSまでデータをExportすることができます.
( なお, Spannerの管理画面からGUIでExportした場合も内部ではこのDataflowTemplateが利用されているようです )
Cloud Spanner to Cloud Storage Avroテンプレートのメリット/デメリット
メリット
- ジョブを起動するだけでスナップショットが取れる
- Avroファイルで出力されるため, BigQueryのFederatedQueryでそのまま読み込むことができる.
ドキュメントにある通り, 以下のようなコマンド一つでデータベースのフルスナップショットが取得できます.
gcloud dataflow jobs run [JOB_NAME] \
--gcs-location='gs://dataflow-templates/[VERSION]/Cloud_Spanner_to_GCS_Avro' \
--region=[DATAFLOW_REGION] \
--parameters='instanceId=[YOUR_INSTANCE_ID],databaseId=[YOUR_DATABASE_ID],outputDir=[YOUR_GCS_DIRECTORY]
デメリット
- Exportするテーブル/カラムを絞ることができない
- データベース全体のフルスナップショットしか取れないため, テーブル単位でデータを取得したいケースでは利用できない
- 全件取得しかできない
- 前日分に追加/更新があったデータのみを差分取得したいなどのユースケースではテンプレートにテコ入れが必要
- データ量が多いと本番Spannerインスタンスへのインパクトがなかなかある
- リアルタイム性が要求されるゲームなどではこの影響が問題になる可能性があるため, Exportする時間をピークの時間帯とずらす, DBのBackupを別なSpannerインスタンスへ復元(Restore)し復元されたDBにスナップショットするなどの工夫が必要
差分取得やテーブル/カラムを指定しての取得を行いたい場合は, Dataflow コネクタを利用してフルスクラッチするか, 上記Dataflow Templateをカスタマイズして利用するのが良さそうです.
GCSにExportされたデータ(Avro)をBigQueryへ取り込む
Google提供のバッチテンプレート を利用してSpannerのスナップショットをとった場合, データはAvro形式のファイルとしてGCSにエクスポートされます.
Cloud Storage からの Avro データの読み込み | BigQuery | Google Cloud にあるように, bq load
コマンドなどを用いることで自前でプログラムを用意しなくてもコマンド一つでBigQueryへロードすることはできます.
また, Dataprocをすでに利用している場合は, Bigqueryコネクタを利用してSparkなどからBigQueryへETLするのも良さそうです.
# bqコマンドで AvroファイルをBigQueryへロードする
bq --location=LOCATION load \
--autodetect \
--source_format=AVRO \
DATASET.TABLE \
PATH_TO_SOURCE
Tips. Spark on Dataproc で GCS Avro to BigQuery
Dataprocは, フルマネージドなHadoop & Sparkクラスターです.
今回はBQでETLを完結させる方針だったためDataprocは利用しませんでしたが, Avroの読み込み, BQの書き出しともにConnectorが用意されているため, Dataprocにおいてもpackageをimportするだけで簡単にETLができました.
#!/usr/bin/env python
from pyspark.sql import SparkSession
# Avroファイルを読み込み, BQにExportするサンプル
# usersテーブル(仮)をbqにロードしてみる
spark = SparkSession.builder.master('yarn').appName('test').getOrCreate()
spark.conf.set('temporaryGcsBucket', "temp-bucket-name")
# Extract
# ref: Avro Data Source Guide https://spark.apache.org/docs/latest/sql-data-sources-avro.html
df = spark.read.format("avro") \
.load("gs://bucket-name/path/to/users.avro-00000-of-00001")
# Transform
# データの加工, フィルタリングなど.
# Load
# ref: BQ Connector https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example?hl=ja#pyspark
df.select("user_id", "exp", "level", "registered_at", "updated_at").write.format('bigquery') \
.option('table', 'bq_dataset_name.table_name') \
.save()
gcloud beta dataproc jobs submit pyspark \
--cluster=${CLUSTER} \
--region=${REGION} \
--properties=^#^spark.jars.packages=org.apache.spark:spark-avro_2.12:2.4.5,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta \
avro2bq.py
Tips. BQにおけるISO8601なSTRINGのキャスト
Google提供のバッチテンプレート で Spannerのスナップショットをとった場合, SpannerにおけるTimestamp型のカラムはAvroファイルでは2020-12-25T09:30:00.914032000Z
のようなISO8601のフォーマットに沿ったSTRING型として記述されています. BQにてISO8601なSTRINGをTIMESTAMP型にキャストするためには, PARSE_TIMESTAMP
を用いてSTRINGをTIMESTAMPにキャストできます.
SELECT
created_at, # 2020-12-29T09:32:17.914032000Z
PARSE_TIMESTAMP("%FT%R:%E*SZ", created_at) # 2020-12-29 09:32:17.914032 UTC
FROM
dataset.table
技術選定2. Cloud Logging to BigQuery
そもそも, アプリケーションログをCloud Loggingにためるところから始まりますが, アプリケーションサーバーにGKEを利用しているのであれば, 標準出力でjson stringを吐き出すだけでロギングエージェントがよしなにCloud Loggingまで運んでくれて, 構造化ログとしてQueryまで出せる状態になります.
Cloud Loggingにあるログをエクスポートする手段は, ドキュメント: Overview of logs exports にある通りです.
- Cloud Storage: Cloud Storage バケットに保存される JSON ファイル。
- BigQuery: BigQuery データセットに作成されるテーブル。
- Pub/Sub: Pub/Sub トピックに配信される JSON メッセージ。
2.1. Cloud StorageへSink
Cloud StorageにJSON(NDJSON)としてログを保存した場合も, BQのバッチ読み込みや, 外部データソースとしてそのまま読み込むことができます.
この方法は, 直接BigQueryにSinkする方法と比べてひと手間かかるものの, 1. いかなるログでもGCSに保存されるためログ欠損のリスクがStreamingに比べて低い, 2. バッチでロードできるためStreamingInsertに比べて料金を抑えることができるといったメリットが存在します.
GCSにExportされたログ(NDJSON)を外部データソース経由でBigQueryへ取り込む.
Avro形式のファイルをBigQueryへ取り込む際と同じく, NDJSON形式のファイルも bq load
コマンドや, 外部テーブル経由でBigQueryへ簡単にロードできます.
Cloud Storage からの JSON データの読み込み | BigQuery | Google Cloud
Tips. Apache Beam Python SDK で GCS JSON to BigQuery
前述の通り, bq load
等のコマンドでGCSに保存されたNDJSON形式のファイル(ログ)をBigQueryへロードはできますが, Apache Beamなどを用いて自前でBigQueryへのETL処理を記述することができます.
Apache Beamに関する詳しい説明は割愛させていただきますが, Apache BeamでもBigQueryやGCSへのIOが用意されているためローカルでの実行においても簡単にGCS to BQのETL処理を行うことができます.
Apache Beam Python SDKを用いて, 以下のようなjsonlinesなファイルをそのままBigQuqeryにInsertするサンプルコード
{"insertId":"793a8f21-679b-48c4-9872-99d5f69dbf08","logName":"activity"}
{"insertId":"fa20d1f9-fd07-4ff4-91b8-15c23bb124b6","logName":"activity"}
# Apache Beam を用いた GCS to BigQuqery のETL サンプル
# ref: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py
from __future__ import absolute_import
import argparse
import logging
import json
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io.gcp.internal.clients import BigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(
SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)
lines = p | 'read' >> ReadFromText(known_args.input)
table_spec = BigQuery.TableReference(
projectId='gcp_project_id',
datasetId='bq_dataset_id',
tableId='bq_table_id')
table_schema = {
'fields': [{
'name': 'id', 'type': 'STRING', 'mode': 'REQUIRED'
}, {
'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'
}]
}
output = (
lines
| 'parse' >> beam.Map(lambda line: json.loads(line))
| 'transform' >> beam.Map(lambda line: {'id': line['insertId'], 'name': line['logName']})
)
output | 'write' >> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
$ export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service_account_credential.json
$ pip install "apache-beam[gcp]"
$ python gcs_to_bq.py --input gs://bucket_name/path/to/input.json
2.2. BigQueryへSink
Cloud LoggingではBigQueryに直接ログを送信する事ができますが, Cloud LoggingのSink先としてBigQueryを設定した場合, リアルタイムでログが送信される = BQのStreaming Insertが行われるため, 別途ストリーミング料金*1がかかるというデメリットも存在します.
*1 2020年12月時点, asia-northeast1で $0.012 per 200 MB
. 詳しくは ストリーミング料金 | BigQuery | Google Cloud
2.3. Pub/SubへSink
Pub/SubからBigQueryへデータを流す手段としては, Apache Beamなどでデータを流すプログラムを作る他にも, Pub/Sub Subscription to BigQuery などのGoogle提供Dataflowテンプレートが存在するため, これを活用することで工数をかけずにPub/Sub経由でBQにデータを流すことができます.
また, このテンプレートを利用した場合, BigQueryへデータを読み込む際にJavascriptでUDF(user-defined functions)としてデータを加工処理を挟むことができます.
Using UDFs | Google Cloud Dataflow Template Pipelines
おわりに
GCPでデータ分析基盤を構築する際に調べたこと/試したことをまとめるだけの記事になってしまいましたが, これから同じような技術選定をする際になにか一つでも参考になることがあれば幸いです.