はじめに
自社で開発しているIoTデバイスを用いた事例作りのためいろいろと考えていたらイベント開催時にある区画における人の通行量を測定して、さらにインターネット経由でデータを確認したいという話を聞いたのでそれを開発して使ってみたらうまく動作できたので開発手順をまとめます。
外観
使ったもの
- NanoPi NEO2(NanoPi NEOでも大丈夫...だと思う)
- SORACOM Sim
- 3G USBドングル AK-020
- ブルーオメガ(当社製)センシングプラザデバイス
2入力、2出力を持った入出力IFボードで、エッジデバイスの信号を安全にセンサーや機器とやりとりができます - センサテック社製 透過形光電センサ
- Google App Engine(python)
機能概要
- NanoPiと光電センサにより、人の通過を検知
- 検知した数をGoogle App Engineに送信
- Google App EngineからWebAPIとしてデータを取得
開発環境
- Python3
環境設定
NanoPi
OSインストール
詳しく説明されている方がおられるのでそちらをご参照下さい。
ライブラリインストール
apt install -y python3-pip python3-flask python3-requests
pip3 install OPi.GPIO
SORACOM Sim + AK-020
NanoPi NEOは、RaspberryPiとほぼ同じ手順で設定ができました。
Google App Engine(以下、GAE)
基本的な準備は以下の記事をご参照下さい。
データ取得用サーバー
以下のコードをGAEにデプロイ。
# requirementsに記載が必要なもの
from flask import Flask, request, jsonify, redirect, render_template
# requirementsに記載しなくても使用できるもの
from datetime import datetime as dt
from datetime import timedelta
import json
# Flask初期設定
app = Flask(__name__)
# 2入力、2出力のためのデータ初期化
types = ["in1", "in2", "out1", "out2"]
signal = {}
signal["status"] = {}
for t in types:
signal[t] = [
{
"datetime": (dt.now() + timedelta(hours=9)).strftime('%Y-%m-%d %H:%M:%S'),
"value" : 0
}
]
signal["status"][t] = 0
@app.route('/')
def index():
global signal
return jsonify(signal["status"])
@app.route('/update', methods=["POST"])
def update():
global signal
name = request.form["name"]
dat = (dt.now() + timedelta(hours=9)).strftime('%Y-%m-%d %H:%M:%S')
value = int(request.form["value"])
data = {
"datetime": dat,
"value": value
}
signal[name].append(data)
if value > 0:
signal["status"][name] += 1
return now()
@app.route('/datas', methods=["POST"])
def datas():
# 指定した信号の値のみ取得
global signal
name = request.form["name"]
vals = {
name: signal[name]
}
return jsonify(vals)
@app.route('/all')
def all():
# 全てのデータを取得
global signal
vals = signal
return jsonify(vals)
@app.route('/now')
def now():
# 最新の値のみ取得
global types
global signal
vals = {}
for t in types:
vals[t] = signal[t][-1]
return jsonify(vals)
if __name__ == '__main__':
app.run(host='127.0.0.1', port=8080, debug=True)
計測用クライアント
クライアントに以下のスクリプトを設置。
#!/usr/bin/env python3
import flask
from flask import Flask, Response, request, jsonify
import OPi.GPIO as GPIO
from datetime import datetime as dt
from datetime import timedelta
import time
import requests
import threading
GPIO.BOARD = {
7: 203,
8: 198,
10: 199,
11: 0,
12: 6,
13: 2,
15: 3,
16: 200,
18: 201,
19: 20,
21: 65,
22: 1,
23: 66,
24: 67
}
app = Flask(__name__)
cnt_in1 = 0
dat_in1 = dt.now()
payload = {}
url = "[your server name]"
def reset_count():
global cnt_in1
global dat_in1
global payload
payload = {
"name": "in1",
"value": cnt_in1
}
put_signal()
cnt_in1 = 0
dat_in1 = dt.now()
payload = {
"name": "in1",
"value": cnt_in1
}
put_signal()
def set_count(value):
global cnt_in1
global dat_in1
cnt_in1 = value
dat_in1 = dt.now()
def get_count():
global cnt_in1
global dat_in1
txt = {
"cnt_in1" : cnt_in1,
"dat_in1" : dat_in1
}
return jsonify(txt)
def put_signal():
global url
global payload
res = requests.post(url, data=payload, timeout=1.0)
print(res.text)
def send_signal_gae(num):
global cnt_in1
global dat_in1
global payload
name = {
"8" : "in1",
"10": "in2",
"16": "out1",
"18": "out2"
}
value = GPIO.input(num)
payload = {
"name": name[str(num)],
"value": GPIO.input(num)
}
if num == int(8):
set_output(1, value)
cnt_in1 += value
print(cnt_in1)
flg = False
if cnt_in1 % 10 == 0:
flg = True
if value == 1:
flg = True
if dat_in1 < dt.now() - timedelta(minutes=1):
flg = True
print(flg)
if flg:
payload["value"] = cnt_in1
dat_in1 = dt.now()
thd = threading.Thread(target=put_signal)
thd.start()
else:
thd = threading.Thread(target=put_signal)
thd.join()
def set_output(num, value):
pin = 16 if num == int(1) else 18
val = GPIO.HIGH if value == int(1) else GPIO.LOW
GPIO.output(pin,val)
print("pin :", pin, " / value:", val)
return True
def callback(channel):
print("button pushed %s"%channel, GPIO.input(channel), dt.now())
if channel == 22:
subprocess.run(["shutdown", "-h", "now"])
else:
send_signal_gae(channel)
time.sleep(0.5)
def init():
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BOARD)
GPIO.setup(8, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(10, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(8, GPIO.BOTH, callback=callback, bouncetime=300)
GPIO.add_event_detect(10, GPIO.BOTH, callback=callback, bouncetime=300)
GPIO.setup(22, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(22, GPIO.FALLING, callback=callback, bouncetime=300)
GPIO.setup(16,GPIO.OUT)
GPIO.setup(18,GPIO.OUT)
def print_status():
txt = {
"in1" : GPIO.input(8),
"in2" : GPIO.input(10),
"out1": GPIO.input(16),
"out2": GPIO.input(18),
"stop": GPIO.input(22)
}
return jsonify(txt)
@app.route('/')
def index():
return print_status()
@app.route('/reset')
def reset():
reset_count()
return print_status()
@app.route('/setcnt', methods=["POST"])
def setcnt():
num = request.form["num"]
val = request.form["val"]
set_count(int(val))
return get_count()
@app.route('/getcnt')
def getcnt():
return get_count()
@app.route('/output', methods=["POST"])
def output():
num = 16 if int(request.form["num"]) == 1 else 18
val = GPIO.HIGH if request.form["val"] == "1" else GPIO.LOW
GPIO.output(num,val)
send_signal_gae(num)
return print_status()
if __name__ == "__main__":
init()
try:
app.run(host='::', port=80, debug=True)
finally:
print("terminated")
設置後、以下のコマンドでスクリプトを実行。
python3 main.py
実行したら光電センサーの信号を入り切りし、正しくデータを取得できているかを確認します。
データの取得と表示
以下のコードをJupyter NotebookやGoogle Colaboratoryで実行。
import pandas as pd
import requests
import json
%matplotlib inline
url = "[your server address]"
payload = {'name': 'in1'}
rs = requests.post(url, data=payload)
txt = rs.text
js = json.loads(txt)
df = pd.DataFrame(js["in1"])
df.index = pd.to_datetime(df.pop("datetime"))
df.plot(figsize=(15,5))
実行するとグラフが表示されます。
できた!
その他
Githubにて全ソースコードを公開していますので、そちらもあわせてご参照下さい。