はじめに
この記事はApache Spark Advent Calendar 2015の7日目の記事です。
Spark 1.5から系列パターンマイニングアルゴリズムの1つ PrefixSpanがmllibに実装されました。
我々は文書や検索クエリー内のキーワードの並びから有益な情報を抽出する目的でPrefixSpanアルゴリズムに以前より興味を持っていました。
OSSで公開されているPrefixSpan実装はオンメモリー上でデータを処理するため、大規模データを処理することが難しかったのですが、Sparkを使った分散PrefixSpan実装が登場したことで、大規模データに対して系列パターン抽出が可能になりました。
今回は、このSpark mllibの分散PrefixSpan実装が、どのくらいのサイズのデータを、どのくらいの処理時間で処理できるのかを確認するために、日本語ngramコーパス [2] 1000万件から頻出形態素パターンを抽出する処理を実行させた。
動作確認、ベンチマーク
確認方法
Spark上で分散処理可能なPrefixSpan実装がどのくらい大規模なデータを処理できるかを確認するために、日本語ngram web corpusから頻出形態素列を抽出する処理を実行させた。
使用したngram corpusは[2]で公開されているもので
- 7gram
- 1000万件
のデータをcassandraにinsertし、それをsparkからアクセスしてPrefixSpanで頻出形態素パターンを抽出することにした。
gm | partition | content | freq
----+-----------+---------------------------------------+------
7 | 0 | ( は ち おうじ みなみ の え | 18
7 | 0 | ) から 桓武 天皇 の 時代 ( | 26
7 | 0 | ) について 】 当 キャンペーン 終了 後 | 15
7 | 0 | ( まゆ ) の 間 を 、 | 11
7 | 0 | 3 分 ブン の 2 以上 イジョウ | 11
7 | 0 | ( PDF ファイル ) ( ダウンロード し | 46
7 | 0 | ( 偶然 かも しれ ない けど ) | 38
7 | 0 | 1 ) 予定 価格 調書 案 の | 20
7 | 0 | 1 号 店 3 階 の 販売 | 11
7 | 0 | 22 日 、 東京 地裁 八王子 支部 | 32
クラスター設定
使用したSparkクラスタ構成は以下である。
今回使用したクラスタは評価テスト用であり、処理時間性能の絶対性能を測定できるものではない。
あくまで、どの程度のデータを処理できるかを評価するためである。
- Spark master 1台, Spark worker 7台構成
- cassandra server 3台
マシンスペックは全台ほぼ同じで
- CPU: 4コア
- mem: 32GB
のマシンを使用した。
Sparkの設定は
- standalone mode
- worker memory: 4GB
- driver memory: 10GB
とした。
処理時間
PrefixSpanのパラメータ
- minSupportを 0.1〜 0.0001まで3段階に変化
- maxPatternLength = 3に固定
とした。
またデータはcassandraからRDDとして読み込んだ後cacheさせ、cassandraへのアクセスオーバーヘッドは最小限になるようにした。
処理時間は総処理時間をlinuxのtime commandで測定した。
minsup | min count | time |
---|---|---|
0.1 | 1,000,000 | 1m34.274s |
0.01 | 100,000 | 4m26.014s |
0.001 | 10,000 | 4m18.359s |
0.0001 | 1,000 | 30m35.717s |
結果
minsupが小さくなればなるほど頻出itemの異なり数が大きくなり、それに伴い指数関数的に処理時間を大きくなることが予想され、実測値でもそうなった。1000万件の大規模データからcount >= 1000の頻出パターンをOOMなしに30分で抽出することができ、分散PrefixSpan実装の有効性を確認できた。
Spark mllib PrefixSpan実装概説
mllib PrefixSpan実装
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
の処理フローを以下で概説します。
頻出itemの抽出とid化
PrefixSpan.rum methodで
- 頻出itemの抽出
- minCount以上の頻出itemのid化
- 系列パターンの内部表現化
が
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala#L139
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala#L154
あたりで行われています。
頻出itemの抽出はword countと同じで、結果はdriverにcollectされています。
PrefixSpan本体処理 = getFreqPatterns
がhttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala#L179
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala#L236
になります。
このmethod内で、分散PrefixSpanアルゴリズムが実装されています。
itemによる射影
が↑で実行されています。
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala#L271
で
prefixとpostfixを2重のflatMapでjoinさせて頻出prefixとその頻度を求めています。
この結果はこれもdriverにcollectされています。
ここが、分散PrefixSpanアルゴリズムのコアのように思います。
この処理の中で射影dbのサイズがサイズが小さいものは、
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala#L289
smallPrefixesとして保持されて、次のlocalPrefixSpan objectにより再帰的に処理されるようです。
各worker上でのlocalPrefixSpan.runの実行
に実行されています。
最終的な頻出パターンの抽出結果
でdriver上で保持されている頻出パターンと、最後に、各worker上のlocalPrefixSpan objectが抽出した
頻出パターンをRDDとしてunionして、返しています。
おわりに
以前より興味を持っていた分散PrefixSpanのSpark mllib実装を1000件の大規模データで動作確認を実施した。
動作確認前は、実際に処理できる不安な面を持っていたが、minsupがある一定以上の場合は十分高速に処理できることを確認できた。
大規模コーパスや検索クエリーから、頻出キーワードパターン、形態素列パターンの抽出にPrefixSpanを適用する場合
- 大規模データに対するPrefixSpan適用の困難
- 無意味な頻出パターンの抽出
- 情報が重複する、内包される頻出パターンの抽出
の3つの欠点があった。
- はSpark mllib PrefixSpan実装で解決できそうである。2,3に関しては、処理させる大規模データへの前処理、後処理で解決することが多かったが、PrefixSpanの拡張アルゴリズムにより、2., 3. を解決したいと考えている。
例えば
- itemへの重要度ウェイトを付与して、そのウェイトにより抽出するパターンを制御する。
- PrefixSpanをClosed系列パターンマイニングにまで拡張する。
などである。これらをSpark mllib PrefixSpanアルゴリズムに適用できればと考えている。
参考情報
[1] PrefixSpan: http://chasen.org/~taku/software/prefixspan/
[2] 日本語ウェブコーパス 2010: http://s-yata.jp/corpus/nwc2010/