4
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

python の multiprocessing で作った複数プロセスから、同じファイルに安全に書き込む

Last updated at Posted at 2021-02-05

これは何か

  • python で multiprocessing モジュールを使って作った複数の子プロセスから、同じファイルに安全に書き込む方法を調べたのでまとめました。
  • 動作環境は Linux です。

(コメントをもらい、間違いに気づき、理解が深まってきたので加筆・修正しました)

動機

  • SSD (PCIe 4) の読み書きが、かなり早いです
    • 5 GB/s を超すものもあります
    • DDR4 メモリと比べても 1/10 程度です
  • CPU もコア数がべらぼうに増えて来ています
  • 小さなデータ (数百MB) を個別にたくさん処理して、最終的に合体し、数十GBのファイルを書き出す、という事を普段やっているのですが、マルチプロセスで個別にデータ処理をさせて、さらにファイルに直接書き出せればメモリを節約できるかもしれません

まとめ

  • Linux の場合、基本的には複数プロセスから同一ファイルを操作する事は可能
  • ただし、ストリームバッファーがブロックサイズ毎に読み書きを行うので、ブロック毎の排他制御が必要
    • ブロックサイズの最小は 1 byte (バッファー無し) です
    • 各プロセスがどれくらいの頻度で、どれくらいのサイズをどれくらい離れたアドレスに書き込むのか、に応じて、I/O パフォーマンスが最大になるようなブロックサイズを探究する必要がありそうです
  • 上記の方法で I/O パフォーマンスに満足行かない場合、メモリを使わざるをえなさそうです
    • 全データをメモリに展開して、解析が完了してから書き出す、もしくは、
    • mmap を使って部分的にメモリに読み出す
      • ファイル全てをメモリに展開しない場合、どのような頻度でメモリ<-->ディスクでやりとりさせるのが、パフォーマンスを出せるかを、調べる必要がありそうです

となりました。

以下、詳細です。

下準備

書き込みに使うファイルを生成します。

  • 4 byte 整数を 10,000 個含むバイナリファイルを生成しました
    • ファイルサイズは 40 KB です
    • 今回は 100 x 100 の 2 次元アレイとして numpy で生成しました
import numpy

d = numpy.zeros([100, 100], dtype='int32')
open('test.dat', 'wb').write(d.tobytes())
# --> 40000

失敗編 : python 組み込み関数の open を使う

まずは、組み込み関数 open を使って実装してみます。

実装

add1 関数の仕様は次のとおりです:

  • test.dat を読み書きモードで開く
  • 10,000 個のデータのうち、
    • 偶数番目 (offset=0)、もしくは、奇数番目 (offset=1) のデータを読み取り、
    • value を加えて、
    • 元の位置に戻します
def add1(value, offset):
    # python 組み込み関数 open() を使います
    f = open('test.dat', 'r+b')
    
    # 書き込む対象のアドレス一覧を生成しておきます
    target_addr = range(offset, 10000, 2) 
    
    t0 = time.time()
    while time.time() - t0 < 2:  # 2 秒間繰り返します
        for i in target_addr:
            # 対象アドレスの現在の値を取得します
            f.seek(i*4, 0)
            d = numpy.frombuffer(f.read(4), dtype='int32') 
            
            # 値を加算します
            d = d + value
            
            # 加算した値を元のアドレスに戻します
            f.seek(-4, 1)
            f.write(d.tobytes())
            continue
        continue
    return

シングルプロセスだけから書き込む

シングルプロセスで実行してみます。

# 実行します
add1(1, 0)  # 偶数番目に +1 を 2 秒間加算します

# 結果を確認します
numpy.fromfile('test.dat', dtype='int32').reshape([100, 100])
# array([[121,   0, 121, ...,   0, 121,   0],
#       [121,   0, 121, ...,   0, 121,   0],
#       [121,   0, 121, ...,   0, 121,   0],
#       ...,
#       [121,   0, 121, ...,   0, 121,   0],
#       [121,   0, 121, ...,   0, 121,   0],
#       [121,   0, 121, ...,   0, 121,   0]], dtype=int32)

結果

  • 意図した通り、偶数番目のアドレスにだけ加算されています。
  • 2 秒間で 121 回加算されたようです。

マルチプロセスで同時に書き込む

次に、マルチプロセス化します。

# 偶数番目は +1 します
p1 = multiprocessing.Process(target=add1, args=(1, 0))
p1.start()

# 奇数番目は -1 します
p2 = multiprocessing.Process(target=add1, args=(-1, 1))
p2.start()

# 待ちます
[p.join() for p in [p1, p2]]

# 結果を確認します
numpy.fromfile('test.dat', dtype='int32').reshape([100, 100])
# array([[  96, -116,   79, ..., -116,   79, -116],
#       [  79, -116,   79, ..., -116,   79, -116],
#       [  79, -116,   79, ..., -116,   79, -116],
#       ...,
#       [  90, -115,   90, ..., -115,   90, -115],
#       [  90, -115,   90, ..., -115,   90, -115],
#       [  90, -115,   90, ..., -115,   96, -124]], dtype=int32)

