Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
9
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

@Shirataki2

Pythonで始める録音機能付きDiscord Bot: (5) Discord APIを直接操作する

はじめに

この記事は前回Pythonで始める録音機能付きDiscord Bot: (4) 音楽ファイルを再生するの続きです.

この記事では音声録音の実装を始める上での準備段階としてDiscord APIを利用した音声通信APIを叩いてみて,どのように通信が行われているかを把握してみます.

記事は全7回を予定しており現在5記事まで執筆を終えています.

  1. Pythonで始める録音機能付きDiscord Bot: (1) 入門 discord.py
  2. Pythonで始める録音機能付きDiscord Bot: (2) 便利機能(Bot拡張,Cog,Embed)
  3. Pythonで始める録音機能付きDiscord Bot: (3) Databaseとの連携
  4. Pythonで始める録音機能付きDiscord Bot: (4) 音楽ファイルを再生する
  5. Pythonで始める録音機能付きDiscord Bot: (5) Discord APIを直接操作する

タグから初心者を外しているように,以下の処理は少し面倒で主に低レイヤの部分をいじっています.

通信の流れ

Discordで音声の送受信を行う際の通信の概略図を以下に示します.

main.png

概略図のため詳しい説明は省いていますが様々なプロセスを経て接続が行われているということがつかめればよいかと思います.

この流れをdiscord.pyを用いることなく実装し,Discordの音声の送受信を詳しく取り扱います.

以降の情報はすべて公式リファレンス(音声接続のGateway,通常のGateway)をもとに記述しています.

Gatewayへの接続

DiscordのGatewayは通常のGateway音声に関する情報を送受信するGatewayがあります.音声Gatewayに接続するためのエンドポイントURLを得るためにはまず,通常のGatewayで認証を行った後音声Gatewayに接続するための情報を送信します.

まずはPythonでWebSocket接続を試みるためのスクリプトを作成します.

op10 Hello

import json
import asyncio
import aiohttp
from pprint import pprint


class Gateway:
    def __init__(self, loop=None):
        if loop is None:
            loop = asyncio.get_event_loop()
        self.endpoint = 'wss://gateway.discord.gg/?v=6&encoding=json'
        loop.create_task(self.receive_data())

    async def receive_data(self):
        async with aiohttp.ClientSession() as session:
            socket = await session.ws_connect(self.endpoint)
            while True:
                packet = await socket.receive()
                if packet.type in (aiohttp.WSMsgType.CLOSED,
                                   aiohttp.WSMsgType.CLOSING,
                                   aiohttp.WSMsgType.CLOSE,
                                   aiohttp.WSMsgType.ERROR):
                    print(packet)
                    print('== 接続終了 ==')
                    break
                pprint(json.loads(packet.data))

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    ws = Gateway(loop)
    loop.run_forever()

これを実行するとGatewayに接続し,Gatewayから受け取ったデータを逐一表示するためのコルーチンreceive_dataがGateway中でcreate_task関数から呼び出され処理を開始します.これを実行すると,DiscordのGatewayから以下のようなデータが送信されます.

{'d': {'_trace': ['["gateway-prd-main-xwmj",{"micros":0.0}]'],
       'heartbeat_interval': 41250},
 'op': 10,
 's': None,
 't': None}

Discordから送信されるデータはdop,まれにtを使用してデータを表現します.opではデータの種類が,dではデータの情報本体が格納されます.tは基本的にNoneですが,より詳細に情報を伝える必要がある場合にはその情報の詳細を伝える文字列になります.

op1 Heartbeat

ここではop=10です.これはHelloと呼ばれる返信で,名前の通り始めて接続した際に送られるデータです.Helloで重要なデータはheartbeat_intervalです.ここでは41250となっていますが,これは指定されたミリ秒(41.25秒)ごとにGatewayが継続して接続できていることを伝えるためHeartbeatと呼ばれる簡単なデータを送信する必要があります.このHeartbeat処理を行う補助クラスとしてthreading.Threadを継承したクラスを作成します.run関数に所望の処理を書きインスタンスからstart関数を呼ぶことで,別スレッドでの処理が行われるようになります.

import json
import asyncio
import aiohttp
import threading
from pprint import pprint


