はじめに
この記事はNTTドコモ R&D Advent Calendar 2021の21日目の記事です。
こんにちわ、NTTドコモ サービスイノベーション部の川嶋です。普段の業務では画像認識技術の研究開発やそのサービス化に取り組んでいます。
最近、外出時にペットが元気にしているか気になって仕方がないので、遠隔でペットの様子を観察できるアプリケーションを作ってみたいと思います。
なお、本記事の執筆にはデグーのぎんちゃんに多大なご協力をいただきました。この場をお借りして深く感謝いたします。
この記事は何?
下記のAWSサービスを組み合わせて開発したペット見守りアプリケーションの実装例の紹介です。
- Amazon Kinesis Video Streams
- Amazon API Gateway
- Lambda
- DynamoDB
システムの概要
デグーは回し車で走り込むことが大好きです。そこで、ぎんちゃんが回し車で元気に走っている様子をコスパよく録画したいので、下記の機能を持つアプリケーションを実装します。
- 回し車の回転をトリガーにAmazon Kinesis Video Streamsに動画を配信
- 配信した動画の再生用URLをLINEに通知
- Amazon API Gateway, Lambda, DynamoDBを用いて回し車の回転日時を保存
準備
ubuntuとdockerが使えるPC、RTSP配信できるネットワークカメラ、回し車を用意します。
- OS: Ubuntu 20.04.3 LTS
- docker: 20.10.11
- ネットワークカメラ:iPhoneにLive-ReporterをインストールしてRTSP配信しました
- 回し車:市販されているもの
なお、回転を検知するための仕掛けとして、回し車の両端にマーカーを貼り付けました。
黄色のマーカーがついている側の網目だけ二重になっていて反対側より重くなっているため、安定時は必ず黄色のマーカーが下にくるようになっています。
次にアプリケーションの実行環境を構築します。まずは、AWSで提供されているkinesis video stream GStreamer C++ プロデューサー SDKが使える公式イメージをpullしてきます。
sudo docker pull 546150905175.dkr.ecr.us-west-2.amazonaws.com/kinesis-video-producer-sdk-cpp-amazon-linux:latest
pullしてきたイメージをベースイメージに設定したDockerfileを作成します。
python3を導入した上で、pipを用いてopencvなどのpython packageを導入し、これから実装するソースコードをコピーしています。また、今回はAWSのリソース、認証情報を環境変数に設定していますが、セキュリティ的にはあまりよろしくない方法だと思うので自己責任でお願いします。
FROM 546150905175.dkr.ecr.us-west-2.amazonaws.com/kinesis-video-producer-sdk-cpp-amazon-linux
RUN yum install -y \
python3 \
python3-dev \
python3-setuptools \
python3-pip
RUN pip3 install --upgrade pip setuptools && \
pip3 install opencv-python && \
pip3 install boto3
COPY ./stream.py /opt/amazon-kinesis-video-streams-producer-sdk-cpp/build/
ENV AWS_ACCESS_KEY_ID="XXXX"
ENV AWS_SECRET_ACCESS_KEY="XXXX"
ENV AWS_DEFAULT_REGION="ap-northeast-1"
ENV KVS_STREAM_NAME="XXXX"
ENV KVS_PRODUCER_BUILD_PATH="/opt/amazon-kinesis-video-streams-producer-sdk-cpp/build/"
実装
ここでは、回し車の回転をトリガーにAmazon Kinesis Video Streamsに動画を配信し、再生用URLを通知・回転日時をデータベースに保存するソースコードを作成します。
回し車の回転を検知するアルゴリズムは下記の通りです。
- OpenCVを用いて回し車の端に付与した黄色のマーカーを検出(
detect_marker
)
すでに2の判定が満たされていた場合、2をスキップして3に進む - マーカーの位置が基準線(黒色の直線)より上にあるか判定し、1に戻る
- マーカーの位置が基準線(黒色の直線)より下にあるか判定
- 3の判定が満たされていない場合は1に戻り、3の判定が満たされていた場合は回し車が回転したとみなし、判定結果をリセットして1に戻る
この手順で回し車の回転をカウントしている様子を描画してみたところ、正確にカウントできていました。
(撮影場所に応じて個別に色検出の閾値をチューニングする必要があったり、背景に同系色のモノがあるとマーカーの位置を特定できなかったりと何かとずさんなアルゴリズムではありますが、、、)
実装は下記のようになりました。
回し車の回転を検知しながらKinesis Video Streamsに動画をアップロードするため、multiprocessingを用いて動画配信部分は別プロセスで並列処理するようにしています。回し車の回転が検知されたとき、このプロセスの処理が終了していれば、新しいプロセスが開始されます。
import datetime
import os
import subprocess
import time
import requests
import json
from multiprocessing import Process
import numpy as np
import boto3
import cv2
KVS_STREAM_NAME = os.environ["KVS_STREAM_NAME"]
KVS_PRODUCER_BUILD_PATH = os.environ["KVS_PRODUCER_BUILD_PATH"]
RTSP_URL = "XXXX"
APP_NAME = "kvs_gstreamer_sample"
LINE_NOTIFY_TOKEN = "XXXX"
API_ENDPOINT = "https://XXXX.execute-api.ap-northeast-1.amazonaws.com/APItest/dynamodbctrl"
LINE_NOTIFY_API = "https://notify-api.line.me/api/notify"
RECORD_SEC = 5 #録画時間
EXPIRATION_MIN = 60 #再生用URLの有効期間
EXPIRATION_SEC = 60 * EXPIRATION_MIN #再生用URLの有効期間
YELLOW_MARKER_LOWER = np.array([0,70,100]) #色検出閾値の下限
YELLOW_MARKER_UPPER = np.array([30,150,200]) #色検出閾値の上限
BASELINE_Y = 350 #基準線:黄色のマーカーが到達するy座標の最小値より少し大きな値を設定します
kvs = boto3.client("kinesisvideo")
def detect_marker(frame, lower, upper):
marker_mask = cv2.inRange(frame, lower, upper)
marker_mask = cv2.medianBlur(marker_mask, 3)
contours, _ = cv2.findContours(marker_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return False, False
else:
max_cnt = max(contours, key=lambda x: cv2.contourArea(x))
mu = cv2.moments(max_cnt)
if mu["m00"] != 0:
x,y= int(mu["m10"]/mu["m00"]) , int(mu["m01"]/mu["m00"])
else:
return False, False
return x, y
def upload_video():
""" Upload video using Amazon Kinesis Video Streams Producer SDK C++ """
print("start recording")
start = time.time()
kvs_app = f"{KVS_PRODUCER_BUILD_PATH}/{APP_NAME}"
try:
a = subprocess.run(
[kvs_app, KVS_STREAM_NAME, RTSP_URL],
cwd=KVS_PRODUCER_BUILD_PATH,
timeout=RECORD_SEC
)
except subprocess.TimeoutExpired:
end = time.time()
print("record finished")
url = get_session_url(start, end) #再生用URLの取得
notify_url(url, start) #再生用URLをLINE通知
print("record interrupted")
def get_session_url(start, end):
""" Get HLS streaming session URL """
endpoint = kvs.get_data_endpoint(
APIName="GET_HLS_STREAMING_SESSION_URL",
StreamName=KVS_STREAM_NAME
)['DataEndpoint']
kvam = boto3.client("kinesis-video-archived-media", endpoint_url=endpoint)
url = kvam.get_hls_streaming_session_url(
StreamName=KVS_STREAM_NAME,
PlaybackMode="ON_DEMAND",
ContainerFormat="MPEG_TS",
DisplayFragmentTimestamp="ALWAYS",
Expires=EXPIRATION_SEC,
HLSFragmentSelector={
"FragmentSelectorType": "PRODUCER_TIMESTAMP",
"TimestampRange": {
"StartTimestamp": start,
"EndTimestamp": end,
}
},
)['HLSStreamingSessionURL']
print(f"HLS session URL: {url}")
return url
def notify_url(url, timestamp):
""" Notify HLS streaming session URL"""
date = datetime.datetime.fromtimestamp(timestamp)
message = f"""ぎんちゃんが走り出しました。
日時: {date.strftime('%Y/%m/%d %H:%M:%S')}
再生用URL: {url} ({EXPIRATION_MIN}分のみ有効です)
"""
headers = {'Authorization': f'Bearer {LINE_NOTIFY_TOKEN}'}
data = {'message': f'{message}'}
requests.post(LINE_NOTIFY_API, headers = headers, data = data)
def main():
p = Process(target=upload_video)
cap = cv2.VideoCapture(RTSP_URL)
flag1, flag2 = False, False
count = 0
while True:
ret, frame = cap.read()
if ret:
_, y_y = detect_marker(frame, YELLOW_MARKER_LOWER, YELLOW_MARKER_UPPER)
if not y_y: #マーカーが検出されなかったとき以降の処理を全てスキップする
continue
if y_y < BASELINE_Y:
flag1 = True
elif flag1 and y_y > BASELINE_Y:
flag2 = True
if flag1 and flag2:
data = json.dumps({"OperationType":"PUT", "Keys":{"date":str(datetime.datetime.now())}})
requests.post(API_ENDPOINT, data = data) #回し車の回転日時をDBに格納
flag1 = False
flag2 = False
count += 1
if p.exitcode is None:
if count == 1:
p.start()
else:
p = Process(target=upload_video)
p.start()
else:
break
if __name__ == '__main__':
main()
実行結果
準備で用意したDockerfileをビルドし、イメージを作成します。
コンテナを起動し、作成したソースコード(stream.py)を実行するとアプリケーションが開始されます。
docker build -t test .
docker run -it test python3 stream.py
回し車の回転を検知すると、以下のようにLINE Notifyから通知が届きます。また、通知されたリンクに飛ぶと、録画された動画をオンデマンドで再生することができます。これで外出時でもぎんちゃんが家で元気に走っていることを確認できそうです。
また、DynamoDBのテーブルには回し車が回転した日時が保存されています。
日付ごとにレコードの数をカウントして回し車の直径を考慮すれば、毎日の走行距離を算出することもできるので、ぎんちゃんの体調管理にも活用できるかもしれません。
おわりに
本記事では簡易的なペット見守りアプリケーションの実装方法を紹介しました。
今後はラズパイで温湿度センサーを使った部屋の気温・湿度のモニタリングやエアコンなどの家電操作などの機能を実装して、遠隔でもぎんちゃんが過ごしやすい環境を整えられるようにしたいです。
ペット(特にハムスターとかデグー)を飼っているエンジニアの方はぜひ参考にしてみてください。