LoginSignup
5
6

More than 5 years have passed since last update.

Pythonのwatchdogを使ってFTP受信ファイルを安全に処理する

Last updated at Posted at 2019-04-24

FTP送信されてくるデータファイルを定時バッチで取り込む必要がありました。

要件をまとめると。

  • 10分毎に上位システムからデータがFTP送信されてくる。
  • 受信ファイルは同じファイル名で毎回上書きされるため、処理漏れに注意する。
  • 受信データを下位システムに反映するバッチは既存のバッチを拡張して実装済み。
  • 既存バッチは定期的に起動されている。
  • 双方の処理で同じファイルを食い合うことがないようにファイルを連携する。

FTP送信もデータ取り込みバッチも定時実行なのですがファイルを食い合ってエラーとならないようにすることが重要な点です。
FTPはそもそも送信側が制御しているため、こちら(受信側)は処理完了を待たなければなりません。
送信側の改変はしないとのことで、送信中にロックファイルを置いてもらうこともできません。
とにかくファイルの送信完了を待って、受信ファイルを作業ファイルへコピーします。
コピー先のファイルにはファイル名にタイムスタンプを付けて履歴管理もします。
データ取り込みバッチはコピー先のファイルを処理対象とします。

既存バッチはPython3で実装されていましたので、同じPython3でファイルをコピーするバッチを作成します。
調べてみるとwatchdogというファイルの更新を監視できるライブラリが良さそうなので、これを使おうと思います。
参考:ファイル監視にwatchdogがかなり便利な件

しかし参考先の記事にもある通り、on_modifiedイベントが2回発生してしまいます。
試しに大きなサイズのファイルを監視中のフォルダにコピーしてみたところ、on_modifiedイベントが2回と言わず多数回発生しました。
つまりこれは、ファイルの更新中はイベントがミリ秒単位で発生し続けているということを意味していると思われます。
そもそも同じファイルを何度もコピーする必要はないので、ファイルの更新が完了するのを待つことにしました。
on_modifiedイベントが発生しなくなったら更新が完了したと判断できるという発想です。
出来たコードはこんな感じです。

monitor_file.py
import datetime
import os
import shutil
import sys
import time

from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

dic_modified_time = {}


class ChangeHandler(FileSystemEventHandler):

    def on_modified(self, event):
        if event.is_directory:
            return

        if os.path.splitext(event.src_path)[-1].lower() in ('.csv'):
            # CSVファイルのみ監視し、更新フラグと更新日時を記録
            dic_modified_time[event.src_path] = datetime.datetime.now()


def copy_file():
    temp_dic = dic_modified_time.copy()
    for file_path in temp_dic.keys():
        try:
            # 更新されたファイルが1秒以上更新なしの状態になった場合、更新が完了したものとみなして処理
            if (datetime.datetime.now() - dic_modified_time[file_path]).total_seconds() > 1:
                # 作業ファイルへコピー
                work_file = '{0}.work'.format(file_path)
                shutil.copy(file_path, work_file)
                # 作業ファイルをリネームにて移動
                min_file = '{0}_{1}'.format(file_path, datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
                os.renames(work_file, min_file)
                del dic_modified_time[file_path]
                print('copy file completed. {0} to {1}'.format(file_path, min_file))

        except Exception as e:
            print(e)
            # エラーが起きても処理を続行
            continue


def init():
    print('folder monitoring started...')

    while 1:
        event_handler = ChangeHandler()
        observer = Observer()
        observer.schedule(event_handler, os.path.abspath('C:/ftp'), recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
                copy_file()

        except KeyboardInterrupt:
            print('stop with KeyboardInterrupt.')
            observer.stop()
            sys.exit()

        observer.join()


if __name__ in '__main__':
    init()

解説です。
イベントハンドラではon_modified時にファイル名単位で更新日時を記録(上書き)していきます。(21行目)
※ファイル生成時にもon_modifiedは発生しているので処理対象はon_modifiedだけとしました。

イベントハンドラとは別にスリープ(1秒)を挟んだループ処理内で、ファイルの更新日時をチェックする処理を呼んでいます。(56行目)
ファイルの最終更新時間から1秒以上経過した場合(29行目)、ファイル更新が終わっているものと判断してファイルをコピーします。(32行目)
ファイルのコピー時にはFTP送信は終わっていて、次の送信までは(今回の場合は10分近い)間があるため、ファイルの食い合いは発生しません。
しかし後続の処理(定期的に実行されるデータ取り込みバッチ)がファイルを取りに来てしまう可能性があります。
そのため、一旦ワークファイルにコピーをしています。このワークファイルはどこからも参照されません。

コピーが終わったらリネームによるファイル移動を行います。(35行目)
このリネームというのはファイルの移動が終わるまで他からファイルを参照することができない処理です。
後続の処理はこのリネーム後のファイルを処理対象としていますから、ファイルを食い合うことはありません。

watchdogは便利なライブラリだと思いますが、イベントの発生の仕方に癖があるというか、ハマりかけました。
ともあれ、要件は満たすことができました。

5
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
6