Google GCP
(Google版AWS)のDataflow
サービスを使ってみました。
Dataflow
とは、その名の通り、膨大なデータをGoogleご自慢の分散環境を使用して並列で爆速処理してくれる、ビッグクエリーと同じGCPのマネージドサービスの1つです。
使用できるオフィシャルなプログラミング言語は、Java
とPython
のみですが、
Spotify
から、ScioというScala
のライブラリが提供されていますので今回使用してみました。
Dataflow
では、パイプラインを作成し、入力、変換1、変換2...変換N、出力
の3ステップで構成します。関数型言語のScalaは、変換ロジックをラムダ式でコンパクトに記述できるため、Javaよりもコード量が少なくて済みます。
Spotify
では、Kafka
からDataflow
へ移行する際に、Scala
で記述できるようにScioライブラリを開発したようです。1秒間に200万のデータを処理できるようです。
https://www.youtube.com/watch?v=4wDwVgODyAg&t=9s
Dataflow自体は、GoogleからApache Beamとして移管されるようです。
Scioのインストール
Scioにはお手軽に試せるScio REPLという対話型で実行できるツールがあります。
jarファイルを1つダウンロードするだけです。
GCPのクラウド環境にプロジェクトを前もって作成する必要があります。(ここでは説明しません)
$ wget https://github.com/spotify/scio/releases/download/v0.2.2/scio-repl-0.2.2.jar
$ java -jar scio-repl-0.2.2.jar
Welcome to
_____
________________(_)_____
__ ___/ ___/_ /_ __ \
_(__ )/ /__ _ / / /_/ /
/____/ \___/ /_/ \____/ version 0.2.2
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.
[main] INFO com.google.cloud.dataflow.sdk.options.GcpOptions$DefaultProjectFactory - Inferred default GCP project 'futa-dev' from gcloud. If this is the incorrect project, please cancel this Pipeline and specify the command-line argument --project.
Using 'futa-dev' as your BigQuery project.
BigQuery client available as 'bq'
Scio context available as 'sc'
scio>
データフローを動かしてみる
クラウド上で実行する前に、ローカルのMac上でテストしてみましょう。
プレーンテキストをインプットにした例
単語の出現頻度をカウントする処理ロジックを組んでみます。
入力ファイルを作成します。(適当に単語を並べます。)
abc def ghi
def ghi
jkl abc abc
Scio REPLで以下のステートメントを入力します。(scio>プロンプトのところ)
scio> val c1 = sc.textFile("input.txt")
c: com.spotify.scio.values.SCollection[String] = com.spotify.scio.values.SCollectionImpl@77ad2767
scio> val c2 = c1.flatMap(_.split("[^a-zA-Z']+")).countByValue.map(_.toString)
c2: com.spotify.scio.values.SCollection[String] = com.spotify.scio.values.SCollectionImpl@9e4388d
scio> c2.saveAsTextFile("output1")
res1: scala.concurrent.Future[com.spotify.scio.io.Tap[String]] = List()
scio> sc close
出力されたファイルを見てみましょう。単語ごとに出現回数がカウントされ、タプルで出力されてます。
(ghi,2)
(jkl,1)
(def,2)
(abc,3)
入力ファイル(input.txt)は、SCollection
というコレクションに、1要素=1行
でセットされます。(val c1)
c1.flatMap(_.split())
は、SCollection
内の全行を単語単位に分割し、新しいSCollection
を返します。
.coutByValue
は、同じ単語ごとにGroupBy
し、(単語, 出現回数)
のタプルのSCollection
を返します。
((ghi, 2), (jkl, 2), (def, 2), (abc, 3))
.saveAsTextFile("output1")
で、output1
ディレクトリに結果をファイル出力します。
sc close
を実行すると、実際に変換処理が実行されます。
ワンライナーで書くこともできます。
scio> sc.textFile("README.md").flatMap(_.split("[^a-zA-Z']+")).countByValue.map(_.toString).saveAsTextFile("output2")
scio> sc close
Scalaでは、メソッドのドットや括弧を省略できます。
sc close
は、sc.close()
と同じです。
Google Storage (Google版 S3)からインプットにした例
Googleがサンプルとして提供している、Google Storage
のシェイクスピアのファイルを入力ファイルとして使用した例です。
scio> :newScio
Scio context available as 'sc'
scio> val c1 = sc.textFile("gs://dataflow-samples/shakespeare/*")
scio> val c2 = c1.flatMap(_.split("[^a-zA-Z']+")).countByValue.map(_.toString)
scio> c2.saveAsTextFile("output2")
scio> sc close
一度sc close
した後で、変換処理を再度実行する場合は、:newScio
で新しいコンテキストを作成する必要があります。
ワイルドカードでshakespear
配下の複数のファイルを読み込みます。(val c1)
後は前回と同じ処理になります。単語ごとにGroupByしてカウントして、結果をoutput2
に出力します。
(bottomless,3)
(Shore's,2)
(oh,8)
(inclinest,1)
<以下、省略>
BigQueryのテーブルをインプットにした例
Googleがサンプルとして提供しているBigQueryの気象データからデータを入力する例です。
トルネード(竜巻)の月ごとの発生回数をカウントするプログラムです。
scio> val c1 = sc.bigQuerySelect("SELECT tornado, month FROM [clouddataflow-readonly:samples.weather_stations]")
scio> val c2 = c1.flatMap(r => if (r.getBoolean("tornado")) Seq(r.getLong("month")) else Nil).countByValue.map(kv => TableRow("month" -> kv._1, "tornado_count" -> kv._2))
scio> c2.saveAsTextFile("output3")
scio> sc close
sc.bigQuerySelect()
でSELECTの結果セットをSCollection
で返します。1要素に1レコードがセットされます。(c1)
tornado
カラムがtrue
なら、month
カラムの値(月)をSeq(Scalaのリスト)に追加します。
.coutByValue
で月ごとの回数を合計し、(月、出現回数)のタプルのコレクションを返します。
.map(kv => Table())
でタプルから、BigQueryのレコードに変換します。(kv._1
は、タプルの1つ目の要素)
{month=4, tornado_count=5}
{month=3, tornado_count=6}
{month=8, tornado_count=4}
{month=7, tornado_count=8}
{month=10, tornado_count=10}
{month=5, tornado_count=6}
{month=12, tornado_count=10}
{month=11, tornado_count=9}
{month=9, tornado_count=7}
{month=1, tornado_count=16}
{month=6, tornado_count=5}
{month=2, tornado_count=7}
クラウド上(GCP Dataflow)で実行してみる
それではクラウド上で実行してみましょう。
Scio REPLの起動オプションで、GCPのプロジェクト名
と使用するランナー名
、コンパイルしたScalaのJarファイルをアップロードするGCSの場所
を指定します。
java -jar scio-repl-0.2.2.jar \ 18:24:31
--project=<プロジェクト名> \
--stagingLocation=gs://<コンパイルしたJarファイルのアップロード先> \
--runner=BlockingDataflowPipelineRunner
処理結果の格納先を、ローカルからGCSに変更します。(ローカルファイルシステムは使用できないので)
scio> val c1 = sc.textFile("gs://dataflow-samples/shakespeare/*")
scio> val c2 = c1.flatMap(_.split("[^a-zA-Z']+")).countByValue.map(_.toString)
scio> c2.saveAsTextFile("gs://my-bucket-futa1/output")
scio> sc close
マルチライン
:paste
を使用すると、コピペで複数のステートメントをまとめて実行できます。
scio> :paste
// Entering paste mode (ctrl-D to finish)
def double(x: Int) = x * 2
def sum(x: Int, y: Int) = x + y
val c1 = sc.parallelize(1 to 10).map(double).reduce(sum)
// Exiting paste mode, now interpreting.
double: (x: Int)Int
sum: (x: Int, y: Int)Int
c1: com.spotify.scio.values.SCollection[Int] = com.spotify.scio.values.SCollectionImpl@29345a91
scio> c1.saveAsTextFile("out1")
scio> sc close
defで定義した値はsc close後も有効なので、使いまわすことができます。
scio> :newScio
scio> val c1 = sc.parallelize(1 to 20).map(double).reduce(sum)
scio> c1.saveAsTextFile("out2")
scio> sc close
マテリアライズとFuture
materializeを使用すると、結果の出力先をテンポラリファイルに吐き出せます。
非同期で実行されるので、戻り値はFutureになります。
waitForResult().value
で結果を取り出します。
scio> :newScio
Scio context available as 'sc'
scio> val c1 = sc.parallelize(1 to 10).map(double).reduce(sum)
c1: com.spotify.scio.values.SCollection[Int] = com.spotify.scio.values.SCollectionImpl@47f0d8db
scio> val c2 = c1.materialize
c2: scala.concurrent.Future[com.spotify.scio.io.Tap[Int]] = List()
scio> c2.waitForResult().value.foreach(println)
110
bq
bqを使用して、テーブルの中身を見ることができます。
scio> bq.getTableRows("clouddataflow-readonly:samples.weather_stations")
res22: Iterator[com.spotify.scio.bigquery.TableRow] = non-empty iterator
scio> res22.next
res24: com.spotify.scio.bigquery.TableRow = {"station_number":"360380","wban_number":"99999","year":"1977","month":"5","day":"11","mean_temp":45.20000076293945,"num_mean_temp_samples":"6","mean_dew_point":19.700000762939453,"num_mean_dew_point_samples":"6","mean_sealevel_pressure":1026.800048828125,"num_mean_sealevel_pressure_samples":"6","mean_visibility":28.0,"num_mean_visibility_samples":"6","mean_wind_speed":6.199999809265137,"num_mean_wind_speed_samples":"5","max_sustained_wind_speed":9.699999809265137,"max_temperature":30.200000762939453,"max_temperature_explicit":true,"total_precipitation":0.0,"fog":false,"rain":false,"snow":false,"hail":false,"thunder":false,"tornado":false}
上で説明したことがこちらのデモで見ることができます。
https://youtu.be/4wDwVgODyAg?t=1165
サンプルプログラム
サンプルプログラムが、Scioのライブラリ本体についてます。
GitHubからダウンロードします。
$ git clone https://github.com/spotify/scio.git
$ cd scio
サンプルの実行方法は各サンプルファイルの先頭にコメントで書かれています。
基本的にはsbt
を使用して実行します。
sbt "scio-examples/runMain
com.spotify.scio.examples.cookbook.BigQueryTornadoes <必要なオプション>
以下、サンプルをいくつか簡単に紹介します。
MaxPerKeyExamples
先ほど使用したBigQueryの気象データを使用して、月ごとの最高気温を見つけ、BigQueryテーブルに出力します。
.maxByKey
で、month
ごとに最大のmean_temp
を見つけます。
sc
.bigQueryTable(args.getOrElse("input", ExampleData.WEATHER_SAMPLES_TABLE))
.map(row => (row.getLong("month"), row.getDouble("mean_temp")))
.maxByKey
.map(kv => TableRow("month" -> kv._1, "max_mean_temp" -> kv._2))
.saveAsBigQuery(args("output"), schema, WRITE_TRUNCATE, CREATE_IF_NEEDED)
sc.close()
DatastoreWordCount
GoogleのキーバリューデータストアのCloud Datastore
を入出力先に使用するサンプルです。
また、このサンプルでは、2つのパイプラインを逐次実行するために、1つ目のパイプラインの終了を待つようにブロック版のPipelineRunner
を使用します。パイプラインは非同期に処理されるため。
opts.setRunner(classOf[BlockingDataflowPipelineRunner])
JoinExamples
通常は入力元は1つですが、この例では2つの入力をマージしています。
サンプルの2つのコレクション、イベントデータeventsInfo
と国マスタcountryInfo
、を国コードを使用してジョインします。
SQLで言えばこんなイメージです。
SELECT * FROM event_log JOIN ON country_master ON country_code
実際のコードはこちらです。
leftOuterJoin
で2つのコレクションがマージされます。外部結合なので国マスタに見つからないとNoneが帰ります。
val eventsInfo = sc.bigQueryTable(ExampleData.EVENT_TABLE).flatMap(extractEventInfo)
val countryInfo = sc.bigQueryTable(ExampleData.COUNTRY_TABLE).map(extractCountryInfo)
eventsInfo
.leftOuterJoin(countryInfo) // regular left outer join
.map { t =>
val (countryCode, (eventInfo, countryNameOpt)) = t
val countryName = countryNameOpt.getOrElse("none")
formatOutput(countryCode, countryName, eventInfo)
}
.saveAsTextFile(args("output"))
sc.close()