0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

簡易ジッタ処理を「小バッファ」に進化させる方針メモ(RTP受信の次の一手)

Posted at

以前の記事では、現行実装の RTP受信(PCMU/8000)→デコード→録音 の流れを固定しました。
今日はその続きとして、MVPで割り切っている「簡易ジッタ(Seq逆行/重複を捨てるだけ)」を、次にどう改善していくかの 方針 をまとめます。

この記事の狙いは「理想のジッタバッファ解説」ではなく、このプロジェクトの制約(PCMU/8k、録音/ASR用途、Tokio、session駆動) に合わせた現実的な進化案を固定することです。


0. 前提:現状のジッタ処理(12/18時点のコード)

  • RTP受信ハンドラ:src/rtp/rx.rs(Seq逆行/重複を捨てるだけ)
  • RTPパーサ:src/rtp/parser.rs(12バイトヘッダのみ、拡張未対応)
  • PayloadType判定:src/rtp/payload.rs0 => PCMU のみ)
// src/rtp/rx.rs:75- (簡易ジッタ)
fn should_accept(&self, call_id: &str, seq: u16) -> bool {
    const MAX_REORDER: u16 = 50; // 遅延廃棄の簡易しきい値
    let mut map = self.jitter.lock().unwrap();
    let state = map.entry(call_id.to_string()).or_default();
    state.accept(seq, MAX_REORDER)
}

#[derive(Default)]
struct JitterState {
    last_seq: Option<u16>,
}

impl JitterState {
    fn accept(&mut self, seq: u16, max_reorder: u16) -> bool {
        match self.last_seq {
            None => { self.last_seq = Some(seq); true }
            Some(last) => {
                if seq == last { return false; }
                let diff_forward = seq.wrapping_sub(last);
                if diff_forward == 0 { return false; }
                // 大きく逆行するものは廃棄(wrap-aroundは許容)
                if diff_forward > max_reorder && last.wrapping_sub(seq) < max_reorder {
                    return false;
                }
                self.last_seq = Some(seq);
                true
            }
        }
    }
}
// src/rtp/parser.rs:12-
pub fn parse_rtp_packet(buf: &[u8]) -> Result<RtpPacket, RtpParseError> {
    if buf.len() < 12 { return Err(RtpParseError::TooShort); }
    let version = buf[0] >> 6;
    if version != 2 { return Err(RtpParseError::UnsupportedVersion(version)); }
    let payload_type = buf[1] & 0b0111_1111;
    let sequence_number = u16::from_be_bytes([buf[2], buf[3]]);
    let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
    let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
    let payload = buf[12..].to_vec(); // csrc/extは未対応
    Ok(RtpPacket { version, padding, extension, csrc_count, marker, payload_type, sequence_number, timestamp, ssrc, payload })
}
// src/rtp/payload.rs:6-
pub fn classify_payload(pt: u8) -> Result<PayloadKind, UnsupportedPayload> {
    match pt {
        0 => Ok(PayloadKind::Pcmu),
        other => Err(UnsupportedPayload(other)),
    }
}

まとめると「PT=0 以外は捨てる」「Seqが逆行/重複なら捨てる」「並べ替えや欠落補完はしない」。MVPとして「録音が取れる」は満たすが、会話品質/ASR品質には物足りない。


1. なぜジッタバッファが要るか(超ざっくり)

電話音声は UDP/RTP なので、普通にこうなります。

  • パケットが 順番通りに届かない
  • パケットが 欠落する
  • 遅れて届く(すでに再生・録音に流した後に来る)

今の「捨てるだけ」だと、ちょいちょい並びが壊れて音が歪む・プチプチ切れる → ASRも崩れます。


2. このプロジェクトでの目標(過剰にやらない)

今回は “本格VoIPスタック” を作るのが目的ではないので、目標はここに絞ります。

  • 小さなバッファ(例:60〜120ms) を持って
  • その範囲で 並べ替え(reorder) する
  • 欠落は「無音を入れる」か「短い欠落は詰める」程度でOK
  • 遅延が大きいパケットは捨てる(リアルタイム優先)

