0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

【Python3】開発で使えるシリーズ その① コマンド実行

Posted at

テーマ:コマンド実行

【ポイント】
1. パイプで繋げてコマンドを実行可能
2. 複数パイプでのコマンドにも対応

import logging
import subprocess


logger = logging.getLogger('baseLogger')


class Command(object):
    def __init__(self, command):
        self.command = command
        self.arg = None
        self.pipe_args = []
        self.sep()

        self._stdout = None

    def sep(self):
        """subprocess.Popenの引数に渡す為、コマンドをブランク区切りでリスト化する。
           Pipe(|)が存在する場合は、最初に実行するコマンドをブランク区切りでリスト化し、
           pipeで繋げて実行したいコマンドは、ブランクで区切りらずに、そのままリストに格納。

        Args:
            self.command
        Set:
            self.arg
            self.pipe_args
        Return:
            None
        Raise:
            None
        """
        if '|' in self.command:
            command = self.command.split('|')
            self.arg = command.pop(0).split()
            self.pipe_args = command
        else:
            self.arg = self.command.split()

    def execute(self):
        """コマンドを実行する。
           PIPEで実行される時に使用する為、stdoutをインベントリ変数にsetする。

        Args:
            self.arg
        Set:
            self._stdout
        Return:
            r: subprocess.Popenモジュール
        Raise:
            subprocess.SubprocessError:
            subprocess.TimeoutExpired:
        """
        try:
            r = subprocess.Popen(self.arg, stdout=subprocess.PIPE)
            self._stdout = r.stdout
            return r

        except subprocess.SubprocessError:
            return None
        except subprocess.TimeoutExpired:
            return None

    def execute_pipe(self):
        """PIPEで繋がったコマンドを実行する。

        Args:
            self.pipe_args
            self._stdout
        Set:
            self._stdout
        Return:
            r: subprocess.Popenモジュール
        Raise:
            subprocess.SubprocessError:
            subprocess.TimeoutExpired:
        """
        if self.execute() is None:
            return None
        try:
            for arg in self.pipe_args:
                arg = arg.split()
                r = subprocess.Popen(
                    arg, stdin=self._stdout, stdout=subprocess.PIPE)
                self._stdout = r.stdout
            return r

        except subprocess.SubprocessError:
            pass
        except subprocess.TimeoutExpired:
            pass


if __name__ == '__main__':
    c = Command('ls | grep lesson')
    r = c.execute_pipe()
    print(r.communicate()[0])
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?