研究室入退室管理システムについて
新型コロナウイルスの感染拡大の影響によって、僕の所属している研究室では滞在していた時間をGoogleフォームで送信しなければなりません。
入力項目は、「氏名・学籍番号・時刻」なのですが、学籍番号と時刻の入力がめちゃくちゃ手間です。
学籍番号は11桁の数字、時刻は2020-10-19 13:00~20:00のように入力しなければなりません。
しかも、この報告みんな忘れがちで先生からお怒りのメールが届いたりしました。
そこで、研究室に滞在していた時間を自動で記録し、「氏名・学籍番号・時刻」を事前入力したGoogleフォームのURLをLINEに送信してくれるシステムを開発してみました。これでもう怒られることはありません。
研究室に入室した場合、その日の21時になるとこんな感じで送られてきます。
「本当に楽。便利。」って一人で感動してました。
システム全体像
- パケット監視ノード
- Raspberry Pi 3B+にBUFFALO製のWi-Fiアダプタを接続した構成になっています。
- パケットキャプチャを行い、パケット解析によって誰が何時に滞在していたかを取得します。
- サーバ
- MacでMySQLを動作させて、ユーザや入退室情報の管理を行っています。
サーバの機能
サーバには、
- ユーザ情報や各ユーザの入退室情報を管理する
- 21時に各ユーザのスマートフォンへ通知する
といった機能を実装しています。
MySQLによってデータを管理し、LINE Notifyを利用して通知を行っています。
データの管理
データの管理では、ユーザの情報を管理するテーブルと入退室の記録を管理するテーブルを利用しています。
(データベースの設計についても詳しくないので、詳しい方教えてください。。。)
userテーブル
ユーザの情報を管理するテーブル(user)は以下のようになっています。
id | student_id | name | full_name | token | mac_addr |
---|---|---|---|---|---|
1 | 23456789111 | yamada | 山田花太郎 | fasdijfoasjJFioasdj | aa:aa:aa:aa:aa:aa |
- id : 自動的に連番で振られるユニークなID
- student_id : 学籍番号 (Googleフォームに埋め込む)
- name : ユーザ名
- full_name : フルネーム (Googleフォームに埋め込む)
- token : LINE Notifyのトークン
- mac_addr : スマートフォンのMACアドレス
なかなかクリティカルな情報を保持しているのがuserテーブルです。
研究でブロックチェーン使っているので、今後はブロックチェーンにデータ記録したいなあなんて思ってます。
accessテーブル
各ユーザの入退室記録を管理するテーブル(access)は以下のようになっています。
id | date | entry_time | exit_time |
---|---|---|---|
1 | 2020-11-09 | 2020-11-09 13:33:01 | 2020-11-09 20:53:02 |
- id : userテーブルのIDと連携する
- student_id : 学籍番号
- entry_time : 入室時間
- exit_time : 退室時間
なぜ、dateが必要なのか忘れました。これがaccessテーブルです。
MySQLの操作(Python)
Pythonのクラスとしてデータベースに対する操作をまとめました。
いくつか例を載せておきます。
class db_func:
def __init__(self):
# コネクションの作成
self.conn = mydb.connect()
# コネクションが切れた時に再接続してくれるよう設定
self.conn.ping(reconnect=True)
# 接続できているかどうか確認
try:
print(self.conn.is_connected())
except mydb.Error as err:
print("Something went wrong: {}".format(err))
def create_user(self, student_id, user_name, full_name, token, mac_addr):
try:
cur = self.conn.cursor()
# クエリを作成
query = """
INSERT INTO user (student_id, name, full_name, token, mac_addr)
VALUES ('{0}','{1}','{2}','{3}', '{4}');
""".format(student_id, user_name, full_name,token, mac_addr)
print("")
print("ユーザの作成")
print("")
# 実行&コミット
cur.execute(query)
self.conn.commit()
except mydb.Error as err:
self.conn.rollback()
return False
def get_access_info(self, field_name, wh_field, value):
# DB操作用にカーソルを作成
cur = self.conn.cursor()
query = "SELECT {0} FROM access where {1} = '{2}';".format(
field_name, wh_field, value
)
cur.execute(query)
get_value = cur.fetchall()
# 取得した値を返す
return get_value
このような感じで必要なクエリをたくさん定義したクラスを用意してデータベースの操作を行っています。
パケット監視ノードの機能
パケットキャプチャできるようにする
Wi-Fiアダプタを使用したパケットキャプチャについての詳細はこちらを参照してください。
パケットキャプチャの詳細
パケットキャプチャはtcpdump
を使用して行います。
パケットの取得についてですが、そのままキャプチャしてしまうと膨大なパケット量になるため、
今回は必要なパケットのみを取得するために以下のようなフィルタリングを行っています。
sudo tcpdump -w ~/pcap/wlan-%F-%T.pcap -G 180 -i wlan0 type data subtype null -s 42
-w : 書き込みファイル名を指定 (%F=2020-01-15 %T=10:14:39 のようなフォーマットになる)
-G : ファイルを何秒で分割するかを指定
-i : どのネットワークインタフェースを対象指定
type : パケットのタイプを指定
subtype : パケットのサブタイプを指定
-s : 取得バイト数の指定
パケットの計測実験を行ったところ、スマートフォンがWi-Fiに接続すると継続的に送信するパケットがあることが判明したため、
今回はそのようなパケットを取得するためのコマンドを採用しました。
(プローブリクエストとかキャプチャしたらもっと効率いいのかな?詳しい方教えてください。。。)
上記コマンドを実行すると、3分毎にパケットキャプチャファイル(pcapファイル)が生成されます。
パケットの解析
上記手順でパケットの取得は行えました。解析に移りたいと思います。
解析といっても大したことはしておらず、
- キャプチャしたパケットから送信元/送信先MACアドレスを取り出す
- userテーブルに記録されているMACアドレスと一致するかどうか検索
- 一致した場合、そのユーザとパケットの到着時間を取得
といった流れを実装しました。
scapyというPythonで書かれたパケット解析のライブラリを使用しています。WireSharkより詳細な解析が可能なのが特徴です。
以下、解析部分のソースコードです。
from scapy.all import *
class wlan_pcap:
def __init__(self):
# データベースへの接続
self.db = db_func.db_func()
def pcap_reader(self, filename, output_file):
# MACアドレスをキーとした辞書を作成する
access_data = {}
# MACアドレスのリストを取得
column_name = "mac_addr"
table_name = "user"
addr_list = self.get_addr_list(column_name, table_name)
try:
for packet in PcapReader(filename):
# 802.11レイヤーのパケットのみを処理
if packet.haslayer(Dot11) and packet.type==2: # タイプ2 = Data Frames
# flagの特定部分を抽出
DS_flag = packet.FCfield & 0x3
# DS(Distribution System)に向かうかどうかを判定
toDS = DS_flag & 0x01 != 0
fromDS = DS_flag & 0x2 != 0
if toDS and not fromDS:
# MACアドレスがuserテーブルに登録されているかどうか
if packet.addr2 in addr_list:
# 送信者(=スマホ)のMACアドレスを格納
TA = packet.addr2
# パケットの到着時間を取得
packet_time = dt.datetime.fromtimestamp(packet.time, dt.timezone(dt.timedelta(hours=9)))
access_data[TA] = packet_time
else:
pass
def get_addr_list(self, column_name, table_name):
# userテーブルから登録されているMACアドレスを取得する
addr_list = self.db.select(column_name, table_name)
return addr_list
入退室記録機能
パケットをキャプチャ/解析する機能が実装できたので、それを利用した入退室記録機能の説明に移ります。
入退室記録については、**「ユーザのID・日付・入室/退室時刻」**を引数としてaccess_manage()を呼び出すことで更新を行います。
以下に関数の中身を載せています。この関数では、src/pi_src/db_func.pyに実装しているaccess_info_manage()を呼び出すことで、accessテーブルを更新しています。何を言っているのか分かりにくいかと思いますが、
- access_manage.pyのaccess_manage() : 「入室記録を更新する or 退室記録を更新する」を判定してaccess_info_manage()を呼び出す
- db_func.pyのaccess_info_manage() : MySQLに接続し、accessテーブルを実際に更新する
といった役割分担です。
def access_manage(self, match_user, time_flag=False):
# クエリに含める情報の設定
field_name = "*"
table_name = "access"
wh_field = "id"
user_id = match_user["id"]
# 過去の入退室記録があるかどうか検索
access_info = self.db.get_where(field_name, table_name, wh_field ,user_id)
# 入退室時間(=パケット到着時間)を取り出す
access_time = match_user["time"]
# 入退室に関する記録がない場合
if len(access_info) == 0:
# 初めての入室記録を生成
self.db.access_info_manage(user_id, today, access_time ,True)
print("")
print("入退室記録なし")
return True
# 入室記録はあるが日付が変わっている場合
elif str(date) != str(today):
# 入室記録の生成
self.db.access_info_manage(user_id, today, access_time ,True)
print("")
print("今日はまだ入室してないと思います。")
return True
# 入退室記録があり、退出時間を更新する場合
else:
# 退室記録を生成/更新
self.db.access_info_manage(user_id, today, access_time)
print("")
print("退出記録更新")
return False
def access_info_manage(self, user_id, date, access_time, entry_flag=False):
cur = self.conn.cursor()
# flagが「入室」の場合
if entry_flag:
# クエリの作成
query = """
INSERT INTO access (id, date, entry_time)
VALUES ({0}, '{1}', '{2}');
""".format(user_id, date, access_time)
else:
query = """
UPDATE access SET exit_time = '{0}'
WHERE date = '{1}' and id = {2} ;
""".format(access_time, date, user_id)
# 実行&コミット
cur.execute(query)
self.conn.commit()
入退室記録の更新
上記まで、パケットをキャプチャ/解析し、accessテーブルに記録する機能を実装できました。
それぞれの機能は独立しています。
残りの作業としては、独立した機能をいい感じの流れで呼び出して入退室記録を定期的に更新するものを作る必要があります。
tcpdumpはファイル分割しているので、バックグラウンドで実行し続けておきます。
sudo tcpdump -w ~/pcap/wlan-%F-%T.pcap -G 180 -i wlan0 type data subtype null -s 42 &
以下のような流れで定期更新を行います。
- 解析対象のpcapファイルを取得
- パケット解析処理を呼び出す
- 入退室記録を更新する
- 解析対象のpcapファイルを取得
class pcap_analyzer:
def __init__(self):
# パケット解析ライブラリ
self.wp = wlan_pcap.wlan_pcap()
# データベースへの接続
self.db = db_func.db_func()
# アクセス管理するライブラリ
self.am = access_manage.access()
def main(self):
# 処理対象のpcapファイルを取得する
path = "/home/pi/pcap"
file_list = self.get_pcap(path)
pcap_file = os.path.join(path, file_list[0])
# 入退室管理を更新する
self.access_data = self.wp.pcap_reader(pcap_file, "test")
if self.access_data != -1:
self.access_update()
else:
print("pcap読み込み時に何らかのエラー発生")
# 処理対象のpcapファイルを削除する
self.wp.delete_file(pcap_file)
if __name__ == "__main__":
pa = pcap_analyzer()
pa.main()
これを定期的に実行すれば、いい感じに更新できます。
そこで、crontabを利用して3分毎にpcap_analyzer.pyを実行します。
crontab -e
# エディタが開くので、最後の行に以下を追加
*/3 * * * * cd [pcap_analyzer.pyがあるディレクトリ] ; [使用しているpythonのパス] pcap_analyzer.py
## 例 ##
*/3 * * * * cd /home/pi/RAS_src ; /home/pi/python-venv/RAS/bin/python /home/pi/RAS_src/pcap_analyzer.py
これで3分毎に入退室記録を更新するプログラムが実行されるようになりました。
補足ですが、今回はtcpdumpによるパケットキャプチャファイル生成の仕様を利用しています。
tcpdumpでファイルの分割を行っているため、ファイル数が2になった際に、1つ目のファイルへのデータ書き込みが終わったと判断できます。
そのため、ファイル数が2になったら1つ目のファイルを取得してくる関数( get_pcap() )を用意しました。
def get_pcap(self, path):
# tcpdumpの仕様を利用したファイル取得
file_list = self.wp.get_file(path)
print(file_list)
if len(file_list) >= 2:
return file_list
else:
print("既定のpcapfile数を下回ってます。")
sys.exit(1)
通知機能
最後に、通知機能です。
やっていることは単純で、21時になると
- accessテーブルから今日の入室記録があるユーザを検索
- 入室記録がある場合、そのユーザへ通知するために必要な情報をuserテーブルから取得
- Googleフォームへ事前入力するためにデータを整形してURLを作成
- LINE Notifyを使用して各ユーザへURLを送信する
といった機能を持つプログラムを実行しています。
全部載せると長くなるので特徴的な部分だけ以下に載せます。
1,2の処理を終えた後、以下の関数が実行されます。
def form_info_create(self):
self.form_info = []
# 入室記録があるユーザ分ループで処理
for (user, access) in zip(self.user_info, self.access_info):
# 必要なデータを取得して整形する
date = str(access[1]) + "%20"
entry_time = str(access[2])[11:16]
exit_time = str(access[3])[11:16]
access_time = date + entry_time + "~" +exit_time
print(access_time)
# Googleフォームへ値を埋め込んで送信するために必要なデータを定義
form_dic = {
"full_name" : user[1],
"student_id": user[2],
"token" : user[3],
"access_time" : access_time
}
self.form_info.append(form_dic)
print(self.form_info)
# LINE Notifyを利用する
self.line.line_push(self.form_info)
最後の行のline_push()はsrc/pi_src/line_func.pyに定義されています。
以下がline_push()です。
class line_func:
def __init__(self):
# LINE通知APIのエンドポイント
self.line_notify_api = 'https://notify-api.line.me/api/notify'
# GoogleフォームのURL
self.url = "https://docs.google.com/forms~~~~~~~~"
# 入力欄の識別子
self.entry = {
"name": 208832475,
"id": 913668226,
"time": 1243542068
}
def line_push(self, user_list):
# ユーザごとのURLを保持
url_list = []
for user_dic in user_list:
# ユーザ情報の取得
full_name = user_dic["full_name"]
student_id = user_dic["student_id"]
access_time = user_dic["access_time"]
token = user_dic["token"]
# GoogleフォームのURLを生成
name = "entry.{0}={1}&".format(self.entry["name"], full_name)
id = "entry.{0}={1}&".format(self.entry["id"], student_id)
time = "entry.{0}={1}".format(self.entry["time"], access_time)
url = self.url + name + id + time
# 各ユーザへ送信する
headers = {'Authorization': f'Bearer {token}'}
data = {'message': f'{url}'}
status = requests.post(self.line_notify_api, headers = headers, data = data)
print(status)
これで通知を行えるようになったので、先ほどと同じようにcrontabに登録します。
今回は毎日夜21時に実行したいので、以下のようにします。
crontab -e
# エディタが開くので、最後の行に以下を追加
0 21 * * * cd [notification.pyがあるディレクトリ] ; [使用しているpythonのパス] notification.py
これで、入退室記録が21時に届くようになりました。
最後に
入退室管理システムを実装してみました。メルカリさんがWi-Fiログで勤怠管理をしていて、それをヒントに今回はパケットキャプチャベースのものを試作しました。
説明不足・説明が下手な部分があるかと思いますが、ご勘弁ください。指摘等あればコメントください。
ソースコードは公開しています。
https://github.com/is0356xi/Room_Access_System