Help us understand the problem. What is going on with this article?

Cloud Dataflow + BigQueryによる大規模データのバッチ処理

はじめに

BigQueryのテーブルデータを加工して外部のストレージに置いてほしい、そんなことを言われたことはないでしょうか? そんなときにCloud Dataflowは大変便利です。本記事では定期的に行われるバッチ処理に対してCloud Dataflowを使った所感について書きたいと思います。

対象の読者

  • Cloud Composerを使うほどでもなく、予算も少ないけどCloud Dataflowを使いたいときの構成を知りたい人
  • BigQuery上のTBを越えるテーブルデータの加工をフルマネージドにしたい人

対象でない読者

  • Cloud Composerを使ったイケてる構成を知りたい人
  • Cloud Dataflowのworkerを高効率に使うノウハウを求めてる人

本記事で使用する主なサービス

  • Terraform
  • Github
  • CircleCI
  • Cloud Storage
  • Cloud Source Repository
  • Cloud Build
  • Cloud Scheduler
  • Cloud Pub/Sub
  • Cloud Functions
  • Cloud Dataflow
  • S3

構成図

Untitled Diagram.png

大規模データのバッチ処理

動機

BigQuery上にあるデータを別のデータソースや分析基盤にも必要な分だけ加工して欲しいといった要望をされることがあります。データ量がTBを越えるとデータ加工用のプロジェクトを作るのも大変で、なるべくGCPのサービスを活用していい感じにしたいと思います。

BigQuery

BigQueryとはGCPで提供されているデータウェアハウスです。他のデータウェアハウスに比べて使い勝手が良いところが多いので、BigQuery以外を選択することは考えづらいです。GCPはAWSに比べてデータウェアハウスにデータを流し込む手段が多いですが、BigQueryから外部のストレージへデータを出す手段はそれほど多くないのが現状です。

Cloud Dataflow

Cloud DataflowとはGCP上でフルマネージドなApache Beamを動かすサービスです。Apache BeamなのでJavaやPythonでパイプラインの処理を記述することができ、バックエンドではGCEのリソースを適当に割り当てて分散処理を行ってくれます。BigQueryに対してデータの入出力を細かく制御できる手段の一つです。

Cloud DataflowをJavaから使う

Cloud DataflowはJava/Pythonに対応しています。Pythonから使ったことがないため正しくない可能性がありますが、Pythonの対応はJavaの後追いのため、Apache Beamの機能を完全には使えないと経験者から聞きました。個人的にScalaで書きたい気持ちもありJavaを利用しています。ちなみにScalaだとScioというSpotifyが開発しているOSSのwrapperがあります。

Javaを使う場合、プロジェクト管理ツールであるApache Mavenを使用します。

テンプレートの作成

Cloud Dataflowのテンプレートを作成しておくことで、テンプレートからjobをkickすることができます。Cloud Dataflowのチュートリアルでは以下のようにWordCountのjavaのプロジェクトをコンパイルし実行しています

$ mvn -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=<PROJECT_ID> \
      --stagingLocation=gs://<STORAGE_BUCKET>/staging/ \
      --output=gs://<STORAGE_BUCKET>/output \
      --runner=DataflowRunner"

compile時に--templateLocationを引数として指定することで実行ではなく、テンプレートの作成のみ行ないます。

次に、日々の定期実行時テンプレートに動的にパラメータを与えたいと思います。Apache BeamではValueProviderを利用することにより、パイプラインでランタイムのパラメータを受け入れることができます。S3OptionsPipelineOptionsを継承しているため、get<パラメータ名>set<パラメータ名>からパイプラインのオプションを追加します。

BigQueryToGCS.java
public interface BigQueryToGCSOptions extends S3Options {
    ValueProvider<String> getAWSAccessKey();
    void setAWSAccessKey(ValueProvider<String> value);

    ValueProvider<String> getAWSSecretKey();
    void setAWSSecretKey(ValueProvider<String> value);

    ValueProvider<String> getDataset();
    void setDataset(ValueProvider<String> value);

    ValueProvider<String> getOutput();
    void setOutput(ValueProvider<String> value);
}

S3OptionsAwsOptionsも継承しているため、setAwsCredentialsProviderからAWSのCredentialsの設定をします。

    BigQueryToGCSOptions options = PipelineOptionsFactory.fromArgs(args)
                                                         .withValidation()
                                                         .as(BigQueryToGCSOptions.class);

    options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
            options.getAWSAccessKey().get(), options.getAWSSecretKey().get())));

このValueProviderというのがわかりづらいのですが、Cloud Dataflowのdocを見ると三種類あることがわかります。

  • RuntimeValueProvider
  • StaticValueProvider
  • NestedValueProvider