class HeartbeatHandler(threading.Thread):
    def __init__(self, ws, interval):
        self.ws = ws
        self.interval = interval
        self.stop_ev = threading.Event()
        super().__init__()

    def run(self):
        self.send()
        while not self.stop_ev.wait(self.interval):
            self.send()

    def send(self):
        data = self.get_payload()
        asyncio.run_coroutine_threadsafe(
            self.ws.socket.send_json(data),
            self.ws.loop
        )
        print('== 送信 ==')
        print(data)

    def stop(self):
        self.stop_ev.set()

    def get_payload(self):
        raise NotImplementedError


class GatewayHeartbeat(HeartbeatHandler):
    def __init__(self, ws, interval):
        super().__init__(ws, interval)

    def get_payload(self):
        return {'op': 1, 'd': None}


class Gateway:
    def __init__(self, loop=None):
        if loop is None:
            self.loop = asyncio.get_event_loop()
        else:
            self.loop = loop
        self.endpoint = 'wss://gateway.discord.gg/?v=6&encoding=json'
        self.loop.create_task(self.receive_data())

    async def receive_data(self):
        async with aiohttp.ClientSession() as session:
            self.socket = await session.ws_connect(self.endpoint)
            while True:
                packet = await self.socket.receive()
                if packet.type in (aiohttp.WSMsgType.CLOSED,
                                   aiohttp.WSMsgType.CLOSING,
                                   aiohttp.WSMsgType.CLOSE,
                                   aiohttp.WSMsgType.ERROR):
                    print(packet)
                    print('== 接続終了 ==')
                    break
                print('== 受信 ==')
                pprint(json.loads(packet.data))
                await self.handle_message(json.loads(packet.data))
            if hasattr(self, 'heartbeat'):
                self.heartbeat.stop()

    async def handle_message(self, msg):
        op = msg.get('op')
        d = msg.get('d')
        t = msg.get('t')
        if op == 10:
            self.heartbeat = GatewayHeartbeat(
                self, d['heartbeat_interval'] / 1000
            )
            self.heartbeat.start()
            return


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    ws = Gateway(loop)
    loop.run_forever()

run_coroutine_threadsafeという如何にも!!な関数があるのでそれを使用します.これを実行すると40秒ごとに相互に通信している様子が出力されます.

== 受信 ==
{'d': {'_trace': ['["gateway-prd-main-w7j9",{"micros":0.0}]'],
       'heartbeat_interval': 41250},
 'op': 10,
 's': None,
 't': None}
== 送信 ==
{'op': 1, 'd': None}
== 受信 ==
{'d': None, 'op': 11, 's': None, 't': None}
== 送信 ==
{'op': 1, 'd': None}
== 受信 ==
{'d': None, 'op': 11, 's': None, 't': None}
...

これを行わないと40秒経過後にDiscordのGatewayから接続の解除が行われてしまいます.ただし単にHeartbeatのみ行っている場合,Discord側から再接続の要求がくる場合があります.とりあえずここでは,再接続等の処理は行わないことにします.

op2 Identify

続いて接続者情報をGatewayに知らせるためにBotのトークンを送信する必要があります.op2でこれらの情報を送信しますがペイロードにはBotのトークンのほか簡易的な接続者情報をpropertiesに付加します.また,大規模Botを運用しておりShardingを行っている場合にはさらに追加の処理が必要となりますがここでは小規模なBotでShardingを使っていない前提で処理を行います.

