Pythonで作ったデーモンにREST APIをつける話

More than 1 year has passed since last update.


目的

Pythonで24時間走らせ続けるデーモンを書いています。稼働状況の確認や設定を動的に変更をするためにREST APIを追加することにしました。メインルーチンに大きな変更を加えることなく、最小の手間で実装する方法を模索しました。


FlaskとFlask-API

REST APIを提供するにはなんらかのウェブサーバが必要です。外部に公開するものではないのでFlaskが同梱している開発用のウェブサーバを使います。開発用ですがスレッドで同時アクセスにも対応していますので、十分実用に耐えます。

REST APIは手軽に実装できてテスト用のUIまで用意されているFlask-APIを使うことにしました。

screenshot.png

以下の実験ではFlask-APIのサンプルを使います。コピペしてexample.pyというファイルで保存します。

http://www.flaskapi.org/#example


マルチスレッド

デーモンのメインルーチンとウェブサーバを同時に走らせるには、asyncioを使うか、スレッドを使うかということになりそうです。Flaskはasyncioで使うことは想定されていないので、スレッドを使うことにします。

メインルーチンとウェブサーバはどちらが主導権を握るべきでしょう?REST APIが付け足しの機能であることを考えるとデーモンがメインスレッド、ウェブサーバが派生したスレッドとするのが良さそうです。(後記:根拠薄いかも。デーモンを管理するという視点ならウェブがメインでデーモンはワーカースレッドの方が自然)

メインルーチンでスレッドを一つ作成し、そこからFlaskのサーバーを起動してみます。1秒毎に時刻を表示して、メインスレッドがブロックされていないことを確認します。

$ diff -u example.py example_threaded.py

--- example.py 2016-12-20 16:19:19.000000000 -0800
+++ example_threaded.py 2016-12-20 16:23:43.000000000 -0800
@@ -1,6 +1,13 @@
+import logging
+import threading
+
from flask import request, url_for
from flask.ext.api import FlaskAPI, status, exceptions

+FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
+logging.basicConfig(format=FORMAT, level=logging.INFO)
+log = logging.getLogger()
+
app = FlaskAPI(__name__)

@@ -53,5 +60,14 @@

if __name__ == "__main__":
- app.run(debug=True)
+
+ log.info('start')
+ rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(debug=True))
+ rest_service_thread.start()
+ log.info('main thread is mine!')
+ import time
+ while True:
+ print(time.ctime())
+ time.sleep(1)
+ rest_service_thread.join()

実行結果

Flask内部のウェブサーバであるwerkzeugが何やらエラーを吐いています。メインスレッドは活きているので時刻は表示されていますが、REST APIは反応しません。

2016-12-20 16:30:32,129 root MainThread start

2016-12-20 16:30:32,130 root MainThread main thread is mine!
Tue Dec 20 16:30:32 2016
2016-12-20 16:30:32,141 werkzeug reset_service * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Exception in thread reset_service:
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/flask/app.py", line 843, in run
run_simple(host, port, self, **options)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/werkzeug/serving.py", line 692, in run_simple
reloader_type)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/werkzeug/_reloader.py", line 242, in run_with_reloader
signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/signal.py", line 47, in signal
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread

Tue Dec 20 16:30:33 2016
Tue Dec 20 16:30:34 2016

どうもシグナルハンドラがMainThreadでないとダメだということらしいです。Flaskの開発中の便利機能でファイルの変更監視とリロードを行うためらしいですが、不要ですのでdebug=Trueを削除して再度実行してみます。

実行結果2

今度はうまく行きました。REST APIを http://localhost:5000 で叩いて見るとちゃんと反応します。

2016-12-20 16:38:54,214 root MainThread start

2016-12-20 16:38:54,215 root MainThread main thread is mine!
Tue Dec 20 16:38:54 2016
2016-12-20 16:38:54,224 werkzeug reset_service * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Tue Dec 20 16:38:55 2016
2016-12-20 16:38:55,840 werkzeug reset_service 127.0.0.1 - - [20/Dec/2016 16:38:55] "GET / HTTP/1.1" 200 -
Tue Dec 20 16:38:56 2016
2016-12-20 16:38:56,827 werkzeug reset_service 127.0.0.1 - - [20/Dec/2016 16:38:56] "GET / HTTP/1.1" 200 -
Tue Dec 20 16:38:57 2016

