はじめに
Jupyter notebookやGoogle Colabで、C言語レベルのstdout/stderrを取得するには、以下のコマンドを実行すればよい。後は、Jupyter notebookに出力される
!pip3 install wurlitzer
%load_ext wurlitzer
参考までに外すには以下のコマンドが必要である。
%unload_ext wurlitzer
仕組み
Jupyter notebookのプラグインについて
Jupyter Notebookでは、実行イベントが走ると、IPython.events配下に登録した処理が走る。そこに、Cの標準入出力をリダイレクトする関数が登録され処理する。
Cの標準入出力を取る仕組みについて
Pythonの標準入出力は高レベルAPIでありC言語のAPIに加えコード変換等のラッパーが入る。このため、そのまま標準入出力をPythonとC言語で共通化できない。
さて、C言語のstdout/stderrは、PythonのCとのインターフェースであるctypeにより受け取り、Wurlitzerクラスの__enter__
メソッドので起動されるスレッドでリダイレクトを行いPythonとして出力している。
以下に、ソースコードを示す。ここで、c_stdout_pおよびc_stderr_pが、Cの標準出力とエラー出力のポインターである。また、本__enter__
メソッドは、プラグイン組み込み時(%load_ext)に起動する。
def __enter__(self):
# flush anything out before starting
libc.fflush(c_stdout_p)
libc.fflush(c_stderr_p)
# setup handle
self._setup_handle()
self._control_r, self._control_w = os.pipe()
# create pipe for stdout
pipes = [self._control_r]
names = {self._control_r: 'control'}
if self._stdout:
pipe = self._setup_pipe('stdout')
pipes.append(pipe)
names[pipe] = 'stdout'
if self._stderr:
pipe = self._setup_pipe('stderr')
pipes.append(pipe)
names[pipe] = 'stderr'
# flush pipes in a background thread to avoid blocking
# the reader thread when the buffer is full
flush_queue = Queue()
def flush_main():
while True:
msg = flush_queue.get()
if msg == 'stop':
return
libc.fflush(c_stdout_p)
libc.fflush(c_stderr_p)
flush_thread = threading.Thread(target=flush_main)
flush_thread.daemon = True
flush_thread.start()
def forwarder():
"""Forward bytes on a pipe to stream messages"""
draining = False
flush_interval = 0
poller = select.poll()
for pipe_ in pipes:
poller.register(pipe_, select.POLLIN | select.POLLPRI)
while pipes:
events = poller.poll(int(flush_interval * 1000))
#r = all([(r_[1] == (select.POLLIN | select.POLLPRI)) for r_ in events])
if events:
# found something to read, don't block select until
# we run out of things to read
flush_interval = 0
else:
# nothing to read
if draining:
# if we are draining and there's nothing to read, stop
break
else:
# nothing to read, get ready to wait.
# flush the streams in case there's something waiting
# to be written.
flush_queue.put('flush')
flush_interval = self.flush_interval
continue
for fd, flags in events:
if fd == self._control_r:
draining = True
pipes.remove(self._control_r)
poller.unregister(self._control_r)
os.close(self._control_r)
continue
name = names[fd]
data = os.read(fd, 1024)
if not data:
# pipe closed, stop polling it
pipes.remove(fd)
poller.unregister(fd)
os.close(fd)
else:
handler = getattr(self, '_handle_%s' % name)
handler(data)
if not pipes:
# pipes closed, we are done
break
# stop flush thread
flush_queue.put('stop')
flush_thread.join()
# cleanup pipes
[os.close(pipe) for pipe in pipes]
self.thread = threading.Thread(target=forwarder)
self.thread.daemon = True
self.thread.start()
return self.handle
Colabへの組み込みに関して(%load_ext/%unload_ext)
以下のI/Fにより拡張している。以下の関数で、IPython.eventsに処理を登録している。
-
load_ipython_extension
- sys_pipes_forever
- stop_sys_pipes
- unload_ipython_extension
参考資料
- 概要説明
- Python
-
(Python)PEP343 (Pythonの
__enter__
,__exit__
メソッドに関して) - (Python)ctypes — A foreign function library for Python ((C等による)外部ライブラリ等を呼び出すインターフェース)
-
(Python)PEP343 (Pythonの
-
IPython
- (IPython)IPython extensions (IPythonの拡張インターフェースに関して)
- (IPython)IPython Events(IPythonのEvent発生時の処理について)
- ソースコード
- さまざまなJupyter拡張モジュール