#はじめに
ラズパイでMAMORIOの近接監視をしてLINEに状況を通知するのをやってみた。
近づいたり離れたりしたらLINEに通知が来るので出退勤の記録に使えるかもしれない。
#環境
2019/8/11時点
- ラズパイ:Raspberry Pi 3 Model b+
- ラズパイのOS:raspbian stretch (2019-04-08)
- Python 2.7.13
- bluez 5.50
- code-oss
- MAMORIO×2個
- iPhone6s
###補足
今回の試みは前の記事と同じ状態から始めたいと思います。以下の状態です。
- げぼ(人)はMAMORIO geboを持っている
- MAMORIO checkはラズパイに貼りつけておく
- げぼが近づいてきたら管理者のLINEに通知する
- げぼがどっかに行ったら管理者のLINEに通知する
#準備:LINE Notify
まずはLINEで通知する部分だけやります。
LINEにはLINE Notifyという無料のサービスがあってHTTPのリクエストからLINEメッセージを送信することができます。
###1)トークンの発行
パソコンからやります。(ラズパイのブラウザでもいいけど)
####1-1)LINE Notifyにアクセス
https://notify-bot.line.me/ja/
####1-2)右上から[ログイン]する
###2)通知テスト
ラズパイから以下のpyを実行します。
access_token変数にはさっきメモったトークンを代入するようしてください。
LINEに通知がきたらテストOK
#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
if __name__ == '__main__':
access_token = "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
url = "https://notify-api.line.me/api/notify"
headers = {'Authorization': 'Bearer ' + access_token}
message = "テスト"
payload = {'message': message}
requests.post(url, headers=headers, params=payload,)
$ python notify_test.py
#準備:BlueZのセットアップ
- MAMORIOはiBeaconです。
- iBeaconはBLEのアドバタイジングという機能で自分がいることを周囲に発信することができます。
- 周囲に発信している情報は「アドバタイズパケット」という短いデータです。
- MAMORIOは30秒に1回程度、アドバタイズパケットを発信しています。
- ラズパイにはBLEが付いているのでアドバタイズパケットを受信することができます。
- ラズパイでPythonでBLEするためにBlueZというモジュールを使います。
- と、いうわけで、ラズパイでMAMORIOとお話するためにBlueZをセットアップします。
- どこかにワークフォルダを作ってその中でやっといたほうがいいかもしれません。
まずはBlueZ本体ではなく、必要なパッケージをインストールします。これは apt intall でOK。
$ sudo apt install bluetooth libusb-dev libdbus-1-dev libglib2.0-dev libudev-dev libical-dev libreadline-dev libdbus-glib-1-dev libbluetooth-dev
そして、本体のインストールですが、
BlueZは上記のようなパッケージが無いようなので、以下の手順でソースを落としてきてビルドします。
$ wget http://www.kernel.org/pub/linux/bluetooth/bluez-5.50.tar.xz
$ xz -dv bluez-5.50.tar.xz
$ tar -xf bluez-5.50.tar
$ cd bluez-5.50/
$ ./configure --enable-experimental
$ make
// makeが長いのでしばらく待ちます・・・
$ sudo make install
以上で準備完了
#PythonでiBeaconを受信する
bluezを使ってiBeaconのアドバタイズパケットを受信するコードを書きます。
bluezのAPIは手順が難しくて説明サイトもないのでコードはおまじない状態です。
ここは思考停止してinterfaces_added()
、properties_changed()
の中を実装していくことにします。
####注意
このソースでimportしているbluezutils
はさっきインストールしたbluezのtest
フォルダに置いてあるbluezutils.py
を使いましょう。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import dbus
import dbus.mainloop.glib
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
import bluezutils
devices = {}
// アドバタイズパケット受信したときの処理
def interfaces_added(path, interfaces):
properties = interfaces["org.bluez.Device1"]
if not properties:
return
if path in devices:
devices[path] = dict(devices[path].items() + properties.items())
else:
devices[path] = properties
// ここら辺で自分がやりたい処理を書く
// devies[path]オブジェクトがアドバタイズパケット
// アドバタイズパケット受信したときの処理
def properties_changed(interface, changed, invalidated, path):
if interface != "org.bluez.Device1":
return
if path in devices:
devices[path] = dict(devices[path].items() + changed.items())
else:
devices[path] = changed
// ここら辺で自分がやりたい処理を書く
// devies[path]オブジェクトがアドバタイズパケット
if __name__ == '__main__':
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = bluezutils.find_adapter()
bus.add_signal_receiver(interfaces_added,
dbus_interface = "org.freedesktop.DBus.ObjectManager",
signal_name = "InterfacesAdded")
bus.add_signal_receiver(properties_changed,
dbus_interface = "org.freedesktop.DBus.Properties",
signal_name = "PropertiesChanged",
arg0 = "org.bluez.Device1",
path_keyword = "path")
om = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
objects = om.GetManagedObjects()
for path, interfaces in objects.iteritems():
if "org.bluez.Device1" in interfaces:
devices[path] = interfaces["org.bluez.Device1"]
scan_filter = dict()
scan_filter.update({ "Transport": "le" })
adapter.SetDiscoveryFilter(scan_filter)
adapter.StartDiscovery()
mainloop = GObject.MainLoop()
mainloop.run()
#アドバタイズパケットってなに?
さっきのサンプルでinterfaces_added(),properties_changed()
でのdevies[path]オブジェクトがアドバタイズパケットって事なんですが、これが何か、という話です。
このdevies[path]オブジェクトはdict型でいわゆるディクショナリーです。
詳細資料がないのでよくわからず、わかるところだけ埋めています。
ManufacturerDataが今回重要なデータです。
key | 説明 |
---|---|
AddressType | ? |
Paired | ? |
ServicesResolved | ? |
Adapter | ? |
Alias | ? |
ManufacturerData | 製造者固有データ.iBeaconの詳細情報が詰まっている |
Connected | ? |
UUIDs | ? |
Address | BLEアドレス |
RSSI | RawSignalStrengthInDBm.シグナル強度、いわゆるRSSI。数値が大きいほどシグナルが強い、近くから発信されている、ということ |
Trusted | ? |
Blocked | ? |
def scan_ibeacon(properties):
address = "<unknown>"
manufac = 0
# AddressとManufacturerDataをGETする
for key, value in properties.iteritems():
if type(value) is dbus.String:
value = unicode(value).encode('ascii', 'replace')
if (key == "Address"):
address = value
if (key == "ManufacturerData"):
manufac = value
# print("address = %" % (address))
if type(manufac) is dbus.Dictionary:
// manufac 中身をチェックする
#ManufacturerDataってなに?
iBeaconはManufacturerDataに詳細情報を入れることになっているらしく、バイト配列で、MAMORIOの個体識別IDなどが入っています。
以下のフォーマットです。
index | size | 説明 |
---|---|---|
0-1 | 2 | Bluetooth SIGが企業に発行した識別子 |
2-18 | 16 | UUID |
18-19 | 2 | Major.unsingned short |
20-21 | 2 | Minor.unsingned short |
22 | 1 | Measured Power.signed 8-bit integer |
23 | 1 | ? |
#####UUID
- 16バイトの識別子
- MAMORIOの場合は
b9407f30-f5f8-466e-aff925556b57fe6e
#####Major,Minor
- MAMORIOの個体識別ID
- unsingned shortでラズビアンはリトルエンディアンなので、変換の際は注意しましょう。
というわけなのでUUID,Major,MinorでMAMORIOを特定することができます。
def parse_manufacture(mdata):
uuid = ""
major = 0
minor = 0
if( type(mdata) is dbus.Array and len(mdata) > 21):
# UUID
for item in mdata[2:6]:
uuid = uuid + format(item,'02x')
uuid = uuid + "-"
for item in mdata[6:8]:
uuid = uuid + format(item,'02x')
uuid = uuid + "-"
for item in mdata[8:10]:
uuid = uuid + format(item,'02x')
uuid = uuid + "-"
for item in mdata[10:18]:
uuid = uuid + format(item,'02x')
# Major
# - unsingned int(2バイト)
# - リトルエンディアン
majortmp = bytearray([int(mdata[19]),int(mdata[18])])
major = struct.unpack('H',majortmp)[0]
# Minor
# - unsingned int(2バイト)
# - リトルエンディアン
minortmp = bytearray([int(mdata[21]),int(mdata[20])])
minor = struct.unpack('H',minortmp)[0]
return uuid,major,minor
#特定のMAMORIOを検知したらLINE通知をする
ここまでで
- LINE通知
- 特定のMAMORIOの検知
ができるようになりました。
さっきのソースをちょっと改造するだけです。
特定のMAMORIを検知したらLINE通知をする
MAMORIO_UUID = "b9407f30-f5f8-466e-aff925556b57fe6e"
# target [0]name,[1]major,[2]minor,[3]linetoken
MAMORIO_TARGETS = {
("check",11111,22222,""),
("gebo", 33333,44444,"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"),
("test", 55555,66666,"yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy")
}
def scan_ibeacon(properties):
address = "<unknown>"
manufac = 0
# AddressとManufacturerDataをGETする
for key, value in properties.iteritems():
if type(value) is dbus.String:
value = unicode(value).encode('ascii', 'replace')
if (key == "Address"):
address = value
if (key == "ManufacturerData"):
manufac = value
# print("address = %" % (address))
if type(manufac) is dbus.Dictionary:
# 特定のMAMORIOかどうかをチェックする
uuid = ""
major = 0
minor = 0
for mdata in manufac.values():
uuid,major,minor = parse_manufacture(mdata)
if( uuid.lower() == MAMORIO_UUID.lower()):
#print(" MAMORIO! - uuid = %s" % uuid)
#print(" - major = %d , minor = %d" % (major,minor))
# ターゲットを検索
username = ""
token = ""
founds = [i for i in MAMORIO_TARGETS if i[1]==major and i[2]==minor]
if(len(founds) <= 0):
return
username = founds[0][0]
token = founds[0][3]
print("%s - major = %d , minor = %d -> user = %s" % (datetime.now(),major,minor,username))
# LINE通知する
access_token = token
url = "https://notify-api.line.me/api/notify"
headers = {'Authorization': 'Bearer ' + access_token}
message = username
payload = {'message': message}
requests.post(url, headers=headers, params=payload,)
#細かい考慮
ここまでだとのアドバタイズパケットを検知するたびにLINE通知が来て困りますね。
いったん通知したMAMORIOの個体識別IDをメモリに保存して再通知しないようにしましょう。
そうすると、LINE通知が1回来たら二度とこなくなる・・・困った。
要するに、MAMORIOが無くなったことを検知する必要があります。
定期タイマで保存されているMAMORIOの個体識別IDをチェックして1分間アドバタイズパケットが来ていなかったら削除します。
ここの実装ですが、もう一個MAMORIOがあるんで、これをラズパイに貼りつけておいて定期タイマの代わりにして消込処理をしました(もっと賢いやり方があるね普通は)。
def delete_targets():
datenow = datetime.now()
delete_username =""
for key, value in targets.iteritems():
span = datenow - value
#print(span)
if(span.seconds>60):
delete_username=key
break
if( len(delete_username)>0):
targets.pop(delete_username)
print("delete %s - span = %s " % (delete_username,span))
return delete_username
#実行してみる
そんなこんなで以下のような実装になりました。
- ラズパイに げぼ が近寄ってきたら(gebo用のMAMORIOが近づいてきたら)LINE通知する
- ラズパイから げぼ が離れたら(gebo用のMAMORIOが離れて60秒たったら)LINE通知する
サンプルコードをまとめたものはコチラ
https://github.com/gebogebogebo/notify_ibeacon/blob/master/source/notify_ibeacon.py
#おつかれさまでした
夏休みの課題でした、ラズパイは面白い。
以下のサイトは大変参考になりました。