はじめに
この記事は前回Pythonで始める録音機能付きDiscord Bot: (4) 音楽ファイルを再生するの続きです.
この記事では音声録音の実装を始める上での準備段階としてDiscord APIを利用した音声通信APIを叩いてみて,どのように通信が行われているかを把握してみます.
記事は全7回を予定しており現在5記事まで執筆を終えています.
- Pythonで始める録音機能付きDiscord Bot: (1) 入門 discord.py
- Pythonで始める録音機能付きDiscord Bot: (2) 便利機能(Bot拡張,Cog,Embed)
- Pythonで始める録音機能付きDiscord Bot: (3) Databaseとの連携
- Pythonで始める録音機能付きDiscord Bot: (4) 音楽ファイルを再生する
- Pythonで始める録音機能付きDiscord Bot: (5) Discord APIを直接操作する
タグから初心者
を外しているように,以下の処理は少し面倒で主に低レイヤの部分をいじっています.
通信の流れ
Discordで音声の送受信を行う際の通信の概略図を以下に示します.
概略図のため詳しい説明は省いていますが様々なプロセスを経て接続が行われているということがつかめればよいかと思います.
この流れをdiscord.pyを用いることなく実装し,Discordの音声の送受信を詳しく取り扱います.
以降の情報はすべて公式リファレンス(音声接続のGateway,通常のGateway)をもとに記述しています.
Gatewayへの接続
DiscordのGatewayは通常のGateway音声に関する情報を送受信するGatewayがあります.音声Gatewayに接続するためのエンドポイントURLを得るためにはまず,通常のGatewayで認証を行った後音声Gatewayに接続するための情報を送信します.
まずはPythonでWebSocket接続を試みるためのスクリプトを作成します.
op10 Hello
import json
import asyncio
import aiohttp
from pprint import pprint
class Gateway:
def __init__(self, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self.endpoint = 'wss://gateway.discord.gg/?v=6&encoding=json'
loop.create_task(self.receive_data())
async def receive_data(self):
async with aiohttp.ClientSession() as session:
socket = await session.ws_connect(self.endpoint)
while True:
packet = await socket.receive()
if packet.type in (aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSING,
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.ERROR):
print(packet)
print('== 接続終了 ==')
break
pprint(json.loads(packet.data))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
ws = Gateway(loop)
loop.run_forever()
これを実行するとGatewayに接続し,Gatewayから受け取ったデータを逐一表示するためのコルーチンreceive_data
がGateway中でcreate_task
関数から呼び出され処理を開始します.これを実行すると,DiscordのGatewayから以下のようなデータが送信されます.
{'d': {'_trace': ['["gateway-prd-main-xwmj",{"micros":0.0}]'],
'heartbeat_interval': 41250},
'op': 10,
's': None,
't': None}
Discordから送信されるデータはd
とop
,まれにt
を使用してデータを表現します.op
ではデータの種類が,d
ではデータの情報本体が格納されます.t
は基本的にNone
ですが,より詳細に情報を伝える必要がある場合にはその情報の詳細を伝える文字列になります.
op1 Heartbeat
ここではop=10
です.これはHello
と呼ばれる返信で,名前の通り始めて接続した際に送られるデータです.Helloで重要なデータはheartbeat_interval
です.ここでは41250
となっていますが,これは指定されたミリ秒(41.25秒)ごとにGatewayが継続して接続できていることを伝えるためHeartbeat
と呼ばれる簡単なデータを送信する必要があります.このHeartbeat処理を行う補助クラスとしてthreading.Thread
を継承したクラスを作成します.run
関数に所望の処理を書きインスタンスからstart関数を呼ぶことで,別スレッドでの処理が行われるようになります.
import json
import asyncio
import aiohttp
import threading
from pprint import pprint
class HeartbeatHandler(threading.Thread):
def __init__(self, ws, interval):
self.ws = ws
self.interval = interval
self.stop_ev = threading.Event()
super().__init__()
def run(self):
self.send()
while not self.stop_ev.wait(self.interval):
self.send()
def send(self):
data = self.get_payload()
asyncio.run_coroutine_threadsafe(
self.ws.socket.send_json(data),
self.ws.loop
)
print('== 送信 ==')
print(data)
def stop(self):
self.stop_ev.set()
def get_payload(self):
raise NotImplementedError
class GatewayHeartbeat(HeartbeatHandler):
def __init__(self, ws, interval):
super().__init__(ws, interval)
def get_payload(self):
return {'op': 1, 'd': None}
class Gateway:
def __init__(self, loop=None):
if loop is None:
self.loop = asyncio.get_event_loop()
else:
self.loop = loop
self.endpoint = 'wss://gateway.discord.gg/?v=6&encoding=json'
self.loop.create_task(self.receive_data())
async def receive_data(self):
async with aiohttp.ClientSession() as session:
self.socket = await session.ws_connect(self.endpoint)
while True:
packet = await self.socket.receive()
if packet.type in (aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSING,
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.ERROR):
print(packet)
print('== 接続終了 ==')
break
print('== 受信 ==')
pprint(json.loads(packet.data))
await self.handle_message(json.loads(packet.data))
if hasattr(self, 'heartbeat'):
self.heartbeat.stop()
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 10:
self.heartbeat = GatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
return
if __name__ == "__main__":
loop = asyncio.get_event_loop()
ws = Gateway(loop)
loop.run_forever()
run_coroutine_threadsafe
という**如何にも!!**な関数があるのでそれを使用します.これを実行すると40秒ごとに相互に通信している様子が出力されます.
== 受信 ==
{'d': {'_trace': ['["gateway-prd-main-w7j9",{"micros":0.0}]'],
'heartbeat_interval': 41250},
'op': 10,
's': None,
't': None}
== 送信 ==
{'op': 1, 'd': None}
== 受信 ==
{'d': None, 'op': 11, 's': None, 't': None}
== 送信 ==
{'op': 1, 'd': None}
== 受信 ==
{'d': None, 'op': 11, 's': None, 't': None}
...
これを行わないと40秒経過後にDiscordのGatewayから接続の解除が行われてしまいます.ただし単にHeartbeatのみ行っている場合,Discord側から再接続の要求がくる場合があります.とりあえずここでは,再接続等の処理は行わないことにします.
op2 Identify
続いて接続者情報をGatewayに知らせるためにBotのトークンを送信する必要があります.op2でこれらの情報を送信しますがペイロードにはBotのトークンのほか簡易的な接続者情報をproperties
に付加します.また,大規模Botを運用しておりSharding
を行っている場合にはさらに追加の処理が必要となりますがここでは小規模なBotでShardingを使っていない前提で処理を行います.
class Gateway:
def __init__(self, loop=None):
if loop is None:
self.loop = asyncio.get_event_loop()
else:
self.loop = loop
self.endpoint = 'wss://gateway.discord.gg/?v=6&encoding=json'
self.loop.create_task(self.receive_data())
self.identified = asyncio.Event()
async def receive_data(self):
async with aiohttp.ClientSession() as session:
self.socket = await session.ws_connect(self.endpoint)
while True:
packet = await self.socket.receive()
if packet.type in (aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSING,
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.ERROR):
print('== 接続終了 ==')
print(packet)
break
print('== 受信 ==')
pprint(json.loads(packet.data))
await self.handle_message(json.loads(packet.data))
if hasattr(self, 'heartbeat'):
self.heartbeat.stop()
async def identify(self):
payload = {
'op': 2,
'd': {
'token': 'BOT_TOKEN',
'properties': {
'$os': 'linux',
'$browser': 'python',
'$device': 'python',
},
'v': 3
}
}
print('== 送信 ==')
print(payload)
await self.socket.send_json(payload)
self.identified.set()
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 10:
self.heartbeat = GatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
await self.identify()
return
Botのトークンを送信することで認証が行われ,Botの情報やBotを導入したサーバーの情報が受信されるようになります.様々な情報が送信されるため見落としがちになりますがop=0
でt=READY
という情報が送信されていれば,Gatewayを使って相互通信する準備が完了したことになります.また,d
の中にあるsession_id
は音声接続の際に使用しますので保管しておきます.
{'d': {
...
'session_id': 'f0d7bba081bc0df51e43c1eef8092adcb',
...
},
'op': 0,
's': 1,
't': 'READY'}
Voice Gatewayへの接続
op4 Gateway Voice State Update
音声Gatewayに接続するための情報を得るためには通常のGatewayにop=4
でその接続したい旨を送信する必要があります.
op=4
ではサーバーと音声チャンネルのIDと,自身のミュート状態を指定してGatewayに送信します.それによりサーバーが使用している音声GatewayのエンドポイントURLを入手することができます.
class Gateway:
...
async def voice_state_update(self):
payload = {
'op': 4,
'd': {
'guild_id': '705...',
'channel_id': '706...',
"self_mute": False, # ミュートするか否か
"self_deaf": False, # スピーカーミュートするか否か
}
}
print('== 送信 ==')
print(payload)
await self.socket.send_json(payload)
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 10:
self.heartbeat = GatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
await self.identify()
return
if op == 0:
if t == 'READY':
self.session_id = d['session_id']
await self.voice_state_update()
これを実行するとBotが音声チャンネルに接続され,以下の2つのデータが受信されます.
== 受信 ==
{'d': {'channel_id': '705...',
'deaf': False,
'guild_id': '706...',
'member': ...,
'mute': False,
'self_deaf': False,
'self_mute': False,
'self_video': False,
'session_id': 'f0d7bba081bc0df51e43c1eef8092adcb',
'suppress': False,
'user_id': '743...'},
'op': 0,
's': 3,
't': 'VOICE_STATE_UPDATE'}
== 受信 ==
{'d': {'endpoint': 'japan396.discord.media:80',
'guild_id': '705...',
'token': '0123456789abcdef'},
'op': 0,
's': 4,
't': 'VOICE_SERVER_UPDATE'}
下のVOICE_SERVER_UPDATE
のendpoint
が音声Gatewayのエンドポイントであり,認証トークンとしてtoken
を使用します.
op3 Heartbeat
ここから,音声Gatewayと通信を開始します.
先ほど得られたエンドポイントに新たにWebSocket通信するためのクラスを作成します.
class Gateway:
...
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 10:
self.heartbeat = GatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
await self.identify()
return
if op == 0:
if t == 'READY':
self.session_id = d['session_id']
await self.voice_state_update()
if t == 'VOICE_SERVER_UPDATE':
self.voice_endpoint = d['endpoint']
self.token = d['token']
self.voice_gw = VoiceGateway(self, self.loop)
class VoiceGateway:
def __init__(self, gateway, loop=None):
self.gateway = gateway
if loop is None:
self.loop = asyncio.get_event_loop()
else:
self.loop = loop
self.endpoint = f'wss://{gateway.voice_endpoint.replace(":80", "")}/?v=4'
self.loop.create_task(self.receive_data())
self.identified = asyncio.Event()
async def receive_data(self):
async with aiohttp.ClientSession() as session:
self.socket = await session.ws_connect(self.endpoint)
while True:
packet = await self.socket.receive()
if packet.type in (aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSING,
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.ERROR):
print('** 接続終了 **')
print(packet)
break
print('** 受信 **')
pprint(json.loads(packet.data))
await self.handle_message(json.loads(packet.data))
async def handle_message(self, msg):
pass
これを実行しうまく動作すると,はじめのGatewayと同様にHeartbeatの間隔を返信してきます.
== 受信 ==
{'d': ...
'op': 0,
's': 5,
't': 'VOICE_SERVER_UPDATE'}
** 受信 **
{'d': {'heartbeat_interval': 13750.25, 'v': 4}, 'op': 8}
接続を維持するため,今度はop=3
でHeartbeatを送信します.データとしてはタイムスタンプを与えます.
import json
import asyncio
import aiohttp
import threading
import time # <- 追加
from pprint import pprint
class VoiceGatewayHeartbeat(HeartbeatHandler):
def __init__(self, ws, interval):
super().__init__(ws, interval)
def get_payload(self):
# ミリ秒単位の時間を'd'に設定
return {'op': 3, 'd': time.time_ns()//1000}
class VoiceGateway:
...
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 8:
self.heartbeat = VoiceGatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
return
このHeartbeatはBotの認証を行った後に動かす必要があるため,通信は開始せずこのままにしておきます.
op0 Identify
音声Gatewayで認証を行うにはサーバーID,BotユーザーのID,session_id
,token
をペイロードにしてop=0
で送信します.
class VoiceGateway:
...
async def identify(self):
payload = {
'op': 0,
'd': {
'token': self.gateway.token,
'user_id': '743853432007557210',
'server_id': '705052322761277540',
'session_id': self.gateway.session_id,
}
}
print('** 送信 **')
print(payload)
await self.socket.send_json(payload)
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 8:
await self.identify()
self.heartbeat = VoiceGatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
return
上手く認証されるとop2 Ready
を受信します.それぞれip
,port
は音声情報を得るためのアドレス,modes
はDiscordが対応している音声暗号化方式,ssrc
が識別子に相当します.
** 受信 **
{'d': {'heartbeat_interval': 13750.25, 'v': 4}, 'op': 8}
** 送信 **
{'op': 0, 'd': {'token': '871d40956f7cf34a', 'user_id': '743853432007557210', 'server_id': '705052322761277540', 'session_id': 'c412a670dbed864b559a25009459f15a'}}
== 送信 ==
{'op': 3, 'd': 1598314493140616}
** 受信 **
{'d': {'experiments': ['bwe_conservative_link_estimate',
'bwe_remote_locus_client'],
'ip': '123.123.123.123',
'modes': ['aead_aes256_gcm',
'xsalsa20_poly1305_lite',
'xsalsa20_poly1305_suffix',
'xsalsa20_poly1305'],
'port': 50004,
'ssrc': 364117},
'op': 2}
** 受信 **
{'d': 1598314493140616, 'op': 6}
== 送信 ==
{'op': 3, 'd': 1598314506891112}
** 受信 **
{'d': 1598314506891112, 'op': 6}
UDP接続
先ほどの通信で得られたIPにUDP接続をして音声データを得るのですが,このIPはNATを通じて難読化されているので,外部公開されているアドレスとポートを取得する必要があります.その取得のためには,以下のようなUDPパケットを先ほどのip
,port
のサーバーに送信します.
フィールド | 説明 | サイズ |
---|---|---|
タイプ | 0x1 | 2バイト |
長さ | 70 | 2バイト |
SSRC | 符号なし整数 | 4バイト |
IPアドレス | asciiコード(余剰分は0x0 (ヌル文字)詰めする.送信時は0でおk) |
64バイト |
ポート | 符号なし整数(送信時は0でおk) | 2バイト |
これを送信すると,IPとポートにデータが入った同じ74バイトのパケットが送信されるので,このパケットからIPとポートの情報を取得します.
import json
import asyncio
import aiohttp
import threading
import time
import socket # <- 追加
import struct # <- 追加
from pprint import pprint
class VoiceGateway:
...
async def ip_discovering(self):
self.udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp.setblocking(False)
packet = bytearray(74)
packet[:2] = struct.pack('>H', 1)
packet[2:4] = struct.pack('>H', 70)
packet[4:8] = struct.pack('>I', self.ssrc)
self.udp.sendto(bytes(packet), (self.ip, self.port))
data = await self.loop.sock_recv(self.udp, 2048)
self.external_ip, self.external_port = struct.unpack_from(
'>64sH', data, 8
)
self.external_ip = self.external_ip.decode(encoding='ascii').rstrip('\x00')
print(self.external_ip, self.external_port)
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 8:
await self.identify()
self.heartbeat = VoiceGatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
return
if op == 2:
self.ip = d['ip']
self.port = d['port']
self.modes = d['modes']
self.ssrc = d['ssrc']
await self.ip_discovering()
UDPのパケットデータの作成にはstruct
(標準パッケージ)を使用しています.これを実行するとイベントループを利用してUDPパケットが受信されてコンソールにIPとポートが出力されます.
** 受信 **
{'d': ...,
'op': 2}
201.158.201.158 54345
このような面倒なプロセスを踏む理由は,暗号化された音声を復号するためのカギを取得する為です.この過程で得られた外部IPとポートを音声Gatewayに送信することで返信として,復号化のカギを入手することができます.音声暗号化にはlibsodiumと呼ばれるものを使用しており,Pythonの場合PyNaClパッケージを追加することでlibsodiumを利用した暗号化と復号が行えます.
op1 Select Protocol
libsodiumで使用する鍵を手に入れられるようにします.op1のPayload中のmode
は先ほどop2
で得られたmodesの中から暗号化方式をいずれか選択する必要があるのですが,ここでは一貫してxsalsa20_poly1305
を利用するものとします.op1
を送信すると返信としてop4 Session Description
が送信されます.このペイロードのなかに復号のための鍵があるのでそれを取り出します.
class VoiceGateway:
...
async def select_protocol(self):
payload = {
'op': 1,
'd': {
'protocol': 'udp',
'data': {
'address': self.external_ip,
'port': self.external_port,
'mode': 'xsalsa20_poly1305'
}
}
}
print('** 送信 **')
print(payload)
await self.socket.send_json(payload)
async def receive_audio_packet(self):
while True:
data = await self.loop.sock_recv(self.udp, 2048)
print('** 音声受信 **')
print(data)
async def handle_message(self, msg):
op = msg.get('op')
d = msg.get('d')
t = msg.get('t')
if op == 8:
await self.identify()
self.heartbeat = VoiceGatewayHeartbeat(
self, d['heartbeat_interval'] / 1000
)
self.heartbeat.start()
return
if op == 2:
self.ip = d['ip']
self.port = d['port']
self.modes = d['modes']
self.ssrc = d['ssrc']
await self.ip_discovering()
await self.select_protocol()
if op == 4:
self.secret_key = d['secret_key']
self.loop.create_task(self.receive_audio_packet())
op4
を受信後,UDPソケットに音声データが送信されるようになるのでcreate_taskを行い音声データを受信するタスクを開始しています.
** 送信 **
{'op': 1, 'd': {'protocol': 'udp', 'data': {'address': '106.73.199.128', 'port': 42057, 'mode': 'xsalsa20_poly1305'}}}
** 受信 **
{'d': {'audio_codec': 'opus',
...
'mode': 'xsalsa20_poly1305',
'secret_key': [244,
157,
...
214],
'video_codec': None},
'op': 4}
** 音声受信 **
b'\x81\xc9\x00\x07\x00\x07\xdd(\x9fI\xb9\xd6\x00G\xce\xa2\xa4\x85M[\xed\xd3\x0fu\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xca\xa9\xec'
** 音声受信 **
b'\x81\xc9\x00\x07\x00\x07\xdd(\x00\x9c^\x83\x90\xc5V\xafX\xff\x14\x97\xf5\xf1/\xad\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xcb\xa9\x02'
** 音声受信 **
b'\x81\xc9\x00\x07\x00\x07\xdd(j\x88B\\O\xd0\rs`\xc1_\x92\xc6\xe6\xe7=\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xc8\xa9\xfd'
** 音声受信 **
b'\x81\xc9\x00\x07\x00\x07\xdd(\x05\x02\xf56\x8a\x13\x9e\xc2\xb6\x8c,\xe6r5\x0e\n\x15\x89|\xa6W\x1e\xc3U\x06\xc8\xd5S\x8fJ\x08\xfcx\xff\xe9\x83k\xc9\xa9\x14'
Discordの音声の送受信に使用されるプロトコルはRTPとRTCPです.それぞれ音声データが格納されているパケットはRTPパケットであり一度に20ms分の音声データを送信し,RTCPパケットはそれらの音声データに関する補助的な情報を送信しています.
RTPとRTCPの見分け方は,パケットの2バイト目の値に着目します.プロトコルの定義上RTCPは2バイト目が200~204の範囲となるのでそこで見分けることが可能です.
また,RTPのヘッダ長を計算するにはX=1バイト目の4ビット目
,CC=1バイト目の5-8ビット目
に着目します.それぞれのビットの役割の説明は致しませんが,
$X=0$であれば
$$
header\_length=12+4\times CC
$$
$X=1$であれば
$$
header\_length=16+4\times CC+4\times len(EX\_header)
$$
のようにして計算できます.len(EX_header)は14+4×CC
バイト目からの2バイトの値に相当する,追加ヘッダ長を表す値です.
詳しくはWikipediaに乗っている表などを参考にしてください.
今回は,RTPヘッダのうち音声の送信時間であるTimestamp
のみを取得できれば問題ないので,APIリファレンスを参考に4-8バイト目を取り出します.
これで音声データはとりあえず取り出せるようになりました.
おわりに
これらの情報からRTPパケットからペイロードとヘッダを分割することができるようになりましたが,問題はまだいくつかあります.
- 送られるデータは暗号化されたデータであること
- 送られた音声はOpusという形式のストリーミング音声であること
前者は,鍵をすでに手に入れているためこれをもとに復号化すればよいことになります.後者は少し複雑な処理が必要でlibopus
というCのライブラリをPythonから利用できるようにし,そのデコード関数を呼べば通常のWavのデータとして保存することが可能になります.
この2つをどうにかクリアできれば音声データを保存することが可能になります.次回はどのようにデータを取り出せばいいかという今回の低いレイヤから得た知識をもとに既存のdiscord.pyを拡張し,音声データを保存するようにします.