Help us understand the problem. What is going on with this article?

Pythonで定周期で実行する方法と検証

More than 1 year has passed since last update.

はじめに

定周期で実行されるプログラムは色々なシステムで実装されるが、上手に実装しないと意図した時間通りに実行されない。最近Pythonでプログラムの実装をする機会が多いので、Pythonで実行する方法をまとめる。実装の方法次第だが、下記のとおりである。

  • [NG] sleepで待つ
  • [NG] threadingでタイマーを使う
  • [OK] 処理時間を考慮してthreadingとsleepを使う
  • [OK] シグナルハンドラを利用する

方法

以降で1秒周期でプログラムを実行する方法についてまとめていく。なお、ヒストグラム作成のために出力した時間(N数)は1000個である。

[NG] sleepによる待機

下記のプログラムに示すように、一定時間待つようにする。

import time

for i in range(1000):
    print(time.time())
    time.sleep(1)

この方法だと、sleepで1秒待つものの、1周するごとに他の処理の時間が加算され、実質1秒周期にならない。実際、出力した時間の差分をとっていくと、次のヒストグラムのようになる。

sleep-test.png

[NG] threadingでタイマーを使う

シーケンシャルに実行するには限界がありそうなので、別スレッドを立てて実行することを考える。そのために、定周期の最初で次の定周期用タイマーを定義し、以降で通常の処理を行う。

import time
import threading

def scheduler():
    t = threading.Timer(1, scheduler)
    t.start()
    print(time.time())

t = threading.Thread(target = scheduler)
t.start()

time.sleep(1000)

その結果が下記である。sleepしていたときと同様で、結局スレッドを作成してタイマーかけるまでの時間分だけ、定周期が伸びてしまう。

thread-test.png

[OK] 処理時間を考慮してthreadingとsleepを使う

別スレッドをどんどん立てて行くことについては共通だが、途中の処理時間を考慮して、その分の補正を行う。今回は別の記事のコードを例として使用する。

import time
import threading

def worker():
    print(time.time())
    time.sleep(8)

def scheduler(interval, f, wait = True):
    base_time = time.time()
    next_time = 0
    while True:
        t = threading.Thread(target = f)
        t.start()
        if wait:
            t.join()
        next_time = ((base_time - time.time()) % interval) or interval
        time.sleep(next_time)

scheduler(1, worker, False)

schedulerの最後の引数をTrueにすると、前のスレッドが生存している場合、次のスレッドが作れない。Falseにすることで、並列処理をガンガン行えるようになる。これを使って、周期を確認してみた結果が次のとおりである。多少のばらつきはあるが、平均1秒で実行できるように見える。

thread-dev-test.png

[OK] シグナルハンドラを利用する

※これはUnix系環境限定である
ここまではプログラム内で時刻を修正する方向で考えていたが、そもそももっと低レイヤーからの信号を使うことを考える。例えば、マイコンでは割り込みピンなどを利用して物理信号をトリガにすることもあるが、そんなイメージである。そのコードが下記のとおりである。

import signal
import time


def scheduler(arg1, args2):
    print(time.time())

signal.signal(signal.SIGALRM, scheduler)
signal.setitimer(signal.ITIMER_REAL, 1, 1)

time.sleep(1000)

実行結果は次のとおりである。先のthreading+sleepと横軸のスケールは揃えているが、明らかに良い精度で1秒で定周期実行できていることがわかる。

signal-test.png

まとめ

定周期は単純なsleepでは実現できない。ベストはシグナルハンドラを利用することで、それができなければ、プログラム内で処理時間を考慮した上で、次回の実行を予約するようにする。

montblanc18
Software Developer & Engineer. Futureという会社で働いています。得意分野は制御系とNW。元は工場勤務の社内SEです。 C/C++生まれRuby育ちなPythonista、現在はGopherに片足ツッコミ中。かつて宇宙物理と小型衛星と天文学で闘い、製鉄で立ち回った後、ITの世界へ。
http://montblanc18.github.io
future
ITを武器とした課題解決型のコンサルティングサービスを提供します
http://future-architect.github.io/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away