LoginSignup
36
39

More than 5 years have passed since last update.

Pythonで作ったデーモンにREST APIをつける話

Last updated at Posted at 2016-12-21

目的

Pythonで24時間走らせ続けるデーモンを書いています。稼働状況の確認や設定を動的に変更をするためにREST APIを追加することにしました。メインルーチンに大きな変更を加えることなく、最小の手間で実装する方法を模索しました。

FlaskとFlask-API

REST APIを提供するにはなんらかのウェブサーバが必要です。外部に公開するものではないのでFlaskが同梱している開発用のウェブサーバを使います。開発用ですがスレッドで同時アクセスにも対応していますので、十分実用に耐えます。

REST APIは手軽に実装できてテスト用のUIまで用意されているFlask-APIを使うことにしました。
screenshot.png

以下の実験ではFlask-APIのサンプルを使います。コピペしてexample.pyというファイルで保存します。
http://www.flaskapi.org/#example

マルチスレッド

デーモンのメインルーチンとウェブサーバを同時に走らせるには、asyncioを使うか、スレッドを使うかということになりそうです。Flaskはasyncioで使うことは想定されていないので、スレッドを使うことにします。

メインルーチンとウェブサーバはどちらが主導権を握るべきでしょう?REST APIが付け足しの機能であることを考えるとデーモンがメインスレッド、ウェブサーバが派生したスレッドとするのが良さそうです。(後記:根拠薄いかも。デーモンを管理するという視点ならウェブがメインでデーモンはワーカースレッドの方が自然)

メインルーチンでスレッドを一つ作成し、そこからFlaskのサーバーを起動してみます。1秒毎に時刻を表示して、メインスレッドがブロックされていないことを確認します。

$ diff -u example.py example_threaded.py
--- example.py  2016-12-20 16:19:19.000000000 -0800
+++ example_threaded.py 2016-12-20 16:23:43.000000000 -0800
@@ -1,6 +1,13 @@
+import logging
+import threading
+
 from flask import request, url_for
 from flask.ext.api import FlaskAPI, status, exceptions

+FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
+logging.basicConfig(format=FORMAT, level=logging.INFO)
+log = logging.getLogger()
+
 app = FlaskAPI(__name__)


@@ -53,5 +60,14 @@


 if __name__ == "__main__":
-    app.run(debug=True)
+
+    log.info('start')
+    rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(debug=True))
+    rest_service_thread.start()
+    log.info('main thread is mine!')
+    import time
+    while True:
+        print(time.ctime())
+        time.sleep(1)
+    rest_service_thread.join()

実行結果

Flask内部のウェブサーバであるwerkzeugが何やらエラーを吐いています。メインスレッドは活きているので時刻は表示されていますが、REST APIは反応しません。

2016-12-20 16:30:32,129 root MainThread start
2016-12-20 16:30:32,130 root MainThread main thread is mine!
Tue Dec 20 16:30:32 2016
2016-12-20 16:30:32,141 werkzeug reset_service  * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Exception in thread reset_service:
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/flask/app.py", line 843, in run
    run_simple(host, port, self, **options)
  File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/werkzeug/serving.py", line 692, in run_simple
    reloader_type)
  File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/werkzeug/_reloader.py", line 242, in run_with_reloader
    signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
  File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/signal.py", line 47, in signal
    handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread

Tue Dec 20 16:30:33 2016
Tue Dec 20 16:30:34 2016

どうもシグナルハンドラがMainThreadでないとダメだということらしいです。Flaskの開発中の便利機能でファイルの変更監視とリロードを行うためらしいですが、不要ですのでdebug=Trueを削除して再度実行してみます。

実行結果2

今度はうまく行きました。REST APIを http://localhost:5000 で叩いて見るとちゃんと反応します。

2016-12-20 16:38:54,214 root MainThread start
2016-12-20 16:38:54,215 root MainThread main thread is mine!
Tue Dec 20 16:38:54 2016
2016-12-20 16:38:54,224 werkzeug reset_service  * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Tue Dec 20 16:38:55 2016
2016-12-20 16:38:55,840 werkzeug reset_service 127.0.0.1 - - [20/Dec/2016 16:38:55] "GET / HTTP/1.1" 200 -
Tue Dec 20 16:38:56 2016
2016-12-20 16:38:56,827 werkzeug reset_service 127.0.0.1 - - [20/Dec/2016 16:38:56] "GET / HTTP/1.1" 200 -
Tue Dec 20 16:38:57 2016