class Gateway:
    def __init__(self, loop=None):
        if loop is None:
            self.loop = asyncio.get_event_loop()
        else:
            self.loop = loop
        self.endpoint = 'wss://gateway.discord.gg/?v=6&encoding=json'
        self.loop.create_task(self.receive_data())
        self.identified = asyncio.Event()

    async def receive_data(self):
        async with aiohttp.ClientSession() as session:
            self.socket = await session.ws_connect(self.endpoint)
            while True:
                packet = await self.socket.receive()
                if packet.type in (aiohttp.WSMsgType.CLOSED,
                                   aiohttp.WSMsgType.CLOSING,
                                   aiohttp.WSMsgType.CLOSE,
                                   aiohttp.WSMsgType.ERROR):
                    print('== 接続終了 ==')
                    print(packet)
                    break
                print('== 受信 ==')
                pprint(json.loads(packet.data))
                await self.handle_message(json.loads(packet.data))
            if hasattr(self, 'heartbeat'):
                self.heartbeat.stop()

    async def identify(self):
        payload = {
            'op': 2,
            'd': {
                'token': 'BOT_TOKEN',
                'properties': {
                    '$os': 'linux',
                    '$browser': 'python',
                    '$device': 'python',
                },
                'v': 3
            }
        }
        print('== 送信 ==')
        print(payload)
        await self.socket.send_json(payload)
        self.identified.set()

    async def handle_message(self, msg):
        op = msg.get('op')
        d = msg.get('d')
        t = msg.get('t')
        if op == 10:
            self.heartbeat = GatewayHeartbeat(
                self, d['heartbeat_interval'] / 1000
            )
            self.heartbeat.start()
            await self.identify()
            return

Botのトークンを送信することで認証が行われ,Botの情報やBotを導入したサーバーの情報が受信されるようになります.様々な情報が送信されるため見落としがちになりますがop=0t=READYという情報が送信されていれば,Gatewayを使って相互通信する準備が完了したことになります.また,dの中にあるsession_idは音声接続の際に使用しますので保管しておきます.

{'d': {
       ...
       'session_id': 'f0d7bba081bc0df51e43c1eef8092adcb',
       ... 
      },
 'op': 0,
 's': 1,
 't': 'READY'}

Voice Gatewayへの接続

op4 Gateway Voice State Update

音声Gatewayに接続するための情報を得るためには通常のGatewayにop=4でその接続したい旨を送信する必要があります.

op=4ではサーバーと音声チャンネルのIDと,自身のミュート状態を指定してGatewayに送信します.それによりサーバーが使用している音声GatewayのエンドポイントURLを入手することができます.

class Gateway:
    ...

    async def voice_state_update(self):
        payload = {
            'op': 4,
            'd': {
                'guild_id': '705...',
                'channel_id': '706...',
                "self_mute": False,  # ミュートするか否か
                "self_deaf": False,  # スピーカーミュートするか否か
            }
        }
        print('== 送信 ==')
        print(payload)
        await self.socket.send_json(payload)

    async def handle_message(self, msg):
        op = msg.get('op')
        d = msg.get('d')
        t = msg.get('t')
        if op == 10:
            self.heartbeat = GatewayHeartbeat(
                self, d['heartbeat_interval'] / 1000
            )
            self.heartbeat.start()
            await self.identify()
            return
        if op == 0:
            if t == 'READY':
                self.session_id = d['session_id']
                await self.voice_state_update()

これを実行するとBotが音声チャンネルに接続され,以下の2つのデータが受信されます.

== 受信 ==
{'d': {'channel_id': '705...',
       'deaf': False,
       'guild_id': '706...',
       'member': ...,
       'mute': False,
       'self_deaf': False,
       'self_mute': False,
       'self_video': False,
       'session_id': 'f0d7bba081bc0df51e43c1eef8092adcb',
       'suppress': False,
       'user_id': '743...'},
 'op': 0,
 's': 3,
 't': 'VOICE_STATE_UPDATE'}
== 受信 ==
{'d': {'endpoint': 'japan396.discord.media:80',
       'guild_id': '705...',
       'token': '0123456789abcdef'},
 'op': 0,
 's': 4,
 't': 'VOICE_SERVER_UPDATE'}

下のVOICE_SERVER_UPDATEendpointが音声Gatewayのエンドポイントであり,認証トークンとしてtokenを使用します.

op3 Heartbeat

ここから,音声Gatewayと通信を開始します.

先ほど得られたエンドポイントに新たにWebSocket通信するためのクラスを作成します.

class Gateway:
    ...

    async def handle_message(self, msg):
        op = msg.get('op')
        d = msg.get('d')
        t = msg.get('t')
        if op == 10:
            self.heartbeat = GatewayHeartbeat(
                self, d['heartbeat_interval'] / 1000
            )
            self.heartbeat.start()
            await self.identify()
            return
        if op == 0:
            if t == 'READY':
                self.session_id = d['session_id']
                await self.voice_state_update()
            if t == 'VOICE_SERVER_UPDATE':
                self.voice_endpoint = d['endpoint']
                self.token = d['token']
                self.voice_gw = VoiceGateway(self, self.loop)


