Java
Python
GoogleCloudPlatform
dataflow
WACULDay 1

Google Cloud Dataflow ってなんだ?

More than 1 year has passed since last update.

株式会社WACUL、CTOの包です。 会社のみんなでパンを作ったりカレーを作ったり映画をみたりと楽しい日々を過ごしてます。

ここ最近、Google Cloud Dataflow をいじってみているので、入門したメモとして、整理してみました。
これから触ってみようとする人が最初に読んで、ざっくり概要を理解する助けになればと思います。
ストリーミングやバッチ処理に深い専門知識があるわけではないので、間違ったことを言っていたら教えていただけるとうれしいです。

この記事を読んでわかること

  • Google Cloud Dataflow のだいたいの概要と目指している世界感
  • だいたいのコードの雰囲気
  • 概要を知った上で次に読むべき参考資料

この記事を読んでもわからないこと

  • Dataflow 上でコードを書いていくときの具体的な知見
  • 運用上のの知見

まずは参考資料

これまでに読んだ Google Cloud Dataflow 関連の参考資料です

Google Cloud Dataflow とは?

Cloud Dataflow は、大規模データの処理エンジンと、そのマネージド・サービスです。
大枠では、 Hadoop, Spark とかの仲間だと思ったら良さそうです。
主な特徴は、新しいプログラミングモデルと、フルマネージドな実行環境の提供です。

バッチ処理とストリーミング処理を統合するプログラミングモデル

バッチ処理と、ストリーミング処理の違いは、扱うデータが、有限(bounded)か、無限(unbounded)か、です。
バッチ処理の基盤は、ストリーミング処理基盤に比べて歴史が長く安定しているので、今まで大規模なデータ処理はバッチ処理を基本に設計されてきました。
一方で、ビジネス上の意思決定を速くおこなったり、ユーザーに素早くデータを届ける需要は高まっていて、終わりのないデータを継続的に処理する、ストリーミング処理エンジンへの需要が高まってきました。

そこで、バッチ処理とストリーミング処理を組み合わせて最終的な結果を提供する、ラムダアーキテクチャ などが登場しました。
ただし、ラムダアーキテクチャで構築されたシステムをメンテナンスするのは大変です。違うプログラミングモデルを使いつつ、でロジックレベルでは整合性をとらないといけない、想像するとつらいですね。。。

Cloud Dataflow が提供するプログラミングモデルが目指すのは、ストリーミング処理をベースに、有限のデータの処理と、無限のデータ処理を統合することのようです。すごく雑に言うと、新しく入ってくるデータを正確に処理していくモデルと、現実的な時間でできるエンジンがあるのならば、過去のデータを全部再生して入力したら、同じ結果になるからそれでいいよね!ということです。

既存のストリーミングエンジンの問題は、主に時間の概念の扱いの難しさにより、データの完全性をコントロールすることが困難な点にありました。
時間の概念の扱いの難しさ、というのは、実際に処理するときの実行時時間と、本当にイベントが起こった時間が違う場合が現実では多いことに起因します。ネットワークや処理の遅延や、極端な例だとモバイルアプリのログを、オンラインになったタイミングでサーバーに送信するなど。
これに対応するために、バッファリングの処理だったり、実際の到着順とは違うデータをうまく処理する必要があります。Cloud Dataflowは、ここを幾つかの新しい概念を登場させることで、うまく処理できるようにしているようです。(完全に理解できてないので、参考資料を見てください https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 , https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102 )

このプログラミングモデルは、もともとは、GCP上で動作する専用のものでしたが、 Apache Beam プロジェクトとしてGoogleによって2016年5月ごろにオープンソースになっています(ちなみに Beam は、BatchとStream の掛け合わせ)。

フルマネージドな実行エンジン

後述する、パイプラインの各ステップを最適化して効率よく分散処理するApache Beamの実行エンジンとして、Google Cloud Dataflow, Apache Spark, Apache Flink, Apache Apex などが選べます(Cloud Dataflow 以外は未検証)
オンプレミスだと、 Flink がいちばんいけてるみたいです。

なので正確には Cloud Dataflow は、フルマネージドなサービスとして、GCP上で動作する Apache Beam の実行エンジン、という位置づけです。動かすと裏でGCEインスタンスが立ち上がります。
インフラのメンテナンスコストなどを考えると、特にスタートアップにとっては魅力的な選択肢になりうると思います。

Java or Python

Cloud Dataflow の、実装言語としては、 Java か Python が選べます。
ただし 2016年11月時点では、Python 版はベータでいくつかできないことがあります

  • ストリーミング処理できない (!)
  • Cloud Datastore にアクセスできない

など。これから充実していくことでしょう。

コードを見てみよう

Java のコードを見てみます。
サンプルコードは、Googleのドキュメント からリンクされている githubリポジトリ にまとまっています。

Java8 版の、入力内の単語をカウントする WordCount を例にみてみます。

main 部分のみ抜粋
(CHANGE のところは、自分のGCP上のプロジェクトIDや、GCSのバケット名を入力する必要があります)

public static void main(String[] args) {
    DataflowPipelineOptions options = PipelineOptionsFactory.create()
        .as(DataflowPipelineOptions.class);

    options.setRunner(BlockingDataflowPipelineRunner.class);

    // CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud.
    options.setProject("SET_YOUR_PROJECT_ID_HERE");

    // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files.
    options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
     .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
         .withOutputType(new TypeDescriptor<String>() {}))
     .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
     .apply(Count.<String>perElement())
     .apply(MapElements
         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
         .withOutputType(new TypeDescriptor<String>() {}))

     // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to.
     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));

    p.run();
}

