概略
pythonにおいてAPSスケジューラーを使い、指定時間にスケジューラーへ新しいJobを追加するJobを実行するコードの備忘録。
- JobAはJobBを指定時間に追加するJob
- JobAの実行は指定時間にスケジュールされる
実行の流れは
- JobAを指定時間に実行するようにスケジュールする
- 指定時間にJobAが実行され、JobBがスケジューラーに追加される
- JobBの実行指定時間になるとJobBが実行される
ポイントはスケジューラーをサブスレッドにして開始して、メインスレッドでJobをスケジューラーに追加する。
今回使用した環境
OS : Win11 (Raspberry pi5 でも動作確認済)
python Version: 3.12.9
APScheduler Version: 3.11.0
コード
"""スケジューラーにJobを追加するJobをスケジュールする"""
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime,timedelta
import threading
def JobA(scheduler:BlockingScheduler,job_runtime:datetime):
"""JobA(JoBをスケジューラーに追加する。)
Args:
scheduler (BlockingScheduler): スケジューラー
job_runtime (datetime): JobB実行時間
"""
print(f"JobA実行:{datetime.now()}")
scheduler.add_job(JobB,"date",run_date=job_runtime)
print(f"JobB {job_runtime}の実行をスケジューラーに追加:{datetime.now()}")
def JobB():
"""JobB(JobAによりスケジューラーに追加される)
"""
print(f"JobB実行:{datetime.now()}")
def StartSceduler(scheduler:BlockingScheduler):
"""スケジューラー開始(スレッド用)"""
print(f"スケジューラー開始:{datetime.now()}")
scheduler.start()
if __name__ == "__main__":
global_scheduler = BlockingScheduler()
#スケジューラーのインスタンス
jobA_runtime = datetime(2025,3,8,17,21)
#JobAを実行する時間
arg_tuple = (global_scheduler, jobA_runtime+timedelta(seconds=30))
#JobBをスケジューラーの追加する引数、tuple(スケジューラーインスタンス、JobB開始時間=JobA開始30秒後)
global_scheduler.add_job(JobA,"date",run_date=jobA_runtime,args=arg_tuple)
#JobAをスケジューラーに追加する。
thread = threading.Thread(target=StartSceduler,args=[global_scheduler])
#スレッドのインスタンス、サブスレッドでスケジューラーを開始する。
thread.start()
#スレッド開始する。
thread.join()
#スレッドを実行するためにロック
実行結果
スケジューラー開始:2025-03-08 18:31:17.073882
JobA実行:2025-03-08 18:32:00.000911
JobB 2025-03-08 18:32:30の実行をスケジューラーに追加:2025-03-08 18:32:00.001461
JobB実行:2025-03-08 18:32:30.000939
今回の方法を必要とした背景
やりたいことと問題
- 指定日と指定時間にJobを実行したい
- しかし具体的指定時間は当日にならないと確定しない
解決策(今回の方法)
- 指定日の朝に具体的な時間を調べて、具体的な時間を確定させる
- その具体的な時間を指定してJobをスケジューラーに追加する