Linuxではファイルの後ろからn
行取得することのできるtail
というコマンドがある. 結構便利なのでPythonでも同じことができるようにしたい.
tail(file_name, n)
でファイルの後ろからn行取得する関数を, いくつかのアプローチで作っていきたいと思う.
最後のアプローチに関してはit-swarm.devというサイトのテキストファイルの最後の行を効率的に見つけるというページを参考にしている.
使用するファイル
読み込むファイルはテキストファイルでもなんでも良かったのだが, 今回はcsv
ファイルを使う.
ファイル名はtest.csv
. 内容は, ビットコインの価格を一秒ずつ86400行(一日分)まとめたもの.
date,price,size
1588258800,933239.0,3.91528007
1588258801,933103.0,3.91169431
1588258802,932838.0,2.91
1588258803,933217.0,0.5089811
(中略)
1588345195,955028.0,0.0
1588345196,954959.0,0.05553
1588345197,954984.0,1.85356
1588345198,955389.0,10.91445135
1588345199,955224.0,3.61106
本題とは関係ないが, それぞれの項目を一応説明するとdate, price, sizeの単位は, UnixTime, YEN, BTC.
最初の行は, 時刻1588258800
, つまり5月1日0時0分0秒に933239.0
円で3.91528007
枚のビットコインの売買があったという意味である.
素直に先頭から読む
まずは組み込み関数open()
を使ってファイルオブジェクトを取得し, 先頭からすべての行を読んで最後のn行だけ出力する方法.
nが0や負の整数だとおかしな結果になるので, 本当は自然数のみに限定する処理を行う必要があるが, 見やすさ重視ということで.
def tail(fn, n):
# ファイルを開いてすべての行をリストで取得する
with open(fn, 'r') as f:
# 一行読む. 一行目はヘッダーだから結果は捨てる
f.readline()
# 全行読む
lines = f.readlines()
# 後ろからn行だけ返す
return lines[-n:]
# 結果
file_name = 'test.csv'
tail(file_name, 3)
# ['1588345197,954984.0,1.85356\n',
# '1588345198,955389.0,10.91445135\n',
# '1588345199,955224.0,3.61106\n']
テキストファイルであればこのままでも良いが, csvファイル用にもう少し使いやすくする.
def tail(fn, n):
# ファイルを開いてすべての行をリストで取得する
with open(fn, 'r') as f:
f.readline()
lines = f.readlines()
# 文字列を配列にしてから返す. ついでにstr->floatに型変換する
return [list(map(float ,line.strip().split(','))) for line in lines[-n:]]
# 結果
tail(file_name, 3)
# [[1588345197.0, 954984.0, 1.85356],
# [1588345198.0, 955389.0, 10.91445135],
# [1588345199.0, 955224.0, 3.61106]]
変わったのはreturn
の行だけだが, 関数が混みあっていてわかりづらいので, 噛み砕いて説明する.
それぞれの行に関して以下の処理を行っている.
-
strip()
で改行コードを削除
'1588345197,954984.0,1.85356\n'
->'1588345197,954984.0,1.85356'
-
split()
で文字列をカンマ区切りで配列に変換
'1588345197,954984.0,1.85356'
->['1588345197', '954984.0', '1.85356']
-
map()
で配列のそれぞれの要素を文字列からfloat型に変換
['1588345197', '954984.0', '1.85356']
->[1588345197.0, 954984.0, 1.85356]
csvモジュールを使う
csvモジュールは行ごとに自動で配列に変換してくれるので, 若干処理が遅くはなるが, より簡潔に記述できる.
import csv
def tail_csv(fn, n):
with open(fn) as f:
# ファイルオブジェクトをcsvリーダーに変換
reader = csv.reader(f)
# ヘッダーを捨てる
next(reader)
# 全行読む
rows = [row for row in reader]
# 最後のn行だけfloatにして返す
return [list(map(float, row)) for row in rows[-n:]]
pandasモジュールを使う
pandasにはtail関数があるので驚くほど簡単に記述できる.
import pandas as pd
def tail_pd(fn, n):
df = pd.read_csv(fn)
return df.tail(n).values.tolist()
pandasはnumpy配列を扱っているので, tolist()
で最後にリストに変換している. numpy配列のままで良いなら必要はない.
それぞれのパターンで実行時間を計測
ipython
にはtimeit
という便利なコマンドがあるので, ループ回数を100として比較してみる.
timeit -n100 tail('test.csv', 3)
18.8 ms ± 175 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
timeit -n100 tail_csv('test.csv', 3)
67 ms ± 822 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
timeit -n100 tail_pd('test.csv', 3)
30.4 ms ± 2.45 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
特に何のモジュールも使わずそのまま読むのが早いことがわかった.
pandasはコードの簡潔さとそこそこのスピードなのでコスパは一番良さそう.
csvモジュールは使わない行までわざわざ文字列から配列に変換しているから, そのせいで成績は断トツで悪くなっている.
ファイルを後ろから読めば一瞬
ここまでのアプローチは結局どれもすべての行を読み込んでいる. しかし, 欲しいのは後ろの数行なのだから, 後ろからファイルを読む方法があれば一瞬で読み込みが完了するはずだ.
テキストファイルの最後の行を効率的に見つけるというページを参考にした.
後ろから100バイトくらいずつ順に読んでいき, 改行コードが見つかればそれ以降の文字列が最後の行である. ページの中では最終行のみを見つけているが, tail
コマンドを実現するには後ろからn
行見つける必要があるので, そこだけ調整する.
まず予備知識として, ファイルポインタの操作方法について説明する.
使う関数はf.tell()
, f.read(size)
, f.seek(offset, whence)
の3つ.
f.tell()
は現在ポインタが指す位置を返す.
f.read(size)
は現在の位置からsize
バイト読んだ内容を返す. ポインタは読んだ位置まで移動する. 正の方向にしか進めない.
f.seek(offset, whence)
はポインタの位置を移動させる関数である.
引数のwhence
は位置を表す. 0, 1, 2
のいずれかの値が入る. 0
はファイルの先頭, 1
は現在のポインタの位置, 2
はファイルの末尾を意味する.
offset
には整数を入力する. read
と異なり負の値も渡せるので, 例えばf.seek(-15, 1)
は現在のポインタの位置を15個先頭側に戻す.
これらを踏まえて実装していく.
# 正規表現が使えるsplitを使う
import re
def tail_b(fn, n=None):
# nを与えないときは最後の行だけ単体で返す
if n is None:
n = 1
is_list = False
# nは自然数
elif type(n) != int or n < 1:
raise ValueError('n has to be a positive integer')
# nを与えたときはn行をリストにまとめて返す
else:
is_list = True
# 128 * n バイトずつ読む
chunk_size = 64 * n
# seek()はバイナリモード以外だと予期せぬ挙動を見せるので'rb'を指定する
with open(fn, 'rb') as f:
# ヘッダーを除いた左端の位置を探すために最初の一行(ヘッダーの行)を読む
f.readline()
# 一番最初の改行コードを左端(ファイルの末尾から読んでいったときの終端)とする
# -1は'\n'の1バイト分
left_end = f.tell() - 1
# ファイルの末尾(2)から1バイト戻る. read(1)で読むため
f.seek(-1, 2)
# ファイル末尾には空行や空白などがあることも多いから
# それらを除いたファイルの最後の文字の位置(右端)を探す
while True:
if f.read(1).strip() != b'':
# 右端
right_end = f.tell()
break
# 1歩進んだから2歩下がる
f.seek(-2, 1)
# 左端までのまだ読んでいない残りのバイト数
unread = right_end - left_end
# 読んだ行数. これがn以上になればn行読み取れたことになる
num_lines = 0
# 読んだバイト列をつなげていくための変数
line = b''
while True:
# 未読のバイト数がchunk_sizeより小さくなったら, 端数をchunk_sizeとする
if unread < chunk_size:
chunk_size = f.tell() - left_end
# 現在地からchunk_sizeだけファイルの先頭側に移動する
f.seek(-chunk_size, 1)
# 移動した分だけ読む
chunk = f.read(chunk_size)
# つなげる
line = chunk + line
# readでまた進んでしまったのでまた先頭側にchunk_size移動する
f.seek(-chunk_size, 1)
# 未読バイト数を更新する
unread -= chunk_size
# 改行コードが含まれるなら
if b'\n' in chunk:
# 改行コードの数だけnum_linesをカウントアップ
num_lines += chunk.count(b'\n')
# 読んだ行数がn行以上, もしくは未読のバイト数が0になったら終了の合図
if num_lines >= n or not unread:
# 最後に見つけた改行コード
leftmost_blank = re.search(rb'\r?\n', line)
# 最後に見つけた改行コードより前の部分は不要
line = line[leftmost_blank.end():]
# バイト列を文字列に変換
line = line.decode()
# 改行コード'\r\n' または\n'で区切って配列に変換する
lines = re.split(r'\r?\n', line)
# 最後に後ろからn個取り出し, float型に変換して返す
result = [list(map(float, line.split(','))) for line in lines[-n:]]
# nを指定しなかったときは最後の一行を単体で返す
if not is_list:
return result[-1]
else:
return result
解説は注釈で行っている.
それではメインの時間測定を行う.
timeit -n100 tail_b(fn, 3)
87.8 µs ± 3.74 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
これまでのベストタイムは最初のアプローチで, 18.8 ms ± 175 µs
だった. 実行時間は0.5%
ほどになったということだ. つまり200
倍であるが, 86400行を最初から全部読むか後ろから数行読むかの違いなのだから大差がつくのは当然である.
おわりに
4つのパターンを紹介したが, 他にもsubprocess
モジュールを使ってシステムのtail
コマンドを実行するという方法もあるようだ. 環境に依存する方法であるため, 今回は省いた.
紹介した中での一番のオススメは, やはりpandas
を使った2行で書ける方法だ. Pythonとは, 他人のコードを利用して自分がいかに楽できるかを極める言語である.
ファイルの後ろから読んでいく方法に関しては, 早さが必要な場合や行数や文字数がとんでもなく多くて先頭からファイルを読んでいては時間がかかりすぎる場合などに使うと良いだろう.
また, chunk_size
を決めるのに64
を使ったのは特に意味はない. ファイルの一行の長さくらいに設定するのが一番早いだろうが, 行によって長さが大きく異なるファイルもあるため, 何とも言えない.
短い行は数文字だが, 長い行は1万文字といったようなファイルを扱うならば, chunk_sizeを動的に変更する必要があるだろう.
例えば一度の探索で見つかった行数がnに遠く及ばないときは次のchunk_sizeを2倍2倍と増やしていくなどである.
探索の終わっている行数や行の平均長から次のchunk_sizeを決定する方法なども有効だと思われる.