class VoiceGateway:
    def __init__(self, gateway, loop=None):
        self.gateway = gateway
        if loop is None:
            self.loop = asyncio.get_event_loop()
        else:
            self.loop = loop
        self.endpoint = f'wss://{gateway.voice_endpoint.replace(":80", "")}/?v=4'
        self.loop.create_task(self.receive_data())
        self.identified = asyncio.Event()

    async def receive_data(self):
        async with aiohttp.ClientSession() as session:
            self.socket = await session.ws_connect(self.endpoint)
            while True:
                packet = await self.socket.receive()
                if packet.type in (aiohttp.WSMsgType.CLOSED,
                                   aiohttp.WSMsgType.CLOSING,
                                   aiohttp.WSMsgType.CLOSE,
                                   aiohttp.WSMsgType.ERROR):
                    print('** 接続終了 **')
                    print(packet)
                    break
                print('** 受信 **')
                pprint(json.loads(packet.data))
                await self.handle_message(json.loads(packet.data))

    async def handle_message(self, msg):
        pass

これを実行しうまく動作すると,はじめのGatewayと同様にHeartbeatの間隔を返信してきます.

== 受信 ==
{'d': ...
 'op': 0,
 's': 5,
 't': 'VOICE_SERVER_UPDATE'}
** 受信 **
{'d': {'heartbeat_interval': 13750.25, 'v': 4}, 'op': 8}

接続を維持するため,今度はop=3でHeartbeatを送信します.データとしてはタイムスタンプを与えます.

import json
import asyncio
import aiohttp
import threading
import time # <- 追加
from pprint import pprint


