はじめに
Twitterで一時期流行していた 100 Days Of Code なるものを先日知りました。本記事は、初学者である私が100日の学習を通してどの程度成長できるか記録を残すこと、アウトプットすることを目的とします。誤っている点、読みにくい点多々あると思います。ご指摘いただけると幸いです!
今回学習する教材
-
- 8章構成
- 本章216ページ
今日の進捗
- 第5章:並行性と並列性
- 本日学んだことの中で、よく忘れるところ、知らなかったところを書いていきます。
並行性と並列性
並行性は、見かけ上同時に実行しているように見えますが、実際は複数のプログラムを同時に実行するわけではなく、プログラムを素早く切り替えながら1つ1つ処理しています。
一方、並列性は、複数のプログラムを同時に実行することであり、並列の文字通り、並列にプログラムを実行しています。
この章からPython での並行プログラムと並列プログラムの書き方を学んでいきます。
subprocess を使って子プロセスを管理する
Python ではサブプロセスを実行するのに、
- popen
- popen2
- os.exec*
- subprocess
などを含めて多数の方法が存在します。
現在のPythonで子プロセスを管理する最良の方法は、組み込みモジュールのsubprocessを使うことのようです。
subprocessのPopenを使うことでサブプロセスを実行することができます。
Popen には多くの引数が存在するのですが、今回扱う引数は以下になります。
- args (第一引数)
- すべての呼び出しに必要で、文字列あるいはプログラム引数のシーケンスでなければいけない
- 0番目の要素に、'echo', 'sleep' (Unix系), 'timeout' (Windows環境) のようなコマンドを入れ、1番目の要素にそのコマンドにあった要素を入れる
- 1つの要素のみで使う場合はshell=Trueにしなければならない
- stdout
- 標準的な出力先
- 有効な値
- PIPE
- 新しいパイプが子プロセスに向けて作られる
- DEVNULL
- 特殊ファイルos.devnull が使用される
- None
- リダイレクトが起こらない
- PIPE
- shell
- 有効な値
- True
- 指定されたコマンドはシェルによって実行される
- False
- True
- 有効な値
続いて、Popenクラス内の本記事で扱うメソッドについて説明します。
- communicate()
- 子プロセスの出力を読み、終了するまで待つメソッド
- poll()
- 子プロセスが終了したかどうかを確認するメソッド
- 子プロセスが完了したらreturncode 属性を返す
- それ以外の場合は、Noneを 返す
- 子プロセスが終了したかどうかを確認するメソッド
- wait()
- 子プロセスが終了するまで待つメソッド
- terminate()
- 子プロセスを止めるメソッドです。
より詳しく知りたい場合は、こちらのドキュメントが参考になります。
https://docs.python.org/ja/3/library/subprocess.html
import subprocess
import time
proc = subprocess.Popen(
['echo', 'Hello from the child'], # args (第一引数)
stdout=subprocess.PIPE, # 新しいパイプが子プロセスに向けて作られる
shell=True) # Windows環境でこのコードを実行する場合に必要、それ以外の場合はshell=Trueは不要
out, err = proc.communicate() # 子プロセスの出力を読み終了まで待つ
print(out.decode('utf-8'))
# "Hello from the child"
子プロセスは、親プロセスのPythonインタプリタとは独立に実行されます。その状態は、Pythongaが他の動作をしている間にもポーリング(イベントが発生していないか定期的にチェックすること)することができます。
import subprocess
import time
proc = subprocess.Popen(
['timeout', '1'],
# ['sleep', '1'], # 本ではsleep だったが、Windows 環境だと動かず
shell=True)
while proc.poll() is None:
print('\nWorking...')
time.sleep(0.3)
実行結果
Working...
0 秒待っています。続行するには何かキーを押してください ...
Working...
はじめから、すべての子プロセスを実行することもできます。
def run_sleep(period):
proc = subprocess.Popen(
['timeout', str(period)],
# ['sleep', str(period)], # 本ではsleep だったが、Windows 環境だと動かず
shell=True)
return proc
start = time.time()
procs = []
# 子プロセスを10個作る
for _ in range(10):
proc = run_sleep(1)
procs.append(proc)
# 子プロセスの出力を読む
for proc in procs:
proc.communicate()
end = time.time()
# 子プロセスの作成から終了までの時間を出力
print('Finished in %.3f seconds' %(end - start))
実行結果
1 秒待っています。続行するには何かキーを押してください ...
#... 10回表示される
Finished in 1.166 seconds
sleep ではなく timeout を使ったため、余計なモノが出力されています。
これらのプロセスが順番に実行されていたら10秒ほどかかるはずですが、同時に実行されたため1秒ちょいで終わっています。
communicateメソッドにtimeout引数を渡すことで、子プロセスが指定した時間内に応答しなければ、例外が引き起こされ、うまく動作しない子プロセスを停止することができます。
proc = run_sleep(10)
try:
proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
proc.terminate()
proc.wait()
print('Exit status', proc.poll())
ただし、timeout引数はPython3.3以降でないと使えないようです。
スレッドはブロッキングI/Oに使い、並列性に使うのは避ける
Pythonの標準実装は、CPythonです。
CPythonでは、次の2ステップでPythonプログラムを実行しています。
- ソーステキストをパースして、バイトコードにコンパイルする
- スタックベースのインタプリタでバイトコードを実行する
Pythonでは、プログラムが実行される間、プログラムに悪影響がでないように、実行中のスレッドに割りこんで制御を奪うような処理を相互排他ロックすることで防いでいます。この仕組みをグローバルインタプリタロック (global interpreter lock, GIL) と呼びます。
GILのお陰で、すべてのバイトコード命令が、CPython実装とC拡張モジュールで正しく動作することを保証しています。
しかし、相互排他ロックがかかっているために、マルチスレッドで実行しても1つ1つ実行されてしまうのです。
例として、素因数分解するプログラムを逐次実行とマルチスレッドで実行してみます。
# 逐次実行
def factorize(number):
for i in range(1, number + 1):
if number % i == 0:
pass
numbers = [10003234, 3425932, 1835723, 2342812]
start = time.time()
for number in numbers:
factorize(number)
end = time.time()
print('Sequential:%.3f seconds' % (end - start))
# マルチスレッドで実行
class FactorizeThread(Thread):
def __init__(self, number):
super().__init__()
self.number = number
def run(self):
self.factors = factorize(self.number)
start = time.time()
threads = []
for number in numbers:
thread =FactorizeThread(number)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
end = time.time()
print('Multi Thread:%.3f seconds' % (end - start))
実行結果
Sequential:1.082 seconds
Multi Thread:1.264 seconds
早くなるどころか、マルチスレッドにした分の処理時間が伸びてしまいました。
マルチスレッドをブロッキングI/Oで扱う場合はどうでしょう。
実は、GILはPythonのコードを並列に実行していることを禁止していますが、システムコールについては並列化を禁止していません。
そのため、システムコールを含むコマンドの並列化は、高速になるのです。
単純な例として、sleep関数を逐次と並列で比較します。
def sleep_func():
time.sleep(0.1)
# 逐次処理
start = time.time()
for _ in range(5):
sleep_func()
end = time.time()
print(end - start)
# マルチスレッド
class SleepThread(Thread):
def __init__(self):
super().__init__()
def run(self):
sleep_func()
start = time.time()
threads = []
for _ in range(5):
thread = SleepThread()
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
end = time.time()
print('Multi Thread: %.3f' % (end - start))
実行結果
Sequential:0.504 seconds
Multi Thread:0.102 seconds
マルチスレッドの方が5倍近く早く処理を完了することが確認できました。