0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

NanoPi NEOとSORACOMとGoogle App EngineでWebAPIを用いた通行量計測器を開発した話

Last updated at Posted at 2023-04-17

はじめに

自社で開発しているIoTデバイスを用いた事例作りのためいろいろと考えていたらイベント開催時にある区画における人の通行量を測定して、さらにインターネット経由でデータを確認したいという話を聞いたのでそれを開発して使ってみたらうまく動作できたので開発手順をまとめます。

外観

IMG_6611.jpg

使ったもの

  • NanoPi NEO2(NanoPi NEOでも大丈夫...だと思う)
  • SORACOM Sim
  • 3G USBドングル AK-020
  • ブルーオメガ(当社製)センシングプラザデバイス
    2入力、2出力を持った入出力IFボードで、エッジデバイスの信号を安全にセンサーや機器とやりとりができます
  • センサテック社製 透過形光電センサ
  • Google App Engine(python)

機能概要

  • NanoPiと光電センサにより、人の通過を検知
  • 検知した数をGoogle App Engineに送信
  • Google App EngineからWebAPIとしてデータを取得

開発環境

  • Python3

環境設定

NanoPi

OSインストール

詳しく説明されている方がおられるのでそちらをご参照下さい。

ライブラリインストール

apt install -y python3-pip python3-flask python3-requests
pip3 install OPi.GPIO

SORACOM Sim + AK-020

NanoPi NEOは、RaspberryPiとほぼ同じ手順で設定ができました。

Google App Engine(以下、GAE)

基本的な準備は以下の記事をご参照下さい。

データ取得用サーバー

以下のコードをGAEにデプロイ。

# requirementsに記載が必要なもの
from flask import Flask, request, jsonify, redirect, render_template

# requirementsに記載しなくても使用できるもの
from datetime import datetime as dt
from datetime import timedelta
import json

# Flask初期設定
app = Flask(__name__)

# 2入力、2出力のためのデータ初期化
types = ["in1", "in2", "out1", "out2"]
signal = {}

signal["status"] = {}

for t in types:
    signal[t] = [
        {
            "datetime": (dt.now() + timedelta(hours=9)).strftime('%Y-%m-%d %H:%M:%S'),
            "value" : 0
        }
    ]
    
    signal["status"][t] = 0

@app.route('/')
def index():
    
    global signal
    
    return jsonify(signal["status"])

@app.route('/update', methods=["POST"])
def update():

    global signal
    
    name = request.form["name"]
    
    dat = (dt.now() + timedelta(hours=9)).strftime('%Y-%m-%d %H:%M:%S')
    value = int(request.form["value"])
    
    data = {
        "datetime": dat,
        "value": value
    }
    
    signal[name].append(data)
    
    if value > 0:
        signal["status"][name] += 1
    
    return now()

@app.route('/datas', methods=["POST"])
def datas():
    
    # 指定した信号の値のみ取得
    
    global signal
    
    name = request.form["name"]
    vals = {
        name: signal[name]
    }
    
    return jsonify(vals)

@app.route('/all')
def all():
    
    # 全てのデータを取得
    
    global signal
    
    vals = signal
    
    return jsonify(vals)

@app.route('/now')
def now():
    
    # 最新の値のみ取得
    
    global types
    global signal
    
    vals = {}
    
    for t in types:
        vals[t] = signal[t][-1]
    
    return jsonify(vals)

if __name__ == '__main__':

    app.run(host='127.0.0.1', port=8080, debug=True)

計測用クライアント

クライアントに以下のスクリプトを設置。

#!/usr/bin/env python3

import flask
from flask import Flask, Response, request, jsonify

import OPi.GPIO as GPIO

from datetime import datetime as dt
from datetime import timedelta

import time
import requests
import threading

GPIO.BOARD = {
    7:    203,
    8:    198,
    10:   199,
    11:   0,
    12:   6,
    13:   2,
    15:   3,
    16:   200,
    18:   201,
    19:   20,
    21:   65,
    22:    1,
    23:   66,
    24:   67
}

app = Flask(__name__)

cnt_in1 = 0
dat_in1 = dt.now()

payload = {}
url = "[your server name]"

def reset_count():

    global cnt_in1
    global dat_in1
    
    global payload
    
    payload = {
        "name": "in1",
        "value": cnt_in1
    }
    
    put_signal()

    cnt_in1 = 0
    dat_in1 = dt.now()
    
    payload = {
        "name": "in1",
        "value": cnt_in1
    }
    
    put_signal()

