LoginSignup
2
0

Amazon Kinesis Video Streams で受け取った動画を定期的に自分のS3に保存する

Last updated at Posted at 2023-05-31

背景

自宅のプランター菜園を常時観察できるようにRaspberry Piに接続したカメラで常時撮影してKinesis Video Streams (KVS) に送信しているのですが、放っておくと所定の時間で動画は削除されてしまいます。勿体ないので保管して見返したり、後々機械学習に利用したりできるようにしたいと思い、S3に定期的に保管する方法を検証してみました。
IMG_3835.jpg

KVSは流れてきた動画をS3に保管する機能を持っていないため、この部分は作りこむ必要があります。

構成

最終的にアーキテクチャはこのようになりました。
AutomatedGetClip.png

Raspberry PiからKVSにデータを流す方法は下記を参考にしました。具体的な実装手順に関してはこの記事では割愛します。
https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/examples-gstreamer-plugin.html

KVSに流れてきた動画はデフォルトでは24時間保管されています。(最大10年)
全体の流れとしてはこれだけです。

  1. KVSから所定の開始時刻/終了時刻の動画を GetClip API でMP4として取り出し S3 に格納する Lambda Function を作成する
  2. EventBridge で定期的にこの Lambda Function を呼び出す

1つのMP4のファイルに永続的な動画を保管することはできないため、定期的に区切りながらMP4として保管していく必要があります。動画の取得頻度は GetClip API のレスポンスのペイロードに格納できるサイズに依存します。下記のとおり、開始のタイムスタンプから100MBのサイズまたは200フラグメント以下のどちらか小さいほうになるため、撮影している動画のフラグメントを確認します。

Payload
指定したビデオストリームのメディアクリップを含む従来の MP4 ファイル。出力には、指定された開始タイムスタンプから (最初の) 100 MB 分のフラグメントまたは 200 個のフラグメントが含まれます。詳細については、「Kinesis Video Streams Limits」を参照してください。

フラグメントの長さはKVSのマネジメントコンソールにて、 ビデオストリーム - メディア再生 - メディア統計 で確認することが出来ます。私の動画は 1.5秒 になっています。

fragment.png

200個のフラグメントまで取得できるので、私の動画ではGetClipで取得できる動画の長さは5分が上限になります。

1.5秒 x 200 = 300秒 = 5分

さて、これを元に実装していきます。

開始時刻のタイムスタンプを Parameter Store に格納

Lambdaを動かす際に問題になるのが、揮発性です。要するに何かしらの値をずっとLambdaのおなかに保持することが出来ないので、どこか外に置く必要があります。今回は5分毎に動画を切り出していくので、開始時刻のタイムスタンプを更新しながら保持する必要があります。よくある方法としてはDynamoDBに格納するやり方なのですが、たかだかタイムスタンプ1つのためにわざわざDynamoDBを建てるのも面倒&もったいないので、Systems Manager の一機能であるParameter Storeを利用してみることにしました。

starttime.png

設定内容は下記のとおりです。最後のが動画の切り出し毎に更新されていきます。

設定項目
名前 /iot-farm/latest-video-timestamp
利用枠 標準
タイプ 文字列
データ型 text
2023-05-31 00:00:00

同様に、切り出す動画の長さ(300秒)を定義するパラメータストアも作っておくことにしました。(意味があるかはわかりませんが。。。)

KVS から動画を切り出してS3に格納する Lambda Function

Lambdaでは下記の処理を作成していきます

Parameter Store から値を取得する

Parameter Store からは get_parameter というAPIをたたくことでレスポンス内の Value に値が格納されて取り出すことが出来ます。楽ちんですね。

# Get starttime value from Parameter Store
ssm = boto3.client('ssm')
str_starttime = ssm.get_parameter(
    Name='/iot-farm/latest-video-timestamp'
    )['Parameter']['Value']
    
str_duration = ssm.get_parameter(
    Name='/iot-farm/video-duration-seconds'
    )['Parameter']['Value']

動画を切り出す

KVSから動画を切り出すにはGetClip APIを利用することになりますが、これ単体ではデータは取り出せず、あらかじめKVSのエンドポイントを GetDataEndpoint API で取得する必要があります。これを忘れると、 UnknowOperation の例外が発生します。

GetClip API の使用の前提条件として、GetDataEndpoint を使用してエンドポイントを取得し、APINameパラメーターに GET_CLIP を指定する必要があります。

# KVSのエンドポイントを取得
kv = boto3.client('kinesisvideo')
kv_response = kv.get_data_endpoint(
    StreamName=stream_name,
    APIName='GET_CLIP'
    )

そのあと、このエンドポイントを用いてboto3のクライアントを初期化して get_clip を使って動画を切り出しますが、このときの動画クリップの開始時刻と終了時刻の指定は共に datetime 型です。そのため、下記のようにstrptime メソッドを使って文字列型をdatetime型に変換する必要があります。また、終了時刻はtimedelta メソッドを利用して導き出しました。

starttime = datetime.datetime.strptime(str_starttime, "%Y-%m-%d %H:%M:%S")
endtime = starttime + datetime.timedelta(seconds=int(str_duration))

