Help us understand the problem. What is going on with this article?

Spotify Scio - Google Dataflow の Scala言語版

More than 3 years have passed since last update.

Google GCP(Google版AWS)のDataflowサービスを使ってみました。
Dataflowとは、その名の通り、膨大なデータをGoogleご自慢の分散環境を使用して並列で爆速処理してくれる、ビッグクエリーと同じGCPのマネージドサービスの1つです。

使用できるオフィシャルなプログラミング言語は、JavaPythonのみですが、
Spotifyから、ScioというScalaのライブラリが提供されていますので今回使用してみました。

Dataflowでは、パイプラインを作成し、入力、変換1、変換2...変換N、出力の3ステップで構成します。関数型言語のScalaは、変換ロジックをラムダ式でコンパクトに記述できるため、Javaよりもコード量が少なくて済みます。

Spotifyでは、KafkaからDataflowへ移行する際に、Scalaで記述できるようにScioライブラリを開発したようです。1秒間に200万のデータを処理できるようです。
https://www.youtube.com/watch?v=4wDwVgODyAg&t=9s

Dataflow自体は、GoogleからApache Beamとして移管されるようです。

Scioのインストール

Scioにはお手軽に試せるScio REPLという対話型で実行できるツールがあります。

jarファイルを1つダウンロードするだけです。

GCPのクラウド環境にプロジェクトを前もって作成する必要があります。(ここでは説明しません)

$ wget https://github.com/spotify/scio/releases/download/v0.2.2/scio-repl-0.2.2.jar

$ java -jar scio-repl-0.2.2.jar

Welcome to
                 _____
    ________________(_)_____
    __  ___/  ___/_  /_  __ \
    _(__  )/ /__ _  / / /_/ /
    /____/ \___/ /_/  \____/   version 0.2.2

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)

Type in expressions to have them evaluated.
Type :help for more information.

[main] INFO com.google.cloud.dataflow.sdk.options.GcpOptions$DefaultProjectFactory - Inferred default GCP project 'futa-dev' from gcloud. If this is the incorrect project, please cancel this Pipeline and specify the command-line argument --project.
Using 'futa-dev' as your BigQuery project.
BigQuery client available as 'bq'
Scio context available as 'sc'

scio>

データフローを動かしてみる

クラウド上で実行する前に、ローカルのMac上でテストしてみましょう。

プレーンテキストをインプットにした例

単語の出現頻度をカウントする処理ロジックを組んでみます。
入力ファイルを作成します。(適当に単語を並べます。)

input.txt
abc def ghi
def ghi
jkl abc abc

Scio REPLで以下のステートメントを入力します。(scio>プロンプトのところ)

plain-text
scio> val c1 = sc.textFile("input.txt")
c: com.spotify.scio.values.SCollection[String] = com.spotify.scio.values.SCollectionImpl@77ad2767

scio> val c2 = c1.flatMap(_.split("[^a-zA-Z']+")).countByValue.map(_.toString)
c2: com.spotify.scio.values.SCollection[String] = com.spotify.scio.values.SCollectionImpl@9e4388d

scio> c2.saveAsTextFile("output1")
res1: scala.concurrent.Future[com.spotify.scio.io.Tap[String]] = List()

scio> sc close

出力されたファイルを見てみましょう。単語ごとに出現回数がカウントされ、タプルで出力されてます。

output1
(ghi,2)
(jkl,1)
(def,2)
(abc,3)

入力ファイル(input.txt)は、SCollectionというコレクションに、1要素=1行でセットされます。(val c1)

c1.flatMap(_.split())は、SCollection内の全行を単語単位に分割し、新しいSCollectionを返します。

.coutByValueは、同じ単語ごとにGroupByし、(単語, 出現回数)のタプルのSCollectionを返します。
((ghi, 2), (jkl, 2), (def, 2), (abc, 3))

.saveAsTextFile("output1")で、output1ディレクトリに結果をファイル出力します。

sc closeを実行すると、実際に変換処理が実行されます。

ワンライナーで書くこともできます。

one-liner
scio> sc.textFile("README.md").flatMap(_.split("[^a-zA-Z']+")).countByValue.map(_.toString).saveAsTextFile("output2")

scio> sc close

Scalaでは、メソッドのドットや括弧を省略できます。sc closeは、sc.close()と同じです。

Google Storage (Google版 S3)からインプットにした例

Googleがサンプルとして提供している、Google Storageのシェイクスピアのファイルを入力ファイルとして使用した例です。

