こういうデータ読みたい時ってありませんか?
aaaa
bbbb
ccc
ddd
eee
fff
ggg
Sparkでテキストデータを扱うといえば、1行1レコードが基本なわけですが、今回は空行で区切られた塊を1レコードにしたいです、と。
今回だったら「aaaa\nbbbb\nccc」と「ddd\neee\nfff」と「ggg」の3レコードにしたいってわけですね。
場合によっては「----」みたいな特定の行で区切られているということもあると思います。
newAPIHadoopFileを使ってみよう
テキストデータを読み込むといえば、普通はSparkContext.textFileとかDataFrameReaderを使うと思います。これだと行単位の読み込みになりますが、SparkContext.newAPIHadoopFileというのを使うと、改行に限らず好きな文字列を区切りに指定できるようです。
例えば、こんな感じで使えます。sc
はSparkContextです。
(以下、Pythonの例です。Scala使いの方は若干書き方が違います。参考ページのフォーラムをご覧ください)
>>> rdd = sc.newAPIHadoopFile("/foo/bar/input.txt", "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text")
>>> rdd.take(10)
[(0, u'aaaa'), (6, u'bbbb'), (12, u'ccc'), (17, u''), (19, u'ddd'), (24, u'eee'), (29, u'fff'), (34, u''), (36, u'ggg'), (41, u'')]
この時点では単に行単位で読んでいるだけで、何のありがたみもない?のですが、オプション引数を付けると区切り文字列を変更できます。
>>> rdd = sc.newAPIHadoopFile("/foo/bar/input.txt", "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf={"textinputformat.record.delimiter": "\n\n"})
>>> rdd.take(3)
[(0, u'aaaa\nbbbb\nccc'), (15, u'ddd\neee\nfff'), (28, u'ggg')]
ここでRDDがKey-Valueのペアになっていますが、Keyはそのレコードの開始位置(バイト数)を表すようです。多くの場合はValueだけを使うことになると思うので
>>> value_rdd = rdd.values()
>>> value_rdd.take(3)
[u'aaaa\nbbbb\nccc', u'ddd\neee\nfff', u'ggg']
こんな感じでValueだけのRDDを作るか、もしくは
>>> df = rdd.toDF("key: long, value: string")
>>> df.select("value").show()
+-------------+
| value|
+-------------+
|aaaa
bbbb
ccc|
| ddd
eee
fff|
| ggg|
+-------------+
こんな感じでDataFrameに変換してからvalueだけを使って処理すると良いと思います。本命は後者ですかね。
(改行が入るとshowの結果が崩れるようですが、まあいいでしょう)
DataFrameを使うとすれば、こんな感じで塊ごとに行のリストを作るとかもできます。
>>> df.select(pyspark.sql.functions.split("value", "\n").alias("values")).withColumn("numvalues", pyspark.sql.functions.size("values")).show()
+-----------------+---------+
| values|numvalues|
+-----------------+---------+
|[aaaa, bbbb, ccc]| 3|
| [ddd, eee, fff]| 3|
| [ggg]| 1|
+-----------------+---------+
注意
"textinputformat.record.delimiter": "\n\n"
のところに区切り文字を入れますが、入力ファイルの改行コードを意識しないといけない場合があります。
例えば今回の例だと、入力ファイルの改行はLF(\n)である必要があります。CRLFのファイルが入力される場合は "\r\n\r\n"
にしないといけません。
謎の?クラス名指定について
内部ではHadoopのTextInputFormatクラスが入力フォーマット(つまり上のKey-Valueの中身)を定義しているのですが、そこでtextinputformat.record.delimiter
の設定が読み込まれています。
その次の org.apache.hadoop.io.LongWritable
とorg.apache.hadoop.io.Text
がKey, Valueの型を定義しているのですが、TextInputFormat自体の成り立ちが
java.lang.Object
└ org.apache.hadoop.mapreduce.InputFormat<K,V>
└ org.apache.hadoop.mapreduce.lib.input.FileInputFormat<LongWritable,Text>
└ org.apache.hadoop.mapreduce.lib.input.TextInputFormat
であるため、自動的にこの2つを指定することになるはずです。
(ただ、両方ともorg.apache.hadoop.io.LongWritable
にしても動きましたが)
TextInputFormat以外のクラスを指定すればいろいろできるはずですが、せっかくSparkを使っているので、まずは素直にレコードを読み込んで、その後でDataFrameベースで加工するのが吉でしょう。
そういえばNew APIって何よ
newAPIHadoopFileという名前にあるように「new API」と「old API」があるみたいですが、「old」に相当するところのhadoopFileって何が違うんでしょうね?
そう思って、上の関数名を単純にhadoopFileに置き換えて呼び出してみたところ
>>> rdd = sc.hadoopFile("/foo/bar/input.txt", "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf={"textinputformat.record.delimiter": "\n\n"})
Traceback (most recent call last):
:
:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.hadoopFile.
: java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.TextInputFormat cannot be cast to org.apache.hadoop.mapred.InputFormat
ということなので、org.apache.hadoop.mapred
パッケージがベースになっているようです。
この違いをまとめたスライドなどもありますが、Sparkから(特にPythonから)使うんだったらおとなしくnewAPIHadoopFileでいいんじゃないでしょうかね。