LoginSignup
94
129

【業務効率化】ファイルorフォルダの更新を監視するプログラム

Posted at

目次

  • 背景
  • 目的
  • 内容
  • 所感

背景

仕事をしていると、共通ファイルサーバにあるファイルが更新されたらすぐに知りたいと思う場面が多々ありました。毎回フォルダにアクセスして更新されているかを確認するのは面倒であり、ファイルが更新されたら誰かがお知らせしてくれれば便利だなぁと思っていました。

目的

フォルダにあるファイルが更新されたら、通知されるプログラムを作成すること。

内容

コードのコンセプト

プログラムの流れを考えます。
まず、ファイルを監視するのか、フォルダを監視するのかを場合分けする必要があります。ファイルとフォルダの場合についてそれぞれについて深掘りしていきます。

【ファイルの場合】

  1. プログラムを実行した際のファイルの更新日を記録する
  2. 更新日を定期的に取得する
  3. その2つの日付を比較して異なれば通知する

【フォルダの場合】
[ファイルの個数を監視]

  1. プログラムを実行した際のファイルの数を記録する
  2. ファイルの数を定期的に取得する
  3. その2つの個数を比較して異なれば通知する

[ファイルの更新日を監視]

  1. プログラムを実行した際のフォルダ内のすべてのファイルの更新日を記録する
  2. フォルダ内のすべてのファイルの更新日を定期的に取得する
  3. すべてのファイルのうち、更新日が1つでも異なれば通知する

通知するにはplyerというライブラリを使用します。インストールされていない方はpip install plyerでインストールしてください。これを使うことで、ポップアップで通知が表示されます。

また、ファイル情報を常に取得し続けると、CPUのリソースを常時使い続けることになり、PCの動作が重くなりますので、プログラム実行に休憩時間を設けます。それがtime.sleep関数です。この引数に数字を渡すことで、指定した秒数の間はプログラムが停止します。

実際のコード

それでは、実際のコードを以下に示します。

# A script to monitor the update of  both files and a directory using the modified date in the file and the number of the files in the directory.
# If you want to run this script, you may need to install plyer, which is a popup notification library.

# Import libraries

import os
from plyer import notification
import datetime as dt
import time

# Initialize variables.

target = input('Enter the path to a file or a directory that you want to monitor.')
sleep = 10 # waiting seconds


# Define functions

def rtn_modified_time(file_path: str) -> dt.datetime:
    """Given a file path, this function returns the modification date as a datetime type.
    
    Parameters
    ----------
    file_path: str
        file path.
        
    Return
    ------
    modified_time: dt.datetime
        the modification date as a datetime type.
    
    """
    
    file_info = os.stat(file_path)
    modified_time = dt.datetime.fromtimestamp(file_info.st_mtime)
    return modified_time

def notificate_from_dts(dt1: dt.datetime, dt2: dt.datetime, file: str) -> bool:
    """Function that receives two datetime.datetime types and file paths and notifies if they are different.
    
    Parameters
    ----------
    dt1: dt.datetime
        A datetime type assuming original.
    dt2: dt.datetime
        A datetime type assuming modified.
    file: str
        A modified file.
        
    Return
    ------
    bool
        It become a flag of breaking loop.
    
    """
    if dt1 != dt2:
        notification.notify(
            title="【Notification】",
            message=f"{file} is modified.",
            app_name="App name",
            app_icon="notification.ico",
            timeout=30)
        return True
    else:
        return False


# Start monitoring.

if os.path.isfile(target): # When monitoring files.
    file_path = target
    modified_dt1 = rtn_modified_time(file_path) # Get the file modification date at the time of loading.
    while True:
        file = os.path.basename(file_path)
        modified_dt2 = rtn_modified_time(file_path)
        flag = notificate_from_dts(modified_dt1, modified_dt2, file)
        if flag:
            print(f'{file} is modified at {modified_dt2.strftime("%Y/%m/%d %H:%M:%S")}')
            break
        time.sleep(sleep)

