まず Spark SQL
Apache Sparkは何かを一言でいうのは難しいのですが、この記事ではSparkの(おそらく最重要機能の)Spark SQLについて説明します。これがわかってないとSparkの他のパートは理解できないでしょうし、またSpark SQLだけでも実業務のアプリに応用できる範囲はめっちゃ広いです。
どんなときに使うのか
Spark SQLの用途は一言でいうとこれです。
複数の表形式のデータを材料に、加工と集計をして別の表形式のデータを出力する
たとえば
- お店の売り上げデータだったら→
- 日付ごとや商品ごとに売り上げがどう違うかをグラフにするとか、
- Webサイトの運営だったら→
- 訪問者が何の検索キーワードで入って、それぞれがどういうページ遷移をして、購入に至ったのか/至らなかったのか
- ソフトウェア開発なら→
- テストの実行件数と失敗数がどう変化したかを集計するとか、
いろいろありますよね。どの企業も手法の巧拙はあるにしてもやっていることじゃないかと思います。
別にまあ、データがちっちゃいときはExcelやらGoogle Spreadsheetで十分いけるんですが、サイズが大きくなると徐々に手に負えなくなってきます。
古く巨大なシステムであれば「バッチ処理」と言われるカテゴリーの処理になりますが、この手のやつはデータ処理の切り口を柔軟に変えたくなるものです。単に日付だけ見ていたものを、時間帯別にしてみようとか天候も考慮に入れてみようとかはよくあるストーリーだと思います。
また、処理が多段になるというのもよくあるでしょう。細かい売り上げデータから日次データを作り、さらに月次データへ加工し、最終的に前年同月比のグラフになる、とかのパターンです。
イメージ図ですがデータの加工が多段になっているようなやつです。
Sparkのいいところ
僕は今や、こういう系統のデータ処理が必要で、かつデータサイズがExcelでは間に合わないくらい大きいときは、Spark一択と信奉するまでになりました。とりわけ、現状の処理が次に当てはまる場合はぜひ考えるべきです。
- 無理やりSQLでやっている場合
- SQLでデータを取ってきたあと普通のプログラムで集計している場合
次にもうちょっと詳しく説明します。
無理やりSQLでやっている場合
SQLのVIEW, JOIN, GROUP BY
や、いわゆるストアドプロシージャを駆使して目的のデータを出しているケースです。このケースでのメンテナンス性の悪さはみんな心当たりあるんじゃないかと思います。
SELECT
C.CustomerID, C.CustomerName,
C.CustomerType, C.Address1, C.City,
C.State, S.TotalSales
FROM
Customers C
INNER JOIN
(SELECT CustomerID, SUM(Sales) as TotalSales FROM Sales GROUP BY CustomerID) S
ON
C.CustomerID = S.CustomerID
この程度は簡単にWebで見つかりますが、もっとおぞましいSQLはきっと世の中には無数にあるんじゃないかと思います。しかも、データの切り口を変えたいのでそういう変なSQLをしょっちゅうメンテする羽目になるケースも多いでしょう。これはもうメンテ無理、というようなのあったらコメントしてくれると嬉しいです。
SQLと比較するとSparkにはこういういいところがあります。
プログラム言語のパワーが使える
SQLだけでできることは限りがあるが、集計と加工にモダンな言語のライブラリをいくらでも使えるのは圧倒的に優位だし、加工の部分をUnitTestに切り出せるのもオイシイ。SQLはDBに渡すまで単純な構文エラー( 括弧の対応誤りなど )すら検出されないが、Sparkならコンパイラに構文と型のチェックをかなり任せることができる
データ型が柔軟
GROUP BYした後などの集計過程の中間データにおいて、フィールドに配列や構造体を入れることもできて柔軟。DBはデータの保存 ( SQLの INSERT, UPDATE ) に専念してもらい、分析と加工をSpark側に分離できる。
複雑なクエリ実行中のためパフォーマンスが落ちる、とかを気にしなくていい。DBのインデックス一発で取ってこれるデータならいいんですが、SQLのLIKE文とかがでてきたらもうオワリです。Sparkの出番です。
SQLでデータを取ってきたあと普通のプログラムで集計している場合
これもよくあるパターンと思います。ごく単純なSQLでデータをごそっと取ってきたあと、ループを回してデータを取り、Array
やHashMap
等を駆使してゴリゴリ集計するタイプです。
単純な集計ならいいのですが、このやり方も、データ量が増えるにつれてメモリ消費量や実行時間がバカスカ増えますし、複雑な分析フロー(上の図で中間データがいくつもあり、フローの分岐があって最終データも複数あるようなやつ)だとどうしてもコードが複雑になります。
Sparkの場合、かなりお手軽にクラスタを組むことができるので負荷分散は簡単です。もちろん、本質的に全データを見なくてはいけないような集計だと無理ですが、癖を把握すればシングルノードでの開発とテストのときと同じようにクラスタ対応ができるのは大きなポイントです。
イメージとしては、
- ユーザの書いたSparkアプリのJARファイルが各ノードに配布される
- 処理前のデータがノード数に等分されて渡される
- 処理後のデータを回収して連結する
という流れです。テーブルのJOINとかに威力を発揮します。
使える言語
Sparkの実装としてはScalaがオリジナルらしいですが、
- Java
- Python
- R
- .NET ( ただし今はまだ実験的な位置づけ? )
が利用可能になってます。一般のアプリからの親和性という点ではJavaかPythonでしょうか。
クエリーの例
Spark SQL では、SQLのキーワードや関数に寄せた感じで、気持ちよく集計コードが書けます。以下はJavaでの「SparkSQLらしさのにじみ出た」ところをさくっと切り出したサンプルです。どういうコーディングスタイルになるのか想像しながら読んでみてください。
withColumn
withColumn
は、既存のカラムのデータを使って新しいカラムを動的に作ります。これは、timestamp
カラムに入ったミリ秒単位の整数の時刻をyyyy-MM-dd HH:mm:ss
フォーマットで文字列化した新しいカラムdatetime
を作ります。
data = data.withColumn("datetime", date_format(
col("timestamp")
.divide(1000)
.cast(DataTypes.TimestampType), "yyyy-MM-dd HH:mm:ss")
));
groupBy
groupBy
は、SQLでのそれと同様、対象レコードを同一のキー値をもったグループに分割し、そのグループ内で集計して新しい表を作るものです。
これは、name
カラムが同一であるもので全レコードをグループ化し、それぞれで
-
score
の合計 :totalScore
-
duration
の平均 :durationAvg
-
duration
の標準偏差 :durationStddev
- 件数 :
count
として新しい表を作るものです。
Dataset<Row> data = data.groupBy("name").agg(
sum("score").as("totalScore"),
avg("duration").as("durationAvg"),
stddev("duration").as("durationStddev"),
count("*").as("count"));
こうやって、よくある処理はSparkで提供されているものでできますし、それで間に合わないものはJavaのパワーを駆使して処理を書くことができます。
苦しいSQLのメンテを頑張るより絶対いいと思いませんか?
こうやってJavaで書けば、ビルド後のJARがクラスタにデプロイされて分散処理ができるんですよ! あるいは、AWS Lambdaとかに乗せて集計処理を実施するときのみ料金を払うようなことももちろんできます。
RDD と Dataset
Spark SQLを学んでいくときにつまづきやすいのはここです。
どちらも巨大なテーブル"風"データを扱うためのクラスでどこがどう違うのか最初のうちはよくわからない(僕もそうでした)のですが、要点だけ押さえると:
- 実現目標の機能はどちらも同じ。
- RDDはSpark初期からあり、そこでわかった欠陥を修正して新版として整えたのがDatasetである
- 基本はDatasetが推奨されるが、アプリの書き方によってはRDDのほうがやりやすいケースもあり、使い分けが肝要
- 後発だけあってAPIはDatasetのほうがわかりやすく柔軟である。
- 特にJavaのRDDはGenericsを使いまくってて慣れないとびっくり。
- メモリ効率もDatasetのほうが大差でよいという情報もあるが、細かいところまでは検証不十分。裏方のコードとしてはどっちもかなりの割合同じものを使っているような気もしなくない。
なお、Spark関連情報を調べるとDataFrame
というものもよく登場しますが、これはDataset<Row>のエイリアスです。Rowは特に型を指定しない汎用の行データに使えるクラスです。
Hadoopとの関係
この手の分散処理フレームワークとしてはSparkよりHadoopが元祖で、Sparkも裏側ではHadoop File Systemを使っています。
ただ、Hadoopは一連の処理ごとに結果をディスクに保存しながら進む一方、Sparkの場合明示的に保存しない限りデータはメモリ上にあるだけなので、加工中の中間データは本当に大事なものだけ保存するとするだけで大幅に速度が稼げます。
もっとも、クラスタ中の全ノードのメモリを合計しても乗り切れないほどの膨大なデータを分析するのはSparkでは無理かもしれません。
だめなところ
これまでSparkをほめてばかりでしたが、もちろんダメなところもあります。一番厄介なのは、エラーメッセージが不親切で、特に型のエラー(intとして読みだそうとしたが実際のデータはstringだった、等)に弱いことです。こういう場合には、ちょっと常識では考えられないような怪しげなエラーメッセージが出るので要注意です。
これはちょっと型を間違えたときのエラーメッセージの実例です。generated.java
というソースコードを生成してそれをコンパイルしてるんですね...
そんな複雑なテクニックが必要な処理を書いているつもりはないんですが、何か理由があるんでしょうね。
Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 562, Column 35:
failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 562, Column 35:
A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304) at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1376) at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1373) at
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at
慣れないうちは「まともなエラーメッセージも出せないSparkはクソ」と考えがちですが、まあ慣れです。こういう欠点を補ってなお余りある価値を持っているのがSparkです。バージョンアップが進めばもうちょっと何とかなるかもしれませんが、場合によっては気合を入れて自分で直してコントリビュートしてしまおうかと考えているくらいです。
複雑なSQLクエリにうんざりしているそこの君! Sparkはいいですよ!