6
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Pythonの公式multiprocessingを復習したらいつの間にかクラスタ書かされてた

Posted at

pythonは遅いので

遅いpythonを速くする方法にマルチプロセスがあります。
僕はPool().mapをよく使っているのですが、これはイテレータしか並列化出来ません。
もうちょっと柔軟性が欲しかったのでモジュールのドキュメントを復習してみました。

ありのまま起こった事を話すぜ!

         ,. -‐'''''""¨¨¨ヽ
         (.___,,,... -ァァフ|          
          |i i|    }! }} //|     『おれはpythonの公式のmultiprocessingを
         |l、{   j} /,,ィ//|       復習していると思ったらいつのまにか
        i|:!ヾ、_ノ/ u {:}//ヘ         コンピュータ・クラスタを書いていた』
        |リ u' }  ,ノ _,!V,ハ |
       /´fト、_{ル{,ィ'eラ , タ人      な… 何を言ってるのか わからねーと思うが
     /'   ヾ|宀| {´,)⌒`/ |<ヽトiゝ       おれも何をされたのかわからなかった…
    ,゙  / )ヽ iLレ  u' | | ヾlトハ〉
     |/_/  ハ !ニ⊇ '/:}  V:::::ヽ        頭がどうにかなりそうだった…
    // 二二二7'T'' /u' __ /:::::::/`ヽ
   /'´r -―一ァ‐゙T´ '"´ /::::/-‐  \     非同期処理だとかマルチスレッドだとか
   / //   广¨´  /'   /:::::/´ ̄`ヽ ⌒ヽ  そんなチャチなもんじゃあ 断じてねえ
  ノ ' /  ノ:::::`ー-、___/::::://       ヽ  }
_/`丶 /:::::::::::::::::::::::::: ̄`ー-{:::...       イ  もっと恐ろしいものの片鱗を味わったぜ…

一番簡単なPoolの復習

一台のコンピュータを使う場合、大抵はこれで片付きます。
ただし、イテレータとして処理できる物に限る。lambda式は動かないです。

結論から言うと、map_asyncかstarmap_asyncをwith文で使うのが手っ取り早いかな…

前提

暫く色々被るので、以下の前提のもと、変数を定義しておきます。
目的はmap関数の代替です。

dataというリストをtestという関数で処理することにします。
並列化の度合いはcoreという変数にいれることにします。

from multiprocessing import Pool
from os import cpu_count

data = [1, 2, 3]

def test(x): return x * 2

core = cpu_count()

一番単純なmap

map関数使える人は、直感的に分かりやすいやつです。

with Pool(core) as pool:
    result = pool.map(test, data)
print(result)

>>> [2, 4, 6]

imapというのもあって、こっちは遅延評価版らしいです。

少し賢いmap_async

map_asyncは非同期的にやってくれるから、
読み込む関数がでかい時はより良いです。
ただし、get()関数で取り出してあげる必要ありです。

with Pool(core) as pool:
    result = pool.map_async(test, data).get()
print(result)

>>> [2, 4, 6]

複数の引数をとるためのstarmap

複数の引数が欲しい場合はstarmapやstarmap_asyncを使います。
zip関数を一緒に使うと幸があるのです。
(test関数を複数の引数をとるものに変えておきます。)

data = list(zip([1, 2, 3], [4, 5, 6]))
print(data)
>>> [(1, 4), (2, 5), (3, 6)]

def test(x, y): return x * y

同期版と非同期版

with Pool(core) as pool:
    result = pool.starmap(test, data)
print(result)

>>> [4, 10, 18]

with Pool(core) as pool:
    result = pool.starmap(test, data)
print(result)

>>> [4, 10, 18]

うん、ここまでなんですよ、僕が普段遣いしていたのは…。

一寸詳しく書けるProcess

ProcessクラスとQueueクラスを使うとさっきのmap系関数と違って
どのタイミングで処理を終わらせたり待ったりするかを選べます。
表現力はイテレータなしで行けるぶん、高い。

from multiprocessing import Process, Queue

さっきと同じ条件でやってみる

data = list(zip([1, 2, 3], [4, 5, 6]))
print(data)
>>> [(1, 4), (2, 5), (3, 6)]

def test(x, y): return x * y

pythonの公式ではここでreturn文じゃなくてQueueクラスに結果を渡す関数を作る、としています。
だけど、既存のコードを並列化する場合、
return文の入ってる関数を一々書き換えるのは現実的じゃないでしょう。
pythonの強みは大量にある既存の資産でしょう?

そこで、僕ならwrap関数を噛ませたいと思いました。上記のコードに続けて…

def wrap(q, func, args): q.put(func(*args))

q = Queue()
q2 = Queue()

p = Process(target=wrap, args=(q, test, data[0]))
p2 = Process(target=wrap, args=(q2, test, data[1]))

p.start()
p2.start()
p.join()
p2.join()
result = q.get()
result2 = q2.get()

まぁ、動くんだけど、めんどいなぁ…と思いました。
他にPipeを使う方法もあるけど、まぁ、Queueさえあればどうにかなる感があります。

標準出力混ぜ混ぜ現象の回避

で、このままだとマルチプロセスした時に標準出力が混ぜ混ぜになるので、
lockを使って標準出力をロックするのだそうな。

そこを加味するとこうなるようです。

from multiprocessing import Process, Queue, Lock

lock = Lock()
data = (1, 2)

def test(x, y):
    return x / y

def wrap(q, l, func, args):
    l.acquire()
    result = func(*args)
    q.put(func(*args))
    l.release()

q = Queue()
p = Process(target=wrap, args=(q, lock, test, data))
p.start()
p.join()
q.get()

>>> 0.5

うん、出来るんですが、僕の脳みそは鶏レベルなので覚えられなさそう…

共有メモリ

もっと柔軟にやりたいなら共有メモリという方法もあります。
Queueは結果だけを格納する感じだけど、共有メモリなら途中で色々参照できます。
ちなみに、あんまり共有メモリとかじゃなくてキュー使ったほうが良いよっていうのが公式見解。

低レベルな共有メモリ

この場合、multiprocessingのValueとかArrayというオブジェクトを使うのだけれど、
型を引数とは別に入力しないといけなかったりして、
自分が書いているのがpythonなのか何なのかよくわかんなくなります。

from multiprocessing import Process, Value, Array

def test(num, arr):
    num.value = 0.5
    arr[3] = 100
    print(arr[8])

num = Value('d', 0.0)
arr = Array('i', range(10))

p = Process(target=test, args=(num, arr))
p.start()
p.join()

print(num.value)
print(list(arr))

型についてはここに詳しいことが書いてあるけど、大変そう…
ctypesだし、これ使うくらいなら別の言語…ってなりそうです。
https://docs.python.org/ja/3/library/array.html#module-array

高レベルな共有メモリ

こっちはちょっと遅いらしいです。
Managerというやつが鯖立てして、そこから読み書きするんだそうな。
どっちにしろProcessが出てくるから長いですね。

from multiprocessing import Process, Manager


def test(shared_list):
    print(shared_list)
    shared_list[4] = None


with Manager() as man:
    shared_list = man.list(range(10))
    p = Process(target=test, args=(shared_list,))
    p.start()
    p.join()

    print(shared_list)

きついので自前クラス作る

ここまでで、ちょっと普段遣いはしたくないレベルになってきたので一筆。
でも、いい子はこんなことせず標準のを使うんだろうか…

from multiprocessing import Process, Manager, Queue, Lock
from typing import Callable, Any, Union


class Multi:
    def __init__(self, lock: Union[Lock, None] = None,
                 queue: Union[Lock, None] = None) -> None:
        self.lock = Lock() if lock is None else lock
        self.queue: Queue = Queue() if queue is None else queue

    def _wrap(self, queue: Queue, lock: Lock, func: Callable, args: tuple) -> Any:
        lock.acquire()
        result = func(*args)
        queue.put(func(*args))
        lock.release()
        return result

    def process(self, func: Callable, *args: Any) -> 'Multi':
        self.p = Process(target=self._wrap,
                         args=(self.queue, self.lock, func, args))
        return self

    def start(self) -> 'Multi':
        self.p.start()
        return self

    def end(self) -> Any:
        self.p.join()
        return self.queue.get()

    @classmethod
    def Process(self, func: Callable, *args: Any) -> 'Multi':
        return self().process(func, *args).start()

一応、上記で

from hoge import Multi
def test(x, y): x * y
mp = Multi.Process(test, 2, 9)
result = mp.end()

とか

with Manager() as m:
    shared_list = m.list(range(10))
    Multi().process(list_test, shared_list, 3).start().end()
    Multi().process(list_test, shared_list, 8).start().end()
    print(shared_list)

みたいな感じで完結出来るようになるのですが、
こういう楽する方法は良い子は使っちゃダメなものなんでしょうか…

ネットワーク越しの並列化

読み進めるうちに、難易度が急激に上がっていきました。
何書いてあるのか始めは分からなかったのですが、読み進めるうちに気づきました。

いわゆるコンピュータ・クラスタが標準ライブラリで出来るらしいです。
ipythonのipyparallelしか知らんかったです。

他のコンピュータに処理を任せるには

  • とりあえず、鯖(実質鯖のほうがslave)立てさせる
  • ipとパスワード指定して接続する
  • 何か投げる

みたいな処理が必要なのだけれど、最低限

  • BaseManager
  • Queue
  • Process

の3つが必要っぽい?

さらに、当然ながら全く同じモジュールを読める環境にしておく必要がありますし、
モジュール読む時は投げる関数の中限定です。
制限多いですね。

一筆書いてみました。下記のこれは、

  • 鯖立てる
  • 鯖がキューを2つ作っておく。一つはタスクの順番、一つは引数。
  • 所有権と引数両方に何か入ったら計算開始。
  • 所有権は消しておいて、値だけ返す
  • クライアント側も2つのキューを作っておく
  • クライアント側から値を入れる
  • クライアント側で少し待って戻ってきた値を拾う
  • ソートしてlistにする

って感じです。
まずは鯖側

from multiprocessing.managers import BaseManager
from multiprocessing import Queue, Process
from time import sleep

def test(x): return x * 3

queue = Queue()
own = Queue()

def run_remote():
    while(True):
        if not own.empty() and not queue.empty():
            print('got data')
            own.get()
            x = queue.get()
            queue.put(x[0](x[1]))
        sleep(1)
        print('waiting')


class QueueManager(BaseManager):
    pass

QueueManager.register('get_queue', callable=lambda: queue)
QueueManager.register('get_own', callable=lambda: own)

m = QueueManager(address=('', 8001),
                 authkey=b'hoge')
p = Process(target=run_remote)
p.start()
server = m.get_server()
server.serve_forever()

次にクライアント側

from multiprocessing.managers import BaseManager
from time import sleep
from socket import gethostname

def test(x): return x * 2

class QueueManager(BaseManager):
    pass

QueueManager.register('get_queue',
                      callable=lambda: queue)
QueueManager.register('get_own',
                      callable=lambda: own)
m = QueueManager(address=('127.0.0.1', 8001),
                 authkey=b'hoge')
print('connecting')
m.connect()
print('connected')
queue = m.get_queue()
own = m.get_own()

print('got own')
own.put(1)
queue.put((test, 4))
print('put')
sleep(1)
result = queue.get()
print(result)

まぁ、ちゃんと鯖側のtest関数が動くのは確認したんですが…書き方は超雑です。

これ自体はそうですね…大変です。
既存のパッケージを使いたいですね。ipyparallelとかさ。

まとめ

  • pythonのmultiprocessingモジュールの解説を読んだ
  • Poolクラスは使いやすい
  • Processクラスは面倒くさいけれど柔軟
  • クラスでwrapすると楽(是非は分からん)
  • 共有メモリは静的型付であり、苦しい
  • 最終的にはクラスタを記述できるが、極めて苦しい
6
8
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?