Pipeline

コードの雰囲気的に、Dataflow は、値を処理するのではなくて、処理の枠組みを定義していく関数型っぽいスタイルだというのがわかると思います。 Rx(Reactive Extension) とか、 TensorFlow とかに似てますね。

パイプラインは、処理の流れを司るオブジェクトで、Pipeline.create にオプションを渡して作成します。
パイプラインの実行に必要な設定をオプションで指定します。

バッチモードと、ストリーミングモードの切り替えは

options.setRunner(BlockingDataflowPipelineRunner.class);

の部分を

options.setRunner(DataflowPipelineRunner.class);

にします。つまり両方とも Pipeline オブジェクト上に構築するフローで処理することになります。

PCollection, PTransform

Pipelineのapplyに渡しているのは、 PTransform オブジェクトです。FlatMapElements.via や、 Filter.byPredicate でjava8のラムダ式と一緒に生成されています。

PTransform は、 PInput を受け取って、 POutput を返す処理を定義するインターフェイスです。
この PTransform に対して、パイプラインがよしなに、次の値を渡して処理を実行してくれます。
始点の処理は、 PInputインターフェイス, 終点の処理は POutputインターフェイスになりますが、中間の処理は、その両方を実装した、 PCollection クラスのインスタンスを扱うことになります。
PCollection こそがパイプラインに渡ってくるデータを表すオブジェクトで、これに対して、データの変換や分岐、結合処理をPipeline 上に構築していく形になっています。

例:

// 前段の文字列のコレクションを、単語に分割する (配列にしたものを最後にフラットにする)
input.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
         .withOutputType(new TypeDescriptor<String>() {})) ...

// 前段の文字列のコレクションが空でないものだけにフィルタする
input.apply(Filter.byPredicate((String word) -> !word.isEmpty())) ...

// 前段の文字列のコレクションを集計して、値ごとのカウントのマップのコレクションに変換する
input.apply(Count.<String>perElement()) ...

入出力

現在対応している入出力は、

  • ファイル(ローカルモード)
  • BigQuery
  • Bigtable
  • Google Cloud Storage
  • Google Datastore
  • Google Cloud Pubsub (ストリーム,もしくは固定数のバッチ) 現在 java のみ
  • Avro ファイル

です、 (python 版は、テキストファイルと、BigQuery のみ)

入出力も、PTransform の一種なので、apply に渡して処理します

例:

// Google Cloud Storage からテキストファイルを読み込んで、行毎の文字列のコレクションに変換する
pipeline.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) ...

// Cloud Pubsub からのストリーム入力を、タイムスタンプ付きのコレクションに変換する
pipeline.apply(PubsubIO.Read
          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
          .subscription(options.getPubsubSubscription())) ...

// コレクションを受け取って、BigQuery に出力する
// tableRef はテーブルの場所などの情報、schema テーブルのスキーマ情報
... .apply(BigQueryIO.Write.to(tableRef)
            .withSchema(schema)

実行してみよう

サンプルコードに、自分のプロジェクト、ストレージの情報をいれて実行すると、Cloud Dataflow の管理画面から、実行状態をモニタリングできます。

Cloud_Dataflow_-_enoshima_and_Grafana_-_Mesos_Tasks.png

Pipeline に定義された処理が、クラウド上で、処理されました。
裏では、 Compute Engine のインスタンスが立ち上がって処理されます。
このサンプルの出力は自分で作ったCloud Storage のバケット上に、ファイルが複数に分割されて出力されます。

ちなみにこのサンプルの入力のテキストファイルは、

GLOUCESTER      A poor unfortunate beggar.

EDGAR   As I stood here below, methought his eyes
        Were two full moons; he had a thousand noses,
        Horns whelk'd and waved like the enridged sea:
        It was some fiend; therefore, thou happy father,
        Think that the clearest gods, who make them honours
        Of men's impossibilities, have preserved thee.

GLOUCESTER      I do remember now: henceforth I'll bear
        Affliction till it do cry out itself
        'Enough, enough,' and die. That thing you speak of,
        I took it for a man; often 'twould say
        'The fiend, the fiend:' he led me to that place.

みたいな感じで、シェイクスピアの台本ですね。
出力された、単語別のカウントは

decreased: 1
'shall': 2
War: 4
empress': 14
Provost: 99
stoops: 6

のような形式のファイルになっています。

感想

シンプルなサンプルを動かしてみて、 Google Cloud Dataflow のざっくりとした雰囲気を理解しました。
感じたこととしては、

  • 概念としてはとてもきれいに見える。特にこれからストリーミング処理に手を出したいと思っている人にはいいスタート地点になりそう
  • やりたい処理を、パイプラインの形にするには、慣れが必要そう。パラダイムを変えて考える必要がある。
  • 流行るかどうかわからないけど、 Google が結構力を入れているように見えるので、すぐには廃れなそう
  • ドキュメントがまだ少ない!少し複雑なことやろうとした時に、苦戦しそうな情報量

わかっていないこと / これから試したいこと

  • 参考資料の、 The world beyond batch: Streaming 101, 102 で解説している、 Watermark, Trigger らへんの概念についての理解がまだたりていない
    • 本当はここがキモっぽいので、ちゃんと理解して次に進みたい。
  • 実際にWebの行動ログを処理するなどしてみたい。特にログをセッション単位に分割する、などはかなりきれいに書けそう。
  • コスト感覚。どのくらいの処理量で、他の選択肢と比べてコストがどうなるのか、予測がついいていない