調べると案外例があまり出てこない。ファイルサイズがメモリに対して充分小さければreadで全読み込み+reverseでもいいが、巨大なファイルにも備えておきたいのでブロック単位で末尾読みを繰り返す。
実装は都合によりcythonなので、Pythonなら主にcdef周りを消せば使えると思う。
修正履歴
- 2021.1.11 ファイル先頭に到達した場合に、既読部分を読まないように読み込みブロック長を調節する(小さくする)よう修正
- 2021.1.11 strで読み込むとCRLFがLFと認識されてよくわからないことになっていたのでbytes読み込みに修正。
- 2021.1.11 withに対応。
- 2021.1.14 空ファイルopen時のエラーを解消。
テスト結果:
Line-length\Line Feed | LF | CRLF |
---|---|---|
<5 | o | o |
=5 | o | o |
>5 | o | o |
備考:_block_size=5としてテストした。ファイルは全てASCII文字のみで構成。
cdef class ReversedFileReader(object):
cdef:
object file
int block_size, pos
bytes newline
def __init__(self, str _path2file, int _block_size=4096):
self.file = None
self.block_size, self.pos = _block_size, 0
self.newline = None
if _path2file != '': self.open(_path2file)
def open(self, str _path2file):
cdef bytes s_tmp = b''
if self.file is not None: self.close()
self.file, self.newline = open(_path2file, 'rb'), None
s_tmp = self.file.readline()
if 0 != len(s_tmp):
if 10 == s_tmp[len(s_tmp) - 1]:
if 1 != len(s_tmp):
if 13 == s_tmp[len(s_tmp) - 2]: self.newline = b'\r\n'
else: self.newline = b'\n'
else: self.newline = b'\n'
self.file.seek(self.file.seek(-len(self.newline), 2))
if self.file.read(len(self.newline)) == self.newline: self.pos = self.file.seek(-len(self.newline), 2)
else: self.pos = self.file.seek(0, 2)
return
def close(self):
if self.file is not None: self.file.close()
self.file = None
return
def readlines(self):
cdef:
int tmp_pos = self.pos - self.block_size
bytes block = b''
lines = []
if 0 == self.pos: return lines
if tmp_pos < 0:
self.file.seek(0)
block = self.file.read(self.pos)
lines = block.split(self.newline)
self.pos = 0
lines.reverse()
return lines
while True:
self.file.seek(tmp_pos)
block = self.file.read(self.block_size) + block
lines = block.split(self.newline)
if 0 == tmp_pos: break
elif 1 < len(lines):
if b'' != lines[0]: tmp_pos += len(lines[0])
lines = lines[1:]
break
else:
if 0 > tmp_pos - self.block_size:
self.file.seek(0)
block = self.file.read(tmp_pos) + block
lines = block.split(self.newline)
self.pos = 0
lines.reverse()
return lines
else: tmp_pos = max(0, tmp_pos - self.block_size)
self.pos = tmp_pos
lines.reverse()
return lines
def __enter__(self):
return self
def __exit__(self, _exc_type, _exc_val, _exc_tb):
self.close()
return