この記事は筆者オンリーのAdvent Calendar 202518日目の記事です。
12/17 は SDP(PCMU/8000固定)の実装位置を整理した。今日は、その続きとして RTP受信→PCMUデコード→録音(mixed.wav/meta.json) が現状どう流れているかを実装ベースで固定する。
対象は「受信して音が取れる」まで。送信(TTS→RTP)は別日。
全体の流れ(MVPのRTP受信)
UAC → RTP(PCMU/8000)
↓ UDP受信
transport/packet.rs: run_rtp_udp_loop
↓ RawRtp
rtp/rx.rs: RtpReceiver::handle_raw(RTPパース+簡易ジッタ+PTチェック)
↓ SessionIn::MediaRtpIn
session/session.rs(録音バッファリング)
↓
media/Recorder(mixed.wav + meta.json を出力)
1) RTP受信の入口(transport)
// src/transport/packet.rs:67-
async fn run_rtp_udp_loop(sock: UdpSocket, rtp_rx: RtpReceiver) -> std::io::Result<()> {
let local_port = sock.local_addr()?.port();
println!("[packet] RTP socket bound on port {}", local_port);
let mut buf = vec![0u8; 2048];
loop {
let (len, src) = sock.recv_from(&mut buf).await?;
let data = buf[..len].to_vec();
let raw = RawRtp { src, dst_port: local_port, data };
// rtpレイヤへ委譲(解析と session への転送)
rtp_rx.handle_raw(raw);
}
}
- UDPを読むだけで、RTPの中身は
rtp::rxに委譲。 -
dst_portをRtpPortMapで引いて call_id を見つける。
2) RTPパースと簡易ジッタ処理(rtp::rx)
// src/rtp/rx.rs:24-
pub fn handle_raw(&self, raw: RawRtp) {
// RTCPは早期return
if is_rtcp_packet(&raw.data) { ... }
let call_id_opt = { self.rtp_port_map.lock().unwrap().get(&raw.dst_port).cloned() };
if let Some(call_id) = call_id_opt {
let sess_tx_opt = { self.session_map.lock().unwrap().get(&call_id).cloned() };
if let Some(sess_tx) = sess_tx_opt {
match parse_rtp_packet(&raw.data) {
Ok(pkt) => {
match classify_payload(pkt.payload_type) {
Ok(PayloadKind::Pcmu) => {}
Err(err) => { warn!("[rtp recv] unsupported payload type {}", err.0); return; }
}
if !self.should_accept(&call_id, pkt.sequence_number) {
warn!("[rtp recv] drop late/dup seq={}", pkt.sequence_number);
return;
}
let _ = sess_tx.send(SessionIn::MediaRtpIn {
ts: pkt.timestamp,
payload: pkt.payload,
});
}
Err(e) => { warn!("[rtp recv] RTP parse error: {:?}", e); }
}
}
} else {
warn!("[rtp recv] RTP on port {} without call_id mapping", raw.dst_port);
}
}
- パース本体は
rtp/parser.rs(12バイトヘッダを読む簡易版、拡張ヘッダは未対応)。 - PayloadType は
payload.rsで0 => PCMUのみ許可。 - 簡易ジッタ:
should_acceptで前回 Seq から大きく逆行/重複するものを捨てる(MAX_REORDER=50)。
3) PCMU→PCM と録音(media::Recorder)
// src/media/mod.rs:68-
pub fn push_rx_mulaw(&mut self, pcm_mulaw: &[u8]) { self.push_mulaw(pcm_mulaw); }
fn push_mulaw(&mut self, pcm_mulaw: &[u8]) {
if let Some(w) = self.writer.as_mut() {
for &b in pcm_mulaw {
let _ = w.write_sample(mulaw_to_linear16(b));
self.samples_written += 1;
}
}
}
// μ-law → 16bit PCM
fn mulaw_to_linear16(mu: u8) -> i16 { ... }
// src/media/mod.rs:41-
pub fn start(&mut self) -> Result<()> {
create_dir_all(&self.dir)?;
let spec = WavSpec { channels: 1, sample_rate: 8000, bits_per_sample: 16, sample_format: Int };
let writer = WavWriter::create(self.dir.join("mixed.wav"), spec)?;
self.writer = Some(writer);
self.started_at = Some(SystemTime::now());
Ok(())
}
pub fn stop(&mut self) -> Result<()> {
if let Some(writer) = self.writer.take() { writer.finalize()?; }
self.write_meta()
}
-
mixed.wavに μ-law を 16bit PCM へ起こして追記。サンプルレート 8000, Mono。 -
stop時にmeta.json(callId, startedAt, durationSec, sampleRate, channels, files)を出力。
4) セッションでの受信処理とバッファリング
// src/session/session.rs:209-
(SessState::Established, SessionIn::MediaRtpIn { payload, .. }) => {
self.recorder.push_rx_mulaw(&payload);
if let Some(start) = self.capture_started {
self.capture_payloads.extend_from_slice(&payload);
if start.elapsed() >= Duration::from_secs(10) {
let _ = self.app_tx.send(AppEvent::AudioBuffered {
call_id: self.call_id.clone(),
pcm_mulaw: self.capture_payloads.clone(),
});
self.capture_started = None;
self.capture_payloads.clear();
}
}
let _ = self.tx_up.send(SessionOut::Metrics { name: "rtp_in", value: payload.len() as i64 });
}
- 受信ペイロードをそのまま μ-law として録音へ渡す。
- 最初の10秒を
capture_payloadsに貯めてAppEvent::AudioBufferedでアプリ層へ渡す(ASR用の素片)。 - メトリクス
rtp_inを上げている。
5) 現状の割り切り(MVP)
- コーデックは PCMU(pt=0) 固定。他PTは警告して捨てる。
- ジッタは Seq ベースの簡易廃棄のみ(バッファ・再生順序補正なし)。
- RTP拡張ヘッダ/CSRC/RTCPは未対応(RTCPはログ&転送フックのみ)。
- 録音は受信音のみで OK(送信音も push_tx_mulaw で混ぜられるがMVPでは片方向)。
まとめ(12/18時点)
- UDP受信 → 簡易RTPパース → Seq 逆行を捨てつつ PCMU を
SessionIn::MediaRtpInに渡す。 - セッションで μ-law を録音に流し、10秒分をアプリ向けバッファとして保持。
-
media::Recorderがmixed.wav/meta.jsonをstorage/recordings/<timestamp_callid>/に生成。
次の候補: 簡易ジッタをもう少し賢くする(バッファ+欠落補完)、あるいは送信側(PCM→RTP/PCMU)を掘る。