Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
19
Help us understand the problem. What is going on with this article?
@sizumita

PythonでDiscordのボイスチャットの音声を録音する

Discord APIのラッパーのDiscord.pyには音声録音機能がありません。
ですが、ボイスチャットの録音ができるようにする必要があったので、Discord.pyを使って録音する方法を解説します。

この記事はDiscord.py純正のWebSocketのラッパーを使います。もし自分で作りたい場合は こちらの記事 を参照してください。

1. パケットを受け取る

クラスを継承する

こちらの記事 にもあるように、op code == 4を受け取った後に音声が流れてきます。
これを利用するために、DiscordVoiceWebsocketVoiceClientをそれぞれ継承したクラスを作成します。

from discord.gateway import DiscordVoiceWebSocket
from discord import VoiceClient


class MyVoiceWebSocket(DiscordVoiceWebSocket):
    pass


class MyVoiceClient(VoiceClient):
    async def connect_websocket(self) -> MyVoiceWebSocket:
        ws = await MyVoiceWebSocket.from_client(self)
        self._connected.clear()
        while ws.secret_key is None:
            await ws.poll_event()
        self._connected.set()
        return ws

こうすることで、connect時にMyVoiceWebSocketが使われます。

await voice_channel.connect(cls=MyVoiceClient)

音声を受け取る

class MyVoiceWebSocket(DiscordVoiceWebSocket):
    def __init__(self, socket, loop):
        super().__init__(socket, loop)
        self.record_ready = False

    async def received_message(self, msg):
        await super(MyVoiceWebSocket, self).received_message(msg)
        op = msg['op']

        if op == self.SESSION_DESCRIPTION:  # op 5
            self.record_ready = True


class MyVoiceClient(VoiceClient):
    def __init__(self, client, channel):
        super().__init__(client, channel)
        self.record_task = None

    async def recv_voice_packet(self):
        if not self.ws.record_ready:
            raise ValueError("Not Record Ready")

        while True:
            recv = await self.loop.sock_recv(self.socket, 2 ** 16)

    async def connect_websocket(self) -> MyVoiceWebSocket:
        ws = await MyVoiceWebSocket.from_client(self)
        self._connected.clear()
        while ws.secret_key is None:
            await ws.poll_event()
        self._connected.set()
        return ws

op codeが5のメッセージを待つことでrecord_readyがTrueになった後から音声を取得できるようになります。

2. 受け取ったパケットの暗号化を解除する

受け取ったパケットはヘッダーを除いてxsalsa20_poly1305で暗号化されています。これをdecryptします。

参照: https://discord.com/developers/docs/topics/voice-connections#encrypting-and-sending-voice

まず、暗号化用の関数を作成します。

import nacl.secret


class MyVoiceClient(VoiceClient):

    ...

    def decrypt_xsalsa20_poly1305(self, data: bytes) -> tuple:
        box = nacl.secret.SecretBox(bytes(self.secret_key))
        is_rtcp = 200 <= data[1] < 205
        if is_rtcp:
            header, encrypted = data[:8], data[8:]
            nonce = bytearray(24)
            nonce[:8] = header
        else:
            header, encrypted = data[:12], data[12:]
            nonce = bytearray(24)
            nonce[:12] = header
        return header, box.decrypt(bytes(encrypted), bytes(nonce))

    def decrypt_xsalsa20_poly1305_suffix(self, data: bytes) -> tuple:
        box = nacl.secret.SecretBox(bytes(self.secret_key))
        is_rtcp = 200 <= data[1] < 205
        if is_rtcp:
            header, encrypted, nonce = data[:8], data[8:-24], data[-24:]
        else:
            header, encrypted, nonce = data[:12], data[12:-24], data[-24:]
        return header, box.decrypt(bytes(encrypted), bytes(nonce))

    def decrypt_xsalsa20_poly1305_lite(self, data: bytes) -> tuple:
        box = nacl.secret.SecretBox(bytes(self.secret_key))
        is_rtcp = 200 <= data[1] < 205
        if is_rtcp:
            header, encrypted, _nonce = data[:8], data[8:-4], data[-4:]
        else:
            header, encrypted, _nonce = data[:12], data[12:-4], data[-4:]
        nonce = bytearray(24)
        nonce[:4] = _nonce
        return header, box.decrypt(bytes(encrypted), bytes(nonce))

    ...