作成した開始時刻と終了時刻を用いてget_clipを呼び出します。stream_nameはKVSのストリーム名です。

kvs = boto3.client('kinesis-video-archived-media', endpoint_url=kv_response['DataEndpoint'])
response = kvs.get_clip(
    StreamName=stream_name,
    ClipFragmentSelector={
        'FragmentSelectorType': 'SERVER_TIMESTAMP',
        'TimestampRange': {
            'StartTimestamp': starttime,
            'EndTimestamp': endtime
        }
    }
)

S3 への格納

切り出した動画をS3 に保管します。一旦Lambdaの/tmpに保管してからアップロードする方法もありますが、二度手間になるので、今回はペイロードをインメモリでそのままアップロードするようにしました。動画の実体は response 内の Payload に格納されているため、read() メソッドでバイナリで取り出し、そのままS3にput_objectしてあげます。Lambdaはデフォルトで128MBのメモリが搭載されていますが、実行前に念のため確認してください。

bucket_name = "YOUR-BUCKET-NAME"
body = response['Payload'].read()
key = "video/" + starttime.strftime("%Y-%m-%d_%H-%M-%S") + ".mp4"

s3 = boto3.client('s3')
put_response = s3.put_object(
    Body=body,
    Bucket=bucket_name,
    Key=key
    )

Parameter Store の更新

最後に、切り出した動画の続きから切り出せるように開始時刻を新しくしたパラメータで更新します。PutParameter APIを呼び出すだけなので簡単です。その際、終了時刻をstrftimeメソッドで文字列に変換して格納します。

ssm_put_response = ssm.put_parameter(
    Name='/iot-farm/latest-video-timestamp',
    Value=endtime.strftime("%Y-%m-%d %H:%M:%S"),
    Overwrite=True
    )

コードの全貌

全体のコードはこちらです。

import json
import boto3
import datetime

def lambda_handler(event, context):
    # Get starttime value from Parameter Store
    ssm = boto3.client('ssm')
    str_starttime = ssm.get_parameter(
        Name='/iot-farm/latest-video-timestamp'
        )['Parameter']['Value']
        
    str_duration = ssm.get_parameter(
        Name='/iot-farm/video-duration-seconds'
        )['Parameter']['Value']
    
    # Parameters    
    stream_name = event['stream_name']
    starttime = datetime.datetime.strptime(str_starttime, "%Y-%m-%d %H:%M:%S")
    endtime = starttime + datetime.timedelta(seconds=int(str_duration))
    
    # KVSのエンドポイントを取得
    kv = boto3.client('kinesisvideo')
    kv_response = kv.get_data_endpoint(
        StreamName=stream_name,
        APIName='GET_CLIP'
        )
    
    kvs = boto3.client('kinesis-video-archived-media', endpoint_url=kv_response['DataEndpoint'])
    response = kvs.get_clip(
        StreamName=stream_name,
        ClipFragmentSelector={
            'FragmentSelectorType': 'SERVER_TIMESTAMP',
            'TimestampRange': {
                'StartTimestamp': starttime,
                'EndTimestamp': endtime
            }
        }
    )
    
    bucket_name = "YOUR-BUCKET-NAME"
    body = response['Payload'].read()
    key = "video/" + starttime.strftime("%Y-%m-%d_%H-%M-%S") + ".mp4"
    
    s3 = boto3.client('s3')
    put_response = s3.put_object(
        Body=body,
        Bucket=bucket_name,
        Key=key
        )
        
    
    ssm_put_response = ssm.put_parameter(
        Name='/iot-farm/latest-video-timestamp',
        Value=endtime.strftime("%Y-%m-%d %H:%M:%S"),
        Overwrite=True
        )

EventBridge Scheduler で5分毎にLambdaが起動するよう設定する

定期的にLambdaをキックするために利用します。

EventBridge のマネジメントコンソールにてスケジューラ をクリックし、スケジュールを作成をクリックします。

event01.png

スケジュールの設定などをしていきます。繰り返し実行ができるように 定期的なスケジュール を選択してrateベースのスケジュールを選択してください。また、実効間隔(rate式)は 5 minites に設定します。

event02.png

ターゲットの選択 にて AWS Lambda を選択し、上で作成したLambda関数とLambda関数の引数(ペイロード)を指定します。

{
    "stream_name": "YOUR-STREAM-NAME", #ストリーム名
    "bucket_name": "YOURMP4BUCKET", #出力先バケット
    "folder": "video/" #バケット内の出力先フォルダー
}

event03.png

再試行回数を5回程度に減らしておきます。

event04.png

最後にスケジュールを作成をクリックして完了です。

結果

放っておくと、このように5分毎に動画が切り出されてS3に格納されるようになりました。

s3.jpg

また Parameter Store も5分毎にバージョンが上がり続けてます。どこまで上げられるのかわかりませんが、やはりDynamoDBに・・・といずれなるかもしれません。。。

parameter02.jpg

まとめ

GetClip API + Lambda + EventBridgeの組み合わせで定期的にKVSの動画を切り出して保管できるようになりました。今後KVS自体に機能が実装されていくかもしれませんが、この内容が今現在で困っている方の参考になれば幸いです。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0