14
19

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ラズパイとMAMORIOとLINEで出退勤を記録する仕組みを作る

Last updated at Posted at 2019-08-11

#はじめに
ラズパイでMAMORIOの近接監視をしてLINEに状況を通知するのをやってみた。
近づいたり離れたりしたらLINEに通知が来るので出退勤の記録に使えるかもしれない。

#環境
2019/8/11時点

###補足
今回の試みは前の記事と同じ状態から始めたいと思います。以下の状態です。

サンプルコードはGitHubに置いときます

#概要図
component.png

  • げぼ(人)はMAMORIO geboを持っている
  • MAMORIO checkはラズパイに貼りつけておく
  • げぼが近づいてきたら管理者のLINEに通知する
  • げぼがどっかに行ったら管理者のLINEに通知する

#準備:LINE Notify
まずはLINEで通知する部分だけやります。
LINEにはLINE Notifyという無料のサービスがあってHTTPのリクエストからLINEメッセージを送信することができます。

###1)トークンの発行
パソコンからやります。(ラズパイのブラウザでもいいけど)

####1-1)LINE Notifyにアクセス
https://notify-bot.line.me/ja/

####1-2)右上から[ログイン]する

####1-3)マイページ→[トークンを発行する]

####1-4)トークンが生成されるのでメモっとく

###2)通知テスト
ラズパイから以下のpyを実行します。
access_token変数にはさっきメモったトークンを代入するようしてください。
LINEに通知がきたらテストOK

notify_test.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
if __name__ == '__main__':
        access_token = "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
        url = "https://notify-api.line.me/api/notify"
        headers = {'Authorization': 'Bearer ' + access_token}
        message = "テスト"
        payload = {'message': message}
        requests.post(url, headers=headers, params=payload,)
実行する
$ python notify_test.py

#準備:BlueZのセットアップ

  • MAMORIOはiBeaconです。
  • iBeaconはBLEのアドバタイジングという機能で自分がいることを周囲に発信することができます。
  • 周囲に発信している情報は「アドバタイズパケット」という短いデータです。
  • MAMORIOは30秒に1回程度、アドバタイズパケットを発信しています。
  • ラズパイにはBLEが付いているのでアドバタイズパケットを受信することができます。
  • ラズパイでPythonでBLEするためにBlueZというモジュールを使います。
  • と、いうわけで、ラズパイでMAMORIOとお話するためにBlueZをセットアップします。
  • どこかにワークフォルダを作ってその中でやっといたほうがいいかもしれません。

まずはBlueZ本体ではなく、必要なパッケージをインストールします。これは apt intall でOK。

必要パッケージのインストール
$ sudo apt install bluetooth libusb-dev libdbus-1-dev libglib2.0-dev libudev-dev libical-dev libreadline-dev libdbus-glib-1-dev libbluetooth-dev

そして、本体のインストールですが、
BlueZは上記のようなパッケージが無いようなので、以下の手順でソースを落としてきてビルドします。

BlueZのビルド
$ wget http://www.kernel.org/pub/linux/bluetooth/bluez-5.50.tar.xz
$ xz -dv bluez-5.50.tar.xz
$ tar -xf bluez-5.50.tar
$ cd bluez-5.50/
$ ./configure --enable-experimental
$ make
// makeが長いのでしばらく待ちます・・・
$ sudo make install

以上で準備完了

#PythonでiBeaconを受信する
bluezを使ってiBeaconのアドバタイズパケットを受信するコードを書きます。
bluezのAPIは手順が難しくて説明サイトもないのでコードはおまじない状態です。
ここは思考停止してinterfaces_added()properties_changed()の中を実装していくことにします。

####注意
このソースでimportしているbluezutilsはさっきインストールしたbluezのtestフォルダに置いてあるbluezutils.pyを使いましょう。

iBeaconのアドバタイズパケットを受信する
#!/usr/bin/python
# -*- coding: utf-8 -*-

import dbus
import dbus.mainloop.glib
try:
  from gi.repository import GObject
except ImportError:
  import gobject as GObject
import bluezutils

devices = {}

// アドバタイズパケット受信したときの処理
def interfaces_added(path, interfaces):
        properties = interfaces["org.bluez.Device1"]

        if not properties:
                return

        if path in devices:
                devices[path] = dict(devices[path].items() + properties.items())
        else:
                devices[path] = properties

        // ここら辺で自分がやりたい処理を書く
        // devies[path]オブジェクトがアドバタイズパケット