ポイント:
低遅延を保ちつつ、録音/ASRに渡すPCMの“順序”を最低限安定させる


3. 設計方針:RTP層で“並べ替え”、sessionには“整った音”を渡す

現状でも rtp/rx.rs がRTPパースと簡易ジッタを担当しているので、拡張方針もシンプルです。

  • rtp 層:Seqをキーに小さな並べ替えバッファを持つ
  • session 層:届いた「整ったペイロード」を録音/ASRバッファへ流すだけ

session側に「Seqの管理」や「並べ替え」が漏れると境界が崩れるので、順序補正はrtp層で完結させます。


4. 実装の形(小バッファreorderの最小案)

4.1 追加したい状態(call_idごと)

  • expected_seq: u16:次に欲しいSeq
  • buf: BTreeMap<u16, Vec<u8>>:Seq→payload
  • last_flush: Instant:いつフラッシュしたか
  • max_depth: usize:バッファ上限(パケット数)
  • max_wait: Duration:並べ替え待ち時間(例:80ms)

4.2 受信時の動き(ざっくり)

  1. RTPをパースして (seq, ts, payload) を得る
  2. buf[seq]=payload として貯める
  3. expected_seq から連続して存在する分を 順に取り出して session に送る
  4. max_wait を超えても expected_seq が来ない(欠落)場合:
    • 欠落扱いにして expected_seq を進める
    • 欠落補完はMVP+1なら「無音payload」を入れるのが楽(後述)

4.3 欠落補完(やるなら最小)

PCMUなら「無音っぽい値」を入れることで、録音が極端に崩れにくいです。
(厳密なPLCは後回し)

  • 欠落フレーム数が少ないうちは「無音payload(同じ長さのμ-law)」を入れて穴埋め
  • 欠落が連続するなら、一定閾値で「メディア途絶」として上位に通知(後日)

5. どこに置くか(このリポジトリの配置案)

現状 rtp/rx.rsshould_accept があるので、段階的にこうするのが自然です。

  • src/rtp/jitter.rs(新規)
    • JitterBuffer(call_idごとの状態を保持)
    • push(pkt) -> Vec<OrderedPayload>(出せる分を返す)
  • src/rtp/rx.rs
    • parse_rtp_packet した後に jitter.push() を呼ぶ
    • 返ってきた OrderedPayloadSessionIn::MediaRtpIn で流す

※ rtp_port_map/session_map へのアクセスは現状 Mutex なので、まずは設計優先でOK。
性能は後から詰める(DashMap等に置換)でも間に合う。


6. パラメータの目安(最初のおすすめ)

MVP+1の目安としてはこのくらいが現実的です。

  • バッファ遅延:80ms(20msフレーム換算で4パケット分)
  • MAX_REORDER:現行 50 は大きいので、reorder導入後は 10〜20 でも良い
  • max_depth50パケット くらい(異常系で膨らまないための上限)
  • 欠落補完:最初は「無音挿入」だけで十分

7. テスト/デバッグ方針(これがないと沼)

ジッタは再現が難しいので、最低限これを用意したいです。

  • 疑似入力:Seqが入れ替わったRTP列(例:1,2,4,3,5)
  • 欠落入力:1,2,4,5(3欠落)
  • 遅延入力:1,2,3,5(4が遅れて最後に来る)

期待:

  • 出力は基本 1,2,3,4,5 の順に揃う(揃わない場合は欠落補完/破棄の方針通り)
  • bufが無限に増えない(上限で捨てる)
  • メトリクス(drop/reorder/missing)が取れると最高

まとめ(12/19の固定点)

  • 12/18の「捨てるだけ簡易ジッタ」から一歩進めて、小バッファでreorderするのが次の一手
  • 目的は本格VoIPではなく、録音/ASRに渡す音の順序を安定させること
  • 実装は rtp 層に閉じて、sessionには「整ったpayload」だけを渡す
  • 欠落補完は最初は「無音挿入」で十分(やりすぎない)

次回(12/20)は、逆側―― 送信(TTS→PCMU→RTP) の現状と、20ms刻みで送るタイミング制御の話を書く予定です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?