これでメインスレッドをデーモン用に確保しつつREST APIを提供する下地ができました。


スレッド間の協調

REST APIからデーモンにコマンドを送るだけであれば一番簡単なのはQueueを使う方法です。PythonのQueueはスレッドセーフなのでグローバルなQueueオブジェクトを作り、複数のスレッドからput/getしても不整合が起きません。でもそれだけで済むケースは限られているでしょう。

example_threaded.pyをさらに改造して、notes変数をメインスレッドとウェブサーバのスレッドから更新する実験をしてみます。メインスレッドではSTDINから入力された文字列をnotesに追加します。

notesはディクショナリなので、同時にアクセスされると矛盾が生じたり、特にforループでアクセス中にキーの追加削除が起こるとエラーになります。そこでsynchronizedデコレータでロックし、クリチカルセクションを保護するようにしてみました。

$ diff -u example_threaded.py example_threaded2.py

--- example_threaded.py 2016-12-20 17:17:10.000000000 -0800
+++ example_threaded2.py 2016-12-20 17:29:06.000000000 -0800
@@ -23,7 +23,38 @@
'text': notes[key]
}

+def synchronized(lock):
+ """ Synchronization decorator. """

+ def wrap(f):
+ def newFunction(*args, **kw):
+ lock.acquire()
+ try:
+ return f(*args, **kw)
+ finally:
+ lock.release()
+ return newFunction
+ return wrap
+
+glock = threading.Lock()
+
+@synchronized(glock)
+def list_notes():
+ return [note_repr(idx) for idx in sorted(notes.keys())]
+
+@synchronized(glock)
+def add_note(note):
+ idx = max(notes.keys()) + 1
+ notes[idx] = note
+ return idx
+@synchronized(glock)
+def update_note(key, note):
+ notes[key] = note
+
+@synchronized(glock)
+def delete_note(key):
+ return notes.pop(key, None)
+
@app.route("/", methods=['GET', 'POST'])
def notes_list():
"""
@@ -31,12 +62,11 @@
"""
if request.method == 'POST':
note = str(request.data.get('text', ''))
- idx = max(notes.keys()) + 1
- notes[idx] = note
+ idx = add_note(note)
return note_repr(idx), status.HTTP_201_CREATED

# request.method == 'GET'
- return [note_repr(idx) for idx in sorted(notes.keys())]
+ return list_notes()