// アドバタイズパケット受信したときの処理
def properties_changed(interface, changed, invalidated, path):
        if interface != "org.bluez.Device1":
                return

        if path in devices:
                devices[path] = dict(devices[path].items() + changed.items())
        else:
                devices[path] = changed

        // ここら辺で自分がやりたい処理を書く
        // devies[path]オブジェクトがアドバタイズパケット

if __name__ == '__main__':
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

        bus = dbus.SystemBus()
        adapter = bluezutils.find_adapter()


        bus.add_signal_receiver(interfaces_added,
                        dbus_interface = "org.freedesktop.DBus.ObjectManager",
                        signal_name = "InterfacesAdded")

        bus.add_signal_receiver(properties_changed,
                        dbus_interface = "org.freedesktop.DBus.Properties",
                        signal_name = "PropertiesChanged",
                        arg0 = "org.bluez.Device1",
                        path_keyword = "path")

        om = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
        objects = om.GetManagedObjects()

        for path, interfaces in objects.iteritems():
                if "org.bluez.Device1" in interfaces:
                        devices[path] = interfaces["org.bluez.Device1"]

        scan_filter = dict()
        scan_filter.update({ "Transport": "le" })
        adapter.SetDiscoveryFilter(scan_filter)
        adapter.StartDiscovery()

        mainloop = GObject.MainLoop()
        mainloop.run()

#アドバタイズパケットってなに?
さっきのサンプルでinterfaces_added(),properties_changed()でのdevies[path]オブジェクトがアドバタイズパケットって事なんですが、これが何か、という話です。

このdevies[path]オブジェクトはdict型でいわゆるディクショナリーです。
詳細資料がないのでよくわからず、わかるところだけ埋めています。
ManufacturerDataが今回重要なデータです。

key 説明
AddressType ?
Paired ?
ServicesResolved ?
Adapter ?
Alias ?
ManufacturerData 製造者固有データ.iBeaconの詳細情報が詰まっている
Connected ?
UUIDs ?
Address BLEアドレス
RSSI RawSignalStrengthInDBm.シグナル強度、いわゆるRSSI。数値が大きいほどシグナルが強い、近くから発信されている、ということ
Trusted ?
Blocked ?
iBeaconをスキャンしたときの処理
def scan_ibeacon(properties):
        address = "<unknown>"
        manufac = 0

        # AddressとManufacturerDataをGETする
        for key, value in properties.iteritems():
                if type(value) is dbus.String:
                        value = unicode(value).encode('ascii', 'replace')
                if (key == "Address"):
                        address = value
                if (key == "ManufacturerData"):
                        manufac = value

        # print("address = %" % (address))

        if type(manufac) is dbus.Dictionary:
                // manufac 中身をチェックする

#ManufacturerDataってなに?
iBeaconはManufacturerDataに詳細情報を入れることになっているらしく、バイト配列で、MAMORIOの個体識別IDなどが入っています。
以下のフォーマットです。

index size 説明
0-1 2 Bluetooth SIGが企業に発行した識別子
2-18 16 UUID
18-19 2 Major.unsingned short
20-21 2 Minor.unsingned short
22 1 Measured Power.signed 8-bit integer
23 1 ?

#####UUID

  • 16バイトの識別子
  • MAMORIOの場合はb9407f30-f5f8-466e-aff925556b57fe6e

#####Major,Minor

  • MAMORIOの個体識別ID
  • unsingned shortでラズビアンはリトルエンディアンなので、変換の際は注意しましょう。

というわけなのでUUID,Major,MinorでMAMORIOを特定することができます。

ManufacturerDataデータをパースする
def parse_manufacture(mdata):
        uuid = ""
        major = 0
        minor = 0
        if( type(mdata) is dbus.Array and len(mdata) > 21):
                # UUID
                for item in mdata[2:6]:
                        uuid = uuid + format(item,'02x')
                uuid = uuid + "-"
                for item in mdata[6:8]:
                        uuid = uuid + format(item,'02x')
                uuid = uuid + "-"
                for item in mdata[8:10]:
                        uuid = uuid + format(item,'02x')
                uuid = uuid + "-"
                for item in mdata[10:18]:
                        uuid = uuid + format(item,'02x')

                # Major
                # - unsingned int(2バイト)
                # - リトルエンディアン
                majortmp = bytearray([int(mdata[19]),int(mdata[18])])
                major = struct.unpack('H',majortmp)[0]

                # Minor
                # - unsingned int(2バイト)
                # - リトルエンディアン
                minortmp = bytearray([int(mdata[21]),int(mdata[20])])
                minor = struct.unpack('H',minortmp)[0]
        return uuid,major,minor

