植物撮影装置の稼働状況をslack経由でnotifyしてくれるpythonプログラムを作った話
こんな感じになりました。
撮影装置がエラーで停止する。
↓
なんとかしてくれというnoticeがslackで来る。
経緯
理化学研究所CSRSが開発した植物自動育成&撮影装置。詳細は上のプレス(日本語)か、論文(英語、オープンアクセス)を参照。
植物をベルトコンベアで回しながら備え付けのカメラで経時的に写真や生育データをPCに貯めていく装置なのですが、どうしてもエラーでときたま止まってしまうことがあるそうです。原因の詳細は把握していませんが、部品破損、土や溶液の付着による劣化などによるハード的な原因から、給水タンクの水涸れなど様々だと思われます。さらに担当研究員が常駐する部屋と制御PC付きの装置がある部屋は研究棟が異なるため、正常に稼働しているかどうかのモニタリングを随時行い続けるのが大変だったそう。ネットワーク経由で保存先のフォルダの最新の更新時刻を見ればいいじゃないかという話だと当初は思ったのですが、1装置にカメラの種類だけフォルダがあること、装置の数だけさらに掛け算で増えることを考慮すると、とてもとてもという状況でした。撮影装置を利用する側として共同研究の話を進めているときに、これでは自分の実験データ取得にも支障が出るということで急遽つくることにしました。
制御ソフトにエラー対処機能を実装するのは困難(一度完成しているハード制御するソフトウェアに手を出したくない)と判断したので、外部からフォルダの更新状況を監視し、slackにその状況を通知するシンプルなプログラムを作りました。以下備忘録と情報共有。
アイディア
-python
-監視すべきフォルダを指定する
-サブフォルダが生成される場合があるので、recursiveに指定フォルダ内を探索する。
-指定したインターバルを挟んでフォルダ内のファイルに差分がなければ通知する。
-1日に1回程度、プログラムが通常稼働していることを通知する。
- 通知する方法はtwitter*かslack
* twitterのapi登録のステップにどれだけの時間かかるかわからなかったのでslackにした。
- pythonからslackへの自動投稿はqiitaでincoming webhookとかで検索してよく使われているコードを利用することにした。
プログラム
1. 必要ライブラリの読み込みと自動インストール
遠隔でpythonを実行してもらう場合、pip install~~~を指示すること自体が実はハードルが高いことがわかったので、その部分もスクリプトに内包した。
from pip._internal import main as _main
import importlib
def _import(name, module, ver=None):
try:
globals()[name] = importlib.import_module(module)
except ImportError:
try:
if ver is None:
_main(['install', module])
else:
_main(['install', '{}=={}'.format(module, ver)])
globals()[name] = importlib.import_module(module)
except:
print("can't import: {}".format(module))
_import('requests','requests')
_import('argparse','argparse')
_import('json','json')
_import('threading','threading')
参考
https://vaaaaaanquish.hatenablog.com/entry/2018/12/02/210647
https://stackoverflow.com/questions/12332975/installing-python-module-within-code
起動時にライブラリがなくインポートエラーを吐く場合、インストールしてから再度インポートを試みる。
requests, argparse, json, threadingは標準インストールされてないことが多かったので指定した。
2. argparseでコマンドライン引数の管理
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--interval',default=1)
parser.add_argument('-d', '--dir', nargs='+', help="pass abs directory pass as list", required = True)
parser.add_argument('-n', '--name',default="RIPPS監視プログラム_第一号")
interval: ファイル確認頻度(時間)
dir: 監視フォルダ(絶対パス)、複数指定可能
name:slack投稿者名
下のように使う
python monitor.py --dir PATH1 PATH2 --interval 1.5 --name RIPPS_monitoring_robot_1
3. 他ハードコード変数
SLACK_WEBHOOK = "https://hooks.slack.com/services/XXXXXXXXXXXXX"
start = time.time()
4. 初回監視フォルダスキャンとslack投稿
def initiation(path,nfiles):
message = "%sの監視を開始します。 監視下のフォルダには現在%d個のファイルがあります。%s時間ごとに更新チェックします。" % (path,nfiles,args.interval)
ellapsed = int(time.time() - start)
payload_dic = {
"icon_emoji": ':heartpulse:',
"text": message,
"username": args.name + "_" + str(int(ellapsed/(60*60))),
"channel": "#general", # #も必要
}
try:
r = requests.post(SLACK_WEBHOOK, data=json.dumps(payload_dic))
except requests.ConnectionError:
print(requests.ConnectionError)
print("slackに接続できませんでした。")
befores = []
for i, path_to_watch in enumerate(args.dir):
print(path_to_watch)
assert os.path.isdir(path_to_watch) == True, print("%s is not a valid directory" % path_to_watch)
if path_to_watch[-1] != "/":
path_to_watch += "/**"
else:
path_to_watch += "**"
before = dict ([(f, None) for f in glob.glob(path_to_watch,recursive=True)])
initiation(path_to_watch,len(before))
args.dir[i] = path_to_watch
befores.append(before)
- 複数ディレクトリを監視するオプションを入れた場合、args.dirはリストになるので、for loopで処理する。単一フォルダでも対応可能。
- ディレクトリじゃなかった場合、assertion errorではじく。
- glob.globのrecursive=Trueかつパスの末端に**がついているとサブディレクトリまでサーチする(はず)なので、入力引数の末端に**を付加しておく。フォルダをterminalやcommand promptに引っ張って落としても対応可能なようにしておくため。パスの末端に〜〜を付加してください、という指示は結構複雑だし、使用者が抜け落とすことがあるので。
- glob.globで取得したファイル一覧(正確にはフォルダの数も入ってしまっているけど)を辞書型にして、辞書のlengthをinitiation functionにわたす。
- initiation functionでslackのチャンネルに投稿する。
- 初期状態の監視フォルダのファイル名をbeforesリストに格納しておいて、後に差分比較用に使う。
以下は2つのフォルダを監視するよう設定した場合のbotの挙動
5.フォルダ監視
def errorpostslack(path):
error_message = "監視対象フォルダ(%s)が%s時間以上更新されていません" % (path,args.interval) #更新なしメッセージ
ellapsed = int(time.time() - start)
payload_dic = {
"icon_emoji": ':cry:',
"text": error_message,
"username": args.name + "_" + str(int(ellapsed/(60*60))),
"channel": "#general", # #も必要
}
try:
r = requests.post(SLACK_WEBHOOK, data=json.dumps(payload_dic))
except requests.ConnectionError:
print(requests.ConnectionError)
print("slackに接続できませんでした。")
while 1:
time.sleep(float(args.interval)*60*60)
for i, (before, path_to_watch) in enumerate(zip(befores,args.directory)):
after = dict ([(f, None) for f in glob.glob(path_to_watch,recursive=True)])
added = [f for f in after if not f in before]
removed = [f for f in before if not f in after]
if added:
print ("Added: ", ", ".join (added))
#goodpostslack(added)
pass
elif removed:
print ("Removed: ", ", ".join (removed))
pass
else:
errorpostslack(path_to_watch)
befores[i] = after
- 設定したinterval(時間)をもとにインターバル(秒)を挟んでループを実行。
- 初期スキャンで取得したbeforesリストとdirectoryをzipで同時に吐き出す。enumerateはここでは意味がないが、バグとり用の名残。
- 2回目以降は更新されたbeforesを利用する
- 各ループごとに、最新のglob.globによるファイル一覧を取得する。beforeと比べ、ファイルが増えていた場合はaddedに、消えていた場合はremovedのリストを作成する。変化がない場合はadded、removedともにemptyで、elseの処理が実行される。
- added removedともにpassはいらないけれど、printをコメントアウトしたとき用に残してある。
- ファイル更新がなかった場合、errorpostslack functionを呼んでエラーメッセージをslackに投稿する
6.生存報告
きちんと撮影装置が稼働して、画像がたまり続けていた場合、通知が何もない。かといってインターバルごとに正常通知がきつづけるのも煩わしい。ということで一日一回の生存報告する機能を追加する。
def dailynotice():
message = "1日1回の定期報告です。"
ellapsed = int(time.time() - start)
for i, (before, path_to_watch) in enumerate(zip(befores,args.dir)):
nfiles = len(glob.glob(path_to_watch,recursive=True))
message += "%sのフォルダ下には%d個のフォルダ・ファイルが存在します" % (path_to_watch,nfiles)
payload_dic = {
"icon_emoji": ':smile:',
"text": message,
"username": args.name + "_" + str(int(ellapsed/(60*60))),
"channel": "#general", # #も必要
}
try:
r = requests.post(SLACK_WEBHOOK, data=json.dumps(payload_dic))
except requests.ConnectionError:
print(requests.ConnectionError)
print("slackに接続できませんでした。")
while 1:
dailynotice()
time.sleep(24*60*60)
7. エラー検知と生存報告の共存
while loopが2つあるのでthreadingで同時に回す。
pseudocode
def error_check():
監視フォルダの更新チェック関数
def daily_check():
生存報告
t1 = Thread(target = error_check)
t2 = Thread(target = daily_check)
t1.start()
t2.start()
8.最後に
- コードはgithubにアップした。https://github.com/totti0223/ripps_utility
- slackのincoming webhookのurlもコマンドライン引数にしようかと迷ったが使用者が混乱するのでハードコードにした。
- macのオートメーターとか、フリーのソフトウェアとか他の方法も多分に存在するだろうけれども、pythonで作る方が早かった。ご存知でしたらぜひ言語、osプラットフォーム問わず教えて下さい(slack経由じゃなくても)。
- 生存報告、エラー報告ごとにアイコンを変えているのに、同じ投稿者が続けてポストするとスレッドまとめられて反映されずに悲しい。。
- 苦肉の策で投稿者名(username)の最後にプログラム起動からの時間をmergeすることによって対処。美しくない。投稿者名が同じでもアイコンが1つずつ表示されるときもあって法則性がよくわからない。
payload_dic = {
"icon_emoji": ':heartpulse:',
"text": message,
"username": args.name + "_" + str(int(ellapsed/(60*60))),
"channel": "#general", # #も必要
}