@app.route("/<int:key>/", methods=['GET', 'PUT', 'DELETE'])
@@ -46,11 +76,11 @@
"""
if request.method == 'PUT':
note = str(request.data.get('text', ''))
- notes[key] = note
+ update_note(key, note)
return note_repr(key)

elif request.method == 'DELETE':
- notes.pop(key, None)
+ delete_note(key)
return '', status.HTTP_204_NO_CONTENT

# request.method == 'GET'
@@ -60,14 +90,15 @@

if __name__ == "__main__":
-
+
log.info('start')
- rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(debug=True))
+ rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict())
rest_service_thread.start()
log.info('main thread is mine!')
- import time
- while True:
- print(time.ctime())
- time.sleep(1)
+ import sys
+ for line in iter(sys.stdin):
+ if not line:
+ break
+ add_note(line.strip())
rest_service_thread.join()

実行結果

起動後、端末から適当に文字列を入れてエンターし、REST APIから一覧を取得すると入力した文字列が追加になっていることが確認できました。


行儀の良いデーモン

バックグラウンドプロセスのことをざっくりデーモンと呼びますが、行儀の良い(well behaved)デーモンはやらなければいけないことがたくさんあります。


  • umaskを設定

  • forkして親プロセスをexit。子はinitに養子に出す。

  • 端末のシグナルを受け取らないように、子プロセスを別プロセスグループとし、新たなセッションにする。(必ずしもセッションリーダーである必要はない。System-Vでは制御端末を持てない非セッションリーダにすることが推奨されている。後述)

  • カレントディレクトリを/にする

  • 不要なファイルデスクリプタは閉じる

  • stdin, stdout, stderrを/dev/nullに設定する

  • 安全なPATHを設定する

  • 二重に起動しないようロックファイルを作る(PIDファイル)

  • シグナルを正しくハンドルするか無視する

  • syslogなどでログを残す

  • chrootする

といろいろありますがPythonならdaemonizepython-daemonで簡単にできます。ここではpython-daemonを最小限の設定で使って見ます。stdinは使えませんので、代わりに3秒ごとにnoteを追加するようにして見ました。REST APIをアクセスすれば3秒ごとにnotesが増えているのが確認できます。

--- example_threaded2.py    2016-12-25 16:25:08.000000000 -0800

+++ example_daemon.py 2016-12-25 16:29:29.000000000 -0800
@@ -1,8 +1,9 @@
import logging
import threading
+import daemon

from flask import request, url_for
-from flask.ext.api import FlaskAPI, status, exceptions
+from flask_api import FlaskAPI, status, exceptions

FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
@@ -89,16 +90,16 @@
return note_repr(key)

-if __name__ == "__main__":
-
+def main():
log.info('start')
rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(threaded=True))
rest_service_thread.start()
log.info('main thread is mine!')
- import sys
- for line in iter(sys.stdin):
- if not line:
- break
- add_note(line.strip())
+ import time
+ for i in range(10):
+ add_note("note{}".format(i))
+ time.sleep(3)
rest_service_thread.join()

+with daemon.DaemonContext():
+ main()

実行して見ます

$ python example_daemon.py

$

&なしで起動しても一瞬でシェルに戻ってきます。プログラムが自分自身でデーモン化しています。psで確認して見ましょう。psコマンドはLinuxのもの。

$ ps xao pid,ppid,pgid,sid,comm | grep python

2860 1 2859 2859 python

親プロセスIDは1で養子になっています。セッションIDとプロセスグループIDが一致しています。ちなみにログインシェルのbashのセッションIDは2693でしたので、期待通り新規のセッションになっています。どうもforkを2回しているようでpidとpgidが異なっています。forkを2回する理由は次の通り(Advanced Programming in Unix 第13章より)


Under System V–based systems, some people recommend calling fork again at this point, terminating the parent, and continuing the daemon in the child. This guarantees that the daemon is not a session leader, which prevents it from acquiring a controlling terminal under the System V rules (Section 9.6). Alternatively, to avoid acquiring a controlling terminal, be sure to specify O_NOCTTY whenever opening a terminal device.


lsof -p 2860でファイルデスクリプターを見て見ます。

lsof -p 2860

COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
python 2860 root cwd DIR 254,1 4096 2 /
python 2860 root rtd DIR 254,1 4096 2 /
python 2860 root txt REG 254,1 3781768 264936 /usr/bin/python2.7
python 2860 root mem REG 254,1 47712 1045078 /lib/x86_64-linux-gnu/libnss_files-2.19.so
python 2860 root mem REG 254,1 54248 391882 /usr/lib/python2.7/lib-dynload/_json.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 18904 1044589 /lib/x86_64-linux-gnu/libuuid.so.1.3.0
python 2860 root mem REG 254,1 31048 265571 /usr/lib/x86_64-linux-gnu/libffi.so.6.0.2
python 2860 root mem REG 254,1 141184 392622 /usr/lib/python2.7/lib-dynload/_ctypes.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 10464 796274 /usr/lib/python2.7/dist-packages/markupsafe/_speedups.so
python 2860 root mem REG 254,1 29464 392892 /usr/lib/python2.7/lib-dynload/_hashlib.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 2066816 264782 /usr/lib/x86_64-linux-gnu/libcrypto.so.1.0.0
python 2860 root mem REG 254,1 395176 264784 /usr/lib/x86_64-linux-gnu/libssl.so.1.0.0
python 2860 root mem REG 254,1 97872 392612 /usr/lib/python2.7/lib-dynload/_ssl.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 11248 392000 /usr/lib/python2.7/lib-dynload/resource.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 1607712 269275 /usr/lib/locale/locale-archive
python 2860 root mem REG 254,1 1738176 1045067 /lib/x86_64-linux-gnu/libc-2.19.so
python 2860 root mem REG 254,1 1051056 1045072 /lib/x86_64-linux-gnu/libm-2.19.so
python 2860 root mem REG 254,1 109144 1044580 /lib/x86_64-linux-gnu/libz.so.1.2.8
python 2860 root mem REG 254,1 10680 1045291 /lib/x86_64-linux-gnu/libutil-2.19.so
python 2860 root mem REG 254,1 14664 1045071 /lib/x86_64-linux-gnu/libdl-2.19.so
python 2860 root mem REG 254,1 137440 1044987 /lib/x86_64-linux-gnu/libpthread-2.19.so
python 2860 root mem REG 254,1 140928 1044988 /lib/x86_64-linux-gnu/ld-2.19.so
python 2860 root 0u CHR 1,3 0t0 5593 /dev/null
python 2860 root 1u CHR 1,3 0t0 5593 /dev/null
python 2860 root 2u CHR 1,3 0t0 5593 /dev/null
python 2860 root 3u IPv4 3596677 0t0 TCP localhost:5000 (LISTEN)

おぉ!STDIN(0),STDOUT(1),STDERR(2)が見事に/dev/nullです。そしてカレントワーキングディレクトリ(cwd)は/になっています。ルートディレクトリ(rtd)も/ですが、これはchrootする設定を引き渡すことで変更可能です。真に正統派デーモンですね!ログアウトしてもrcスクリプトから起動してもちゃんと動きます。


まとめ


  • メインルーチンをメインスレッドで、Flaskの開発用ウェブサーバを派生スレッドで稼働させることに成功しました。

  • メインルーチンとREST APIで変数の共有・更新が可能なことを確認しました。

  • 行儀の良いデーモンとして実装することに成功しました。

まだ色々と実験中ですので、もっと軽量な方法、グローバルロック無しで実現する方法があるよ、という方がいらっしゃいましたら是非お知らせください。


最終の状態のコード

コードはPython3ですが、数行の修正でPython2でも動くはずです。threaded=Trueは複数のリクエストを同時に受け付けるためのもので本論のスレッドとは関係ありません。

import logging

import threading
import daemon

from flask import request, url_for
from flask_api import FlaskAPI, status, exceptions

FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
log = logging.getLogger()

app = FlaskAPI(__name__)

notes = {
0: 'do the shopping',
1: 'build the codez',
2: 'paint the door',
}

def note_repr(key):
return {
'url': request.host_url.rstrip('/') + url_for('notes_detail', key=key),
'text': notes[key]
}

def synchronized(lock):
""" Synchronization decorator. """

def wrap(f):
def newFunction(*args, **kw):
lock.acquire()
try:
return f(*args, **kw)
finally:
lock.release()
return newFunction
return wrap

glock = threading.Lock()

@synchronized(glock)
def list_notes():
return [note_repr(idx) for idx in sorted(notes.keys())]

@synchronized(glock)
def add_note(note):
idx = max(notes.keys()) + 1
notes[idx] = note
return idx
@synchronized(glock)
def update_note(key, note):
notes[key] = note

@synchronized(glock)
def delete_note(key):
return notes.pop(key, None)

@app.route("/", methods=['GET', 'POST'])
def notes_list():
"""
List or create notes.
"""

if request.method == 'POST':
note = str(request.data.get('text', ''))
idx = add_note(note)
return note_repr(idx), status.HTTP_201_CREATED

# request.method == 'GET'
return list_notes()

@app.route("/<int:key>/", methods=['GET', 'PUT', 'DELETE'])
def notes_detail(key):
"""
Retrieve, update or delete note instances.
"""

if request.method == 'PUT':
note = str(request.data.get('text', ''))
update_note(key, note)
return note_repr(key)

elif request.method == 'DELETE':
delete_note(key)
return '', status.HTTP_204_NO_CONTENT

# request.method == 'GET'
if key not in notes:
raise exceptions.NotFound()
return note_repr(key)

def main():
log.info('start')
rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(threaded=True))
rest_service_thread.start()
log.info('main thread is mine!')
import time
for i in range(10):
add_note("note{}".format(i))
time.sleep(3)
rest_service_thread.join()

with daemon.DaemonContext():
main()