あらすじ
前回は4GPiとslee-pi3、SORACOMを組み合わせてRaspberryPiをSMS経由で起動してみました。今回はそれを発展させて、クラウドからSMS起動してそれぞれのRaspberryPiからデータを取得して呼び出し元のクラウドで集計する仕組みを作ってみます。
問題
4GPiとslee-pi3, SORACOMを組み合わせると電源が切れたRaspberryPiを起動できることは分かりました。実際の現場は数多くのRaspberryPiにセンサー機器を繋げて値を取得することが多いと思いますが、非同期で起動してセンサーデータ送信が行われるため、バッチがセンサーデータをまとめて処理するのは容易ではありません。
バッチスクリプトがAPIサーバーも兼ねれば可能かもしれませんが、サーバーレスを考慮すると難しくなります。
さらにサーバーレスにした場合、処理が終わるとインスタンスが終了してしまうので集計処理を走らせることが難しくなります。
解決案
Cloud FunctionとCloud Pub/Subを使って複数のRaspberryPiからセンサーデータを一括で取得する仕組みを作ってみました。今回はGCPを利用していますがAWSやAzureでも似たような構成で問題なく実装できると思います。
ポイントは電源が落ちているRaspberryPiから非同期に送られてくるセンサーデータを実行中のバッチシステムで取得する仕組みです。今回はCloud Pub/Subを使うことで実現してみました。
構成図
処理の流れ
- Cloud Function のサーバースクリプト起動
- SORACOM SMS API 呼び出し
- 4GPi / slee-pi3 を使って RaspberryPi をSMS起動
- SORACOM Funk にセンサーデータ送信
- Cloud Pub/Sub にセンサーデータを保存
- 呼び出し元のサーバースクリプトにて集計
バッチ
バッチは可能な限りサーバーレスにしたかったためCloudFunctionで実装しました。SMS通信以外でSORACOM SIMを指した4GPi/slee-pi3をSMS起動するには、IMSIを指定してAPI呼び出しを行う必要があります。
バッチではSORACOM APIを利用して各IMSIに対してAPI呼び出しを行っています。IMSIの数が多いことを考慮して並列呼び出しを行っています。
X-Soracom-API-Key
, X-Soracom-Token
は予めTOKENAPIを利用して取得しておき、環境変数に詰めておく必要があります。
$ curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d "{\"email\": \"<ログインID>\",\"password\": \"<パスワード>\"}" "https://g.api.soracom.io/v1/auth"
{"apiKey": "hogehoge", "token": "fugafuga", ...}
$ export SORACOM_API_KEY="hogehoge"
$ export SORACOM_TOKEN="fugafuga"
IMSIはSORACOMの管理画面から確認できます。IMSIはプログラム内で指定するようにしました。
これでCloud FunctionからSMS起動ができます。以下のように実装してみました。
IMSI_LIST = [
12345678 # dummy imsi
]
def start_batch(request: Request):
request_json = request.get_json(silent=True)
print(request_json)
# imsiの数だけ並列で実行する。多すぎる場合は要検討
with ThreadPoolExecutor(max_workers=len(IMSI_LIST)) as executor:
def call_up(imsi):
# SMS APIを呼び出してRaspberryPiを起動
data = {"body": "message"}
headers = {
"X-Soracom-API-Key": os.getenv("SORACOM_API_KEY"),
"X-Soracom-Token": os.getenv("SORACOM_TOKEN")
}
res = requests.post(
f"https://g.api.soracom.io/v1/subscribers/{imsi}/send_sms",
json=data,
headers=headers
)
print(json.loads(res.content))
return res
features = [executor.submit(call_up, imsi) for imsi in IMSI_LIST]
for feature in features:
print(f"result:{feature.result()}")
...
RaspberryPi
次はセンサーデータを送信するRaspberryPi側のコードを見ていきます。SORACOMがあることでサービスに対してかなり疎結合な実装ができます。SORACOM FunkのURLにのみ依存しているので、SORACOM側でAWSに切り替えるだけでクラウドを変更することもできます。SORACOM便利だな・・・今回は温度のダミーデータを送信してみます。
import requests
import random
SORACOM_FUNK_URL = "http://funk.soracom.io"
def main():
data = {
"device_id": "test_1",
"temperature": random.randint(0, 40)
}
print(data)
res = requests.post(SORACOM_FUNK_URL, json=data)
print(res)
print(res.content)
if __name__ == '__main__':
main()
SORACOM Funkの設定はSIMグループを作ることで可能になります。
センサーデータ受信
SORACOM Funkの受け取り側の実装を行います。先程の実行中のバッチ直接データを渡せれば良いですが、実行中のCloud Functionに直接データを渡す事は不可能です。Pub/Sub使って橋渡しさせることで呼び出し元のバッチがデータを取得できるようにしたいと思います。
先程のバッチ実装に関数を追加します。Functionで受信したデータをPub/Subのtopicに送信して、呼び出し元のバッチで取得できるようにします。
PROJECT_ID = "hoge"
TOPIC_ID = "fuga-topic"
def receive_sensor(request: Request):
request_json = request.get_json(silent=True)
print(request.headers)
print(request_json)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)
publisher.publish(topic_path, json.dumps(request_json).encode("utf-8"))
return f'send temperature {request_json}!'
バッチでセンサーデータを取得する
Pub/Subに送信されるデータを呼び出し元のバッチで取得する仕組みを追記していきます。先程のstart_batchにsubscriberの受信処理を追加していきます。
RaspberryPiの故障などでデータは必ず送られてくるとは限らないので、ある程度のタイムアウト時間を決めてデータを待ちます。
TIMEOUT = 2 * 60
PROJECT_ID = "hoge"
SUBSCRIPTION_ID = "fuga-subscription"
def start_batch(request: Request):
...(先程のバッチ実装の続き)
sensors = []
def callback(message: Message):
print(f"Received {message}.")
data = json.loads(message.data.decode("utf-8"))
sensors.append(data)
message.ack()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)
future = subscriber.subscribe(subscription_path, callback=callback)
with subscriber:
try:
future.result(timeout=TIMEOUT)
except TimeoutError:
future.cancel()
print(mean([sensor["temperature"] for sensor in sensors]))
return json.dumps(sensors)
以上で基本的な実装は終了です。
デプロイ
コードができたらデプロイして動かしてみます。以下のようにシェルスクリプトを実行しておくと便利です。
poetry export --without-hashes > requirements.txt
gcloud functions deploy start-batch \
--entry-point start_batch \
--runtime python37 \
--trigger-http \
--allow-unauthenticated
gcloud functions deploy receive-sensor \
--entry-point receive_sensor \
--runtime python37 \
--trigger-http \
--allow-unauthenticated
rm requirements.txt
ローカル開発時に役立つダミサーバー起動、ダミー呼び出しスクリプトも作っておくと開発がはかどります。(Github)
RaspberryPi側はserviceを作って起動時に自動で実行するようにしておきます。sensor.service
ファイルを作って
[Unit]
Description=SMS Pi Script Service
After=multi-user.target
[Service]
Type=idle
ExecStart=/home/pi/start.sh
[Install]
WantedBy=multi-user.target
サービスとして登録します
sudo cp sensor.service /etc/systemd/system/
sudo systemctl enable sensor
まとめ
センサーデータは実際BigQueryに集約すれば集計しやすいと思いますが、扱う値の取り扱いはプロジェクトや状況により様々です。今回は呼び出し元のバッチが直接センサーデータを扱いたい場合は想定し、Pub/Subなどを駆使することで集計を実現してみました。
すべてのソースはGithubに公開しています。この記事では紹介していないスクリプトや構成管理の仕組みも作っているので手元で動かしやすいかもしれません。