次に、パケットを解読します。また、RTCPパケットは破棄します。

import struct


class RTCPacket:
    def __init__(self, header, decrypted):
        self.version = (header[0] & 0b11000000) >> 6
        self.padding = (header[0] & 0b00100000) >> 5
        self.extend = (header[0] & 0b00010000) >> 4
        self.cc = header[0] & 0b00001111
        self.marker = header[1] >> 7
        self.payload_type = header[1] & 0b01111111
        self.offset = 0
        self.ext_length = None
        self.ext_header = None
        self.csrcs = None
        self.profile = None
        self.real_time = None

        self.header = header
        self.decrypted = decrypted
        self.seq, self.timestamp, self.ssrc = struct.unpack_from('>HII', header, 2)


class MyVoiceClient(VoiceClient):

    ...

    async def recv_voice_packet(self):
        if not self.ws.record_ready:
            raise ValueError("Not Record Ready")

        while True:
            recv = await self.loop.sock_recv(self.socket, 2 ** 16)
            if 200 <= recv[1] < 205:
                continue
            decrypt_func = getattr(self, f'decrypt_{self.mode}')
            header, data = decrypt_func(recv)
            packet = RTCPacket(header, data)

次に、拡張ヘッダーが存在している場合はそれをデータから取り除く必要があるので、そのための関数を作成し呼び出します。

参照: RFC 5285 https://tools.ietf.org/html/rfc5285#section-4.3

ここで、Discord独自の仕様として、拡張ヘッダーの次にまた一つ0か2のデータが存在する場合があり、その場合の処理も必要です。

また、後々使用するためにパケットが作成された時間を記録します。

import time


class RTCPacket:
    ...

    def set_real_time(self):
        self.real_time = time.time()

    def calc_extension_header_length(self) -> None:
        if not (self.decrypted[0] == 0xbe and self.decrypted[1] == 0xde and len(self.decrypted) > 4):
            return
        self.ext_length = int.from_bytes(self.decrypted[2:4], "big")
        offset = 4
        for i in range(self.ext_length):
            byte_ = self.decrypted[offset]
            offset += 1
            if byte_ == 0:
                continue
            offset += 1 + (0b1111 & (byte_ >> 4))

        # Discordの仕様
        if self.decrypted[offset + 1] in [0, 2]:
            offset += 1
        self.decrypted = self.decrypted[offset + 1:]


class MyVoiceClient(VoiceClient):
    ...

    async def recv_voice_packet(self):
        ...

        while True:
            ...

            packet = RTCPacket(header, data)
            packet.set_real_time()
            packet.calc_extension_header_length()

3. decryptしたデータを保管する

BufferDecoderクラスを作り、opusとしてデコードされていないパケットを保存します.

ここでなぜ随時デコードしないかというと、ネットワークなのでパケットの順番が前後することがあり、前後した場合は並び替えてデコードしないといけないからです。

また、デコードする際はユーザーごとにデコードするため、ssrcを使用してユーザーごとにデータを保管します。そのためのクラスを作成します。

from collections import defaultdict


class PacketQueue:
    def __init__(self):
        self.queues = defaultdict(list)

    def push(self, packet):
        self.queues[packet.ssrc].append(packet)


class BufferDecoder:
    def __init__(self):
        self.queue = PacketQueue()

    def recv_packet(self, packet):
        self.queue.push(packet)


class MyVoiceClient(VoiceClient):
    def __init__(self, client, channel):
        ...

        self.decoder = None

    async def recv_voice_packet(self):
        ...
        while True:
            ...
            self.decoder.recv_packet(packet)

録音するためのメイン関数を作成します。


class MyVoiceClient(VoiceClient):
    async def record(self, record_time=30):
        if self.is_recording:
            raise ValueError("Already recording")

        # init
        self.decoder = None
        self.is_recording = True
        self.decoder = BufferDecoder()

        # do record
        self.record_task = self.loop.create_task(self.recv_voice_packet())
        await asyncio.sleep(record_time)

        self.record_task.cancel()

        # clear data
        self.record_task = None
        self.is_recording = False

