はじめに
大きなファイルを処理する時、全データを一度メモリ上に持つような実装は、Pythonがとてつもなく遅くなるので避ける必要があります。
Python(を含むインタプリタ型言語)は、内部で参照しているメモリを管理していて、不要になったメモリを自動的に開放する処理(Garbage Collection; GC)をしています。GCには各種の方法がありますが、もっとも典型的な方法には「ある変数から参照されている変数は利用中である」という規則で、全ての利用中の変数(メモリ)を走査するという方式(mark-and-sweep)です。
GC処理は、Pythonの実行中に定期的に呼び出されます。また、Pythonが管理している変数(メモリ)が増えると、GC処理にかかる時間も増えます(mark-and-sweep方式の場合、変数の数Nに対して線形に増加します)。そのため、計算サーバに128GBのメモリが搭載されているから大丈夫だろう、と安易に128GBのファイルをPythonで読み込むと、実行時間の大半はGCが呼び出されているだけで、本当に行いたい処理はまったく進んでいない、ということが頻繁に起きます。
一括処理 vs 逐次処理
大きなファイルを処理する時、本当に全てのデータを同時にメモリ上に保持する必要があるでしょうか?以下は、CSVファイルを全てメモリ上に読み込んでから処理しているコードの例です。
filename = "data.csv"
buf = []
with open(filename, "r", encoding="utf-8") as f:
for line in f:
buf.append(line.rstrip().split(","))
for x in buf:
# なにかxに対する処理
このような方式を、一括処理 と呼ぶことにしましょう。
しかし、通常の処理では、前後の行が必要ではなく、処理対象の行だけが必要なことが多いです。その場合、上のコードは以下のように書き換えることができます。
filename = "data.csv"
with open(filename, "r", encoding="utf-8") as f:
for line in f:
x = line.rstrip().split(",")
# なにかxに対する処理
このように、データを1行読み込んだら、その1行に対する処理を行う方式のことを、逐次処理 と呼ぶことにしましょう。
逐次処理は、一括処理に比べて、メモリ効率に優れています。しかし、単純に逐次処理のコードを書くと、forループのネストが深くなり、読込み処理や書込み処理のコードが散らばって配置されて、読みづらく保守性が低いコードになってしまいます。
そのため、保守性が高い逐次処理を書くノウハウが必要になります。保守性が高い逐次処理を書くためには、ジェネレータ(generator) という概念を身につけることが重要です。
ジェネレータ
ジェネレータは、実はPythonでは非常に頻繁に利用されています。例えば、range(3)は、0,1,2という数列を順番に返すジェネレータです。
for i in range(3):
print(i)
ただ、自分でジェネレータを定義する、という使い方は少ないかもしれません。
まず、上記のCSVファイルを読み込む例を、一括処理方式の関数で実現してみましょう。
def read_csv_batch(filename):
buf = []
with open(filename, "r", encoding="utf-8") as f:
for x in f:
buf.append(x.rstrip().split(","))
return buf
for x in read_csv_batch(filename):
# なにかxに対する処理
これを、逐次処理を行うジェネレータとして書き直すと、以下のようになります。
def read_csv(filename):
with open(filename, "r", encoding="utf-8") as f:
for x in f:
yield x.rstrip().split(",")
for x in read_csv(filename):
# なにかxに対する処理
ポイントは、
-
return文の代わりにyield文を使っていること、および - データをバッファに貯める代わりに逐次的に返していること
の2点です。返り値を返すためにyield文を使った関数は、Pythonによって自動的にジェネレータとして処理されます。
先頭の要素を読み飛ばしたい場合
CSVファイルでは、先頭1行がヘッダで読み飛ばしたい場合がよくあります。その場合、以下のようにenumerate()を使って、処理中の行番号を使うというコードをよく見かけます(ちなみに、range()と同じくenumerate()もジェネレータです)。
for i,x in enumerate(read_csv(filename)):
if i == 0:
continue
# なにかxに対する処理
next()関数を使うと、ジェネレータから値を1つだけ取り出すことができるので、次のように書くとヘッダを1行だけ読み飛ばすことができます。
generator = read_csv(filename)
_ = next(generator)
for x in generator:
# なにかxに対する処理
ジェネレータが返す値を全て一括して受け取りたい場合はlist()関数を使う必要があります。
buf = list(read_csv(filename))
次の要素を先読みしたい場合
次の行を先読みしながら処理したい場合があります。その場合、以下のように2つの変数を使う書き方を良く見かけます。
generator = read_csv(filename)
cur = next(generator)
for post in generator:
# post を参照しながら、cur に対する処理をする
cur = post
# cur に最後の要素が入っているはずなので、最後の要素に対する処理をする
しかし、この書き方は、
- for ループの末尾で、変数
curを更新すること、および - forループを抜けた後で最後の要素の処理を行うこと
の2点を忘れないようにしなければならない、という少しバグを作り込みやすい書き方になっています。例えば、処理の途中でcontinueを使う場合、必ず変数curを更新する必要がありますが、うっかり忘れてしまうことがあります。
そのため、先読みを隠蔽するジェネレータを用意するという書き方があります。
from itertools import pairwise
def sentinel(iterable):
for x in iterable:
yield x
yield None
generator = read_csv(filename)
for cur,post in pairwise(sentinel(generator)):
# post を参照しながら、cur に対する処理をする
sentinel()は、ジェネレータを引数として取り、各要素を順番に返して、最後に 番兵 としてNoneを付け加えて返すジェネレータです。
この番兵ジェネレータと、pairwise ジェネレータを組み合わせます。pairwise ジェネレータは、ジェネレータを引数として取り、各要素と次の要素からなるタプルを順に返すジェネレータです。
したがって、この2つを組み合わせると、各要素と次の要素からなるタプルを順に返す、ただし、最後の要素については最後の要素と番兵からなるタプルを返すジェネレータになっています。
JSON形式の場合
JSONファイルを読み込む時は、逐次的にJSONファイルを解析してくれるライブラリ ijson を使って、以下のように書くことができます。
import ijson
def read_json(infile):
with open(infile, "r", encoding="utf-8") as f:
for item in ijson.items(f, 'item'):
#itemに関する処理を行う
yield item
先ほど説明した通り、返り値を返すためにyieldを使っていますから、この関数はジェネレータです。そのため、以下のように使うことができます。
for x in read_json(filename):
print(x)
しかし、このままでは出力はJSON形式になりません。出力をJSON形式にしたい場合は、上述の番兵ジェネレータを使って、以下のように書くことができます。
import json
from itertools import pairwise
def sentinel(iterable):
for x in iterable:
yield x
yield None
def write_json(outfile, generator):
with open(outfile, "w", encoding="utf-8") as f:
print("[", file=f)
for cur,post in pairwise(sentinel(generator)):
text = json.dumps(cur, ensure_ascii=False, indent=2)
if post:
text += ","
for line in text.split("\n"):
print(" " + line, file=f)
print("]", file=f)
write_json(outfile, read_json(infile))