はじめに
個人開発の一環で、特定のディレクトリ内のファイルが変更された場合、それを検知して外部のAPIにリクエストを投げるPythonプログラムを作りました。
Pythonでディレクトリ内のファイル監視&変更イベントを検知してなんらかの処理を行う手段としては、watchdogライブラリが有名どころかと思います。
watchdogでいいかなぁと思いつつ、他のライブラリを検索してみたところ、watchfilesというライブラリを発見しました。
こちらが非常にシンプルなAPIで使いやすかったので簡単に概要と使い方等を共有しようと思います。
想定読者
- watchfilesの概要と簡単な使い方を知りたい人
- ディレクトリやファイルの変更をトリガーにさくっと処理を行いたい人
watchfilesとは?
前述の通り、ファイルやディレクトリの変更を監視し、変更をトリガーに何らかの処理を実行するためのライブラリです。
なお、watchfilesはuvicornのホットリロード機能でも採用されているようです。 1
watchfilesの主な特徴としては以下です。
高性能
watchfilesは、今流行りのRust言語で実装されており、バックエンドにnotifyライブラリを使用しています。
Rustの効率的な言語特性を活かしているとのことで高パフォーマンスが特徴のようです。
また、watchfilesはnotifyに基づいて実装されているので、クロスプラットフォームに対応しています。
Platforms
・Linux / Android: inotify
・macOS: FSEvents or kqueue, see features
・Windows: ReadDirectoryChangesW
・iOS / FreeBSD / NetBSD / OpenBSD / DragonflyBSD: kqueue
シンプルなAPI
watchfilesのAPIは非常にシンプルです。
これはwatchdogと比較するとわかりやすいので、両者の最低限のサンプルコードを載せます。
※ディレクトリ内のファイルの変更を検知し、その内容をprint出力するだけの処理です。
- watchdogの実装
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 監視対象ディレクトリ内に更新があったときに呼ばれるイベントハンドラ
class EventHandler(FileSystemEventHandler):
def on_modified(self,e):
print(f"{e.event_type} : {e.src_path}")
observer = Observer()
observer.schedule(EventHandler(), path="/path/to/directory", recursive=True)
observer.start()
try:
while True:
time.sleep(1) # CPU使用率を抑えるために1秒間隔でスリープ
except KeyboardInterrupt:
observer.stop()
observer.join()
- watchfileの実装
from watchfiles import watch
# 監視対象ディレクトリ内に更新があったときは、このforループに入る
for changes in watch("/path/to/directory"):
print(f"Changes detected: {changes}")
若干の違いはあれど同じ処理を行っていますが、watchfilesのほうがコード量が少ないことがわかると思います。
前者のwatchdogでは、オブザーバー/イベントハンドラのインスタンス作成に加えて、監視を開始/停止するための明示的なstart/stop処理等を記述する必要がありますが、watchfilesはそれらの複雑な処理を内部でカプセル化しています。
これによって、開発時はファイル監視のための詳細な処理を意識することなく、本来のタスク(監視イベント検知後の処理)に注力することができるということです。
watchfilesの基本的な使い方
以下のようなディレクトリ構造を前提に、簡単に使い方を説明します。
.
└── src
├── main.py # watchfilesの実装コード
└── target_dir # 監視対象ディレクトリ
└── test.txt
変更を検知したら任意の関数を実行したい場合
watch()
またはrun_process()
を使います。
watch()
を使う
前述した通り、for文のイテラブルオブジェクトとしてwatch()
を定義し、watch()
の引数に監視対象のディレクトリ or ファイルのパスを指定することで変更を検知することができます。
以下のコードは、src/target_dir
ディレクトリ内の変更を監視し、変更の内容が更新の場合のみ別の関数を実行しています。
from watchfiles import watch
action_type_map = {
1: "Added",
2: "Modified",
3: "Deleted",
}
# 更新イベントを検知したときに実行するトリガー関数
def changed_func(path):
print(path)
watch_directory = "src/target_dir"
# 特定のファイルだけ監視したい場合は、以下のようにファイル名まで指定する
# watch_file = "src/target_dir/test.txt"
for changes in watch(watch_directory):
for action, path in changes:
# 変更のタイプが「更新」の場合、別の関数を実行する
if action_type_map.get(action) == "Modified":
changed_func(path)
なお、watch()
から返却されるchanges
の値は、以下の通りsetの中にtupleが含まれたネスト構造となっています。
{(<Change.modified: 2>, 'src/target_dir/test.txt')}
tupleの最初の要素は変更の種類(追加/更新/削除のいずれかを示すenum型)が含まれており、2番目の要素には変更が発生したファイルまたはディレクトリのパスが格納されています。
enum型の値は上記のaction_type_map
に定義した通り、変更の種類に応じて異なる数値が返却されます。
run_process()
を使う
run_process()
を使えば、watchと同じように任意の関数を実行できます。
記述方法は、以下のようにrun_process()
の第1引数に監視対象のパスを指定し、target引数にcallableなオブジェクトを渡します。
import os
from watchfiles import run_process
action_type_map = {
1: "Added",
2: "Modified",
3: "Deleted",
}
# 変更イベントを検知したときに実行するトリガー関数
def changed_func():
# changesの内容は、環境変数から取得できる
changes = os.getenv("WATCHFILES_CHANGES")
if action_type_map.get(action) == "Modified":
print("changed_func called due to changes:", changes)
if __name__ == "__main__":
run_process("src/target_dir", target=changed_func)
run_process()
は内部的にmultiprocessing.Process
を使って新しいプロセスを生成して変更監視および処理を実行しているようです。
トリガー関数の処理を実行中に新しい変更を検知した場合はどうなる?
例えばトリガー関数の処理が重く、最初の処理が終わらない状態で立て続けに2つ目の変更を検知した場合はどうなるでしょうか。
以下のようにwatch()
とrun_process()
のトリガー関数内にsleepを仕込んで検証してみます。
# 更新イベントを検知したときに実行する処理を定義する
def changed_func(path):
# 重い処理を想定
time.sleep(10)
結果としては以下のようになりました。
-
watch()
- 最初の処理が終わったら2つ目の処理が実行されます。すなわち、2つ目の処理は最初の処理が終わるまで待たされるが、両方の処理は確実に実行されます。
- これは、
watch()
がシングルプロセスで動作し、変更を逐次処理するからです。
-
run_process()
- 最初の処理は強制終了され、2つ目の変更処理が即座に実行されます。
- これは、
run_process()
が新しい変更を検出すると、現在進行中のプロセスを中断し、変更に基づいて新しいプロセスを再度開始するためです。
watch()
とrun_process()
の使い分け
watch()
の特性を踏まえると、変更を検知した際は順序を維持して確実に処理を行いたい場合に利用できそうです。
例えば、ファイルの変更履歴を記録する場合や、変更内容を外部に通知する等の処理が考えられます。
一方でrun_process()
は、最新の変更に素早く反応し、プロセスの完了を待たずに処理をリスタートしたい場合に適していると思います。
一例としては、Webアプリケーションの開発時に最新の変更内容に基づいて設定ファイルを再読み込み→リロードを実行するようなケースで有効に使えると思います。
変更を検知したら任意のコマンドも実行できる
run_process()
のtarget
引数にstr型を渡すことで任意のコマンドを実行することができます。
from watchfiles import run_process
command = "echo 'Start Process'"
if __name__ == '__main__':
run_process("src/target_dir", target=command)
内部的にはsubprocess.Popen
でコマンドを実行するみたいです。
利用用途としては様々ありますが、変更を検知したら別のスクリプトを実行する等があるかなと思います。
非同期処理
今回私は利用しませんでしたが、watchとrun_processにはそれぞれ非同期版のawatch()
とarun_process()
もあります。
asyncioと組み合わせることで並行処理ができるようです。
パフォーマンスが求められる環境で効率的に処理を実行したい場合に使えると思います。
CLIコマンドでrun_process()
を実行できる
以下のようにCLIからwatchfiles
コマンドを実行できます。
watchfiles ['コマンド'] [監視対象のパス]
watchfilesコマンドを実行すると、内部的にはrun_process()
(subprocess.Popen
)で処理されるみたいです。
利用ケースとしては、監視&処理のスクリプト(今回紹介したmain.pyのようなもの)を書かずに直接ファイルやディレクトリの変更を監視し、特定のコマンドを実行する際に便利かなと思います。
公式ドキュメントでは、簡易的なWebサーバのホットリロードを実行するサンプルコードが紹介されています。
さいごに
以上、簡単にwatchfilesの概要と使い方の紹介でした。
なお、本記事では紹介していませんが、watch()
やrun_process()
には様々な引数を渡せる仕様となっています。
引数の内容に応じて細かい制御ができるようなので、興味がある方はドキュメントを参照してみてください。
-
デフォルトでは利用されていませんが、ホットリロードする際の監視対象をより
詳細にコントロールしたい場合に別途インストールして使うようです。 ↩