LoginSignup
5

More than 1 year has passed since last update.

posted at

updated at

GCPで0からデータ分析基盤を構築する

この記事は ミクシィグループ Advent Calendar 2020 の 17日目の記事です.

はじめに

GCPでゲーム分析基盤を0から構築する機会があったので, 技術選定どうしたかや直面した問題と対処法についてお話したいなと思います.
これからGCPでデータ分析基盤を構築するエンジニアや私と同じようにデータエンジニアに入門したての方のお役に立てれば幸いです.

GCPでデータ分析基盤を構築する

単にGCPでデータ分析基盤を構築すると言っても扱うデータの種類やドメインによって最適解は変わってくるのかなと思います. この記事では, 以下のやりたいことを実現するために採用した技術スタックとTipsを紹介します。

やりたいこと(要件)

  • デイリーでデータベース(Cloud Spanner)のスナップショットを取得する
  • 上記スナップショットとGKEなどから出力されたアプリケーションログ, リクエストログをBigQueryに取り込み分析可能な状態にする

採用した技術スタック

やりたいことを実現するために採用した技術スタックは以下の通りです

etl.png

本番サーバーから出力されたアプリケーションログや本番DB(Spanner)から取得したスナップショットはすべてGoogle Cloud Storageに保存しています. これをワークフローエンジンであるAirflowが依存関係に基づいてデイリーでBigQueryに取り込むというアーキテクチャを採用しました.

技術選定どうしたか

技術選定する際の方針として, BigQueryでETLを完結させることを重視して上記のようなアーキテクチャになりました.
GCPでETLに利用できるサービスとしては, Dataflow (Apache Beam)や, Dataproc(Hive, Spark等), GUIでワークフローを構築できるDataFusionなどが存在しますが, BQでETLできるような構造化されたデータであれば, BQでETLを完結させたほうが実装コストを抑えることができると考えたため, Airflowでは依存関係の制御に徹底し生データの加工処理はすべてBigQueryで行うようにしています.

技術選定1. Spanner to BigQuery

SpannerでのスナップショットにはDataflowを利用しました.
Dataflowの場合Spannerのスナップショットを取るためのGoogle提供のバッチテンプレート(Cloud Spanner to Cloud Storage Avro)が存在するためノーコーディングでGCSまでデータをExportすることができます.
( なお, Spannerの管理画面からGUIでExportした場合も内部ではこのDataflowTemplateが利用されているようです )

Cloud Spanner to Cloud Storage Avroテンプレートのメリット/デメリット
メリット
  • ジョブを起動するだけでスナップショットが取れる
  • Avroファイルで出力されるため, BigQueryのFederatedQueryでそのまま読み込むことができる.

ドキュメントにある通り, 以下のようなコマンド一つでデータベースのフルスナップショットが取得できます.

gcloud dataflow jobs run [JOB_NAME] \
    --gcs-location='gs://dataflow-templates/[VERSION]/Cloud_Spanner_to_GCS_Avro' \
    --region=[DATAFLOW_REGION] \
    --parameters='instanceId=[YOUR_INSTANCE_ID],databaseId=[YOUR_DATABASE_ID],outputDir=[YOUR_GCS_DIRECTORY]
デメリット
  • Exportするテーブル/カラムを絞ることができない
    • データベース全体のフルスナップショットしか取れないため, テーブル単位でデータを取得したいケースでは利用できない
  • 全件取得しかできない
    • 前日分に追加/更新があったデータのみを差分取得したいなどのユースケースではテンプレートにテコ入れが必要 
  • データ量が多いと本番Spannerインスタンスへのインパクトがなかなかある
    • リアルタイム性が要求されるゲームなどではこの影響が問題になる可能性があるため, Exportする時間をピークの時間帯とずらす, DBのBackupを別なSpannerインスタンスへ復元(Restore)し復元されたDBにスナップショットするなどの工夫が必要

差分取得やテーブル/カラムを指定しての取得を行いたい場合は, Dataflow コネクタを利用してフルスクラッチするか, 上記Dataflow Templateをカスタマイズして利用するのが良さそうです.