RuntimeValueProvider

RuntimeValueProviderValueProviderのデフォルトのタイプです。パイプラインの実行中にのみ使用可能な値をパイプラインで受け入れることができます。WordCountの例では、.from(options.getInputFile())で動的に読み取るファイルを指定し、.from(options.getOutputFile())で動的に出力先を指定しています

p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
    .apply(new CountWords())
    .apply(MapElements.via(new FormatAsTextFn()))
    .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));

StaticValueProvider

StaticValueProviderはパイプラインで静的な値を指定することができます。

.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

NestedValueProvider

NestedValueProviderValueProviderをラップしたもので、ラップされたValueProviderのタイプによってパイプラインの作成中に値にアクセスできるかどうかが決定します。

例えばパイプラインの実行時に動的にクエリを生成するとき、以下のようにNestedValueProvider内で動的に渡したパラメータからクエリを生成します。

pipeline.apply("Read from BigQuery", BigQueryIO.readTableRows()
        .fromQuery(
            NestedValueProvider.of(options.getDataset(), new SerializableFunction<String,String>() {
                @Override
                public String apply(String dataset) {
                    String sql = "SELECT hoge, fuga FROM <project>." + dataset;
                    return sql;
                }
        }))
        .usingStandardSql()
        .withTemplateCompatibility()
        .withoutValidation())

これはPythonユーザーから聞いたのですが、PythonではNestedValueProviderというタイプはなく、上記のように動的にパラメータを使用することが可能だそうです。

S3への出力

出力先の指定ですが、S3の場合はあらかじめCredentialをセットしてTextIO.write().to()にS3のパスをしてあげるだけで出力できます。便利です。

.apply(TextIO.write().withHeader(header).to(<S3のパス>));

BigQuery

Cloud Dataflowで行うデータ処理はなるべく減らしたいので、可能な限りBigQueryを叩くクエリを工夫してデータをあらかじめ加工しておきます。

このときBigQueryのテーブルのスキーマに多重配列が含まれていると厄介で、例えば二重配列ならば以下のように展開します。UNNEST時に展開されるカラム名と元のカラム名が被っている場合はColumn name xx is ambiguous at []とエラーが出るのでUNNESTする前に別名のテーブルソースを追加しておきます。

SELECT 
    orig_hogehoge.hogehuga,
    (SELECT hogehogehoge FROM UNNEST(h.hogehoge)) AS hoge3
    (SELECT hugahugahuga FROM UNNEST(h.hugahuga)) AS huga3
FROM 
    table,
    <table>.hogehoge AS orig_hogehoge,
    UNNEST(hoge) AS h

上記の手法ではBigQueryIO.readTableRows().fromQuery()を利用してBigQueryで直接クエリの実行をしています。注意しなければならない点はCloud Dataflowのworker(GCEのリソース)を起動させながらBigQueryのクエリ(slot)を消費している点です。

どの手段を使うかはケースバイケースになるので、各々で判断するしかなさそうです。別の手段として以下の二つが考えられます。

BigQueryからデータを取得する別の方法①

BigQueryIO.readTableRows().from(TableReference) を指定します。BigQueryのテーブルデータを一度GCSにエクスポートしたものをCloud Dataflowから読み取ります。クエリを叩かないためslotを消費することはありませんが、エクスポートを待つ必要があります。

参考
https://beam.apache.org/documentation/io/built-in/google-bigquery/#reading-from-a-table

BigQueryからデータを取得する別の方法②

BigQueryIO.readTableRows().from(TableReference).withMethod(Method.DIRECT_READ) を指定します。こちらは①とは異なり、BigQuery Storage APIを使用して直接BigQueryのデータを読み取ります。また、クエリを叩く場合と比べてスキャン範囲あたり以下の値段のメリットがあります。

Queries(on-demand) BigQuery API
$5.00/TB $1.10/TB

参考
https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-api

一方でクエリは毎月1TBまで無料なので、使用用途によって使い分けるのが良さそうです。

参考
https://cloud.google.com/bigquery/pricing

定期実行の仕組み

Cloud DataflowのjobをkickするためCloud FunctionからAPIを叩きます。Cloud Composerからワークフローとしてスケジューリングすることもできますが、GKEを立てる必要があるためお金がかかります。

requirement.txt
oauth2client
google-api-python-client
main.py
from googleapiclient.discovery import build

