0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

この記事は筆者オンリーのAdvent Calendar 202518日目の記事です。

12/17 は SDP(PCMU/8000固定)の実装位置を整理した。今日は、その続きとして RTP受信→PCMUデコード→録音(mixed.wav/meta.json) が現状どう流れているかを実装ベースで固定する。

対象は「受信して音が取れる」まで。送信(TTS→RTP)は別日。


全体の流れ(MVPのRTP受信)

UAC → RTP(PCMU/8000)
↓ UDP受信
transport/packet.rs: run_rtp_udp_loop
↓ RawRtp
rtp/rx.rs: RtpReceiver::handle_raw(RTPパース+簡易ジッタ+PTチェック)
SessionIn::MediaRtpIn
session/session.rs(録音バッファリング)

media/Recorder(mixed.wav + meta.json を出力)


1) RTP受信の入口(transport)

// src/transport/packet.rs:67-
async fn run_rtp_udp_loop(sock: UdpSocket, rtp_rx: RtpReceiver) -> std::io::Result<()> {
    let local_port = sock.local_addr()?.port();
    println!("[packet] RTP socket bound on port {}", local_port);
    let mut buf = vec![0u8; 2048];
    loop {
        let (len, src) = sock.recv_from(&mut buf).await?;
        let data = buf[..len].to_vec();
        let raw = RawRtp { src, dst_port: local_port, data };
        // rtpレイヤへ委譲(解析と session への転送)
        rtp_rx.handle_raw(raw);
    }
}
  • UDPを読むだけで、RTPの中身は rtp::rx に委譲。
  • dst_portRtpPortMap で引いて call_id を見つける。

2) RTPパースと簡易ジッタ処理(rtp::rx)

// src/rtp/rx.rs:24-
pub fn handle_raw(&self, raw: RawRtp) {
    // RTCPは早期return
    if is_rtcp_packet(&raw.data) { ... }

    let call_id_opt = { self.rtp_port_map.lock().unwrap().get(&raw.dst_port).cloned() };
    if let Some(call_id) = call_id_opt {
        let sess_tx_opt = { self.session_map.lock().unwrap().get(&call_id).cloned() };
        if let Some(sess_tx) = sess_tx_opt {
            match parse_rtp_packet(&raw.data) {
                Ok(pkt) => {
                    match classify_payload(pkt.payload_type) {
                        Ok(PayloadKind::Pcmu) => {}
                        Err(err) => { warn!("[rtp recv] unsupported payload type {}", err.0); return; }
                    }
                    if !self.should_accept(&call_id, pkt.sequence_number) {
                        warn!("[rtp recv] drop late/dup seq={}", pkt.sequence_number);
                        return;
                    }
                    let _ = sess_tx.send(SessionIn::MediaRtpIn {
                        ts: pkt.timestamp,
                        payload: pkt.payload,
                    });
                }
                Err(e) => { warn!("[rtp recv] RTP parse error: {:?}", e); }
            }
        }
    } else {
        warn!("[rtp recv] RTP on port {} without call_id mapping", raw.dst_port);
    }
}
  • パース本体は rtp/parser.rs(12バイトヘッダを読む簡易版、拡張ヘッダは未対応)。
  • PayloadType は payload.rs0 => PCMU のみ許可。
  • 簡易ジッタ: should_accept で前回 Seq から大きく逆行/重複するものを捨てる(MAX_REORDER=50)。

3) PCMU→PCM と録音(media::Recorder)

// src/media/mod.rs:68-
pub fn push_rx_mulaw(&mut self, pcm_mulaw: &[u8]) { self.push_mulaw(pcm_mulaw); }
fn push_mulaw(&mut self, pcm_mulaw: &[u8]) {
    if let Some(w) = self.writer.as_mut() {
        for &b in pcm_mulaw {
            let _ = w.write_sample(mulaw_to_linear16(b));
            self.samples_written += 1;
        }
    }
}
// μ-law → 16bit PCM
fn mulaw_to_linear16(mu: u8) -> i16 { ... }
// src/media/mod.rs:41-
pub fn start(&mut self) -> Result<()> {
    create_dir_all(&self.dir)?;
    let spec = WavSpec { channels: 1, sample_rate: 8000, bits_per_sample: 16, sample_format: Int };
    let writer = WavWriter::create(self.dir.join("mixed.wav"), spec)?;
    self.writer = Some(writer);
    self.started_at = Some(SystemTime::now());
    Ok(())
}
pub fn stop(&mut self) -> Result<()> {
    if let Some(writer) = self.writer.take() { writer.finalize()?; }
    self.write_meta()
}
  • mixed.wav に μ-law を 16bit PCM へ起こして追記。サンプルレート 8000, Mono。
  • stop 時に meta.json(callId, startedAt, durationSec, sampleRate, channels, files)を出力。

4) セッションでの受信処理とバッファリング

// src/session/session.rs:209-
(SessState::Established, SessionIn::MediaRtpIn { payload, .. }) => {
    self.recorder.push_rx_mulaw(&payload);
    if let Some(start) = self.capture_started {
        self.capture_payloads.extend_from_slice(&payload);
        if start.elapsed() >= Duration::from_secs(10) {
            let _ = self.app_tx.send(AppEvent::AudioBuffered {
                call_id: self.call_id.clone(),
                pcm_mulaw: self.capture_payloads.clone(),
            });
            self.capture_started = None;
            self.capture_payloads.clear();
        }
    }
    let _ = self.tx_up.send(SessionOut::Metrics { name: "rtp_in", value: payload.len() as i64 });
}
  • 受信ペイロードをそのまま μ-law として録音へ渡す。
  • 最初の10秒を capture_payloads に貯めて AppEvent::AudioBuffered でアプリ層へ渡す(ASR用の素片)。
  • メトリクス rtp_in を上げている。

5) 現状の割り切り(MVP)

  • コーデックは PCMU(pt=0) 固定。他PTは警告して捨てる。
  • ジッタは Seq ベースの簡易廃棄のみ(バッファ・再生順序補正なし)。
  • RTP拡張ヘッダ/CSRC/RTCPは未対応(RTCPはログ&転送フックのみ)。
  • 録音は受信音のみで OK(送信音も push_tx_mulaw で混ぜられるがMVPでは片方向)。

まとめ(12/18時点)

  • UDP受信 → 簡易RTPパース → Seq 逆行を捨てつつ PCMU を SessionIn::MediaRtpIn に渡す。
  • セッションで μ-law を録音に流し、10秒分をアプリ向けバッファとして保持。
  • media::Recordermixed.wav / meta.jsonstorage/recordings/<timestamp_callid>/ に生成。

次の候補: 簡易ジッタをもう少し賢くする(バッファ+欠落補完)、あるいは送信側(PCM→RTP/PCMU)を掘る。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?