はじめに
定周期で実行されるプログラムは色々なシステムで実装されるが、上手に実装しないと意図した時間通りに実行されない。最近Pythonでプログラムの実装をする機会が多いので、Pythonで実行する方法をまとめる。実装の方法次第だが、下記のとおりである。
- [NG] sleepで待つ
- [NG] threadingでタイマーを使う
- [OK] 処理時間を考慮してthreadingとsleepを使う
- [OK] シグナルハンドラを利用する
方法
以降で1秒周期でプログラムを実行する方法についてまとめていく。なお、ヒストグラム作成のために出力した時間(N数)は1000個である。
[NG] sleepによる待機
下記のプログラムに示すように、一定時間待つようにする。
import time
for i in range(1000):
print(time.time())
time.sleep(1)
この方法だと、sleepで1秒待つものの、1周するごとに他の処理の時間が加算され、実質1秒周期にならない。実際、出力した時間の差分をとっていくと、次のヒストグラムのようになる。
[NG] threadingでタイマーを使う
シーケンシャルに実行するには限界がありそうなので、別スレッドを立てて実行することを考える。そのために、定周期の最初で次の定周期用タイマーを定義し、以降で通常の処理を行う。
import time
import threading
def scheduler():
t = threading.Timer(1, scheduler)
t.start()
print(time.time())
t = threading.Thread(target = scheduler)
t.start()
time.sleep(1000)
その結果が下記である。sleepしていたときと同様で、結局スレッドを作成してタイマーかけるまでの時間分だけ、定周期が伸びてしまう。
[OK] 処理時間を考慮してthreadingとsleepを使う
別スレッドをどんどん立てて行くことについては共通だが、途中の処理時間を考慮して、その分の補正を行う。今回は別の記事のコードを例として使用する。
import time
import threading
def worker():
print(time.time())
time.sleep(8)
def scheduler(interval, f, wait = True):
base_time = time.time()
next_time = 0
while True:
t = threading.Thread(target = f)
t.start()
if wait:
t.join()
next_time = ((base_time - time.time()) % interval) or interval
time.sleep(next_time)
scheduler(1, worker, False)
schedulerの最後の引数をTrueにすると、前のスレッドが生存している場合、次のスレッドが作れない。Falseにすることで、並列処理をガンガン行えるようになる。これを使って、周期を確認してみた結果が次のとおりである。多少のばらつきはあるが、平均1秒で実行できるように見える。
[OK] シグナルハンドラを利用する
※これはUnix系環境限定である
ここまではプログラム内で時刻を修正する方向で考えていたが、そもそももっと低レイヤーからの信号を使うことを考える。例えば、マイコンでは割り込みピンなどを利用して物理信号をトリガにすることもあるが、そんなイメージである。そのコードが下記のとおりである。
import signal
import time
def scheduler(arg1, args2):
print(time.time())
signal.signal(signal.SIGALRM, scheduler)
signal.setitimer(signal.ITIMER_REAL, 1, 1)
time.sleep(1000)
実行結果は次のとおりである。先のthreading+sleepと横軸のスケールは揃えているが、明らかに良い精度で1秒で定周期実行できていることがわかる。
まとめ
定周期は単純なsleepでは実現できない。ベストはシグナルハンドラを利用することで、それができなければ、プログラム内で処理時間を考慮した上で、次回の実行を予約するようにする。