概要
subprocessで起動したプロセスが入力待ちになったタイミングで、外部から入力を送りたい場面があります。
業務の中でPythonでこれを行うときになかなか苦戦したので記しておきます。
標準入出力
サブプロセスとして起動する以下のようなプログラムを想定します。
import time
if __name__ == "__main__":
while True:
time.sleep(5)
user_input = input("wait for input :")
print(f"User Input is {user_input}")
五秒ごとにユーザーからの入力を受け付けるいたってシンプルなプログラムです。
これをサブプロセスとしてメインプロセスから起動します。
from subprocess import Popen
import sys
import time
p = Popen(
args=["python", "sub.py"],
stdin=sys.stdin,
stdout=sys.stdout
)
while True:
time.sleep(1)
コンソールの標準出力と標準入力をサブプロセスに引き渡しているので、当然コンソール画面から入出力ができます。
>> python __main__.py
wait for input : abcdefg
User Input is abcdefg
IOインターフェースの引き渡し
標準入出力を使えれば世話はないですが、それ以外のioインターフェスを利用したい場合はどうすればよいでしょうか。
試しに以下のコードを実行してみます。
import io
stdin = io.BytesIO()
p = Popen(
args=["python", "sub.py"],
stdin=stdin,
stdout=sys.stdout
)
すると以下のエラーが発生します。
io.UnsupportedOperation: fileno
ストリームをプロセス間で共有する場合、file descriptorを利用する必要があります。
mainプロセスで宣言されたBytesIOは、あくまでPythonのメインプロセス内に限定されたioインターフェースで、OSレベルの利用を想定したものではありません。
pythonのIOインターフェースの読み込み
そこで、osモジュールのpipeメソッドで作ったfile descriptorを利用して、ストリームの共有を図ります。
pythonではIO.read(size=-1)
でストリームを読み込むことができます。
sizeを指定しない場合は、EOFまで読み込んでくれますが、
os.fdopenで開けたストリームはブロッキングで開かれているため、
指定したサイズのバッファが来るまで待ち続けます。
つまり普通にstdoutを読み込んでしまうと、サブプロセスが入力待ちになった時点で、ブロックされてしまいます。
よってIO.read(1)
として、1 byteずつ別スレッドで読み込むことで、サブプロセスが入力待ちになる直前までのアウトプットを取得するアプローチを取ります。
下記のコードはサブプロセスからのoutputを受けて、入力待ちになった段階で別プロセスから入力を送るプログラムです。
以下の二つの処理を別スレッドでwhile loopで回し続けています。
- stdoutの読み込み
- subprocessが入力待ちになった時点で入力の送信
from subprocess import Popen, PIPE
import time
import sys
import io
import os
import threading
from dataclasses import dataclass
@dataclass
class SubprocessStatus:
wait_for_input: bool
@property
def is_wait(self):
pass
@is_wait.getter
def is_wait(self):
return self.wait_for_input
@is_wait.setter
def is_wait(self, is_wait):
self.wait_for_input = is_wait
class Threadable:
def thread_target(self):
pass
def start(self):
self.t = threading.Thread(target=self.thread_target)
self.t.setDaemon(True)
self.t.start()
class ProcessWatcher(Threadable):
"""
processがwait for inputになったらcallback
"""
subprocess_status: SubprocessStatus
def __init__(self, subprocess_status, callback):
self.subprocess_status = subprocess_status
self.callback = callback
def wait(self):
time.sleep(1)
def thread_target(self):
while True:
self.wait()
if not self.subprocess_status.is_wait:
continue
self.callback()
class StdoutLineReader(Threadable):
"""
stdoutを1 byteずつ読み込んで、行として処理
"""
line = b""
def __init__(self, fd_stdout: int, callback):
self.fd_stdout = fd_stdout
self.callback = callback
def thread_target(self):
self.stdout = os.fdopen(self.fd_stdout, "rb")
linesep = os.linesep.encode()
while True:
r = self.stdout.read(1)
self.line += r
self.callback(self.line)
if self.line.endswith(linesep):
print("line :", self.line)
self.line = b""
class MainProcess:
count = 0
target_line = b"wait for input"
def __init__(self):
self.fd_stdin_r, self.fd_stdin_w = os.pipe()
self.fd_stdout_r, self.fd_stdout_w = os.pipe()
self.stdout_w = os.fdopen(self.fd_stdin_w, "wb")
self.subprocess_status = SubprocessStatus(False)
self.stdout_line_reader = StdoutLineReader(
self.fd_stdout_r, self.handle_line)
self.process_watcher = ProcessWatcher(
self.subprocess_status, self.when_wait_for_input)
def handle_line(self, line: bytes):
if not line.startswith(self.target_line):
return
self.subprocess_status.wait_for_input = True
def when_wait_for_input(self):
self.subprocess_status.wait_for_input = False
self.stdout_w.write(f'Input:{self.count}\n'.encode())
self.stdout_w.flush()
self.count += 1
def start(self):
self.process = Popen(
args=["python", "sub.py"],
stdin=self.fd_stdin_r,
stdout=self.fd_stdout_w,
stderr=sys.stderr
)
self.stdout_line_reader.start()
self.process_watcher.start()
main_process = MainProcess()
main_process.start()
while True:
time.sleep(1)
詳細
サブプロセスの状態とstdoutからの出力は当然誤差があるため、
SubprocessStatusを作成して、サブプロセスの状態を仮想的に制御します。
StdoutLineReaderはstdoutを一文字ずつ読み続け、改行コードが出現した段階で、リフレッシュします。別スレッドで動かすことで、入力待ち時のブロッキングを回避します。
ProcessWatcherでサブプロセスの状態を監視させ、wait for inputになった時点で
callbackを呼び出します。
仕様かやり方が悪いのかわかりませんが、openしたfile descriptorについて、一度closeしててから、
同一のfile descriptorを再びopenしようとするとLinux,windowsそれぞれでOSエラーがでます。
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3.8/os.py", line 1023, in fdopen
return io.open(fd, *args, **kwargs)
OSError: [Errno 9] Bad file descriptor
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2032.0_x64__qbz5n2kfra8p0\lib\os.py", line 1023, in fdopen
return io.open(fd, *args, **kwargs)
OSError: [WinError 6] ハンドルが無効です。
よって、stdinに使うfile descriptorについては開いたままにしています。
※Popen.communicateについて
サブプロセスへの入力が一度きりの場合については、popenのcommunicateが利用できます。ほかの記事を参考にしてみてください。