else: # When monitoring files.
    dir_path = target
    time_dic1 = {} # key is file name, value is the file modification date.
    original_files = [] # List of original files.
    for file in os.listdir(dir_path):
        time_dic1[file] = rtn_modified_time(dir_path + f'\\{file}')
        original_files.append(file)
        
    while True:
        flag_while = False
        files = os.listdir(dir_path)
        
        if len(set(original_files)) != len(set(files)): # When number of files in directory changes.
            if len(set(original_files)) > len(set(files)):
                msg = 'removed'
            elif len(set(original_files)) < len(set(files)):
                msg = 'added'
            flag_while = True
            diff_file = list(set(original_files) ^ set(files))
            notification.notify(
                title="【Notification】",
                message=f"{diff_file[0]} is {msg}.",
                app_name="App name",
                app_icon="notification.ico",
                timeout=30)
            print(f'{diff_file[0]} is {msg} at {dt.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}')
            break
            
        for file in files: # When individual files in a folder are updated.
            modified_dt1 = time_dic1[file]
            modified_dt2 = rtn_modified_time(dir_path + f'\\{file}')
            flag = notificate_from_dts(modified_dt1, modified_dt2, file)
            if flag:
                flag_while = True
                print(f'{file} is modified at {modified_dt2.strftime("%Y/%m/%d %H:%M:%S")}')
                break
        
        if flag_while:
            break
            
        time.sleep(sleep)

_=input('Enter any key to close this window.') # To stay at the command prompt.

実行環境によって表示形式は異なりますが、コマンドプロンプトで実行した場合、以下のような手順になります。

  1. ファイルもしくはフォルダのパスを聞かれるので、パスを入力する。
  2. ファイルもしくはフォルダを任意の時間間隔 (上記のコードでは10秒)で監視する。
  3. ファイルもしくはフォルダに変更があればポップアップ通知が表示される。

下の図は、フォルダを監視対象とする場合で実行した結果です。bbb.txtというファイルに変更を加えるとポップアップが現れて、音とともにお知らせしてくれます。
image.png

コードの詳細

初期設定

ライブラリのインポートは割愛します。
まず、ファイルもしくはフォルダのパスをinputで聞きます。また、sleep変数に何秒間の休憩時間を設定します。

# Initialize variables.

target = input('Enter the path to a file or a directory that you want to monitor.')
sleep = 10 # waiting seconds

関数の定義

続いて、関数の定義をします。ここでの関数定義の意義は、繰り返し処理をまとめることというより、複雑な処理をまとめることです。
rtn_modified_timeは、ファイルパスを受け取って、そのファイルの更新日をdatetime.datetime型で返す関数です。os.stat関数にファイルパスを渡すと、メタデータのタイムスタンプ情報を持つos.stat_resultオブジェクトを取得でき、.st_mtimeメソッドで更新日のUNIX時間にアクセスすることができます。それをdatetime.datetime型に変換しているのが、dt.datetime.fromtimestamp関数です。
notificate_from_dtsは、2つのdatetime.datetime型とファイルパスを受け取って、その時間が異なっていればTrue、同じならFalseを返す関数です。またTrueの場合、ポップアップでお知らせしてくれます。

# Define functions

def rtn_modified_time(file_path: str) -> dt.datetime:
    """Given a file path, this function returns the modification date as a datetime type.
    
    Parameters
    ----------
    file_path: str
        file path.
        
    Return
    ------
    modified_time: dt.datetime
        the modification date as a datetime type.
    
    """
    
    file_info = os.stat(file_path)
    modified_time = dt.datetime.fromtimestamp(file_info.st_mtime)
    return modified_time

def notificate_from_dts(dt1: dt.datetime, dt2: dt.datetime, file: str) -> bool:
    """Function that receives two datetime.datetime types and file paths and notifies if they are different.
    
    Parameters
    ----------
    dt1: dt.datetime
        A datetime type assuming original.
    dt2: dt.datetime
        A datetime type assuming modified.
    file: str
        A modified file.
        
    Return
    ------
    bool
        It become a flag of breaking loop.
    
    """
    if dt1 != dt2:
        notification.notify(
            title="【Notification】",
            message=f"{file} is modified.",
            app_name="App name",
            app_icon="notification.ico",
            timeout=30)
        return True
    else:
        return False

ファイルの監視

ここからが監視のスタートですが、まずファイルを監視する場合です。
os.path.isfile関数は、渡された文字列がファイルまでのパスならTrue、それ以外はFalseを返すので、これをif文の条件に設定しています。先程定義したrtn_modified_time関数で、監視開始時点のファイルの更新日をmodified_dt1に記録しておきます。

そして、ループが始まり、指定した時間間隔で対象のファイルの更新日を取得し続け、監視開始時点のファイル更新日と異なっていればflag変数がTrueとなり、ポップアップが表示されるのと同時にコマンドプロンプトにも表示され、ループを脱出します。

# Start monitoring.

