LoginSignup
4
1

More than 3 years have passed since last update.

Apache Ignite と Spark で巨大 CSV ファイルの前処理

Last updated at Posted at 2019-12-20

Spark も Ignite も素人です。

前提

  • 1000万行オーダーの、とにかく縦に長いCSVファイルが手元にある
  • ぱぱっとローカルでざっくり整形して、あとは Tableau とかに放り込みたい
    • 前処理した後、ファイルに書き出しておきたい
  • 前処理では特に、表っぽい操作をしたい。
    • 具体的には SQL でいうところの集約関数やウィンドウ関数を使ったり、他のテーブルと JOIN したい
  • ツールの使い方で四苦八苦したくない

こういう状況で、やはりインメモリでコア全部つかって極力高速に前処理したい。 Pandas もいいけれど、今は pandasで1000万件のデータの前処理を高速にするTips集 のような tips を追う気分ではない。 大昔はよく MySQL を使っていたのだが、重い。あとインデックス張るのが面倒だ。 かかる時間も想像つかない。

一方、 Apache Ignite はインメモリDBだからこういう一時的な高速処理に使えそう…と思いきや、CSV ファイルを読み込む COPY コマンドが不親切極まりない。また、Apache Ignite の SQL には制限が多い (例えばウィンドウ関数がない)。

ところで Ignite への CSV の読み込みには Spark が使えるそうだ (参考: Loading data into Apache Ignite …かなり overkill な気がしないでもない)。 また、Ignite と Spark を併用するのもそれなりに理にかなっているようだ (Apache IgniteとApache Sparkの統合による大規模データ処理における機能拡張や処理能力の向上).

そこで、実際に CSV ファイルを Spark 経由で Ignite に入れてみた。

わかったこと

  • Spark はよく使われているので CSVファイルの読み込みで苦労することはあまりない
    • (追記) そんなことはなかった。例えば日付文字列が NULL のときにうまく parse できない。
    • parse に失敗したときのログが見れない。 私の環境 (2.3.0, homebrew) だけかもしれないが、 loglevel を適切に設定してもログが出ない。
    • Undocumented な機能として、 _corrupted_record なる列をスキーマに追加しておけばそこに parse できなかった行の全体が入る…
  • Ignite の SQL はウィンドウ関数など前処理に便利そうな関数が使えない (なあんだ)
    • Spark のストリーム処理で代替できる。
  • 単なる前処理なら Ignite 要らないのではないか
  • バージョンの整合性に注意

インストール

macOS の場合。

設定と起動

Ignite のメモリを 12GB に増やす。 config/default-config.xml を以下のようにした:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
      <property name="dataStorageConfiguration">
    <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
      <property name="defaultDataRegionConfiguration">
        <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
              <property name="name" value="Default_Region"/>
              <property name="maxSize" value="#{12L * 1024 * 1024 * 1024}"/>
        </bean>
      </property>
    </bean>
      </property>
    </bean>
</beans>

Ignite ノードを起動

export JAVA_HOME=`/usr/libexec/java_home`
./bin/ignite.sh

Spark を起動。 (rlwrap があると便利)

export IGNITE_HOME=/Users/keigoi/...
rlwrap pyspark \
  --packages org.apache.ignite:ignite-spark:2.7.6 \
  --repositories http://repo.maven.apache.org/maven2/org/apache/ignite
Python 3.6.1 (default, May 24 2018, 09:13:52) 
[GCC 4.2.1 Compatible Apple LLVM 9.1.0 (clang-902.0.39.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
19/12/21 09:38:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Python version 3.6.1 (default, May 24 2018 09:13:52)
SparkSession available as 'spark'.
>>> from pyspark.sql import *
>>> from pyspark.sql import functions

Spark で CSV ファイルの読み込み

CSV ファイルを読み込む。その前にまず、スキーマを推論 (inferSchema=True) させる。 inferSchema=True にするとファイル全体を1度すべて読み込んでしまうようなので、でかいファイルには不向き。そこで、スキーマを推論させるためだけの小さいファイル(最初の1000行ていど) header.csv を作っておいた。

schema = spark.read.csv('header.csv', header=True, inferSchema=True).schema

テーブルの DF を作る。ここでファイルが2つに別れていたので union() で結合

t1 = spark.read.csv('../table1.csv', header=True, schema=schema)
t2 = spark.read.csv('../table2.csv', header=True, schema=schema)
t = t1.union(t2)

例: グループごとの連番を振ってみる

SQL の ROW_NUMBER() OVER (PARTITION BY groupid ORDER BY date) AS rownum みたいなことをやる。
連番を最後の列に追加する。

rownum = row_number().over(Window.partitionBy('groupid').orderBy('date')).alias('rownum')
t_with_rownum = t.select('*', rownum)

最初の10行ほど出力してみる。
集約計算なので結局ストリーム全体を舐めるため、インデックスを張ったDBほどには速くはない。そりゃそうだ。

>>> t_with_rownum.limit(10).collect()
[Stage 2:>                                                         (0 + 8) / 23]
(結果が出る)

Ignite に書き込む

書き込み先テーブル名、プライマリキー、書き込みモード (overwrite, append などがあるよう) を指定して save.

t_with_rownum.write.format('ignite') \
  .option('config','config/default-config.xml') \
  .option('table','tablename') \
  .option('primaryKeyFields','groupid,rownum') \ 
  .mode('overwrite') \
  .save()

数分して、合計 3GB ほどの CSV ファイルが 9.5GB のフットプリントをもつ Ignite ノードに収まった。こんなもんだろうか。素直にどこかのクラウドサービス使っとけという気もする。
あとは Ignite 側で sqlline とかを使ってよしなにすると良いのだろう (あまり作り込まれていないようで、かなり不安だが)。

./bin/sqlline.sh -u jdbc:ignite:thin://127.0.0.1/

インデックス張ってないストリームに対して集約しているので遅いような気もするが、まあこんなもんだろうか…
Spark 側でも一旦全てのデータをインメモリに読み込んでいる気がするので、モヤモヤしますね…
Ignite がウィンドウ関数までサポートしてくれればこんなことはしなくて済むのだろうか。 そもそもターミナルから叩く使い方はあまり想定されていないようにも感じた。Spark でも SQL めいたことができるので Ignite に入れる必要がないような気もする。

(以上)

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1