これでメインスレッドをデーモン用に確保しつつREST APIを提供する下地ができました。

スレッド間の協調

REST APIからデーモンにコマンドを送るだけであれば一番簡単なのはQueueを使う方法です。PythonのQueueはスレッドセーフなのでグローバルなQueueオブジェクトを作り、複数のスレッドからput/getしても不整合が起きません。でもそれだけで済むケースは限られているでしょう。

example_threaded.pyをさらに改造して、notes変数をメインスレッドとウェブサーバのスレッドから更新する実験をしてみます。メインスレッドではSTDINから入力された文字列をnotesに追加します。

notesはディクショナリなので、同時にアクセスされると矛盾が生じたり、特にforループでアクセス中にキーの追加削除が起こるとエラーになります。そこでsynchronizedデコレータでロックし、クリチカルセクションを保護するようにしてみました。

$ diff -u example_threaded.py example_threaded2.py
--- example_threaded.py 2016-12-20 17:17:10.000000000 -0800
+++ example_threaded2.py    2016-12-20 17:29:06.000000000 -0800
@@ -23,7 +23,38 @@
         'text': notes[key]
     }

+def synchronized(lock):
+    """ Synchronization decorator. """

+    def wrap(f):
+        def newFunction(*args, **kw):
+            lock.acquire()
+            try:
+                return f(*args, **kw)
+            finally:
+                lock.release()
+        return newFunction
+    return wrap
+
+glock = threading.Lock()
+
+@synchronized(glock)
+def list_notes():
+    return [note_repr(idx) for idx in sorted(notes.keys())]
+
+@synchronized(glock)
+def add_note(note):
+    idx = max(notes.keys()) + 1
+    notes[idx] = note
+    return idx
+@synchronized(glock)
+def update_note(key, note):
+    notes[key] = note
+
+@synchronized(glock)
+def delete_note(key):
+    return notes.pop(key, None)
+
 @app.route("/", methods=['GET', 'POST'])
 def notes_list():
     """
@@ -31,12 +62,11 @@
     """
     if request.method == 'POST':
         note = str(request.data.get('text', ''))
-        idx = max(notes.keys()) + 1
-        notes[idx] = note
+        idx = add_note(note)
         return note_repr(idx), status.HTTP_201_CREATED

     # request.method == 'GET'
-    return [note_repr(idx) for idx in sorted(notes.keys())]
+    return list_notes()


 @app.route("/<int:key>/", methods=['GET', 'PUT', 'DELETE'])
