LoginSignup
2
1

More than 3 years have passed since last update.

Jupyter notebook/Google Colab等で、C言語レベルのstdout/stderrを出力する

Last updated at Posted at 2019-06-12

はじめに

Jupyter notebookやGoogle Colabで、C言語レベルのstdout/stderrを取得するには、以下のコマンドを実行すればよい。後は、Jupyter notebookに出力される

!pip3 install wurlitzer
%load_ext wurlitzer

参考までに外すには以下のコマンドが必要である。

%unload_ext wurlitzer

仕組み

Jupyter notebookのプラグインについて

Jupyter Notebookでは、実行イベントが走ると、IPython.events配下に登録した処理が走る。そこに、Cの標準入出力をリダイレクトする関数が登録され処理する。

Cの標準入出力を取る仕組みについて

Pythonの標準入出力は高レベルAPIでありC言語のAPIに加えコード変換等のラッパーが入る。このため、そのまま標準入出力をPythonとC言語で共通化できない。
さて、C言語のstdout/stderrは、PythonのCとのインターフェースであるctypeにより受け取り、Wurlitzerクラスの__enter__メソッドので起動されるスレッドでリダイレクトを行いPythonとして出力している。
以下に、ソースコードを示す。ここで、c_stdout_pおよびc_stderr_pが、Cの標準出力とエラー出力のポインターである。また、本__enter__メソッドは、プラグイン組み込み時(%load_ext)に起動する。

    def __enter__(self):
        # flush anything out before starting
        libc.fflush(c_stdout_p)
        libc.fflush(c_stderr_p)
        # setup handle
        self._setup_handle()
        self._control_r, self._control_w = os.pipe()

        # create pipe for stdout
        pipes = [self._control_r]
        names = {self._control_r: 'control'}
        if self._stdout:
            pipe = self._setup_pipe('stdout')
            pipes.append(pipe)
            names[pipe] = 'stdout'
        if self._stderr:
            pipe = self._setup_pipe('stderr')
            pipes.append(pipe)
            names[pipe] = 'stderr'

        # flush pipes in a background thread to avoid blocking
        # the reader thread when the buffer is full
        flush_queue = Queue()

        def flush_main():
            while True:
                msg = flush_queue.get()
                if msg == 'stop':
                    return
                libc.fflush(c_stdout_p)
                libc.fflush(c_stderr_p)

        flush_thread = threading.Thread(target=flush_main)
        flush_thread.daemon = True
        flush_thread.start()

        def forwarder():
            """Forward bytes on a pipe to stream messages"""
            draining = False
            flush_interval = 0
            poller = select.poll()
            for pipe_ in pipes:
                poller.register(pipe_, select.POLLIN | select.POLLPRI)

            while pipes:
                events = poller.poll(int(flush_interval * 1000))
                #r = all([(r_[1] == (select.POLLIN | select.POLLPRI)) for r_ in events])
                if events:
                    # found something to read, don't block select until
                    # we run out of things to read
                    flush_interval = 0
                else:
                    # nothing to read
                    if draining:
                        # if we are draining and there's nothing to read, stop
                        break
                    else:
                        # nothing to read, get ready to wait.
                        # flush the streams in case there's something waiting
                        # to be written.
                        flush_queue.put('flush')
                        flush_interval = self.flush_interval
                        continue

                for fd, flags in events:
                    if fd == self._control_r:
                        draining = True
                        pipes.remove(self._control_r)
                        poller.unregister(self._control_r)
                        os.close(self._control_r)
                        continue
                    name = names[fd]
                    data = os.read(fd, 1024)
                    if not data:
                        # pipe closed, stop polling it
                        pipes.remove(fd)
                        poller.unregister(fd)
                        os.close(fd)
                    else:
                        handler = getattr(self, '_handle_%s' % name)
                        handler(data)
                if not pipes:
                    # pipes closed, we are done
                    break
            # stop flush thread
            flush_queue.put('stop')
            flush_thread.join()
            # cleanup pipes
            [os.close(pipe) for pipe in pipes]

        self.thread = threading.Thread(target=forwarder)
        self.thread.daemon = True
        self.thread.start()

        return self.handle

Colabへの組み込みに関して(%load_ext/%unload_ext)

以下のI/Fにより拡張している。以下の関数で、IPython.eventsに処理を登録している。

参考資料

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1