class VoiceGatewayHeartbeat(HeartbeatHandler):
    def __init__(self, ws, interval):
        super().__init__(ws, interval)

    def get_payload(self):
        # ミリ秒単位の時間を'd'に設定
        return {'op': 3, 'd': time.time_ns()//1000}

class VoiceGateway:
    ...

    async def handle_message(self, msg):
        op = msg.get('op')
        d = msg.get('d')
        t = msg.get('t')
        if op == 8:
            self.heartbeat = VoiceGatewayHeartbeat(
                self, d['heartbeat_interval'] / 1000
            )
            return

このHeartbeatはBotの認証を行った後に動かす必要があるため,通信は開始せずこのままにしておきます.

op0 Identify

音声Gatewayで認証を行うにはサーバーID,BotユーザーのID,session_id,tokenをペイロードにしてop=0で送信します.

class VoiceGateway:
    ...

    async def identify(self):
        payload = {
            'op': 0,
            'd': {
                'token': self.gateway.token,
                'user_id': '743853432007557210',
                'server_id': '705052322761277540',
                'session_id': self.gateway.session_id,
            }
        }
        print('** 送信 **')
        print(payload)
        await self.socket.send_json(payload)

    async def handle_message(self, msg):
        op = msg.get('op')
        d = msg.get('d')
        t = msg.get('t')
        if op == 8:
            await self.identify()
            self.heartbeat = VoiceGatewayHeartbeat(
                self, d['heartbeat_interval'] / 1000
            )
            self.heartbeat.start()
            return

上手く認証されるとop2 Readyを受信します.それぞれip,portは音声情報を得るためのアドレス,modesはDiscordが対応している音声暗号化方式,ssrcが識別子に相当します.

** 受信 **
{'d': {'heartbeat_interval': 13750.25, 'v': 4}, 'op': 8}
** 送信 **
{'op': 0, 'd': {'token': '871d40956f7cf34a', 'user_id': '743853432007557210', 'server_id': '705052322761277540', 'session_id': 'c412a670dbed864b559a25009459f15a'}}
== 送信 ==
{'op': 3, 'd': 1598314493140616}
** 受信 **
{'d': {'experiments': ['bwe_conservative_link_estimate',
                       'bwe_remote_locus_client'],
       'ip': '123.123.123.123',
       'modes': ['aead_aes256_gcm',
                 'xsalsa20_poly1305_lite',
                 'xsalsa20_poly1305_suffix',
                 'xsalsa20_poly1305'],
       'port': 50004,
       'ssrc': 364117},
 'op': 2}
** 受信 **
{'d': 1598314493140616, 'op': 6}
== 送信 ==
{'op': 3, 'd': 1598314506891112}
** 受信 **
{'d': 1598314506891112, 'op': 6}

UDP接続

先ほどの通信で得られたIPにUDP接続をして音声データを得るのですが,このIPはNATを通じて難読化されているので,外部公開されているアドレスとポートを取得する必要があります.その取得のためには,以下のようなUDPパケットを先ほどのip,portのサーバーに送信します.

フィールド 説明 サイズ
タイプ 0x1 2バイト
長さ 70 2バイト
SSRC 符号なし整数 4バイト
IPアドレス asciiコード(余剰分は0x0(ヌル文字)詰めする.送信時は0でおk) 64バイト
ポート 符号なし整数(送信時は0でおk) 2バイト

これを送信すると,IPとポートにデータが入った同じ74バイトのパケットが送信されるので,このパケットからIPとポートの情報を取得します.

import json
import asyncio
import aiohttp
import threading
import time
import socket # <- 追加
import struct # <- 追加
from pprint import pprint

class VoiceGateway:
    ...

    async def ip_discovering(self):
        self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.udp.setblocking(False)
        packet = bytearray(74)
        packet[:2] = struct.pack('>H', 1)
        packet[2:4] = struct.pack('>H', 70)
        packet[4:8] = struct.pack('>I', self.ssrc)
        self.udp.sendto(bytes(packet), (self.ip, self.port))
        data = await self.loop.sock_recv(self.udp, 2048)
        self.external_ip, self.external_port = struct.unpack_from(
            '>64sH', data, 8
        )
        self.external_ip = self.external_ip.decode(encoding='ascii').rstrip('\x00')
        print(self.external_ip, self.external_port)

    async def handle_message(self, msg):
        op = msg.get('op')
        d = msg.get('d')
        t = msg.get('t')
        if op == 8:
            await self.identify()
            self.heartbeat = VoiceGatewayHeartbeat(
                self, d['heartbeat_interval'] / 1000
            )
            self.heartbeat.start()
            return
        if op == 2:
            self.ip = d['ip']
            self.port = d['port']
            self.modes = d['modes']
            self.ssrc = d['ssrc']
            await self.ip_discovering()

UDPのパケットデータの作成にはstruct(標準パッケージ)を使用しています.これを実行するとイベントループを利用してUDPパケットが受信されてコンソールにIPとポートが出力されます.

** 受信 **
{'d': ...,
 'op': 2}
201.158.201.158 54345

このような面倒なプロセスを踏む理由は,暗号化された音声を復号するためのカギを取得する為です.この過程で得られた外部IPとポートを音声Gatewayに送信することで返信として,復号化のカギを入手することができます.音声暗号化にはlibsodiumと呼ばれるものを使用しており,Pythonの場合PyNaClパッケージを追加することでlibsodiumを利用した暗号化と復号が行えます.

op1 Select Protocol

libsodiumで使用する鍵を手に入れられるようにします.op1のPayload中のmodeは先ほどop2で得られたmodesの中から暗号化方式をいずれか選択する必要があるのですが,ここでは一貫してxsalsa20_poly1305を利用するものとします.op1を送信すると返信としてop4 Session Descriptionが送信されます.このペイロードのなかに復号のための鍵があるのでそれを取り出します.

class VoiceGateway:
    ...

    async def select_protocol(self):
        payload = {
            'op': 1,
            'd': {
                'protocol': 'udp',
                'data': {
                    'address': self.external_ip,
                    'port': self.external_port,
                    'mode': 'xsalsa20_poly1305'
                }
            }
        }
        print('** 送信 **')
        print(payload)
        await self.socket.send_json(payload)

    async def receive_audio_packet(self):
        while True:
            data = await self.loop.sock_recv(self.udp, 2048)
            print('** 音声受信 **')
            print(data)

    async def handle_message(self, msg):
        op = msg.get('op')
        d = msg.get('d')
        t = msg.get('t')
        if op == 8:
            await self.identify()
            self.heartbeat = VoiceGatewayHeartbeat(
                self, d['heartbeat_interval'] / 1000
            )
            self.heartbeat.start()
            return
        if op == 2:
            self.ip = d['ip']
            self.port = d['port']
            self.modes = d['modes']
            self.ssrc = d['ssrc']
            await self.ip_discovering()
            await self.select_protocol()
        if op == 4:
            self.secret_key = d['secret_key']
            self.loop.create_task(self.receive_audio_packet())

op4を受信後,UDPソケットに音声データが送信されるようになるのでcreate_taskを行い音声データを受信するタスクを開始しています.

** 送信 **
{'op': 1, 'd': {'protocol': 'udp', 'data': {'address': '106.73.199.128', 'port': 42057, 'mode': 'xsalsa20_poly1305'}}}
** 受信 **
{'d': {'audio_codec': 'opus',
       ...
       'mode': 'xsalsa20_poly1305',
       'secret_key': [244,
                      157,
                      ...
                      214],
       'video_codec': None},
 'op': 4}
** 音声受信 **
b'\x81\xc9\x00\x07\x00\x07\xdd(\x9fI\xb9\xd6\x00G\xce\xa2\xa4\x85M[\xed\xd3\x0fu\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xca\xa9\xec'
** 音声受信 **
b'\x81\xc9\x00\x07\x00\x07\xdd(\x00\x9c^\x83\x90\xc5V\xafX\xff\x14\x97\xf5\xf1/\xad\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xcb\xa9\x02'
** 音声受信 **
b'\x81\xc9\x00\x07\x00\x07\xdd(j\x88B\\O\xd0\rs`\xc1_\x92\xc6\xe6\xe7=\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xc8\xa9\xfd'
** 音声受信 **
b'\x81\xc9\x00\x07\x00\x07\xdd(\x05\x02\xf56\x8a\x13\x9e\xc2\xb6\x8c,\xe6r5\x0e\n\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xc9\xa9\x14'

Discordの音声の送受信に使用されるプロトコルはRTPとRTCPです.それぞれ音声データが格納されているパケットはRTPパケットであり一度に20ms分の音声データを送信し,RTCPパケットはそれらの音声データに関する補助的な情報を送信しています.

RTPとRTCPの見分け方は,パケットの2バイト目の値に着目します.プロトコルの定義上RTCPは2バイト目が200~204の範囲となるのでそこで見分けることが可能です.

また,RTPのヘッダ長を計算するにはX=1バイト目の4ビット目,CC=1バイト目の5-8ビット目に着目します.それぞれのビットの役割の説明は致しませんが,

$X=0$であれば

$$
header\_length=12+4\times CC
$$

$X=1$であれば

$$
header\_length=16+4\times CC+4\times len(EX\_header)
$$

のようにして計算できます.len(EX_header)は14+4×CCバイト目からの2バイトの値に相当する,追加ヘッダ長を表す値です.

詳しくはWikipediaに乗っている表などを参考にしてください.

今回は,RTPヘッダのうち音声の送信時間であるTimestampのみを取得できれば問題ないので,APIリファレンスを参考に4-8バイト目を取り出します.

これで音声データはとりあえず取り出せるようになりました.

おわりに

これらの情報からRTPパケットからペイロードとヘッダを分割することができるようになりましたが,問題はまだいくつかあります.

  • 送られるデータは暗号化されたデータであること
  • 送られた音声はOpusという形式のストリーミング音声であること

前者は,鍵をすでに手に入れているためこれをもとに復号化すればよいことになります.後者は少し複雑な処理が必要でlibopusというCのライブラリをPythonから利用できるようにし,そのデコード関数を呼べば通常のWavのデータとして保存することが可能になります.

この2つをどうにかクリアできれば音声データを保存することが可能になります.次回はどのようにデータを取り出せばいいかという今回の低いレイヤから得た知識をもとに既存のdiscord.pyを拡張し,音声データを保存するようにします.

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
9
Help us understand the problem. What are the problem?