1
1

More than 3 years have passed since last update.

【実装例】Cython(Python)でファイルを行単位で末尾行から読み込み

Last updated at Posted at 2021-01-02

調べると案外例があまり出てこない。ファイルサイズがメモリに対して充分小さければreadで全読み込み+reverseでもいいが、巨大なファイルにも備えておきたいのでブロック単位で末尾読みを繰り返す。
実装は都合によりcythonなので、Pythonなら主にcdef周りを消せば使えると思う。

修正履歴

  • 2021.1.11 ファイル先頭に到達した場合に、既読部分を読まないように読み込みブロック長を調節する(小さくする)よう修正
  • 2021.1.11 strで読み込むとCRLFがLFと認識されてよくわからないことになっていたのでbytes読み込みに修正。
  • 2021.1.11 withに対応。
  • 2021.1.14 空ファイルopen時のエラーを解消。

テスト結果:

Line-length\Line Feed LF CRLF
<5 o o
=5 o o
>5 o o

備考:_block_size=5としてテストした。ファイルは全てASCII文字のみで構成。

cdef class ReversedFileReader(object):
    cdef:
        object file
        int block_size, pos
        bytes newline

    def __init__(self, str _path2file, int _block_size=4096):
        self.file = None
        self.block_size, self.pos = _block_size, 0
        self.newline = None
        if _path2file != '': self.open(_path2file)

    def open(self, str _path2file):
        cdef bytes s_tmp = b''
        if self.file is not None: self.close()
        self.file, self.newline = open(_path2file, 'rb'), None
        s_tmp = self.file.readline()
        if 0 != len(s_tmp):
            if 10 == s_tmp[len(s_tmp) - 1]:
                if 1 != len(s_tmp):
                    if 13 == s_tmp[len(s_tmp) - 2]: self.newline = b'\r\n'
                    else: self.newline = b'\n'
                else: self.newline = b'\n'
            self.file.seek(self.file.seek(-len(self.newline), 2))
            if self.file.read(len(self.newline)) == self.newline: self.pos = self.file.seek(-len(self.newline), 2)
            else: self.pos = self.file.seek(0, 2)
        return

    def close(self):
        if self.file is not None: self.file.close()
        self.file = None
        return

    def readlines(self):
        cdef:
            int tmp_pos = self.pos - self.block_size
            bytes block = b''

        lines = []
        if 0 == self.pos: return lines
        if tmp_pos < 0:
            self.file.seek(0)
            block = self.file.read(self.pos)
            lines = block.split(self.newline)
            self.pos = 0
            lines.reverse()
            return lines
        while True:
            self.file.seek(tmp_pos)
            block = self.file.read(self.block_size) + block
            lines = block.split(self.newline)
            if 0 == tmp_pos: break
            elif 1 < len(lines):
                if b'' != lines[0]: tmp_pos += len(lines[0])
                lines = lines[1:]
                break
            else:
                if 0 > tmp_pos - self.block_size:
                    self.file.seek(0)
                    block = self.file.read(tmp_pos) + block
                    lines = block.split(self.newline)
                    self.pos = 0
                    lines.reverse()
                    return lines
                else: tmp_pos = max(0, tmp_pos - self.block_size)
        self.pos = tmp_pos
        lines.reverse()
        return lines

    def __enter__(self):
        return self

    def __exit__(self, _exc_type, _exc_val, _exc_tb):
        self.close()
        return

参考

1
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1