Spark も Ignite も素人です。
前提
- 1000万行オーダーの、とにかく縦に長いCSVファイルが手元にある
- ぱぱっとローカルでざっくり整形して、あとは Tableau とかに放り込みたい
- 前処理した後、ファイルに書き出しておきたい
- 前処理では特に、表っぽい操作をしたい。
- 具体的には SQL でいうところの集約関数やウィンドウ関数を使ったり、他のテーブルと JOIN したい
- ツールの使い方で四苦八苦したくない
こういう状況で、やはりインメモリでコア全部つかって極力高速に前処理したい。 Pandas もいいけれど、今は pandasで1000万件のデータの前処理を高速にするTips集 のような tips を追う気分ではない。 大昔はよく MySQL を使っていたのだが、重い。あとインデックス張るのが面倒だ。 かかる時間も想像つかない。
一方、 Apache Ignite はインメモリDBだからこういう一時的な高速処理に使えそう…と思いきや、CSV ファイルを読み込む COPY コマンドが不親切極まりない。また、Apache Ignite の SQL には制限が多い (例えばウィンドウ関数がない)。
ところで Ignite への CSV の読み込みには Spark が使えるそうだ (参考: Loading data into Apache Ignite …かなり overkill な気がしないでもない)。 また、Ignite と Spark を併用するのもそれなりに理にかなっているようだ (Apache IgniteとApache Sparkの統合による大規模データ処理における機能拡張や処理能力の向上).
そこで、実際に CSV ファイルを Spark 経由で Ignite に入れてみた。
わかったこと
- Spark はよく使われているので CSVファイルの読み込みで苦労することはあまりない
- (追記) そんなことはなかった。例えば日付文字列が NULL のときにうまく parse できない。
- parse に失敗したときのログが見れない。 私の環境 (2.3.0, homebrew) だけかもしれないが、 loglevel を適切に設定してもログが出ない。
- Undocumented な機能として、 _corrupted_record なる列をスキーマに追加しておけばそこに parse できなかった行の全体が入る…
- Ignite の SQL はウィンドウ関数など前処理に便利そうな関数が使えない (なあんだ)
- Spark のストリーム処理で代替できる。
- 単なる前処理なら Ignite 要らないのではないか
- バージョンの整合性に注意
- 例えば最新の Ignite 2.7.6 には Spark 2.3.0 が対応する。 新しすぎる Spark だと NoSuchMethodError が出た
- Maven の ignite-spark パッケージの依存関係 で、spark.version を見ればよい。
インストール
macOS の場合。
- 最新の Java SDK を入れる。
- Spark:
brew install apache-spark
- Ignite: https://ignite.apache.org/download.cgi からバイナリリリースを拾ってきて展開
設定と起動
Ignite のメモリを 12GB に増やす。 config/default-config.xml
を以下のようにした:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="name" value="Default_Region"/>
<property name="maxSize" value="#{12L * 1024 * 1024 * 1024}"/>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Ignite ノードを起動
export JAVA_HOME=`/usr/libexec/java_home`
./bin/ignite.sh
Spark を起動。 (rlwrap があると便利)
export IGNITE_HOME=/Users/keigoi/...
rlwrap pyspark \
--packages org.apache.ignite:ignite-spark:2.7.6 \
--repositories http://repo.maven.apache.org/maven2/org/apache/ignite
Python 3.6.1 (default, May 24 2018, 09:13:52)
[GCC 4.2.1 Compatible Apple LLVM 9.1.0 (clang-902.0.39.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
19/12/21 09:38:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Python version 3.6.1 (default, May 24 2018 09:13:52)
SparkSession available as 'spark'.
>>> from pyspark.sql import *
>>> from pyspark.sql import functions
Spark で CSV ファイルの読み込み
CSV ファイルを読み込む。その前にまず、スキーマを推論 (inferSchema=True) させる。 inferSchema=True にするとファイル全体を1度すべて読み込んでしまうようなので、でかいファイルには不向き。そこで、スキーマを推論させるためだけの小さいファイル(最初の1000行ていど) header.csv を作っておいた。
schema = spark.read.csv('header.csv', header=True, inferSchema=True).schema
テーブルの DF を作る。ここでファイルが2つに別れていたので union()
で結合
t1 = spark.read.csv('../table1.csv', header=True, schema=schema)
t2 = spark.read.csv('../table2.csv', header=True, schema=schema)
t = t1.union(t2)
例: グループごとの連番を振ってみる
SQL の ROW_NUMBER() OVER (PARTITION BY groupid ORDER BY date) AS rownum
みたいなことをやる。
連番を最後の列に追加する。
rownum = row_number().over(Window.partitionBy('groupid').orderBy('date')).alias('rownum')
t_with_rownum = t.select('*', rownum)
最初の10行ほど出力してみる。
集約計算なので結局ストリーム全体を舐めるため、インデックスを張ったDBほどには速くはない。そりゃそうだ。
>>> t_with_rownum.limit(10).collect()
[Stage 2:> (0 + 8) / 23]
(結果が出る)
Ignite に書き込む
書き込み先テーブル名、プライマリキー、書き込みモード (overwrite, append などがあるよう) を指定して save.
t_with_rownum.write.format('ignite') \
.option('config','config/default-config.xml') \
.option('table','tablename') \
.option('primaryKeyFields','groupid,rownum') \
.mode('overwrite') \
.save()
数分して、合計 3GB ほどの CSV ファイルが 9.5GB のフットプリントをもつ Ignite ノードに収まった。こんなもんだろうか。素直にどこかのクラウドサービス使っとけという気もする。
あとは Ignite 側で sqlline とかを使ってよしなにすると良いのだろう (あまり作り込まれていないようで、かなり不安だが)。
./bin/sqlline.sh -u jdbc:ignite:thin://127.0.0.1/
インデックス張ってないストリームに対して集約しているので遅いような気もするが、まあこんなもんだろうか…
Spark 側でも一旦全てのデータをインメモリに読み込んでいる気がするので、モヤモヤしますね…
Ignite がウィンドウ関数までサポートしてくれればこんなことはしなくて済むのだろうか。 そもそもターミナルから叩く使い方はあまり想定されていないようにも感じた。Spark でも SQL めいたことができるので Ignite に入れる必要がないような気もする。
(以上)