def invoke_cloud_dataflow(event, context):

    project = <project>
    job = <job>
    template = <templateが保存されているGCSのパス>
    parameters = {
        'dataset': <Dataset>,
        'output': <出力先のパス>,
    }

    service = build(
        'dataflow', 'v1b3',
        cache_discovery=False)
    request = service.projects().templates().launch(
        projectId=project,
        gcsPath=template,
        body={
            'jobName': job,
            'parameters': parameters,
        }
    )

    response = request.execute()

CI/CD

CircleCIではテストのみ行い、Cloud Buildでテンプレートの生成とプロジェクトのデプロイを行います。mavenからCloud Dataflowのテンプレートを生成するのと、Terraformによるプロジェクトのデプロイを行います。

Cloud Buildの環境変数には以下を指定します。

Name Value
_AWS_ACCESS_KEY 作成したAWSのアクセスキー
_AWS_SECRET_KEY 作成したAWSのシークレットアクセスキー
_TERRAFORM_BUCKET_NAME Terraformプロジェクトのtfstateを保存するS3バケット名
_TERRAFORM_WORKSPACE Terraformプロジェクトのworkspace
cloudbuild.yaml
steps:
- name: 'gcr.io/cloud-builders/mvn'
  entrypoint: 'sh'
  args:
  - '-c'
  - |
      cd dataflow-bigquerytos3
      mvn compile exec:java -Dexec.mainClass=<Javaのmainクラス> -Dexec.args="--project=<GCPのプロジェクト名> --runner=DataflowRunner --stagingLocation=<GCSのstaging用のパス> --templateLocation=<GCSのテンプレートを保存するパス> --awsRegion=<AWSのRegion> --AWSAccessKey=${_AWS_ACCESS_KEY} --AWSSecretKey=${_AWS_SECRET_KEY} " -Pdataflow-runner

- name: 'hashicorp/terraform:0.12.6'
  entrypoint: 'sh'
  args:
  - '-c'
  - |
      terraform init -backend-config "bucket=${_TERRAFORM_BUCKET_NAME}"
      terraform workspace new ${_TERRAFORM_WORKSPACE} 2>/dev/null || terraform workspace select ${_TERRAFORM_WORKSPACE}
      terraform validate
      terraform apply -auto-approve

補足

Q. データの加工はいらない、外部ストレージにデータをとにかく出したい

おとなしくbq extractでGCSに圧縮+出力して、gsutil rsyncでファイルを同期しましょう。同じリージョンであれば、bq extractではGCSへの保存料金とgsutil rsyncでのファイル転送にかかる通信費だけで済みます。

bq extract --destination_format=<ファイルフォーマット> --compression=<圧縮形式> <出力元> <出力先>
gsutil -m rsync -rd <同期元> <同期先>

BigQueryからGCSへのデータのエクスポート
https://cloud.google.com/bigquery/docs/exporting-data?hl=ja

GCSの料金
https://cloud.google.com/storage/pricing#storage-pricing

Q. お会計が心配で夜も眠れません

Cloud Dataflowのworker数をうまく操る方法は自分もスキームが出来上がってないので説明ができません...ですが、まず少ないデータから試してみてお会計がいくらになるか計算したらいいと思います。自分が試した事例では、データ量に対して請求額がだいたい線形になったので、そこから全体の金額を見積もりました。worker数は必要最大数を取りに行くのか実行時間が長くなると多くとりがちな気がしています。

お会計で気になるのがデータの圧縮です。Cloud DataflowというかApache Beamでは以下のようにメソッドチェインを追加するだけで圧縮することが可能です。

// Before
.apply(TextIO.write().withHeader(header).to(options.getOutput()));

// After
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO.write().withWindowedWrites().withHeader(header).to(options.getOutput()).withCompression(Compression.BZIP2));

データ転送量が減れば転送料金が減りますが、Cloud Dataflowでは圧縮している間も最大限確保されたGCEの料金が発生しているので、時間がかかっている割に節約になってなかったり、圧縮するほうがお金がかかったりします。このあたりは実際に動かしてみたり、要件を確認して適当な手段を選択する方がよさそうです。

感想

長くなってしまいましたが、読んでいただきありがとうございました。使い始めたときはCloud Dataflowを運用した知見ブログが全然見つからなくて、どう使っていったらいいのか手探り状態でした。フルマネージドな分散処理はworkerを勝手にスケールしてくれたり便利だなーと思う一方、処理が冗長になるとworker用のGCEを大量に確保されて請求が跳ねるのでハラハラします。今後はScalaで書き直したり、バッチ処理ではなくCloud Composerを使ってPub/Subからストリームで届くデータをリアルタイムに加工->BigQueryに突っ込んで分析->意思決定、みたいなかっこいい構成をやってみたいです。

参考文献

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした