Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
79
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

updated at

Organization

Pythonで定周期で実行する方法と検証

はじめに

定周期で実行されるプログラムは色々なシステムで実装されるが、上手に実装しないと意図した時間通りに実行されない。最近Pythonでプログラムの実装をする機会が多いので、Pythonで実行する方法をまとめる。実装の方法次第だが、下記のとおりである。

  • [NG] sleepで待つ
  • [NG] threadingでタイマーを使う
  • [OK] 処理時間を考慮してthreadingとsleepを使う
  • [OK] シグナルハンドラを利用する

方法

以降で1秒周期でプログラムを実行する方法についてまとめていく。なお、ヒストグラム作成のために出力した時間(N数)は1000個である。

[NG] sleepによる待機

下記のプログラムに示すように、一定時間待つようにする。

import time

for i in range(1000):
    print(time.time())
    time.sleep(1)

この方法だと、sleepで1秒待つものの、1周するごとに他の処理の時間が加算され、実質1秒周期にならない。実際、出力した時間の差分をとっていくと、次のヒストグラムのようになる。

sleep-test.png

[NG] threadingでタイマーを使う

シーケンシャルに実行するには限界がありそうなので、別スレッドを立てて実行することを考える。そのために、定周期の最初で次の定周期用タイマーを定義し、以降で通常の処理を行う。

import time
import threading

def scheduler():
    t = threading.Timer(1, scheduler)
    t.start()
    print(time.time())

t = threading.Thread(target = scheduler)
t.start()

time.sleep(1000)

その結果が下記である。sleepしていたときと同様で、結局スレッドを作成してタイマーかけるまでの時間分だけ、定周期が伸びてしまう。

thread-test.png

[OK] 処理時間を考慮してthreadingとsleepを使う

別スレッドをどんどん立てて行くことについては共通だが、途中の処理時間を考慮して、その分の補正を行う。今回は別の記事のコードを例として使用する。

import time
import threading

def worker():
    print(time.time())
    time.sleep(8)

def scheduler(interval, f, wait = True):
    base_time = time.time()
    next_time = 0
    while True:
        t = threading.Thread(target = f)
        t.start()
        if wait:
            t.join()
        next_time = ((base_time - time.time()) % interval) or interval
        time.sleep(next_time)

scheduler(1, worker, False)

schedulerの最後の引数をTrueにすると、前のスレッドが生存している場合、次のスレッドが作れない。Falseにすることで、並列処理をガンガン行えるようになる。これを使って、周期を確認してみた結果が次のとおりである。多少のばらつきはあるが、平均1秒で実行できるように見える。

thread-dev-test.png

[OK] シグナルハンドラを利用する

※これはUnix系環境限定である
ここまではプログラム内で時刻を修正する方向で考えていたが、そもそももっと低レイヤーからの信号を使うことを考える。例えば、マイコンでは割り込みピンなどを利用して物理信号をトリガにすることもあるが、そんなイメージである。そのコードが下記のとおりである。

import signal
import time


def scheduler(arg1, args2):
    print(time.time())

signal.signal(signal.SIGALRM, scheduler)
signal.setitimer(signal.ITIMER_REAL, 1, 1)

time.sleep(1000)

実行結果は次のとおりである。先のthreading+sleepと横軸のスケールは揃えているが、明らかに良い精度で1秒で定周期実行できていることがわかる。

signal-test.png

まとめ

定周期は単純なsleepでは実現できない。ベストはシグナルハンドラを利用することで、それができなければ、プログラム内で処理時間を考慮した上で、次回の実行を予約するようにする。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
79
Help us understand the problem. What are the problem?