結果

  • 全体的には、偶数番目は加算され、奇数番目は減算されています
  • しかし、数字にばらつきがあり、意図した通りに実行できていません
  • python 組み込み関数の open() は、内部で、オープンファイルテーブルのエントリを共有しているのだと推察されます。そのため、f.seek() と f.read() や f.write() の間で処理が割り込まれて競合しているのでしょう。意図した通りに動かすためには、オープンファイルテーブルのエントリを分けるか、seek() から read() までをアトミックに処理させないといけません。
  • python 組み込み関数の open() は、デフォルトでバッファーが有効で、ブロック毎に読み書きが行われます。
    • 1 つ目のプロセスでブロックを読み込んだ後で、ブロックの中の一部を変更し、変更を含むブロック (大半は読み出したときのままの状態) を書き込みます。
    • 2 つ目のプロセスが、その書き込み前にブロックを読んだ場合、1 つ目のプロセスの変更箇所が、上書きされて消えてしまいます
    • バッファーサイズのデフォルトは、os.stat() で取得される st_blksize です
  • ですので、バッファーサイズを 0 にするか、ストリームを使わずに直接 read(), write() のシステムコールを呼ぶ必要があります

解決編 1 : open() をバッファー無しで使う

  • python 組み込み関数 open() の buffering 引数に 0 を与えると、ストリームバッファー無しでストリームを扱えます

実装

def add2(value, offset):
    # open() に buffering=0 を指定します
    f = open('test.dat', 'r+b', buffering=0)
    
    # 書き込む対象のアドレス一覧を生成しておきます
    target_addr = range(offset, 10000, 2) 

    t0 = time.time()
    while time.time() - t0 < 2: # 2 秒間繰り返します
        for i in target_addr:
            # 対象アドレスの現在の値を取得します
            f.seek(i*4, 0)
            d = numpy.frombuffer(f.read(4), dtype='int32')

            # 値を加算します
            d = d + value

            # 加算した値を元のアドレスに戻します
            f.seek(-4, 1)
            f.write(d.tobytes())
            continue
        continue
    return

結果

# データを初期化します
d = numpy.zeros([100, 100], dtype='int32')
open('test.dat', 'wb').write(d.tobytes())

# マルチプロセスで実行します
p3 = multiprocessing.Process(target=add2, args=(-1, 0))
p3.start()

p4 = multiprocessing.Process(target=add2, args=(1, 1))
p4.start()

# 待ちます
[p.join() for p in [p3, p4]]

# 結果を確認します
numpy.fromfile('test.dat', dtype='int32').reshape([100, 100])
# array([[-46,  46, -46, ...,  46, -46,  46],
#        [-46,  46, -46, ...,  46, -46,  46],
#        [-46,  46, -46, ...,  46, -46,  46],
#        ...,
#        [-46,  46, -46, ...,  46, -46,  46],
#        [-46,  46, -46, ...,  46, -46,  46],
#        [-46,  46, -46, ...,  46, -46,  46]], dtype=int32)
  • 数字が揃っています。意図した通りに動きました
  • しかし、実行速度は、シングルスレッドでバッファーありで実行した時より遅そうです。2 秒間に 46 回しか実行できませんでした。

解決編 2 : os.open() を使う

  • ストリームを使わなければ、ストリームバッファーもありません
  • システムコールの open(), read(), write() を直接呼んでみましょう
  • python では、os モジュールで、システムコールを直接呼べるようです

実装

def add3(value, offset):
    # os.open() を使って Linux システムコールを直接実行します
    fd = os.open('test.dat', os.O_RDWR)
    
    # 書き込む対象のアドレス一覧を生成しておきます
    target_addr = range(offset, 10000, 2) 
    
    t0 = time.time()
    while time.time() - t0 < 2: # 2 秒間繰り返します
        for i in target_addr:
            # 対象アドレスの現在の値を取得します
            os.lseek(fd, i*4, 0)
            d = numpy.frombuffer(os.read(fd, 4), dtype='int32')
            
            # 値を加算します
            d = d + value
            
            # 加算した値を元のアドレスに戻します
            os.lseek(fd, -4, os.SEEK_CUR)
            os.write(fd, d.tobytes())
            continue
        continue
    return

結果

# データを初期化します
d = numpy.zeros([100, 100], dtype='int32')
open('test.dat', 'wb').write(d.tobytes())

# マルチプロセスで実行します
p5 = multiprocessing.Process(target=add2, args=(1, 0)) # 偶数アドレスは加算
p5.start()

p6 = multiprocessing.Process(target=add2, args=(-1, 1)) # 奇数アドレスは減算
p6.start()

# 待ちます
[p.join() for p in [p5, p6]]

# 結果を確認します
numpy.fromfile('test.dat', dtype='int32').reshape([100, 100])
# array([[ 46, -46,  46, ..., -46,  46, -46],
#        [ 46, -46,  46, ..., -46,  46, -46],
#        [ 46, -46,  46, ..., -46,  46, -46],
#        ...,
#        [ 46, -46,  46, ..., -46,  46, -46],
#        [ 46, -46,  46, ..., -46,  46, -46],
#        [ 46, -46,  46, ..., -46,  46, -46]], dtype=int32)
  • 数字が揃っています。こちらも、意図した通りに動きました。
  • 実行速度は、バッファー無しで開いた組み込み open() と同程度です

