背景
自宅のプランター菜園を常時観察できるようにRaspberry Piに接続したカメラで常時撮影してKinesis Video Streams (KVS) に送信しているのですが、放っておくと所定の時間で動画は削除されてしまいます。勿体ないので保管して見返したり、後々機械学習に利用したりできるようにしたいと思い、S3に定期的に保管する方法を検証してみました。
KVSは流れてきた動画をS3に保管する機能を持っていないため、この部分は作りこむ必要があります。
構成
Raspberry PiからKVSにデータを流す方法は下記を参考にしました。具体的な実装手順に関してはこの記事では割愛します。
https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/examples-gstreamer-plugin.html
KVSに流れてきた動画はデフォルトでは24時間保管されています。(最大10年)
全体の流れとしてはこれだけです。
- KVSから所定の開始時刻/終了時刻の動画を GetClip API でMP4として取り出し S3 に格納する Lambda Function を作成する
- EventBridge で定期的にこの Lambda Function を呼び出す
1つのMP4のファイルに永続的な動画を保管することはできないため、定期的に区切りながらMP4として保管していく必要があります。動画の取得頻度は GetClip API のレスポンスのペイロードに格納できるサイズに依存します。下記のとおり、開始のタイムスタンプから100MBのサイズまたは200フラグメント以下のどちらか小さいほうになるため、撮影している動画のフラグメントを確認します。
Payload
指定したビデオストリームのメディアクリップを含む従来の MP4 ファイル。出力には、指定された開始タイムスタンプから (最初の) 100 MB 分のフラグメントまたは 200 個のフラグメントが含まれます。詳細については、「Kinesis Video Streams Limits」を参照してください。
フラグメントの長さはKVSのマネジメントコンソールにて、 ビデオストリーム - メディア再生 - メディア統計 で確認することが出来ます。私の動画は 1.5秒 になっています。
200個のフラグメントまで取得できるので、私の動画ではGetClipで取得できる動画の長さは5分が上限になります。
1.5秒 x 200 = 300秒 = 5分
さて、これを元に実装していきます。
開始時刻のタイムスタンプを Parameter Store に格納
Lambdaを動かす際に問題になるのが、揮発性です。要するに何かしらの値をずっとLambdaのおなかに保持することが出来ないので、どこか外に置く必要があります。今回は5分毎に動画を切り出していくので、開始時刻のタイムスタンプを更新しながら保持する必要があります。よくある方法としてはDynamoDBに格納するやり方なのですが、たかだかタイムスタンプ1つのためにわざわざDynamoDBを建てるのも面倒&もったいないので、Systems Manager の一機能であるParameter Storeを利用してみることにしました。
設定内容は下記のとおりです。最後の値が動画の切り出し毎に更新されていきます。
設定項目 | 値 |
---|---|
名前 | /iot-farm/latest-video-timestamp |
利用枠 | 標準 |
タイプ | 文字列 |
データ型 | text |
値 | 2023-05-31 00:00:00 |
同様に、切り出す動画の長さ(300秒)を定義するパラメータストアも作っておくことにしました。(意味があるかはわかりませんが。。。)
KVS から動画を切り出してS3に格納する Lambda Function
Lambdaでは下記の処理を作成していきます
Parameter Store から値を取得する
Parameter Store からは get_parameter というAPIをたたくことでレスポンス内の Value に値が格納されて取り出すことが出来ます。楽ちんですね。
# Get starttime value from Parameter Store
ssm = boto3.client('ssm')
str_starttime = ssm.get_parameter(
Name='/iot-farm/latest-video-timestamp'
)['Parameter']['Value']
str_duration = ssm.get_parameter(
Name='/iot-farm/video-duration-seconds'
)['Parameter']['Value']
動画を切り出す
KVSから動画を切り出すにはGetClip APIを利用することになりますが、これ単体ではデータは取り出せず、あらかじめKVSのエンドポイントを GetDataEndpoint API で取得する必要があります。これを忘れると、 UnknowOperation の例外が発生します。
GetClip API の使用の前提条件として、GetDataEndpoint を使用してエンドポイントを取得し、APINameパラメーターに GET_CLIP を指定する必要があります。
# KVSのエンドポイントを取得
kv = boto3.client('kinesisvideo')
kv_response = kv.get_data_endpoint(
StreamName=stream_name,
APIName='GET_CLIP'
)
そのあと、このエンドポイントを用いてboto3のクライアントを初期化して get_clip を使って動画を切り出しますが、このときの動画クリップの開始時刻と終了時刻の指定は共に datetime 型です。そのため、下記のようにstrptime メソッドを使って文字列型をdatetime型に変換する必要があります。また、終了時刻はtimedelta メソッドを利用して導き出しました。
starttime = datetime.datetime.strptime(str_starttime, "%Y-%m-%d %H:%M:%S")
endtime = starttime + datetime.timedelta(seconds=int(str_duration))
作成した開始時刻と終了時刻を用いてget_clipを呼び出します。stream_nameはKVSのストリーム名です。
kvs = boto3.client('kinesis-video-archived-media', endpoint_url=kv_response['DataEndpoint'])
response = kvs.get_clip(
StreamName=stream_name,
ClipFragmentSelector={
'FragmentSelectorType': 'SERVER_TIMESTAMP',
'TimestampRange': {
'StartTimestamp': starttime,
'EndTimestamp': endtime
}
}
)
S3 への格納
切り出した動画をS3 に保管します。一旦Lambdaの/tmpに保管してからアップロードする方法もありますが、二度手間になるので、今回はペイロードをインメモリでそのままアップロードするようにしました。動画の実体は response 内の Payload に格納されているため、read() メソッドでバイナリで取り出し、そのままS3にput_objectしてあげます。Lambdaはデフォルトで128MBのメモリが搭載されていますが、実行前に念のため確認してください。
bucket_name = "YOUR-BUCKET-NAME"
body = response['Payload'].read()
key = "video/" + starttime.strftime("%Y-%m-%d_%H-%M-%S") + ".mp4"
s3 = boto3.client('s3')
put_response = s3.put_object(
Body=body,
Bucket=bucket_name,
Key=key
)
Parameter Store の更新
最後に、切り出した動画の続きから切り出せるように開始時刻を新しくしたパラメータで更新します。PutParameter APIを呼び出すだけなので簡単です。その際、終了時刻をstrftimeメソッドで文字列に変換して格納します。
ssm_put_response = ssm.put_parameter(
Name='/iot-farm/latest-video-timestamp',
Value=endtime.strftime("%Y-%m-%d %H:%M:%S"),
Overwrite=True
)
コードの全貌
全体のコードはこちらです。
import json
import boto3
import datetime
def lambda_handler(event, context):
# Get starttime value from Parameter Store
ssm = boto3.client('ssm')
str_starttime = ssm.get_parameter(
Name='/iot-farm/latest-video-timestamp'
)['Parameter']['Value']
str_duration = ssm.get_parameter(
Name='/iot-farm/video-duration-seconds'
)['Parameter']['Value']
# Parameters
stream_name = event['stream_name']
starttime = datetime.datetime.strptime(str_starttime, "%Y-%m-%d %H:%M:%S")
endtime = starttime + datetime.timedelta(seconds=int(str_duration))
# KVSのエンドポイントを取得
kv = boto3.client('kinesisvideo')
kv_response = kv.get_data_endpoint(
StreamName=stream_name,
APIName='GET_CLIP'
)
kvs = boto3.client('kinesis-video-archived-media', endpoint_url=kv_response['DataEndpoint'])
response = kvs.get_clip(
StreamName=stream_name,
ClipFragmentSelector={
'FragmentSelectorType': 'SERVER_TIMESTAMP',
'TimestampRange': {
'StartTimestamp': starttime,
'EndTimestamp': endtime
}
}
)
bucket_name = "YOUR-BUCKET-NAME"
body = response['Payload'].read()
key = "video/" + starttime.strftime("%Y-%m-%d_%H-%M-%S") + ".mp4"
s3 = boto3.client('s3')
put_response = s3.put_object(
Body=body,
Bucket=bucket_name,
Key=key
)
ssm_put_response = ssm.put_parameter(
Name='/iot-farm/latest-video-timestamp',
Value=endtime.strftime("%Y-%m-%d %H:%M:%S"),
Overwrite=True
)
EventBridge Scheduler で5分毎にLambdaが起動するよう設定する
定期的にLambdaをキックするために利用します。
EventBridge のマネジメントコンソールにてスケジューラ をクリックし、スケジュールを作成をクリックします。
スケジュールの設定などをしていきます。繰り返し実行ができるように 定期的なスケジュール を選択してrateベースのスケジュールを選択してください。また、実効間隔(rate式)は 5 minites に設定します。
ターゲットの選択 にて AWS Lambda を選択し、上で作成したLambda関数とLambda関数の引数(ペイロード)を指定します。
{
"stream_name": "YOUR-STREAM-NAME", #ストリーム名
"bucket_name": "YOURMP4BUCKET", #出力先バケット
"folder": "video/" #バケット内の出力先フォルダー
}
再試行回数を5回程度に減らしておきます。
最後にスケジュールを作成をクリックして完了です。
結果
放っておくと、このように5分毎に動画が切り出されてS3に格納されるようになりました。
また Parameter Store も5分毎にバージョンが上がり続けてます。どこまで上げられるのかわかりませんが、やはりDynamoDBに・・・といずれなるかもしれません。。。
まとめ
GetClip API + Lambda + EventBridgeの組み合わせで定期的にKVSの動画を切り出して保管できるようになりました。今後KVS自体に機能が実装されていくかもしれませんが、この内容が今現在で困っている方の参考になれば幸いです。