4. ユーザーごとにデコードする

BufferDecoderにデコードするための関数を作成します。また、PacketQueueからssrcを指定してseqの順番を揃えてデータをイテレートさせます。

MAX_SRC = 65535


class PacketQueue:

    def get_all_ssrc(self):
        return self.queues.keys()

    def get_packets(self, ssrc: int):
        last_seq = None
        packets = self.queues[ssrc]

        while packets:
            if last_seq is None:
                packet = packets.pop(0)
                last_seq = packet.seq
                yield packet
                continue

            if last_seq == MAX_SRC:
                last_seq = -1

            if packets[0].seq - 1 == last_seq:
                packet = packets.pop(0)
                last_seq = packet.seq
                yield packet
                continue

            # 順番がおかしかったときの場合
            for i in range(1, min(1000, len(packets))):
                if packets[i].seq - 1 == last_seq:
                    packet = packets.pop(0)
                    last_seq = packet.seq
                    yield packet
                    break
            else:
                #  該当するパケットがなかった場合、破損していたとみなす
                yield None

        # 終了
        yield -1


class BufferDecoder:
    async def decode(self):
        pass

5. Opusをデコードし、合成する

ここで、Opusのデコーダーが必要になります。が、Discord.pyのOpus Decoderには問題点が存在しているのと、floatの配列で音声を受け取りたいので自分でDecoderを作成します。

from discord.opus import Decoder as DiscordDecoder
from discord.opus import exported_functions, OpusError, c_float_ptr
import sys
import ctypes
import os
import logging
import struct

log = logging.getLogger(__name__)


_lib = None


def libopus_loader(name):
    # create the library...
    lib = ctypes.cdll.LoadLibrary(name)

    # register the functions...
    for item in exported_functions:
        func = getattr(lib, item[0])

        try:
            if item[1]:
                func.argtypes = item[1]

            func.restype = item[2]
        except KeyError:
            pass

        try:
            if item[3]:
                func.errcheck = item[3]
        except KeyError:
            log.exception("Error assigning check function to %s", func)

    return lib


def _load_default():
    global _lib
    try:
        if sys.platform == 'win32':
            _basedir = os.path.dirname(os.path.abspath(__file__))
            _bitness = struct.calcsize('P') * 8
            _target = 'x64' if _bitness > 32 else 'x86'
            _filename = os.path.join(_basedir, 'bin', 'libopus-0.{}.dll'.format(_target))
            _lib = libopus_loader(_filename)
        else:
            _lib = libopus_loader(ctypes.util.find_library('opus'))
    except Exception:
        _lib = None

    return _lib is not None


def is_loaded():
    global _lib
    return _lib is not None


class Decoder(DiscordDecoder):
    @staticmethod
    def packet_get_nb_channels(data: bytes) -> int:
        return 2

    def decode_float(self, data, *, fec=False):
        if not is_loaded():
            _load_default()
        if data is None and fec:
            raise OpusError("Invalid arguments: FEC cannot be used with null data")

        if data is None:
            frame_size = self._get_last_packet_duration() or self.SAMPLES_PER_FRAME
            channel_count = self.CHANNELS
        else:
            frames = self.packet_get_nb_frames(data)
            channel_count = self.packet_get_nb_channels(data)
            samples_per_frame = self.packet_get_samples_per_frame(data)
            frame_size = frames * samples_per_frame

        pcm = (ctypes.c_float * (frame_size * channel_count))()
        pcm_ptr = ctypes.cast(pcm, c_float_ptr)

        ret = _lib.opus_decode_float(self._state, data, len(data) if data else 0, pcm_ptr, frame_size, fec)

        return pcm[:ret * channel_count]

これを使用してデコードを開始します。