GCSにExportされたデータ(Avro)をBigQueryへ取り込む

Google提供のバッチテンプレート を利用してSpannerのスナップショットをとった場合, データはAvro形式のファイルとしてGCSにエクスポートされます.

Cloud Storage からの Avro データの読み込み  |  BigQuery  |  Google Cloud にあるように, bq loadコマンドなどを用いることで自前でプログラムを用意しなくてもコマンド一つでBigQueryへロードすることはできます.

また, Dataprocをすでに利用している場合は, Bigqueryコネクタを利用してSparkなどからBigQueryへETLするのも良さそうです.

# bqコマンドで AvroファイルをBigQueryへロードする
bq --location=LOCATION load \
--autodetect \
--source_format=AVRO \
DATASET.TABLE \
PATH_TO_SOURCE
Tips. Spark on Dataproc で GCS Avro to BigQuery

Dataprocは, フルマネージドなHadoop & Sparkクラスターです.
今回はBQでETLを完結させる方針だったためDataprocは利用しませんでしたが, Avroの読み込み, BQの書き出しともにConnectorが用意されているため, Dataprocにおいてもpackageをimportするだけで簡単にETLができました.

avro2bq.py
#!/usr/bin/env python

from pyspark.sql import SparkSession

# Avroファイルを読み込み, BQにExportするサンプル
# usersテーブル(仮)をbqにロードしてみる
spark = SparkSession.builder.master('yarn').appName('test').getOrCreate()
spark.conf.set('temporaryGcsBucket', "temp-bucket-name")

# Extract
# ref: Avro Data Source Guide https://spark.apache.org/docs/latest/sql-data-sources-avro.html
df = spark.read.format("avro") \
    .load("gs://bucket-name/path/to/users.avro-00000-of-00001")

# Transform
# データの加工, フィルタリングなど. 

# Load
# ref: BQ Connector https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example?hl=ja#pyspark
df.select("user_id", "exp", "level", "registered_at", "updated_at").write.format('bigquery') \
    .option('table', 'bq_dataset_name.table_name') \
    .save()
gcloud beta dataproc jobs submit pyspark \
--cluster=${CLUSTER} \
--region=${REGION} \
--properties=^#^spark.jars.packages=org.apache.spark:spark-avro_2.12:2.4.5,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta \
avro2bq.py
Tips. BQにおけるISO8601なSTRINGのキャスト

Google提供のバッチテンプレート で Spannerのスナップショットをとった場合, SpannerにおけるTimestamp型のカラムはAvroファイルでは2020-12-25T09:30:00.914032000Z のようなISO8601のフォーマットに沿ったSTRING型として記述されています. BQにてISO8601なSTRINGをTIMESTAMP型にキャストするためには, PARSE_TIMESTAMP を用いてSTRINGをTIMESTAMPにキャストできます.

SELECT
    created_at, # 2020-12-29T09:32:17.914032000Z
    PARSE_TIMESTAMP("%FT%R:%E*SZ", created_at) # 2020-12-29 09:32:17.914032 UTC
FROM
    dataset.table

技術選定2. Cloud Logging to BigQuery

そもそも, アプリケーションログをCloud Loggingにためるところから始まりますが, アプリケーションサーバーにGKEを利用しているのであれば, 標準出力でjson stringを吐き出すだけでロギングエージェントがよしなにCloud Loggingまで運んでくれて, 構造化ログとしてQueryまで出せる状態になります.

Cloud Loggingにあるログをエクスポートする手段は, ドキュメント: Overview of logs exports にある通りです.

  1. Cloud Storage: Cloud Storage バケットに保存される JSON ファイル。
  2. BigQuery: BigQuery データセットに作成されるテーブル。
  3. Pub/Sub: Pub/Sub トピックに配信される JSON メッセージ。
2.1. Cloud StorageへSink

Cloud StorageにJSON(NDJSON)としてログを保存した場合も, BQのバッチ読み込みや, 外部データソースとしてそのまま読み込むことができます.

