2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Amazon Connect〜KinesisVideoStreamsで保存された音声をpythonで取得したい

Last updated at Posted at 2023-06-15

やりたいこと

まさにこれ(先駆者に感謝)
https://dev.classmethod.jp/articles/amazon-connect-voice-mail-from-kinesis-video-stream

記事中に載っているコードはnodejsで書かれていますが、wavへの変換部分をpythonで書いてみた...という内容

boto3で取得

DataEndpointを取得して、コンテンツを取得するまではドキュメントを見ながら書ける

import boto3

kinesis_client = boto3.client('kinesisvideo')

endpoint_response = kinesis_client.get_data_endpoint(
    StreamARN=stream_arn,
    APIName='GET_MEDIA'
)
data_endpoint = endpoint_response['DataEndpoint']

kinesis_video_client = boto3.client('kinesis-video-media', endpoint_url=data_endpoint)

media_response = kinesis_video_client.get_media(
    StreamARN=stream_arn,
    StartSelector={
        'StartSelectorType': 'FRAGMENT_NUMBER',
        'AfterFragmentNumber': start_fragment_number
    }
)

# media_response['Payload'] に取得したい中身がある

肝心の中身はドキュメントをみると...

The payload Kinesis Video Streams returns is a sequence of chunks from the specified stream. For information about the chunks, see . The chunks that Kinesis Video Streams returns in the GetMedia call also include the following additional Matroska (MKV) tags:

...「Matroska」(マトリョーシカ) ???

Matroskaについて調べた

Matroskaは動画、音声、字幕などのマルチメディアデータを格納するコンテナフォーマット

データ構造仕様はEBML(Extensible Binary Meta Language)

定義を見るとそれぞれのelementに固有のIDが振られていて、それを元に取得できそう
https://www.matroska.org/technical/elements.html

pythonでEBMLを扱いたい

ライブラリがあった

pip install ebmlite

parserを作ってる方もいて、参考にさせてもらいました
https://github.com/rosealexander/kvsparser

simple_blockの中に必要なバイナリがあるので、それだけのparserを作ってみた

from enum import Enum
from dataclasses import dataclass
from ebmlite import loadSchema, Document

# Enum系は省略(githubを参考に)

@dataclass()
class Fragment:

    bytes: bytearray
    dom: Document

    @property
    def simple_blocks(self):
        return self.__get_simple_blocks()
    
    def __get_simple_blocks(self):
        segment = next(filter(lambda s: s.id == Mkv.SEGMENT.value, self.dom))
        cluster = next(filter(lambda c: c.id == Mkv.CLUSTER.value, segment))
        simple_blocks = [ b for b in cluster if b.id == Mkv.SIMPLEBLOCK.value ]
        return simple_blocks


class KVSParser:

    def __init__(self, media):
        self.__stream = media['Payload']
        self.__schema = loadSchema('matroska.xml')
        self.__buffer = bytearray()

    @property
    def fragments(self):
        fl = list()

        for chunk in self.__stream:
            fragment = self.__parse(chunk)
            if fragment:
                fl.append(fragment)

        return fl

    def __parse(self, chunk):
        self.__buffer.extend(chunk)
        header_elements = [e for e in self.__schema.loads(self.__buffer) if e.id == Ebml.EBML.value]
        if header_elements:
            offset = header_elements[0].offset
            fragment_bytes = self.__buffer[:offset]
            fragment_dom = self.__schema.loads(fragment_bytes)
            fragment = Fragment(bytes=fragment_bytes, dom=fragment_dom)
            self.__buffer = self.__buffer[offset:]
            return fragment

上記のparserで、必要なバイナリ部分を取得(エラー処理などは割愛)

parser = KVSParser(media_response)
fragments = parser.fragments

# SIMPLE_BLOCKの中身だけを取得
data_list = list()

for fragment in fragments:
    for block in fragment.simple_blocks:
        data_list.append(block.value)

# 各chunkの先頭4バイトを破棄して結合する
margin = 4
total_length = 0
for data in data_list:
    total_length += len(data) - margin

samples = bytearray(total_length)
pos = 0
for data in data_list:
    tmp = data[margin:]
    samples[pos:pos+len(tmp)] = tmp
    pos += len(tmp)

# wavファイルのフォーマットに変換(後述)
wav = convert_bytearray_to_wav(samples)

# 保存
with open(output_file, 'wb') as file:
    file.write(wav)

wavファイルのフォーマットにする

wavファイルはbytesarrayにゴリゴリ書いていくだけ

def convert_bytearray_to_wav(samples):
    length = len(samples)

    # Single 16bit PCM 8000Hz、ビットオーダーはリトルエンディアン
    channel = 1
    bit_par_sample = 16
    format_code = 1
    sample_rate = 8000

    header_size = 44
    wav = bytearray()

    # RIFFチャンク
    wav[0:4] = b'RIFF'                                          # チャンクID
    wav[4:8] = struct.pack('<I', length + header_size - 12)     # ファイルサイズ (このチャンクを含まない)
    wav[8:12] = b'WAVE'                                         # Wave ID

    # fmtチャンク
    wav[12:16] = b'fmt '                                # チャンクID
    wav[16:20] = struct.pack('<I', 16)                  # fmtチャンクのバイト数
    wav[20:22] = struct.pack('<H', format_code)         # フォーマットコード
    wav[22:24] = struct.pack('<H', channel)             # チャンネル数
    wav[24:28] = struct.pack('<I', sample_rate)         # サンプリングレート
    wav[28:32] = struct.pack('<I', sample_rate * 2)     # データ速度
    wav[32:34] = struct.pack('<H', 2)                   # ブロックサイズ
    wav[34:36] = struct.pack('<H', bit_par_sample)      # サンプルあたりのビット数

    # dataチャンク
    wav[36:40] = b'data'                                # チャンクID
    wav[40:44] = struct.pack('<I', length)              # 波形データのバイト数

    offset = header_size
    for i in range(0, length, 4):
        value = struct.unpack('<I', samples[i:i+4])[0]
        wav[offset:offset+4] = struct.pack('<I', value)
        offset += 4

    return wav

作成できたwavファイルはローカルに保存したり、lambda上で実行するならS3に保存するなりできますね。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?