やりたいこと
まさにこれ(先駆者に感謝)
https://dev.classmethod.jp/articles/amazon-connect-voice-mail-from-kinesis-video-stream
記事中に載っているコードはnodejsで書かれていますが、wavへの変換部分をpythonで書いてみた...という内容
boto3で取得
DataEndpointを取得して、コンテンツを取得するまではドキュメントを見ながら書ける
import boto3
kinesis_client = boto3.client('kinesisvideo')
endpoint_response = kinesis_client.get_data_endpoint(
StreamARN=stream_arn,
APIName='GET_MEDIA'
)
data_endpoint = endpoint_response['DataEndpoint']
kinesis_video_client = boto3.client('kinesis-video-media', endpoint_url=data_endpoint)
media_response = kinesis_video_client.get_media(
StreamARN=stream_arn,
StartSelector={
'StartSelectorType': 'FRAGMENT_NUMBER',
'AfterFragmentNumber': start_fragment_number
}
)
# media_response['Payload'] に取得したい中身がある
肝心の中身はドキュメントをみると...
The payload Kinesis Video Streams returns is a sequence of chunks from the specified stream. For information about the chunks, see . The chunks that Kinesis Video Streams returns in the GetMedia call also include the following additional Matroska (MKV) tags:
...「Matroska」(マトリョーシカ) ???
Matroskaについて調べた
Matroskaは動画、音声、字幕などのマルチメディアデータを格納するコンテナフォーマット
データ構造仕様はEBML(Extensible Binary Meta Language)
定義を見るとそれぞれのelementに固有のIDが振られていて、それを元に取得できそう
https://www.matroska.org/technical/elements.html
pythonでEBMLを扱いたい
ライブラリがあった
pip install ebmlite
parserを作ってる方もいて、参考にさせてもらいました
https://github.com/rosealexander/kvsparser
simple_blockの中に必要なバイナリがあるので、それだけのparserを作ってみた
from enum import Enum
from dataclasses import dataclass
from ebmlite import loadSchema, Document
# Enum系は省略(githubを参考に)
@dataclass()
class Fragment:
bytes: bytearray
dom: Document
@property
def simple_blocks(self):
return self.__get_simple_blocks()
def __get_simple_blocks(self):
segment = next(filter(lambda s: s.id == Mkv.SEGMENT.value, self.dom))
cluster = next(filter(lambda c: c.id == Mkv.CLUSTER.value, segment))
simple_blocks = [ b for b in cluster if b.id == Mkv.SIMPLEBLOCK.value ]
return simple_blocks
class KVSParser:
def __init__(self, media):
self.__stream = media['Payload']
self.__schema = loadSchema('matroska.xml')
self.__buffer = bytearray()
@property
def fragments(self):
fl = list()
for chunk in self.__stream:
fragment = self.__parse(chunk)
if fragment:
fl.append(fragment)
return fl
def __parse(self, chunk):
self.__buffer.extend(chunk)
header_elements = [e for e in self.__schema.loads(self.__buffer) if e.id == Ebml.EBML.value]
if header_elements:
offset = header_elements[0].offset
fragment_bytes = self.__buffer[:offset]
fragment_dom = self.__schema.loads(fragment_bytes)
fragment = Fragment(bytes=fragment_bytes, dom=fragment_dom)
self.__buffer = self.__buffer[offset:]
return fragment
上記のparserで、必要なバイナリ部分を取得(エラー処理などは割愛)
parser = KVSParser(media_response)
fragments = parser.fragments
# SIMPLE_BLOCKの中身だけを取得
data_list = list()
for fragment in fragments:
for block in fragment.simple_blocks:
data_list.append(block.value)
# 各chunkの先頭4バイトを破棄して結合する
margin = 4
total_length = 0
for data in data_list:
total_length += len(data) - margin
samples = bytearray(total_length)
pos = 0
for data in data_list:
tmp = data[margin:]
samples[pos:pos+len(tmp)] = tmp
pos += len(tmp)
# wavファイルのフォーマットに変換(後述)
wav = convert_bytearray_to_wav(samples)
# 保存
with open(output_file, 'wb') as file:
file.write(wav)
wavファイルのフォーマットにする
wavファイルはbytesarrayにゴリゴリ書いていくだけ
def convert_bytearray_to_wav(samples):
length = len(samples)
# Single 16bit PCM 8000Hz、ビットオーダーはリトルエンディアン
channel = 1
bit_par_sample = 16
format_code = 1
sample_rate = 8000
header_size = 44
wav = bytearray()
# RIFFチャンク
wav[0:4] = b'RIFF' # チャンクID
wav[4:8] = struct.pack('<I', length + header_size - 12) # ファイルサイズ (このチャンクを含まない)
wav[8:12] = b'WAVE' # Wave ID
# fmtチャンク
wav[12:16] = b'fmt ' # チャンクID
wav[16:20] = struct.pack('<I', 16) # fmtチャンクのバイト数
wav[20:22] = struct.pack('<H', format_code) # フォーマットコード
wav[22:24] = struct.pack('<H', channel) # チャンネル数
wav[24:28] = struct.pack('<I', sample_rate) # サンプリングレート
wav[28:32] = struct.pack('<I', sample_rate * 2) # データ速度
wav[32:34] = struct.pack('<H', 2) # ブロックサイズ
wav[34:36] = struct.pack('<H', bit_par_sample) # サンプルあたりのビット数
# dataチャンク
wav[36:40] = b'data' # チャンクID
wav[40:44] = struct.pack('<I', length) # 波形データのバイト数
offset = header_size
for i in range(0, length, 4):
value = struct.unpack('<I', samples[i:i+4])[0]
wav[offset:offset+4] = struct.pack('<I', value)
offset += 4
return wav
作成できたwavファイルはローカルに保存したり、lambda上で実行するならS3に保存するなりできますね。