ファイルの変更を検知して、サーバーにアップロードするような処理を書きたかったので調べました。
ファイルの変更検知は gorakhargosh/watchdog | Github で簡単にできたのですが、アップロード処理を非同期で行う実装に手間取ったので残しておきます。
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent
from concurrent.futures import ThreadPoolExecutor
def upload_file(file_path):
print(f" [upload_file] start. (file={file_path})")
time.sleep(5)
print(f" [upload_file] end. (file={file_path})")
return "success"
def handle_future(future):
try:
success = future.result()
print(f" [handle_future] {success}")
except Exception as e:
print(f" [handle_future] aught an exception: {e}")
class MyEventHandler(FileSystemEventHandler):
def __init__(self, executor):
# FileSystemEventHandlerを継承して、ThreadPoolExecutor をインスタンスに持たせる
self.executor = executor
def on_closed(self, event: FileSystemEvent):
if event.is_directory:
return
print(f"[on_closed] start: {event}")
# ThreadPoolExecutor で upload_fileを別スレッドで実行する
future = self.executor.submit(upload_file, event.src_path)
# コールバックでupload_fileの結果を処理する
future.add_done_callback(handle_future)
print(f"[on_closed] end")
# 最大3スレッドで処理する
with ThreadPoolExecutor(max_workers=3) as executor:
event_handler = MyEventHandler(executor)
target_dir = "/tmp"
observer = Observer()
observer.schedule(event_handler, target_dir, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
finally:
observer.stop()
observer.join()