@@ -46,11 +76,11 @@
     """
     if request.method == 'PUT':
         note = str(request.data.get('text', ''))
-        notes[key] = note
+        update_note(key, note)
         return note_repr(key)

     elif request.method == 'DELETE':
-        notes.pop(key, None)
+        delete_note(key)
         return '', status.HTTP_204_NO_CONTENT

     # request.method == 'GET'
@@ -60,14 +90,15 @@


 if __name__ == "__main__":
-
+
     log.info('start')
-    rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(debug=True))
+    rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict())
     rest_service_thread.start()
     log.info('main thread is mine!')
-    import time
-    while True:
-        print(time.ctime())
-        time.sleep(1)
+    import sys
+    for line in iter(sys.stdin):
+        if not line:
+            break
+        add_note(line.strip())
     rest_service_thread.join()

実行結果

起動後、端末から適当に文字列を入れてエンターし、REST APIから一覧を取得すると入力した文字列が追加になっていることが確認できました。

行儀の良いデーモン

バックグラウンドプロセスのことをざっくりデーモンと呼びますが、行儀の良い(well behaved)デーモンはやらなければいけないことがたくさんあります。

  • umaskを設定
  • forkして親プロセスをexit。子はinitに養子に出す。
  • 端末のシグナルを受け取らないように、子プロセスを別プロセスグループとし、新たなセッションにする。(必ずしもセッションリーダーである必要はない。System-Vでは制御端末を持てない非セッションリーダにすることが推奨されている。後述)
  • カレントディレクトリを/にする
  • 不要なファイルデスクリプタは閉じる
  • stdin, stdout, stderrを/dev/nullに設定する
  • 安全なPATHを設定する
  • 二重に起動しないようロックファイルを作る(PIDファイル)
  • シグナルを正しくハンドルするか無視する
  • syslogなどでログを残す
  • chrootする

といろいろありますがPythonならdaemonizepython-daemonで簡単にできます。ここではpython-daemonを最小限の設定で使って見ます。stdinは使えませんので、代わりに3秒ごとにnoteを追加するようにして見ました。REST APIをアクセスすれば3秒ごとにnotesが増えているのが確認できます。

--- example_threaded2.py    2016-12-25 16:25:08.000000000 -0800
+++ example_daemon.py   2016-12-25 16:29:29.000000000 -0800
@@ -1,8 +1,9 @@
 import logging
 import threading
+import daemon

 from flask import request, url_for
-from flask.ext.api import FlaskAPI, status, exceptions
+from flask_api import FlaskAPI, status, exceptions

 FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
 logging.basicConfig(format=FORMAT, level=logging.INFO)
@@ -89,16 +90,16 @@
     return note_repr(key)


-if __name__ == "__main__":
-
+def main():
     log.info('start')
     rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(threaded=True))
     rest_service_thread.start()
     log.info('main thread is mine!')
-    import sys
-    for line in iter(sys.stdin):
-        if not line:
-            break
-        add_note(line.strip())
+    import time
+    for i in range(10):
+        add_note("note{}".format(i))
+        time.sleep(3)
     rest_service_thread.join()

+with daemon.DaemonContext():
+    main()

実行して見ます

$ python example_daemon.py
$

&なしで起動しても一瞬でシェルに戻ってきます。プログラムが自分自身でデーモン化しています。psで確認して見ましょう。psコマンドはLinuxのもの。

$ ps xao pid,ppid,pgid,sid,comm | grep python
 2860     1  2859  2859 python

親プロセスIDは1で養子になっています。セッションIDとプロセスグループIDが一致しています。ちなみにログインシェルのbashのセッションIDは2693でしたので、期待通り新規のセッションになっています。どうもforkを2回しているようでpidとpgidが異なっています。forkを2回する理由は次の通り(Advanced Programming in Unix 第13章より)

Under System V–based systems, some people recommend calling fork again at this point, terminating the parent, and continuing the daemon in the child. This guarantees that the daemon is not a session leader, which prevents it from acquiring a controlling terminal under the System V rules (Section 9.6). Alternatively, to avoid acquiring a controlling terminal, be sure to specify O_NOCTTY whenever opening a terminal device.

lsof -p 2860でファイルデスクリプターを見て見ます。

lsof -p 2860
COMMAND  PID USER   FD   TYPE  DEVICE SIZE/OFF    NODE NAME
python  2860 root  cwd    DIR   254,1     4096       2 /
python  2860 root  rtd    DIR   254,1     4096       2 /
python  2860 root  txt    REG   254,1  3781768  264936 /usr/bin/python2.7
python  2860 root  mem    REG   254,1    47712 1045078 /lib/x86_64-linux-gnu/libnss_files-2.19.so
python  2860 root  mem    REG   254,1    54248  391882 /usr/lib/python2.7/lib-dynload/_json.x86_64-linux-gnu.so
python  2860 root  mem    REG   254,1    18904 1044589 /lib/x86_64-linux-gnu/libuuid.so.1.3.0
python  2860 root  mem    REG   254,1    31048  265571 /usr/lib/x86_64-linux-gnu/libffi.so.6.0.2
python  2860 root  mem    REG   254,1   141184  392622 /usr/lib/python2.7/lib-dynload/_ctypes.x86_64-linux-gnu.so
python  2860 root  mem    REG   254,1    10464  796274 /usr/lib/python2.7/dist-packages/markupsafe/_speedups.so
python  2860 root  mem    REG   254,1    29464  392892 /usr/lib/python2.7/lib-dynload/_hashlib.x86_64-linux-gnu.so
python  2860 root  mem    REG   254,1  2066816  264782 /usr/lib/x86_64-linux-gnu/libcrypto.so.1.0.0
python  2860 root  mem    REG   254,1   395176  264784 /usr/lib/x86_64-linux-gnu/libssl.so.1.0.0
python  2860 root  mem    REG   254,1    97872  392612 /usr/lib/python2.7/lib-dynload/_ssl.x86_64-linux-gnu.so
python  2860 root  mem    REG   254,1    11248  392000 /usr/lib/python2.7/lib-dynload/resource.x86_64-linux-gnu.so
python  2860 root  mem    REG   254,1  1607712  269275 /usr/lib/locale/locale-archive
python  2860 root  mem    REG   254,1  1738176 1045067 /lib/x86_64-linux-gnu/libc-2.19.so
python  2860 root  mem    REG   254,1  1051056 1045072 /lib/x86_64-linux-gnu/libm-2.19.so
python  2860 root  mem    REG   254,1   109144 1044580 /lib/x86_64-linux-gnu/libz.so.1.2.8
python  2860 root  mem    REG   254,1    10680 1045291 /lib/x86_64-linux-gnu/libutil-2.19.so
python  2860 root  mem    REG   254,1    14664 1045071 /lib/x86_64-linux-gnu/libdl-2.19.so
python  2860 root  mem    REG   254,1   137440 1044987 /lib/x86_64-linux-gnu/libpthread-2.19.so
python  2860 root  mem    REG   254,1   140928 1044988 /lib/x86_64-linux-gnu/ld-2.19.so
python  2860 root    0u   CHR     1,3      0t0    5593 /dev/null
python  2860 root    1u   CHR     1,3      0t0    5593 /dev/null
python  2860 root    2u   CHR     1,3      0t0    5593 /dev/null
python  2860 root    3u  IPv4 3596677      0t0     TCP localhost:5000 (LISTEN)

おぉ!STDIN(0),STDOUT(1),STDERR(2)が見事に/dev/nullです。そしてカレントワーキングディレクトリ(cwd)は/になっています。ルートディレクトリ(rtd)も/ですが、これはchrootする設定を引き渡すことで変更可能です。真に正統派デーモンですね!ログアウトしてもrcスクリプトから起動してもちゃんと動きます。

まとめ

  • メインルーチンをメインスレッドで、Flaskの開発用ウェブサーバを派生スレッドで稼働させることに成功しました。
  • メインルーチンとREST APIで変数の共有・更新が可能なことを確認しました。
  • 行儀の良いデーモンとして実装することに成功しました。

まだ色々と実験中ですので、もっと軽量な方法、グローバルロック無しで実現する方法があるよ、という方がいらっしゃいましたら是非お知らせください。

最終の状態のコード

コードはPython3ですが、数行の修正でPython2でも動くはずです。threaded=Trueは複数のリクエストを同時に受け付けるためのもので本論のスレッドとは関係ありません。

import logging
import threading
import daemon

from flask import request, url_for
from flask_api import FlaskAPI, status, exceptions

FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
log = logging.getLogger()

app = FlaskAPI(__name__)


notes = {
    0: 'do the shopping',
    1: 'build the codez',
    2: 'paint the door',
}

def note_repr(key):
    return {
        'url': request.host_url.rstrip('/') + url_for('notes_detail', key=key),
        'text': notes[key]
    }

def synchronized(lock):
    """ Synchronization decorator. """

    def wrap(f):
        def newFunction(*args, **kw):
            lock.acquire()
            try:
                return f(*args, **kw)
            finally:
                lock.release()
        return newFunction
    return wrap

glock = threading.Lock()

@synchronized(glock)
def list_notes():
    return [note_repr(idx) for idx in sorted(notes.keys())]

@synchronized(glock)
def add_note(note):
    idx = max(notes.keys()) + 1
    notes[idx] = note
    return idx
@synchronized(glock)
def update_note(key, note):
    notes[key] = note

@synchronized(glock)
def delete_note(key):
    return notes.pop(key, None)

@app.route("/", methods=['GET', 'POST'])
def notes_list():
    """
    List or create notes.
    """
    if request.method == 'POST':
        note = str(request.data.get('text', ''))
        idx = add_note(note)
        return note_repr(idx), status.HTTP_201_CREATED

    # request.method == 'GET'
    return list_notes()


@app.route("/<int:key>/", methods=['GET', 'PUT', 'DELETE'])
def notes_detail(key):
    """
    Retrieve, update or delete note instances.
    """
    if request.method == 'PUT':
        note = str(request.data.get('text', ''))
        update_note(key, note)
        return note_repr(key)

    elif request.method == 'DELETE':
        delete_note(key)
        return '', status.HTTP_204_NO_CONTENT

    # request.method == 'GET'
    if key not in notes:
        raise exceptions.NotFound()
    return note_repr(key)


def main():
    log.info('start')
    rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(threaded=True))
    rest_service_thread.start()
    log.info('main thread is mine!')
    import time
    for i in range(10):
        add_note("note{}".format(i))
        time.sleep(3)
    rest_service_thread.join()

with daemon.DaemonContext():
    main()
36
39
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
36
39