#特定のMAMORIOを検知したらLINE通知をする
ここまでで

  • LINE通知
  • 特定のMAMORIOの検知
    ができるようになりました。
    さっきのソースをちょっと改造するだけです。
特定のMAMORIを検知したらLINE通知をする
特定のMAMORIを検知したらLINE通知をする

MAMORIO_UUID = "b9407f30-f5f8-466e-aff925556b57fe6e"

# target [0]name,[1]major,[2]minor,[3]linetoken
MAMORIO_TARGETS = {
        ("check",11111,22222,""),
        ("gebo", 33333,44444,"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"),
        ("test", 55555,66666,"yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy")
        }
        
def scan_ibeacon(properties):
        address = "<unknown>"
        manufac = 0

        # AddressとManufacturerDataをGETする
        for key, value in properties.iteritems():
                if type(value) is dbus.String:
                        value = unicode(value).encode('ascii', 'replace')
                if (key == "Address"):
                        address = value
                if (key == "ManufacturerData"):
                        manufac = value

        # print("address = %" % (address))

        if type(manufac) is dbus.Dictionary:
        
                # 特定のMAMORIOかどうかをチェックする
                uuid = ""
                major = 0
                minor = 0
                for mdata in manufac.values():
                        uuid,major,minor = parse_manufacture(mdata)

                if( uuid.lower() == MAMORIO_UUID.lower()):
                        #print("    MAMORIO! - uuid = %s" % uuid)
                        #print("    - major = %d , minor = %d" % (major,minor))

                        # ターゲットを検索
                        username = ""
                        token = ""
                        founds = [i for i in MAMORIO_TARGETS if i[1]==major and i[2]==minor]
                        if(len(founds) <= 0):
                                return
                        username = founds[0][0]
                        token = founds[0][3]

                        print("%s - major = %d , minor = %d -> user = %s" % (datetime.now(),major,minor,username))
                        
                        # LINE通知する
                        access_token = token
                        url = "https://notify-api.line.me/api/notify"
                        headers = {'Authorization': 'Bearer ' + access_token}
                        message = username
                        payload = {'message': message}
                        requests.post(url, headers=headers, params=payload,)                        

#細かい考慮
ここまでだとのアドバタイズパケットを検知するたびにLINE通知が来て困りますね。
いったん通知したMAMORIOの個体識別IDをメモリに保存して再通知しないようにしましょう。
そうすると、LINE通知が1回来たら二度とこなくなる・・・困った。
要するに、MAMORIOが無くなったことを検知する必要があります
定期タイマで保存されているMAMORIOの個体識別IDをチェックして1分間アドバタイズパケットが来ていなかったら削除します。
ここの実装ですが、もう一個MAMORIOがあるんで、これをラズパイに貼りつけておいて定期タイマの代わりにして消込処理をしました(もっと賢いやり方があるね普通は)。

検知したMAMORIOが60秒更新されていなければ削除する
def delete_targets():
        datenow = datetime.now()
        delete_username =""
        for key, value in targets.iteritems():
                span = datenow - value
                #print(span)
                if(span.seconds>60):
                        delete_username=key
                        break

        if( len(delete_username)>0):                        
                targets.pop(delete_username)
                print("delete %s - span = %s " % (delete_username,span))

        return delete_username

#実行してみる
そんなこんなで以下のような実装になりました。

  • ラズパイに げぼ が近寄ってきたら(gebo用のMAMORIOが近づいてきたら)LINE通知する
  • ラズパイから げぼ が離れたら(gebo用のMAMORIOが離れて60秒たったら)LINE通知する
IMG_6455_p.png

サンプルコードをまとめたものはコチラ
https://github.com/gebogebogebo/notify_ibeacon/blob/master/source/notify_ibeacon.py

#おつかれさまでした
夏休みの課題でした、ラズパイは面白い。

以下のサイトは大変参考になりました。

14
19
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
19

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?