Edited at

お手軽なDataflow処理をScioの「Type Safe BigQuery」を使って作ってみる

本記事はFringe81アドベントカレンダー2018の3日目の投稿です。

昨年に続きScioを使ったDataflowの処理について書きます!


Scio、Dataflowとは?

ScioはApacheBeamおよびDataflowのScala版APIです。

音楽配信サービスで有名なSpotifyが主導で開発しています。

DataflowはApacheBeamをGoogleCloudPlatform上で実行できるサービスです。

付加機能も充実していて本家のJava版より簡潔に実装、実行できるためオススメです。


なぜDataflow??

個人的には、ある程度〜大規模のデータ処理はGCPのBigQueryで処理するのが最強だと思っています。

しかし、SQL一発で欲しいデータを抽出するのが難しい時もあり、SQLで出すため中間テーブルをいくつも経由したり、

一旦中間ファイルに落としてから別の仕組みで処理しする場合があります。

そんな時、Dataflowを活用すると速く、シンプル、低コストで実現できます。


Type Safe BigQueryとは?

ScioでBigQuery処理を簡潔に実装できる仕組みです。

DataflowでBigQueryを中心としたデータ処理を実装する場合に大変便利です。

cf. https://github.com/spotify/scio/wiki/Type-Safe-BigQuery


作るモノ

BigQueryの一般公開データセットとしてアクセスできる

StackOverflowの質問データ(タイトル)に対し、word-countを実行して情報を付加して新規tableへ書き出す処理を書きます

(単純だけど、SQL表現が難しいデータ処理の一例として)


ScioREPLで実行(ローカルDataflow)

簡単に実行できるScioのREPLで実行します。

(Scioの準備等は昨年の記事を参照していだければ(^^))


起動

java -jar scio-repl-0.5.6.jar


サンプル実行


BigQueryデータを抽出する

@BigQueryType.fromQuery("SELECT id,title,0 AS wordcount FROM `bigquery-public-data.stackoverflow.posts_questions` limit 100")

class Row


  • gcpアカウントが適切に設定されてないとエラーで止まってしまう可能性があるので下記を実施します

    gcloud auth login
    gcloud config set project [project-id]
    gcloud auth application-default login



タイトルに対してワードカウントを実行して情報を付与、新規BigQueryテーブルへ書き出す

val questions=bq.getTypedRows[Row]()

val wcquestions=questions.map(r=> Row(r.id, r.title, r.title.map(_.split("[^a-zA-Z']+").length)))
bq.writeTypedRows("[project-id]:sample.wcquestions", wcquestions.toList)

([project-id]は各自の設定へ置き換えてください)


結果

サクッとtitleに対するwordcount処理が実行され新規tableへ書き出されます。

スクリーンショット 2018-12-03 6.59.06.png

コードは下記で全てです!! 

本当にこれだけのコードを書くだけで

BigQueryからデータ抽出->データ加工->新規BigQueryテーブル作成(スキーマはデータ型から自動作成)

が実行されて完成です!便利です!

@BigQueryType.fromQuery("SELECT id,title,0 AS wordcount FROM `bigquery-public-data.stackoverflow.posts_questions` limit 100")

class Row
val questions=bq.getTypedRows[Row]()
val wcquestions=questions.map(r=> Row(r.id, r.title, r.title.map(_.split("[^a-zA-Z']+").length)))
bq.writeTypedRows("[project-id]:sample.wcquestions", wcquestions.toList)


GCPのDataflowで本格実行

java -jar scio-repl-0.5.6.jar --runner=DataflowRunner

「--runner=DataflowRunner」をつけてREPL起動するだけで本格的な分散処理環境であるGCP上のDataflowで実行できます。

ある程度以上のデータ処理を行う場合はDataflowで実行しないと終わらないです(^^;)


最後に

データ処理を運用していると、ある程度以上のデータ規模になると急に厳しくなる(インフラ調整、コスト問題)問題がありました。

そこで、使い勝手がよいBigQueryとフルマネージドで分散処理を実現するDataflowを使うと、ほとんどの問題が解消される実感があります。

同じような問題を感じている方は是非使ってみてください〜