def set_count(value):

    global cnt_in1
    global dat_in1
    
    cnt_in1 = value
    dat_in1 = dt.now()

def get_count():

    global cnt_in1
    global dat_in1
    
    txt = {
        "cnt_in1" : cnt_in1,
        "dat_in1" : dat_in1
    }

    return jsonify(txt)

def put_signal():

    global url
    global payload

    res = requests.post(url, data=payload, timeout=1.0)
    
    print(res.text)

def send_signal_gae(num):

    global cnt_in1
    global dat_in1
    global payload

    name = {
        "8" : "in1",
        "10": "in2",
        "16": "out1",
        "18": "out2"
    }

    value = GPIO.input(num)

    payload = {
        "name": name[str(num)],
        "value": GPIO.input(num)
    }

    if num == int(8):

        set_output(1, value)

        cnt_in1 += value
        print(cnt_in1)
        
        flg = False
        
        if cnt_in1 % 10 == 0:
            flg = True
        if value == 1:
            flg = True
        if dat_in1 < dt.now() - timedelta(minutes=1):
            flg = True
            
        print(flg)

        if flg:
            
            payload["value"] = cnt_in1
            dat_in1 = dt.now()
            
            thd = threading.Thread(target=put_signal)
            thd.start()

    else:
        
        thd = threading.Thread(target=put_signal)
        thd.join()

def set_output(num, value):

    pin = 16 if num == int(1) else 18
    val = GPIO.HIGH if value == int(1) else GPIO.LOW

    GPIO.output(pin,val)
    print("pin :", pin, " / value:", val)

    return True

def callback(channel):

    print("button pushed %s"%channel, GPIO.input(channel), dt.now())

    if channel == 22:
        subprocess.run(["shutdown", "-h", "now"])
    else:
        send_signal_gae(channel)
        time.sleep(0.5)

def init():

    GPIO.setwarnings(False)
    GPIO.setmode(GPIO.BOARD)

    GPIO.setup(8, GPIO.IN, pull_up_down=GPIO.PUD_UP)
    GPIO.setup(10, GPIO.IN, pull_up_down=GPIO.PUD_UP)

    GPIO.add_event_detect(8, GPIO.BOTH, callback=callback, bouncetime=300)
    GPIO.add_event_detect(10, GPIO.BOTH, callback=callback, bouncetime=300)

    GPIO.setup(22, GPIO.IN, pull_up_down=GPIO.PUD_UP)
    GPIO.add_event_detect(22, GPIO.FALLING, callback=callback, bouncetime=300)

    GPIO.setup(16,GPIO.OUT)
    GPIO.setup(18,GPIO.OUT)

def print_status():

    txt = {
        "in1" : GPIO.input(8),
        "in2" : GPIO.input(10),
        "out1": GPIO.input(16),
        "out2": GPIO.input(18),
        "stop": GPIO.input(22)
    }

    return jsonify(txt)

@app.route('/')
def index():

    return print_status()

@app.route('/reset')
def reset():

    reset_count()
    
    return print_status()

@app.route('/setcnt', methods=["POST"])
def setcnt():

    num = request.form["num"]
    val = request.form["val"]
    set_count(int(val))
    
    return get_count()

@app.route('/getcnt')
def getcnt():

    return get_count()

@app.route('/output', methods=["POST"])
def output():

    num = 16 if int(request.form["num"]) == 1 else 18
    val = GPIO.HIGH if request.form["val"] == "1" else GPIO.LOW

    GPIO.output(num,val)
    send_signal_gae(num)

    return print_status()

if __name__ == "__main__":
    init()

    try:
        app.run(host='::', port=80, debug=True)

    finally:
        print("terminated")

設置後、以下のコマンドでスクリプトを実行。

python3 main.py

実行したら光電センサーの信号を入り切りし、正しくデータを取得できているかを確認します。

データの取得と表示

以下のコードをJupyter NotebookやGoogle Colaboratoryで実行。

import pandas as pd
import requests
import json

%matplotlib inline

url = "[your server address]"

payload = {'name': 'in1'}

rs = requests.post(url, data=payload)

txt = rs.text

js = json.loads(txt)

df = pd.DataFrame(js["in1"])
df.index = pd.to_datetime(df.pop("datetime"))

df.plot(figsize=(15,5))

実行するとグラフが表示されます。

Unknown.png

できた!

その他

Githubにて全ソースコードを公開していますので、そちらもあわせてご参照下さい。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?