if os.path.isfile(target): # When monitoring files.
    file_path = target
    modified_dt1 = rtn_modified_time(file_path) # Get the file modification date at the time of loading.
    while True:
        file = os.path.basename(file_path)
        modified_dt2 = rtn_modified_time(file_path)
        flag = notificate_from_dts(modified_dt1, modified_dt2, file)
        if flag:
            print(f'{file} is modified at {modified_dt2.strftime("%Y/%m/%d %H:%M:%S")}')
            break
        time.sleep(sleep)

フォルダの監視

フォルダを監視する場合はやや複雑なコードとなっています。
まず、フォルダ内の全てのファイルパスと監視開始時点の更新日をtime_dic1に記録します。別途、全てのファイルパスのリストをoriginal_filesに保管します。

そして、ループが始まり、初期設定としてflag_whileはループを脱出しても良いかのフラグとなっています。while文の末尾にチェックが入ります。files変数は監視を開始してからのフォルダ内のファイルパスのリストを保管します。

まず、ファイルの数が異なっていないかをチェックします。if len(set(original_files)) != len(set(files)):の条件式ですね。その中でもファイルが削除されたのか、追加されたのかの条件によってmsg変数の中身を分岐しています。そして、diff_file = list(set(original_files) ^ set(files))で更新前後の差分のファイルパスを抽出しています。これはリストをset化することでリスト内の重複が削除される性質を利用しています。後はお知らせをしてループを脱出します。

続いて、各ファイルが更新されていないかをチェックします。for file in files:から始まるブロックですね。ここでは、各ファイルの更新日が監視開始時点から変化していないかをチェックします。異なっていれば、お知らせをしてループを脱出します。

最後に、ループの外にinput関数を置いておくことで、ループを脱出した直後にコマンドプロンプトがすぐに閉じてしまうことを防いでいます。お知らせのポップアップを見逃した場合でも、コマンドプロンプトの履歴を見れば更新されたかを確認できるようにするためです。

else: # When monitoring files.
    dir_path = target
    time_dic1 = {} # key is file name, value is the file modification date.
    original_files = [] # List of original files.
    for file in os.listdir(dir_path):
        time_dic1[file] = rtn_modified_time(dir_path + f'\\{file}')
        original_files.append(file)
        
    while True:
        flag_while = False
        files = os.listdir(dir_path)
        
        if len(set(original_files)) != len(set(files)): # When number of files in directory changes.
            if len(set(original_files)) > len(set(files)):
                msg = 'removed'
            elif len(set(original_files)) < len(set(files)):
                msg = 'added'
            flag_while = True
            diff_file = list(set(original_files) ^ set(files))
            notification.notify(
                title="【Notification】",
                message=f"{diff_file[0]} is {msg}.",
                app_name="App name",
                app_icon="notification.ico",
                timeout=30)
            print(f'{diff_file[0]} is {msg} at {dt.datetime.now().strftime("%Y/%m/%d %H:%M:%S")}')
            break
            
        for file in files: # When individual files in a folder are updated.
            modified_dt1 = time_dic1[file]
            modified_dt2 = rtn_modified_time(dir_path + f'\\{file}')
            flag = notificate_from_dts(modified_dt1, modified_dt2, file)
            if flag:
                flag_while = True
                print(f'{file} is modified at {modified_dt2.strftime("%Y/%m/%d %H:%M:%S")}')
                break
        
        if flag_while:
            break
            
        time.sleep(sleep)

_=input('Enter any key to close this window.') # To stay at the command prompt.

所感

もう少しきれいにコードを書けるような気がしますが、意図した通りの動きをしました。

ポップアップでお知らせしてくれるplyerライブラリは色々な使い方がありそうです。記事にするつもりはないですが、会社のスケジュールの通知は実際にこれを使用しています。会議のたびに腕時計で開始時間のちょっと前にアラームを設定したり、「やべえ!打ち合わせの時間が過ぎている!」なんてことはなくなり、すごく快適になりました。
他にも面白い使い方や便利な利用用途があればコメントしてくれると嬉しいです。

ちなみに、icoファイルは任意のものでOKですが、jpgなどの画像から変換したものを使用すると、うまく動作しない場合がありますので、注意してください。

また、ポップアップの表示時間がコードで設定したものより短いという事象は、恐らくPC設定の通知表示時間によるものであると考えられます。このような事象に遭遇したらPC設定を一度確認してみてください。

記載したコードは、いくらでもコピペして再利用していただいて構いません。
最後まで読んでいただき、ありがとうございました。:v:

94
129
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
94
129