GCS
scio> :newScio
Scio context available as 'sc'

scio> val c1 = sc.textFile("gs://dataflow-samples/shakespeare/*")
scio> val c2 = c1.flatMap(_.split("[^a-zA-Z']+")).countByValue.map(_.toString)
scio> c2.saveAsTextFile("output2")
scio> sc close

一度sc closeした後で、変換処理を再度実行する場合は、:newScioで新しいコンテキストを作成する必要があります。

ワイルドカードでshakespear配下の複数のファイルを読み込みます。(val c1)
後は前回と同じ処理になります。単語ごとにGroupByしてカウントして、結果をoutput2に出力します。

output2
(bottomless,3)
(Shore's,2)
(oh,8)
(inclinest,1)
<以下、省略>

BigQueryのテーブルをインプットにした例

Googleがサンプルとして提供しているBigQueryの気象データからデータを入力する例です。
トルネード(竜巻)の月ごとの発生回数をカウントするプログラムです。

scio> val c1 = sc.bigQuerySelect("SELECT tornado, month FROM [clouddataflow-readonly:samples.weather_stations]")
scio> val c2 = c1.flatMap(r => if (r.getBoolean("tornado")) Seq(r.getLong("month")) else Nil).countByValue.map(kv => TableRow("month" -> kv._1, "tornado_count" -> kv._2))
scio> c2.saveAsTextFile("output3")
scio> sc close

sc.bigQuerySelect()でSELECTの結果セットをSCollectionで返します。1要素に1レコードがセットされます。(c1)
tornadoカラムがtrueなら、monthカラムの値(月)をSeq(Scalaのリスト)に追加します。

.coutByValueで月ごとの回数を合計し、(月、出現回数)のタプルのコレクションを返します。
.map(kv => Table())でタプルから、BigQueryのレコードに変換します。(kv._1は、タプルの1つ目の要素)

output3
{month=4, tornado_count=5}
{month=3, tornado_count=6}
{month=8, tornado_count=4}
{month=7, tornado_count=8}
{month=10, tornado_count=10}
{month=5, tornado_count=6}
{month=12, tornado_count=10}
{month=11, tornado_count=9}
{month=9, tornado_count=7}
{month=1, tornado_count=16}
{month=6, tornado_count=5}
{month=2, tornado_count=7}

クラウド上(GCP Dataflow)で実行してみる

それではクラウド上で実行してみましょう。

Scio REPLの起動オプションで、GCPのプロジェクト名と使用するランナー名コンパイルしたScalaのJarファイルをアップロードするGCSの場所を指定します。

java -jar scio-repl-0.2.2.jar \                                                                                                                          18:24:31
 --project=<プロジェクト名> \
 --stagingLocation=gs://<コンパイルしたJarファイルのアップロード先> \
 --runner=BlockingDataflowPipelineRunner

処理結果の格納先を、ローカルからGCSに変更します。(ローカルファイルシステムは使用できないので)

cloud
scio> val c1 = sc.textFile("gs://dataflow-samples/shakespeare/*")
scio> val c2 = c1.flatMap(_.split("[^a-zA-Z']+")).countByValue.map(_.toString)
scio> c2.saveAsTextFile("gs://my-bucket-futa1/output")
scio> sc close

マルチライン

:pasteを使用すると、コピペで複数のステートメントをまとめて実行できます。

multi-lines
scio> :paste
// Entering paste mode (ctrl-D to finish)

def double(x: Int) = x * 2
def sum(x: Int, y: Int) = x + y
val c1 = sc.parallelize(1 to 10).map(double).reduce(sum)

// Exiting paste mode, now interpreting.

double: (x: Int)Int
sum: (x: Int, y: Int)Int
c1: com.spotify.scio.values.SCollection[Int] = com.spotify.scio.values.SCollectionImpl@29345a91
scio> c1.saveAsTextFile("out1")
scio> sc close

defで定義した値はsc close後も有効なので、使いまわすことができます。

scio> :newScio
scio> val c1 = sc.parallelize(1 to 20).map(double).reduce(sum)
scio> c1.saveAsTextFile("out2")
scio> sc close

マテリアライズとFuture

materializeを使用すると、結果の出力先をテンポラリファイルに吐き出せます。
非同期で実行されるので、戻り値はFutureになります。
waitForResult().valueで結果を取り出します。

scio> :newScio
Scio context available as 'sc'

scio> val c1 = sc.parallelize(1 to 10).map(double).reduce(sum)
c1: com.spotify.scio.values.SCollection[Int] = com.spotify.scio.values.SCollectionImpl@47f0d8db

scio> val c2 = c1.materialize
c2: scala.concurrent.Future[com.spotify.scio.io.Tap[Int]] = List()

scio> c2.waitForResult().value.foreach(println)
110

bq

bqを使用して、テーブルの中身を見ることができます。

bq
scio> bq.getTableRows("clouddataflow-readonly:samples.weather_stations")
res22: Iterator[com.spotify.scio.bigquery.TableRow] = non-empty iterator

scio> res22.next
res24: com.spotify.scio.bigquery.TableRow = {"station_number":"360380","wban_number":"99999","year":"1977","month":"5","day":"11","mean_temp":45.20000076293945,"num_mean_temp_samples":"6","mean_dew_point":19.700000762939453,"num_mean_dew_point_samples":"6","mean_sealevel_pressure":1026.800048828125,"num_mean_sealevel_pressure_samples":"6","mean_visibility":28.0,"num_mean_visibility_samples":"6","mean_wind_speed":6.199999809265137,"num_mean_wind_speed_samples":"5","max_sustained_wind_speed":9.699999809265137,"max_temperature":30.200000762939453,"max_temperature_explicit":true,"total_precipitation":0.0,"fog":false,"rain":false,"snow":false,"hail":false,"thunder":false,"tornado":false}

上で説明したことがこちらのデモで見ることができます。
https://youtu.be/4wDwVgODyAg?t=1165

サンプルプログラム

サンプルプログラムが、Scioのライブラリ本体についてます。

GitHubからダウンロードします。

$ git clone https://github.com/spotify/scio.git
$ cd scio

サンプルの実行方法は各サンプルファイルの先頭にコメントで書かれています。
基本的にはsbtを使用して実行します。

sbt "scio-examples/runMain
  com.spotify.scio.examples.cookbook.BigQueryTornadoes <必要なオプション>

以下、サンプルをいくつか簡単に紹介します。

MaxPerKeyExamples

先ほど使用したBigQueryの気象データを使用して、月ごとの最高気温を見つけ、BigQueryテーブルに出力します。
.maxByKeyで、monthごとに最大のmean_tempを見つけます。

MaxPerKeyExamples
    sc
      .bigQueryTable(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE))
      .map(row => (row.getLong("month"), row.getDouble("mean_temp")))
      .maxByKey
      .map(kv => TableRow("month" -> kv._1, "max_mean_temp" -> kv._2))
      .saveAsBigQuery(args("output"), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED)

    sc.close()