この方法は, 直接BigQueryにSinkする方法と比べてひと手間かかるものの, 1. いかなるログでもGCSに保存されるためログ欠損のリスクがStreamingに比べて低い, 2. バッチでロードできるためStreamingInsertに比べて料金を抑えることができるといったメリットが存在します.

GCSにExportされたログ(NDJSON)を外部データソース経由でBigQueryへ取り込む.

Avro形式のファイルをBigQueryへ取り込む際と同じく, NDJSON形式のファイルも bq loadコマンドや, 外部テーブル経由でBigQueryへ簡単にロードできます.

Cloud Storage からの JSON データの読み込み  |  BigQuery  |  Google Cloud

Tips. Apache Beam Python SDK で GCS JSON to BigQuery

前述の通り, bq load 等のコマンドでGCSに保存されたNDJSON形式のファイル(ログ)をBigQueryへロードはできますが, Apache Beamなどを用いて自前でBigQueryへのETL処理を記述することができます.
Apache Beamに関する詳しい説明は割愛させていただきますが, Apache BeamでもBigQueryやGCSへのIOが用意されているためローカルでの実行においても簡単にGCS to BQのETL処理を行うことができます.

Apache Beam Python SDKを用いて, 以下のようなjsonlinesなファイルをそのままBigQuqeryにInsertするサンプルコード

input.json
{"insertId":"793a8f21-679b-48c4-9872-99d5f69dbf08","logName":"activity"}
{"insertId":"fa20d1f9-fd07-4ff4-91b8-15c23bb124b6","logName":"activity"}
gcs_to_bq.py
# Apache Beam を用いた GCS to BigQuqery のETL サンプル
# ref: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py

from __future__ import absolute_import

import argparse
import logging
import json
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io.gcp.internal.clients import BigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        help='Input file to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(
        SetupOptions).save_main_session = save_main_session
    p = beam.Pipeline(options=pipeline_options)

    lines = p | 'read' >> ReadFromText(known_args.input)

    table_spec = BigQuery.TableReference(
        projectId='gcp_project_id',
        datasetId='bq_dataset_id',
        tableId='bq_table_id')

    table_schema = {
        'fields': [{
            'name': 'id', 'type': 'STRING', 'mode': 'REQUIRED'
        }, {
            'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'
        }]
    }

    output = (
        lines
        | 'parse' >> beam.Map(lambda line: json.loads(line))
        | 'transform' >> beam.Map(lambda line: {'id': line['insertId'], 'name': line['logName']})
    )

    output | 'write' >> beam.io.WriteToBigQuery(
        table_spec,
        schema=table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
$ export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service_account_credential.json
$ pip install "apache-beam[gcp]"
$ python gcs_to_bq.py --input gs://bucket_name/path/to/input.json
2.2. BigQueryへSink

Cloud LoggingではBigQueryに直接ログを送信する事ができますが, Cloud LoggingのSink先としてBigQueryを設定した場合, リアルタイムでログが送信される = BQのStreaming Insertが行われるため, 別途ストリーミング料金*1がかかるというデメリットも存在します.
*1 2020年12月時点, asia-northeast1で $0.012 per 200 MB. 詳しくは ストリーミング料金  |  BigQuery  |  Google Cloud

2.3. Pub/SubへSink

Pub/SubからBigQueryへデータを流す手段としては, Apache Beamなどでデータを流すプログラムを作る他にも, Pub/Sub Subscription to BigQuery などのGoogle提供Dataflowテンプレートが存在するため, これを活用することで工数をかけずにPub/Sub経由でBQにデータを流すことができます.
また, このテンプレートを利用した場合, BigQueryへデータを読み込む際にJavascriptでUDF(user-defined functions)としてデータを加工処理を挟むことができます.
Using UDFs | Google Cloud Dataflow Template Pipelines

おわりに

GCPでデータ分析基盤を構築する際に調べたこと/試したことをまとめるだけの記事になってしまいましたが, これから同じような技術選定をする際になにか一つでも参考になることがあれば幸いです.

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
5