LoginSignup
0
1

More than 3 years have passed since last update.

Python : 入力待ち(wait for input)のsubprocessにinputを送る

Last updated at Posted at 2021-02-25

概要

subprocessで起動したプロセスが入力待ちになったタイミングで、外部から入力を送りたい場面があります。
業務の中でPythonでこれを行うときになかなか苦戦したので記しておきます。

標準入出力

サブプロセスとして起動する以下のようなプログラムを想定します。

sub.py

import time

if __name__ == "__main__":
    while True:
        time.sleep(5)
        user_input = input("wait for input :")
        print(f"User Input is {user_input}")

五秒ごとにユーザーからの入力を受け付けるいたってシンプルなプログラムです。
これをサブプロセスとしてメインプロセスから起動します。

__main__.py
from subprocess import Popen
import sys
import time

p = Popen(
    args=["python", "sub.py"],
    stdin=sys.stdin,
    stdout=sys.stdout
)

while True:
    time.sleep(1)

コンソールの標準出力と標準入力をサブプロセスに引き渡しているので、当然コンソール画面から入出力ができます。

>> python __main__.py
wait for input : abcdefg
User Input is abcdefg

IOインターフェースの引き渡し

標準入出力を使えれば世話はないですが、それ以外のioインターフェスを利用したい場合はどうすればよいでしょうか。
試しに以下のコードを実行してみます。

__main__.py
import io

stdin = io.BytesIO()
p = Popen(
    args=["python", "sub.py"],
    stdin=stdin,
    stdout=sys.stdout
)

すると以下のエラーが発生します。

io.UnsupportedOperation: fileno

ストリームをプロセス間で共有する場合、file descriptorを利用する必要があります。
mainプロセスで宣言されたBytesIOは、あくまでPythonのメインプロセス内に限定されたioインターフェースで、OSレベルの利用を想定したものではありません。

pythonのIOインターフェースの読み込み

そこで、osモジュールのpipeメソッドで作ったfile descriptorを利用して、ストリームの共有を図ります。

pythonではIO.read(size=-1)でストリームを読み込むことができます。
sizeを指定しない場合は、EOFまで読み込んでくれますが、
os.fdopenで開けたストリームはブロッキングで開かれているため、
指定したサイズのバッファが来るまで待ち続けます。
つまり普通にstdoutを読み込んでしまうと、サブプロセスが入力待ちになった時点で、ブロックされてしまいます。

よってIO.read(1)として、1 byteずつ別スレッドで読み込むことで、サブプロセスが入力待ちになる直前までのアウトプットを取得するアプローチを取ります。

下記のコードはサブプロセスからのoutputを受けて、入力待ちになった段階で別プロセスから入力を送るプログラムです。
以下の二つの処理を別スレッドでwhile loopで回し続けています。

  • stdoutの読み込み
  • subprocessが入力待ちになった時点で入力の送信
main.py
from subprocess import Popen, PIPE
import time
import sys
import io
import os
import threading

from dataclasses import dataclass


@dataclass
class SubprocessStatus:
    wait_for_input: bool

    @property
    def is_wait(self):
        pass

    @is_wait.getter
    def is_wait(self):
        return self.wait_for_input

    @is_wait.setter
    def is_wait(self, is_wait):
        self.wait_for_input = is_wait


class Threadable:
    def thread_target(self):
        pass

    def start(self):
        self.t = threading.Thread(target=self.thread_target)
        self.t.setDaemon(True)
        self.t.start()


class ProcessWatcher(Threadable):
    """
        processがwait for inputになったらcallback
    """
    subprocess_status: SubprocessStatus

    def __init__(self, subprocess_status, callback):
        self.subprocess_status = subprocess_status
        self.callback = callback

    def wait(self):
        time.sleep(1)

    def thread_target(self):
        while True:
            self.wait()
            if not self.subprocess_status.is_wait:
                continue
            self.callback()


class StdoutLineReader(Threadable):
    """
        stdoutを1 byteずつ読み込んで、行として処理
    """
    line = b""

    def __init__(self, fd_stdout: int, callback):
        self.fd_stdout = fd_stdout
        self.callback = callback

    def thread_target(self):
        self.stdout = os.fdopen(self.fd_stdout, "rb")
        linesep = os.linesep.encode()
        while True:
            r = self.stdout.read(1)
            self.line += r
            self.callback(self.line)
            if self.line.endswith(linesep):
                print("line :", self.line)
                self.line = b""


class MainProcess:
    count = 0
    target_line = b"wait for input"

    def __init__(self):
        self.fd_stdin_r, self.fd_stdin_w = os.pipe()
        self.fd_stdout_r, self.fd_stdout_w = os.pipe()

        self.stdout_w = os.fdopen(self.fd_stdin_w, "wb")

        self.subprocess_status = SubprocessStatus(False)
        self.stdout_line_reader = StdoutLineReader(
            self.fd_stdout_r, self.handle_line)
        self.process_watcher = ProcessWatcher(
            self.subprocess_status, self.when_wait_for_input)

    def handle_line(self, line: bytes):
        if not line.startswith(self.target_line):
            return
        self.subprocess_status.wait_for_input = True

    def when_wait_for_input(self):
        self.subprocess_status.wait_for_input = False
        self.stdout_w.write(f'Input:{self.count}\n'.encode())
        self.stdout_w.flush()
        self.count += 1

    def start(self):
        self.process = Popen(
            args=["python", "sub.py"],
            stdin=self.fd_stdin_r,
            stdout=self.fd_stdout_w,
            stderr=sys.stderr
        )
        self.stdout_line_reader.start()
        self.process_watcher.start()


main_process = MainProcess()
main_process.start()
while True:
    time.sleep(1)

詳細

サブプロセスの状態とstdoutからの出力は当然誤差があるため、
SubprocessStatusを作成して、サブプロセスの状態を仮想的に制御します。

StdoutLineReaderはstdoutを一文字ずつ読み続け、改行コードが出現した段階で、リフレッシュします。別スレッドで動かすことで、入力待ち時のブロッキングを回避します。

ProcessWatcherでサブプロセスの状態を監視させ、wait for inputになった時点で
callbackを呼び出します。

仕様かやり方が悪いのかわかりませんが、openしたfile descriptorについて、一度closeしててから、
同一のfile descriptorを再びopenしようとするとLinux,windowsそれぞれでOSエラーがでます。

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.8/os.py", line 1023, in fdopen
    return io.open(fd, *args, **kwargs)
OSError: [Errno 9] Bad file descriptor
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.8_3.8.2032.0_x64__qbz5n2kfra8p0\lib\os.py", line 1023, in fdopen
    return io.open(fd, *args, **kwargs)
OSError: [WinError 6] ハンドルが無効です

よって、stdinに使うfile descriptorについては開いたままにしています。

※Popen.communicateについて
サブプロセスへの入力が一度きりの場合については、popenのcommunicateが利用できます。ほかの記事を参考にしてみてください。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1