DatastoreWordCount

GoogleのキーバリューデータストアのCloud Datastoreを入出力先に使用するサンプルです。
また、このサンプルでは、2つのパイプラインを逐次実行するために、1つ目のパイプラインの終了を待つようにブロック版のPipelineRunnerを使用します。パイプラインは非同期に処理されるため。

DatastoreWordCount
    opts.setRunner(classOf[BlockingDataflowPipelineRunner])

JoinExamples

通常は入力元は1つですが、この例では2つの入力をマージしています。
サンプルの2つのコレクション、イベントデータeventsInfoと国マスタcountryInfo、を国コードを使用してジョインします。
SQLで言えばこんなイメージです。

SELECT * FROM event_log JOIN ON country_master ON country_code

実際のコードはこちらです。
leftOuterJoinで2つのコレクションがマージされます。外部結合なので国マスタに見つからないとNoneが帰ります。

JoinExamples
    val eventsInfo = sc.bigQueryTable(ExampleData.EVENT_TABLE).flatMap(extractEventInfo)
    val countryInfo = sc.bigQueryTable(ExampleData.COUNTRY_TABLE).map(extractCountryInfo)

    eventsInfo
      .leftOuterJoin(countryInfo)  // regular left outer join
      .map { t =>
        val (countryCode, (eventInfo, countryNameOpt)) = t
        val countryName = countryNameOpt.getOrElse("none")
        formatOutput(countryCode, countryName, eventInfo)
      }
      .saveAsTextFile(args("output"))

    sc.close()
tfutada
シリコンバレー、ニューヨーク(NTTデータUSA)、インド(マヒンドラサティヤム)、ベトナム(フリー)を経て、現在は東南アジアのオフショア開発のコンサルティング会社経営 Kubernetes, GCP, Go, Python, R, Swift, Kotlin, ReactJS 英語(TOEIC895)
https://note.mu/tafutafu
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away