■はじめに
この記事は「DSL Advent Calendar 2019」の22日目の記事です。
もうすぐクリスマス、大晦日、お正月と、ビッグイベントを控えてこころなしか世間も
浮足立ってせわしなくなってくる季節と思われます。皆様はいかがお過ごしでしょうか?
アドベントカレンダーも終盤、少ない人数で回しているとこなんかは精神崩壊一歩手前、
今日まで書き続けられているソロプレイヤーは人間卒業間近ですね。
このアドカレのメンバーはDSL関係者なのですが、私はOB枠での参加となります!
学部で卒業し、とあるITベンチャーでエンジニアをしていますが、
入社して約半年、勉強してきたことをまとめ、紹介していきたいと思います。
■機械学習×Webアプリで意識すべきこと
さて、機械学習をWeb上で行うためには以下の点に気を付けなければなりません。
- 前処理、学習、予測など時間のかかる処理を動かしつつもWebサーバーを動かし続けなければならない
- 処理の開始・終了時にグラフィックメモリの操作が必要な場合がある
この点に対応するためにマルチプロセスかつ、それぞれのプロセスの開始と終了の処理を管理できるような
システムをプログラミングします。めんどくさいですね。
■設計思想1:async/await
まずはノンブロッキングIOの金字塔async/awaitです。
フロントエンドに手を出したことがある方ならあたりまえのように使っているかもしれませんが
実はPythonにもあります。
しかし、javascriptのasync/awaitとは違い、asyncを付けた関数は必ずコルーチンオブジェクトを
返すので、イベントループ内でしか実行できません。
■設計思想2:System of Systems
具体的なシステムの設計の仕方としてSystem of Systemsという考え方があります。
本来ならばソフトウェア設計ではなく業務プロセスなどもっと別分野で
用いられるものっぽい?ですが今回はこれをうまくプロセス管理の部分に落とし込みます。
>1.システムを入れ子構造に
ひとつのシステムは0個以上のシステムから構成されます。
このとき、親のシステムに対し子のシステムをサブシステムと呼び、
全てのサブシステムが起動しおえることで親のシステムが「起動した」扱いになり、
全てのサブシステムが終了することで親のシステムが「終了した」扱いになります。
>2.システムの状態
システムは以下の表の状態をとります。
各状態から遷移できる状態は決まっており、initialからいきなりrunningなどへ遷移することはできません。
状態 | 説明 | 遷移可能 |
---|---|---|
initial | システムが作成された直後に初期値として与えられる状態 | ready, disabled |
ready | システムを実行するための準備が完了したことを表す状態 | running |
running | システムが実行中であるときの状態 | completed, intermitted, terminated |
completed | システムが正常に実行完了したことを表す状態 | - |
disabled | システムが実行できないことを表す状態、実行不可の原因を取り除くことでreadyに遷移できる | ready |
intermitted | システムが停止中であることを表す状態、システムがrunningである間は何度でもintermittedとrunningを行ったり来たりできる(実際にそう作りこむことは困難) | running |
terminated | システムが強制終了したときの状態、disabledと違ってここから遷移することはできない | - |
以下の図が簡単な状態遷移図です。
途中でエラーもおきず正常に処理が進んだ場合、青いルートを通ります。
予期せぬ事態で処理を進めることができなくなった場合は赤いルートを通りdisabledやterminatedとなります。
また、緑のルートは基本的に人間による判断・操作で遷移が開始されます。
>3.システムの遷移
前項ではシステムの各状態の紹介、もとい定義を行いました。
次は状態の遷移、図でいうと矢印の定義を行います。
定義というとちょっと堅苦しいですが、しっかりしておくことでプログラムを書く際に悩まないようにしておきましょう。
先ほどと同じように表と図を用意しました。
遷移 | 説明 |
---|---|
activate(活性化) | 実行に必要な材料集めを行うprepare関数を実行 |
disable(無効化) | 状態が格納された変数の値をdisabledに変更 |
enable(有効化) | 状態が格納された変数の値をreadyに変更 |
start(開始) | 機械学習など重い処理や無限ループを行うmain関数を実行 |
complete(完了) | メモリの開放などを行うshutdown関数を実行 |
suspend(中断) | 実行中のmain関数に中断シグナルを送ります |
resume(再開) | 中断中のmain関数に再開シグナルを送ります |
terminate(強制終了) | メモリの開放などを行うteardown関数を実行 |
prepare関数やらmain関数やら新たな単語が出てきましたが、
これらを用意しておくことでプログラムが書きやすくなります。
具体的なイメージとしては大元となるSystemクラスを継承させて各システムを作っていくときに、
activateやstartをオーバーライドするときに必ずsuper()を挿入しなくてはいけません。
(状態変更やロギングなどは遷移のたびに行うため)
これがわずらわしいので、各システム特有の処理をprepareやmainなど別の関数に逃がすことで解決します。
■プログラム例
タイトルでは機械学習を謳ってはいますが、簡単のために今回は時間のかかる処理としてsleep関数で代用します。
まずは大元のSystemクラスを作成します。
class System():
def __init__(self, name):
self.name = name
self.state = "initial"
self.kwargs = {}
self.log(self.state)
def log(self, msg):
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
line = f"{date}\t[{self.name}]\tpid:{os.getpid():05d}\t{msg}"
print(line)
def prepare(self, **kwargs):
pass
def main(self):
pass
def activate(self):
self.prepare(**self.kwargs)
self.state = "ready"
self.log(self.state)
def start(self):
self.state = "running"
self.log(self.state)
self.main()
def complete(self):
self.state = "completed"
self.log(self.state)
def transit(self):
self.activate()
self.start()
self.complete()
async def run(self, **kwargs):
self.kwargs = kwargs
executor = ProcessPoolExecutor(max_workers=None)
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, self.transit)
sleepを並列で実行するだけなので上で延々と紹介した状態や遷移をすべて実装していません
コンストラクタ__init__ではこのシステムの名付けと初期状態の設定を行っています。
transitでは青ルートの遷移を順番に実行しています。disableやterminateを実装する際は
この部分にtry-exceptを入れてあげると綺麗に書けると思います。
最後のasync関数として定義されたrunでは、run_in_executorによってtransitをコルーチン関数として扱えるようにしています。
またprepareなどではユーザーによって引数を取る場合があるので可変長引数として
transit、さらにはactiveへと渡したいところですが、どうもこのrun_inexecutor、マルチプロセスの場合
可変長引数を渡そうとするとエラーを吐いてしまいます。しかたがないのでインスタンス変数kwargsに格納しています。
次に、"sleep関数を実行するシステム"を実行するシステムを作ります。
ややこしい言い回しですが、もし複数のシステムを実行したいとなったときに、
__main__に直接書いてしまうのは避けたいのでラップシステムとしてappSystemを作ります。
class appSystem(System):
def prepare(self):
pass
def main(self):
sleep1 = sleepSystem("sleepSystem1")
sleep2 = sleepSystem("sleepSystem2")
systems = asyncio.gather(
sleep1.run(sleep=5),
sleep2.run(sleep=3)
)
loop = asyncio.get_event_loop()
loop.run_until_complete(systems)
ここでわざわざactivetとprepare、startとmainのように処理を分けた意味が出てきますね。
今回はただのsleepなのでprepareには特に書くことがありません。インスタンス格納した変数を無理やり書いてもいいが…
main内で5秒間sleepするsleepSystem1と3秒間sleepするsleepSystem2を実行します。
sleepSystemは以下のような単純なシステムです。
class sleepSystem(System):
def prepare(self, sleep=3):
self.sleep = sleep
def main(self):
time.sleep(self.sleep)
あとはメイン関数でappSystem.run()をイベントループに追加してあげます。13
def main():
app = appSystem("appSystem")
loop = asyncio.get_event_loop()
loop.run_until_complete(app.run())
if __name__ == "__main__":
main()
それでは実行してみましょう。
2019-12-14 16:43:28.843830 [appSystem] pid:30360 initial
2019-12-14 16:43:29.196505 [appSystem] pid:21020 ready
2019-12-14 16:43:29.196505 [appSystem] pid:21020 running
2019-12-14 16:43:29.197501 [sleepSystem1] pid:21020 initial
2019-12-14 16:43:29.197501 [sleepSystem2] pid:21020 initial
2019-12-14 16:43:29.799470 [sleepSystem1] pid:29720 ready
2019-12-14 16:43:29.803496 [sleepSystem1] pid:29720 running
2019-12-14 16:43:29.872484 [sleepSystem2] pid:18868 ready
2019-12-14 16:43:29.872484 [sleepSystem2] pid:18868 running
2019-12-14 16:43:32.873678 [sleepSystem2] pid:18868 completed
2019-12-14 16:43:34.804446 [sleepSystem1] pid:29720 completed
2019-12-14 16:43:34.804446 [appSystem] pid:21020 completed
左から順番に、日付、システム名、PID、状態となっています。
sleepSystem1とsleepSystem2がrunning状態になった時刻がほぼ同時刻であること、
またそれらが別プロセスとなっており同時に進行し、3、5秒後にcompleted状態遷移、
そしてappSystemのcompletedが確認できます。
最後にプログラム全体を載せておきます。
import asyncio
import time
from datetime import datetime
import os
from concurrent.futures import ProcessPoolExecutor
class System():
def __init__(self, name):
self.name = name
self.state = "initial"
self.kwargs = {}
self.log(self.state)
def log(self, msg):
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
line = f"{date}\t[{self.name}]\tpid:{os.getpid():05d}\t{msg}"
print(line)
def prepare(self, **kwargs):
pass
def main(self):
pass
def activate(self):
self.prepare(**self.kwargs)
self.state = "ready"
self.log(self.state)
def start(self):
self.state = "running"
self.log(self.state)
self.main()
def complete(self):
self.state = "completed"
self.log(self.state)
def transit(self):
self.activate()
self.start()
self.complete()
async def run(self, **kwargs):
self.kwargs = kwargs
executor = ProcessPoolExecutor(max_workers=None)
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, self.transit)
class appSystem(System):
def prepare(self):
pass
def main(self):
sleep1 = sleepSystem("sleepSystem1")
sleep2 = sleepSystem("sleepSystem2")
systems = asyncio.gather(
sleep1.run(sleep=5),
sleep2.run(sleep=3)
)
loop = asyncio.get_event_loop()
loop.run_until_complete(systems)
class sleepSystem(System):
def prepare(self, sleep=3):
self.sleep = sleep
def main(self):
time.sleep(self.sleep)
def main():
app = appSystem("appSystem")
loop = asyncio.get_event_loop()
loop.run_until_complete(app.run())
if __name__ == "__main__":
main()
■最後に
駆け足になりましたが機械学習のためのWebアプリ設計の一例を紹介させていただきました。
プログラム例はsleepだけでwwwサーバーや機械学習の実装をしているわけではないですが
考え方自体は同じなので手間取る部分は少ないと思います。
(具体的に書きすぎると会社的にアレなのでかなり簡略化しています)
また、システム間の通信は基本WebSocketで行います。
wwwSystemとは別にwebsocketSystemを作成してappSystemのサブシステムにするといいでしょう。
というわけでいかがだったでしょうか?
まだ長期運用はしていないですが個人的にはきれいな設計で気に入っています。
■参考