from itertools import zip_longest
import numpy as np

    async def _decode(self, ssrc):
        decoder = Decoder()
        pcm = []
        start_time = None

        last_timestamp = None

        for packet in self.queue.get_packets(ssrc):
            if packet == -1:
                # 終了
                break
            if packet is None:
                # パケット破損の場合
                data = decoder.decode_float(None)
                pcm += data
                last_timestamp = None
                continue

            if start_time is None:
                start_time = packet.real_time
            else:
                start_time = min(packet.real_time, start_time)

            if len(packet.decrypted) < 10:
                # パケットがdiscordから送られてくる無音のデータだった場合: https://discord.com/developers/docs/topics/voice-connections#voice-data-interpolation
                last_timestamp = packet.timestamp
                continue

            if last_timestamp is not None:
                elapsed = (packet.timestamp - last_timestamp) / Decoder.SAMPLING_RATE
                if elapsed > 0.02:
                    # 無音期間
                    margin = [0] * 2 * int(Decoder.SAMPLE_SIZE * (elapsed - 0.02) * Decoder.SAMPLING_RATE)
                    pcm += margin

            data = decoder.decode_float(packet.decrypted)
            pcm += data
            last_timestamp = packet.timestamp

        del decoder

        return dict(data=pcm, start_time=start_time)

    async def decode(self):
        file = BytesIO()
        wav = wave.open(file, "wb")
        wav.setnchannels(Decoder.CHANNELS)
        wav.setsampwidth(Decoder.SAMPLE_SIZE // Decoder.CHANNELS)
        wav.setframerate(Decoder.SAMPLING_RATE)
        pcm_list = []

        for ssrc in self.queue.get_all_ssrc():
            pcm_list.append(await self._decode(ssrc))

        pcm_list.sort(key=lambda x: x["start_time"])

        if not pcm_list:
            # 音声がなかった場合
            wav.close()
            file.seek(0)
            return file
        # 録音が始まった時刻
        first_time = pcm_list[0]["start_time"]
        for pcm in pcm_list:
            # 録音が始まった時刻からのマージンをつける
            pcm["data"] = [0] * int(48000 * 2 * (pcm["start_time"] - first_time)) + pcm["data"]

        right_channel = []
        left_channel = []

        i = 0
        for bytes_ in zip_longest(*map(lambda x: x["data"], pcm_list)):
            # 左右のチャンネルにそれぞれ音声を合成してから入れる処理
            result = 0
            for b in bytes_:
                if b is None:
                    continue
                # 音声の合成
                # result = x + y - (x * y * -1) when x < 0 and y < 0
                # result = x + y - (x * y) when x > 0 and y > 0
                # otherwise, result = x + y
                if result < 0 and b < 0:
                    result = result + b - (result * b * -1)
                elif result > 0 and b > 0:
                    result = result + b - (result * b)
                else:
                    result = result + b

            # クリッピングの対処
            if result > 1:
                if not i % 2:
                    right_channel.append(1)
                else:
                    left_channel.append(1)
            elif result < -1:
                if not i % 2:
                    right_channel.append(-1)
                else:
                    left_channel.append(-1)
            else:
                if not i % 2:
                    right_channel.append(result)
                else:
                    left_channel.append(result)
            i += 1

        # 左右のチャンネルの大きさが違う場合があるので、その場合の処理
        left_len = len(left_channel)
        right_len = len(right_channel)
        if left_len != right_len:
            if not left_len % 2:
                if left_len > right_len:
                    right_channel += [0] * (left_len - right_len)
                else:
                    right_channel = right_channel[:left_len]
            elif not right_len % 2:
                if right_len > left_len:
                    left_channel += [0] * (right_len - left_len)
                else:
                    left_channel = left_channel[:right_len]

        audio = np.array([left_channel, right_channel]).T

        # Convert to (little-endian) 16 bit integers.
        audio = (audio * (2 ** 15 - 1)).astype(np.int16)

        wav.writeframes(audio.tobytes())
        wav.close()
        file.seek(0)

        return file

6. デコードした音声を返す

先ほど作った関数を使用してデータを返却します。型はBytesIOです。

class MyVoiceClient(VoiceClient):
    async def record(self, record_time=30):
        return self.decoder.decode()

終わりに

Opusの拡張ヘッダーにDiscord独自の仕様が存在していてかなり苦戦しましたが、参考になれば幸いです。

また、困ったときに助けていただいた @tignear さん、 @Shirataki2 さん、ありがとうございました。

19
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
sizumita
安曇野でpythonとかやってます。
proglove
国内最大級学生エンジニアオンラインコミュニティー

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
19
Help us understand the problem. What is going on with this article?