解決編 3 : os.pread(), os.pwrite() を使う

  • Linux では、lseek と read/write をアトミックに実行する pread/pwrite システムコールが用意されています。
  • こちらの方が、lseek が不要な分、システムコールが少なく、早くなるはずです
  • python からは、os.pread(), os.pwrite() で利用可能です

実装

def add4(value, offset):
    # os.open() を使って Linux システムコールを直接実行します
    fd = os.open('test.dat', os.O_RDWR)
    
    # 書き込む対象のアドレス一覧を生成しておきます
    target_addr = range(offset, 10000, 2) 
    
    t0 = time.time()
    while time.time() - t0 < 2: # 2 秒間繰り返します
        for i in target_addr:
            # 対象アドレスの現在の値を取得します
            d = numpy.frombuffer(os.pread(fd, 4, i*4), dtype='int32')
            
            # 値を加算します
            d = d + value
            
            # 加算した値を元のアドレスに戻します
            os.pwrite(fd, d.tobytes(), i*4)
            continue
        continue
    return

結果

# データを初期化します
d = numpy.zeros([100, 100], dtype='int32')
open('test.dat', 'wb').write(d.tobytes())

# マルチプロセスで実行します
p7 = multiprocessing.Process(target=add3, args=(1, 0)) # 偶数アドレスは加算
p7.start()

p8 = multiprocessing.Process(target=add3, args=(-1, 1)) # 奇数アドレスは減算
p8.start()

# 待ちます
[p.join() for p in [p7, p8]]

# 結果を確認します
numpy.fromfile('test.dat', dtype='int32').reshape([100, 100])
# array([[ 62, -62,  62, ..., -62,  62, -62],
#        [ 62, -62,  62, ..., -62,  62, -62],
#        [ 62, -62,  62, ..., -62,  62, -62],
#        ...,
#        [ 62, -62,  62, ..., -62,  62, -62],
#        [ 62, -62,  62, ..., -62,  62, -62],
#        [ 62, -62,  62, ..., -62,  62, -62]], dtype=int32)
  • こちらも意図した通りに動きました。
  • 2 秒間に 62 回実行できており、lseek() + read()/write() よりは早いですが、流石にバッファーありのストリームには勝てていません。

解決編 4 : mmap を使う

  • mmap システムコールを使うと、ファイルをメモリ上にマッピングします
    • プロセスはメモリを操作するだけです。後ろでカーネルがメモリとファイルの同期を行ってくれます
    • さらに、MAP_SHARED フラグを立てれば、プロセスをまたいで、同じマッピングされたメモリを参照できます
  • メモリ上で操作できるので早いはずです (メモリは消費してしまいますが...)
  • python からは、mmap モジュールを使って利用できます

実装

import mmap

def add4(value, offset):
    # os.open() を使って開きます
    fd = os.open('test.dat', os.O_RDWR)
    
    # mmap にして、メモリに読み込みます
    mm = mmap.mmap(fd, 0)  # デフォルトでは、flags=mmap.MAP_SHARED となり、
                           # プロセス間でメモリマッピングが共有されます

    # 書き込む対象のアドレス一覧を生成しておきます
    target_addr = range(offset, 10000, 2) 

    t0 = time.time()
    while time.time() - t0 < 2: # 2 秒間繰り返します
        for i in target_addr:
            # 対象アドレスの現在の値を取得します
            mm.seek(i*4, os.SEEK_SET)
            d = numpy.frombuffer(mm.read(4), dtype='int32')

            # 値を加算します
            d = d + value

            # 加算した値を元のアドレスに戻します
            mm.seek(-4, os.SEEK_CUR)
            mm.write(d.tobytes())
            continue
        continue
    return

結果

# データを初期化します
d = numpy.zeros([100, 100], dtype='int32')
open('test.dat', 'wb').write(d.tobytes())

# マルチプロセスで実行します
p9 = multiprocessing.Process(target=add4, args=(-1, 0))
p9.start()

p10 = multiprocessing.Process(target=add4, args=(1, 1))
p10.start()

# 待ちます
[p.join() for p in [p9, p10]]

# 結果を確認します
numpy.fromfile('test.dat', dtype='int32').reshape([100, 100])
# array([[ 108, -103,  108, ..., -103,  108, -103],
#        [ 108, -103,  108, ..., -103,  108, -103],
#        [ 108, -103,  108, ..., -103,  108, -103],
#        ...,
#        [ 108, -103,  108, ..., -103,  108, -103],
#        [ 108, -103,  108, ..., -103,  108, -103],
#        [ 108, -103,  108, ..., -103,  108, -103]], dtype=int32)
  • こちらも意図した通りに動きました。
  • 2 秒間に 100 回ほど実行できており、シングルスレッドのバッファーありストリームに迫る勢いです。

終わりに

  • 処理したいデータの構造、処理の内容、使える計算機のリソースを考慮して、最適解を探る必要がありそうです
4
7
5

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?