この記事は筆者オンリーのAdvent Calendar 202516日目の記事です。
前回(12/15)は、現行実装ベースで「ここまでをMVP」と定義しました。
今日はその続きとして、MVPの中核である SIP UAS の最小実装(INVITE / ACK / BYE) について、現行 sip がどうなっているかを 実装ベースで まとめます。
この記事の狙いは「SIPの教科書」ではなく、このリポジトリでは何がどこにあって、何ができていて、何が未対応か を固定することです。
(PRACK/UPDATE/Session-Timer、厳密な再送タイマ等はMVP外なので後回し)
この記事のスコープ(MVP範囲)
MVPとして「最低限ここが動く」を、SIP目線で言い直すとこうです。
-
INVITEを受ける →180 Ringing/200 OK (+SDP)を返す -
ACKを受ける(2xx ACKとして扱う) -
BYEを受ける →200 OKを返す - UDP再送は「最後の応答キャッシュ」で最低限吸収する(厳密タイマは後)
sip の構成と責務(現行)
現行 sip は「パース」「ビルド」「簡易トランザクション」「コア処理」を分けています。
message.rs
- SIPメッセージの構造体(Request/Response/ヘッダ)
-
core_headersでVia/From/To/Call-ID/CSeqなど主要ヘッダを取り出せる
#![allow(dead_code)]
#[derive(Debug, Clone)]
// Request or Response の種別
pub enum SipMessage {
Request(SipRequest),
Response(SipResponse),
}
#[derive(Debug, Clone)]
pub struct SipRequest {
pub method: SipMethod,
pub uri: String, // とりあえず String, 後で構造化しても良い
#[allow(dead_code)]
pub version: String,
pub headers: Vec<SipHeader>,
pub body: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct SipResponse {
pub version: String,
pub status_code: u16,
pub reason_phrase: String,
pub headers: Vec<SipHeader>,
pub body: Vec<u8>,
}
#[derive(Debug, Clone)]
pub enum SipMethod {
Invite,
Ack,
Bye,
Cancel,
Options,
Register,
#[allow(dead_code)]
Unknown(String),
}
#[derive(Debug, Clone)]
pub struct SipHeader {
pub name: String,
pub value: String,
}
impl SipHeader {
pub fn new(name: impl Into<String>, value: impl Into<String>) -> Self {
Self {
name: name.into(),
value: value.into(),
}
}
}
impl SipRequest {
pub fn header_value(&self, name: &str) -> Option<&str> {
self.headers
.iter()
.find(|h| h.name.eq_ignore_ascii_case(name))
.map(|h| h.value.as_str())
}
/// よく使う基本ヘッダを構造化して返す(存在しない場合は Err)
pub fn core_headers(&self) -> anyhow::Result<CoreHeaders> {
let via = self
.header_value("Via")
.ok_or_else(|| anyhow::anyhow!("missing Via"))?
.to_string();
let from = self
.header_value("From")
.ok_or_else(|| anyhow::anyhow!("missing From"))?
.to_string();
let to = self
.header_value("To")
.ok_or_else(|| anyhow::anyhow!("missing To"))?
.to_string();
let call_id = self
.header_value("Call-ID")
.ok_or_else(|| anyhow::anyhow!("missing Call-ID"))?
.to_string();
let cseq_raw = self
.header_value("CSeq")
.ok_or_else(|| anyhow::anyhow!("missing CSeq"))?;
let (cseq_num, cseq_method) = parse_cseq(cseq_raw)?;
Ok(CoreHeaders {
via,
from,
to,
call_id,
cseq: cseq_num,
cseq_method,
})
}
}
#[derive(Debug, Clone)]
pub struct CoreHeaders {
pub via: String,
pub from: String,
pub to: String,
pub call_id: String,
pub cseq: u32,
pub cseq_method: String,
}
fn parse_cseq(raw: &str) -> anyhow::Result<(u32, String)> {
let mut parts = raw.split_whitespace();
let num_str = parts
.next()
.ok_or_else(|| anyhow::anyhow!("CSeq missing number"))?;
let method = parts
.next()
.ok_or_else(|| anyhow::anyhow!("CSeq missing method"))?;
let num: u32 = num_str
.parse()
.map_err(|_| anyhow::anyhow!("invalid CSeq number"))?;
Ok((num, method.to_string()))
}
#[derive(Debug, Clone)]
pub struct SipUri {
pub scheme: String,
pub user: Option<String>,
pub host: String,
pub port: Option<u16>,
pub params: Vec<(String, String)>,
}
#[derive(Debug, Clone)]
pub struct NameAddr {
pub display: Option<String>,
pub uri: SipUri,
pub params: Vec<(String, String)>,
}
#[derive(Debug, Clone)]
pub struct Via {
pub sent_protocol: String,
pub sent_by: String,
pub params: Vec<(String, String)>,
}
#[derive(Debug, Clone)]
pub struct CSeq {
pub num: u32,
pub method: String,
}
/// よく使う共通ヘッダを構造化して持つ(最初に出現したもののみ)
#[derive(Debug, Clone, Default)]
pub struct CommonHeaders {
pub via: Option<Via>,
pub from: Option<NameAddr>,
pub to: Option<NameAddr>,
pub contact: Option<NameAddr>,
pub call_id: Option<String>,
pub cseq: Option<CSeq>,
pub max_forwards: Option<u32>,
pub content_length: Option<usize>,
}
parse.rs
- テキスト → 構造体
- Start-Line(Request/Response)を判定し、ヘッダを分解(CRLF/折り返し対応)
- 個別ヘッダの構造化は
protocols/に委譲
#![allow(dead_code)]
use anyhow::{anyhow, Result};
use nom::{
branch::alt,
bytes::complete::{tag, take_till1, take_until, take_while1},
character::complete::{digit1, not_line_ending, space1},
combinator::{map, map_res},
sequence::{terminated, tuple},
IResult,
};
use crate::sip::message::{
CSeq, CommonHeaders, NameAddr, SipHeader, SipMessage, SipMethod, SipRequest, SipResponse,
SipUri, Via,
};
use crate::sip::protocols::{
CSeqHeader, ContentLengthHeader, HeaderCodec, MaxForwardsHeader, NameAddrHeader, ViaHeader,
};
enum StartLine {
Request {
method: SipMethod,
uri: String,
version: String,
},
Response {
version: String,
status: u16,
reason: String,
},
}
pub fn parse_sip_message(input: &str) -> Result<SipMessage> {
let (head, body) = split_head_and_body(input);
let (start_line, headers) = parse_head(head)?;
match start_line {
StartLine::Request {
method,
uri,
version,
} => Ok(SipMessage::Request(SipRequest {
method,
uri,
version,
headers,
body: body.as_bytes().to_vec(),
})),
StartLine::Response {
version,
status,
reason,
} => Ok(SipMessage::Response(SipResponse {
version,
status_code: status,
reason_phrase: reason,
headers,
body: body.as_bytes().to_vec(),
})),
}
}
fn split_head_and_body(input: &str) -> (&str, &str) {
if let Some(pos) = input.find("\r\n\r\n") {
let (head, rest) = input.split_at(pos);
return (head, &rest[4..]);
}
if let Some(pos) = input.find("\n\n") {
let (head, rest) = input.split_at(pos);
return (head, &rest[2..]);
}
(input, "")
}
fn parse_head(input: &str) -> Result<(StartLine, Vec<SipHeader>)> {
let (rest, start) =
parse_start_line(input).map_err(|e| anyhow!("failed to parse start line: {:?}", e))?;
let headers =
parse_headers_block(rest).map_err(|e| anyhow!("failed to parse headers: {:?}", e))?;
Ok((start, headers))
}
fn parse_start_line(input: &str) -> IResult<&str, StartLine> {
alt((
map(terminated(parse_request_line, parse_crlf), |v| {
StartLine::Request {
method: v.0,
uri: v.1,
version: v.2,
}
}),
map(terminated(parse_status_line, parse_crlf), |v| {
StartLine::Response {
version: v.0,
status: v.1,
reason: v.2,
}
}),
))(input)
}
fn parse_request_line(input: &str) -> IResult<&str, (SipMethod, String, String)> {
let (rest, (method_raw, _, uri, _, version)) = tuple((
take_while1(|c: char| c != ' '),
space1,
take_till1(|c| c == ' ' || c == '\r' || c == '\n'),
space1,
take_while1(|c: char| c != '\r' && c != '\n'),
))(input)?;
let method = parse_method(method_raw);
Ok((rest, (method, uri.to_string(), version.to_string())))
}
fn parse_status_line(input: &str) -> IResult<&str, (String, u16, String)> {
let (rest, (_, _, code, _, reason)) = tuple((
tag("SIP/2.0"),
space1,
map_res(digit1, |d: &str| d.parse::<u16>()),
space1,
not_line_ending,
))(input)?;
Ok((
rest,
("SIP/2.0".to_string(), code, reason.trim().to_string()),
))
}
fn parse_headers_block(input: &str) -> Result<Vec<SipHeader>> {
let mut headers = Vec::new();
let mut current = String::new();
for raw_line in input.lines() {
let line = raw_line.trim_end_matches('\r');
if line.is_empty() {
continue;
}
if line.starts_with(' ') || line.starts_with('\t') {
// header folding: append to previous header value
if current.is_empty() {
continue;
}
current.push(' ');
current.push_str(line.trim_start());
continue;
}
if !current.is_empty() {
headers.push(parse_header_line_nom(¤t)?);
}
current.clear();
current.push_str(line);
}
if !current.is_empty() {
headers.push(parse_header_line_nom(¤t)?);
}
Ok(headers)
}
fn parse_header_line_nom(input: &str) -> Result<SipHeader> {
type NomErr<'a> = nom::Err<nom::error::Error<&'a str>>;
let res: IResult<&str, (&str, &str, &str, &str), nom::error::Error<&str>> = tuple((
take_until(":"),
tag(":"),
nom::character::complete::space0,
not_line_ending,
))(input);
let (_, (name, _, _, value)) =
res.map_err(|e: NomErr| anyhow!("invalid SIP header line {:?}: {:?}", input, e))?;
Ok(SipHeader {
name: name.trim().to_string(),
value: value.trim().to_string(),
})
}
fn parse_crlf(input: &str) -> IResult<&str, &str> {
alt((tag("\r\n"), tag("\n")))(input)
}
fn parse_method(token: &str) -> SipMethod {
match token.to_ascii_uppercase().as_str() {
"INVITE" => SipMethod::Invite,
"ACK" => SipMethod::Ack,
"BYE" => SipMethod::Bye,
"CANCEL" => SipMethod::Cancel,
"OPTIONS" => SipMethod::Options,
"REGISTER" => SipMethod::Register,
other => SipMethod::Unknown(other.to_string()),
}
}
/// 以下は個別ヘッダの構造化パーサ
pub fn parse_via_header(value: &str) -> Result<ViaHeader> {
ViaHeader::parse(value)
}
pub fn parse_name_addr(value: &str) -> Result<NameAddrHeader> {
NameAddrHeader::parse(value)
}
pub fn parse_cseq(value: &str) -> Result<CSeqHeader> {
CSeqHeader::parse(value)
}
pub fn parse_uri(input: &str) -> Result<SipUri> {
super::protocols::name_addr::parse_uri(input)
}
fn parse_params(input: &str) -> Vec<(String, String)> {
super::protocols::name_addr::parse_params(input)
}
/// 生ヘッダ配列から、よく使うヘッダを構造化でまとめて返す
pub fn collect_common_headers(headers: &[SipHeader]) -> CommonHeaders {
let mut common = CommonHeaders::default();
for h in headers {
match h.name.to_ascii_lowercase().as_str() {
"via" => {
if common.via.is_none() {
common.via = parse_via_header(&h.value).ok().map(|v| Via {
sent_protocol: v.sent_protocol,
sent_by: v.sent_by,
params: v.params,
});
}
}
"from" => {
if common.from.is_none() {
common.from = parse_name_addr(&h.value).ok().map(|n| NameAddr {
display: n.display,
uri: n.uri,
params: n.params,
});
}
}
"to" => {
if common.to.is_none() {
common.to = parse_name_addr(&h.value).ok().map(|n| NameAddr {
display: n.display,
uri: n.uri,
params: n.params,
});
}
}
"contact" => {
if common.contact.is_none() {
common.contact = parse_name_addr(&h.value).ok().map(|n| NameAddr {
display: n.display,
uri: n.uri,
params: n.params,
});
}
}
"call-id" => {
if common.call_id.is_none() {
common.call_id = Some(h.value.clone());
}
}
"cseq" => {
if common.cseq.is_none() {
common.cseq = parse_cseq(&h.value).ok().map(|c| CSeq {
num: c.num,
method: c.method,
});
}
}
"max-forwards" => {
if common.max_forwards.is_none() {
common.max_forwards = MaxForwardsHeader::parse(&h.value).ok().map(|m| m.hops);
}
}
"content-length" => {
if common.content_length.is_none() {
common.content_length =
ContentLengthHeader::parse(&h.value).ok().map(|c| c.length);
}
}
_ => {}
}
}
common
}
builder.rs
- 構造体 → バイト列(用途別のビルダ関数がある)
-
response_provisional_from_request(180など) -
response_final_with_sdp(INVITEの200+SDP) -
response_simple_from_request(BYE/REGISTERの200など)
-
-
To-tagは無ければ;tag=rustbotを付与(MVPの割り切り)
#![allow(dead_code)]
use std::fmt::{self, Write};
use crate::session::types::Sdp;
use crate::sip::message::{SipHeader, SipMethod, SipRequest, SipResponse};
/// 追加で使いやすい Builder スタイル
pub struct SipResponseBuilder {
status_code: u16,
reason_phrase: String,
headers: Vec<SipHeader>,
body: Vec<u8>,
}
pub struct SipRequestBuilder {
method: SipMethod,
uri: String,
headers: Vec<SipHeader>,
body: Vec<u8>,
}
impl SipResponseBuilder {
pub fn new(code: u16, reason: impl Into<String>) -> Self {
Self {
status_code: code,
reason_phrase: reason.into(),
headers: Vec::new(),
body: Vec::new(),
}
}
pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.push(SipHeader::new(name, value));
self
}
pub fn body(mut self, body: impl Into<Vec<u8>>, content_type: Option<&str>) -> Self {
self.body = body.into();
if let Some(ct) = content_type {
let has_ct = self
.headers
.iter()
.any(|h| h.name.eq_ignore_ascii_case("Content-Type"));
if !has_ct {
self.headers.push(SipHeader::new("Content-Type", ct));
}
}
self
}
pub fn build(mut self) -> SipResponse {
ensure_content_length(&mut self.headers, self.body.len());
SipResponse {
version: "SIP/2.0".to_string(),
status_code: self.status_code,
reason_phrase: self.reason_phrase,
headers: self.headers,
body: self.body,
}
}
}
impl SipRequestBuilder {
pub fn new(method: SipMethod, uri: impl Into<String>) -> Self {
Self {
method,
uri: uri.into(),
headers: Vec::new(),
body: Vec::new(),
}
}
pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.push(SipHeader::new(name, value));
self
}
pub fn body(mut self, body: impl Into<Vec<u8>>, content_type: Option<&str>) -> Self {
self.body = body.into();
if let Some(ct) = content_type {
let has_ct = self
.headers
.iter()
.any(|h| h.name.eq_ignore_ascii_case("Content-Type"));
if !has_ct {
self.headers.push(SipHeader::new("Content-Type", ct));
}
}
self
}
pub fn build(mut self) -> SipRequest {
ensure_content_length(&mut self.headers, self.body.len());
SipRequest {
method: self.method,
uri: self.uri,
version: "SIP/2.0".to_string(),
headers: self.headers,
body: self.body,
}
}
}
fn ensure_content_length(headers: &mut Vec<SipHeader>, body_len: usize) {
let has_len = headers
.iter()
.any(|h| h.name.eq_ignore_ascii_case("Content-Length"));
if !has_len {
headers.push(SipHeader::new("Content-Length", body_len.to_string()));
}
}
fn render_headers(headers: &[SipHeader], out: &mut String) {
for h in headers {
// SIP は CRLF 区切り
let _ = writeln!(out, "{}: {}\r", h.name, h.value);
}
}
#[allow(dead_code)]
fn method_to_str(method: &SipMethod) -> &str {
match method {
SipMethod::Invite => "INVITE",
SipMethod::Ack => "ACK",
SipMethod::Bye => "BYE",
SipMethod::Cancel => "CANCEL",
SipMethod::Options => "OPTIONS",
SipMethod::Register => "REGISTER",
SipMethod::Unknown(token) => token.as_str(),
}
}
#[allow(dead_code)]
pub fn build_request(
method: SipMethod,
uri: impl Into<String>,
mut headers: Vec<SipHeader>,
body: Vec<u8>,
) -> SipRequest {
ensure_content_length(&mut headers, body.len());
SipRequest {
method,
uri: uri.into(),
version: "SIP/2.0".to_string(),
headers,
body,
}
}
pub fn build_response(
status_code: u16,
reason_phrase: impl Into<String>,
mut headers: Vec<SipHeader>,
body: Vec<u8>,
) -> SipResponse {
ensure_content_length(&mut headers, body.len());
SipResponse {
version: "SIP/2.0".to_string(),
status_code,
reason_phrase: reason_phrase.into(),
headers,
body,
}
}
impl fmt::Display for SipRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut out = String::new();
let mut headers = self.headers.clone();
ensure_content_length(&mut headers, self.body.len());
let _ = writeln!(
out,
"{} {} {}\r",
method_to_str(&self.method),
self.uri,
self.version
);
render_headers(&headers, &mut out);
out.push_str("\r\n");
f.write_str(&out)
}
}
impl SipRequest {
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = self.to_string().into_bytes();
buf.extend_from_slice(&self.body);
buf
}
}
/// リクエストヘッダから 1xx/空ボディレスポンスを組み立てる(To-tag を付与)。
pub fn response_provisional_from_request(
req: &SipRequest,
code: u16,
reason: &str,
) -> Option<SipResponse> {
let via = req.header_value("Via")?;
let from = req.header_value("From")?;
let mut to = req.header_value("To")?.to_string();
let call_id = req.header_value("Call-ID")?;
let cseq = req.header_value("CSeq")?;
if !to.to_ascii_lowercase().contains("tag=") {
to = format!("{to};tag=rustbot");
}
Some(
SipResponseBuilder::new(code, reason)
.header("Via", via)
.header("From", from)
.header("To", to)
.header("Call-ID", call_id)
.header("CSeq", cseq)
.build(),
)
}
/// リクエストヘッダ+SDPから 2xx 応答を組み立てる。
pub fn response_final_with_sdp(
req: &SipRequest,
code: u16,
reason: &str,
contact_ip: &str,
sip_port: u16,
answer: &Sdp,
) -> Option<SipResponse> {
let via = req.header_value("Via")?;
let from = req.header_value("From")?;
let mut to = req.header_value("To")?.to_string();
let call_id = req.header_value("Call-ID")?;
let cseq = req.header_value("CSeq")?;
if !to.to_ascii_lowercase().contains("tag=") {
to = format!("{to};tag=rustbot");
}
let sdp = format!(
concat!(
"v=0\r\n",
"o=rustbot 1 1 IN IP4 {ip}\r\n",
"s=Rust PCMU Bot\r\n",
"c=IN IP4 {ip}\r\n",
"t=0 0\r\n",
"m=audio {rtp} RTP/AVP {pt}\r\n",
"a=rtpmap:{pt} {codec}\r\n",
"a=sendrecv\r\n",
),
ip = answer.ip,
rtp = answer.port,
pt = answer.payload_type,
codec = answer.codec
);
Some(
SipResponseBuilder::new(code, reason)
.header("Via", via)
.header("From", from)
.header("To", to)
.header("Call-ID", call_id)
.header("CSeq", cseq)
.header("Contact", format!("sip:rustbot@{contact_ip}:{sip_port}"))
.body(sdp.as_bytes(), Some("application/sdp"))
.build(),
)
}
/// BYE/REGISTER など 2xx 空ボディ応答。
pub fn response_simple_from_request(
req: &SipRequest,
code: u16,
reason: &str,
) -> Option<SipResponse> {
let via = req.header_value("Via")?;
let from = req.header_value("From")?;
let mut to = req.header_value("To")?.to_string();
let call_id = req.header_value("Call-ID")?;
let cseq = req.header_value("CSeq")?;
if !to.to_ascii_lowercase().contains("tag=") {
to = format!("{to};tag=rustbot");
}
Some(
SipResponseBuilder::new(code, reason)
.header("Via", via)
.header("From", from)
.header("To", to)
.header("Call-ID", call_id)
.header("CSeq", cseq)
.build(),
)
}
impl fmt::Display for SipResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut out = String::new();
let mut headers = self.headers.clone();
ensure_content_length(&mut headers, self.body.len());
let _ = writeln!(
out,
"{} {} {}\r",
self.version, self.status_code, self.reason_phrase
);
render_headers(&headers, &mut out);
out.push_str("\r\n");
f.write_str(&out)
}
}
impl SipResponse {
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = self.to_string().into_bytes();
buf.extend_from_slice(&self.body);
buf
}
}
transaction.rs
- 簡易サーバトランザクション
- INVITE/非INVITEで「最後に送ったレスポンス」をキャッシュし、再送に応答
- 非INVITEは Timer J相当の期限管理のみ持つ(詳細タイマは未実装)
use std::net::SocketAddr;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InviteTxState {
Proceeding,
Completed,
Confirmed,
Terminated,
}
#[derive(Debug)]
#[allow(dead_code)]
pub enum InviteTxAction {
Retransmit(Vec<u8>),
Timeout,
}
/// INVITE サーバトランザクションの簡易実装(UDP前提)。
/// - 2xx 送信時は即 Terminated
/// - 3xx–6xx の再送や Timer G/H/I の詳細は後続で拡張予定
pub struct InviteServerTransaction {
pub state: InviteTxState,
pub peer: SocketAddr,
last_provisional: Option<Vec<u8>>,
last_final: Option<Vec<u8>>,
pub invite_req: Option<crate::sip::SipRequest>,
}
impl InviteServerTransaction {
pub fn new(peer: SocketAddr) -> Self {
Self {
state: InviteTxState::Proceeding,
peer,
last_provisional: None,
last_final: None,
invite_req: None,
}
}
pub fn remember_provisional(&mut self, resp: Vec<u8>) {
self.last_provisional = Some(resp);
}
pub fn on_final_sent(&mut self, resp: Vec<u8>, status: u16) {
self.last_final = Some(resp);
if status >= 300 {
self.state = InviteTxState::Completed;
} else {
self.state = InviteTxState::Terminated;
}
}
pub fn on_retransmit(&self) -> Option<InviteTxAction> {
match self.state {
InviteTxState::Proceeding => self
.last_provisional
.as_ref()
.cloned()
.map(InviteTxAction::Retransmit),
InviteTxState::Completed => self
.last_final
.as_ref()
.cloned()
.map(InviteTxAction::Retransmit),
_ => None,
}
}
pub fn on_ack(&mut self) -> Option<InviteTxAction> {
if self.state == InviteTxState::Completed {
self.state = InviteTxState::Confirmed;
// Timer I → Terminated を予定。ここでは即時に Terminated にしておく。
self.state = InviteTxState::Terminated;
}
None
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NonInviteTxState {
Trying,
Completed,
Terminated,
}
/// 非 INVITE サーバトランザクション(簡易版)。最終応答の再送と Timer J 相当の期限管理のみ。
pub struct NonInviteServerTransaction {
pub state: NonInviteTxState,
#[allow(dead_code)]
#[allow(dead_code)]
pub peer: SocketAddr,
pub last_request: Option<crate::sip::SipRequest>,
pub last_final: Option<Vec<u8>>,
pub expires_at: std::time::Instant,
}
impl NonInviteServerTransaction {
pub fn new(peer: SocketAddr, req: crate::sip::SipRequest) -> Self {
Self {
state: NonInviteTxState::Trying,
peer,
last_request: Some(req),
last_final: None,
expires_at: std::time::Instant::now() + std::time::Duration::from_secs(32),
}
}
pub fn on_final_sent(&mut self, resp: Vec<u8>) {
self.last_final = Some(resp);
self.state = NonInviteTxState::Completed;
self.expires_at = std::time::Instant::now() + std::time::Duration::from_secs(32);
}
pub fn on_retransmit(&self) -> Option<Vec<u8>> {
match self.state {
NonInviteTxState::Completed => self.last_final.clone(),
_ => None,
}
}
}
mod.rs
-
SipCore本体 - transportからの入力を受けて、パース→イベント化
- sessionからの
SessionOutを受けてレスポンス送信 - MVPでは
INVITE/ACK/BYEのみハンドルし、それ以外はUnknown
pub mod builder;
pub mod message;
pub mod parse;
pub mod protocols;
pub mod transaction;
pub mod tx;
#[allow(unused_imports)]
pub use message::{SipHeader, SipMessage, SipMethod, SipRequest, SipResponse};
pub use parse::parse_sip_message;
#[allow(unused_imports)]
pub use crate::sip::builder::{SipRequestBuilder, SipResponseBuilder};
#[allow(unused_imports)]
pub use crate::sip::parse::{
collect_common_headers, parse_cseq as parse_cseq_header, parse_name_addr, parse_uri,
parse_via_header,
};
#[allow(unused_imports)]
pub use protocols::*;
use crate::session::types::{CallId, Sdp, SessionOut};
use crate::sip::builder::{
response_final_with_sdp, response_provisional_from_request, response_simple_from_request,
};
use crate::sip::transaction::{
InviteServerTransaction, InviteTxAction, InviteTxState, NonInviteServerTransaction,
NonInviteTxState,
};
use crate::sip::tx::{SipTransportRequest, SipTransportTx};
use crate::transport::SipInput;
use std::collections::HashMap;
use std::time::Instant;
use tokio::time::sleep;
/// sip 層から session 層へ渡すイベント(設計ドキュメントの「sip→session 通知」と対応)
#[derive(Debug)]
pub enum SipEvent {
/// INVITE を受けたときの session への通知(call_id/from/to/offer を引き渡す)
IncomingInvite {
call_id: CallId,
from: String,
to: String,
offer: Sdp,
},
/// 既存ダイアログに対する ACK
Ack {
call_id: CallId,
},
/// 既存ダイアログに対する BYE
Bye {
call_id: CallId,
},
/// トランザクションのタイムアウト通知(Timer J など)
TransactionTimeout {
call_id: CallId,
},
Unknown,
}
#[derive(Clone)]
pub struct SipConfig {
pub advertised_ip: String,
pub sip_port: u16,
#[allow(dead_code)]
pub advertised_rtp_port: u16,
}
/// SIP 処理のエントリポイント。トランザクション状態と送信経路を保持する。
pub struct SipCore {
cfg: SipConfig,
transport_tx: SipTransportTx,
invites: HashMap<CallId, InviteContext>,
non_invites: HashMap<CallId, NonInviteServerTransaction>,
}
struct InviteContext {
tx: InviteServerTransaction,
req: SipRequest,
}
fn parse_offer_sdp(body: &[u8]) -> Option<Sdp> {
let s = std::str::from_utf8(body).ok()?;
let mut ip = None;
let mut port = None;
let mut pt = None;
for line in s.lines() {
let line = line.trim();
if line.starts_with("c=IN IP4 ") {
let v = line.trim_start_matches("c=IN IP4 ").trim();
ip = Some(v.to_string());
} else if line.starts_with("m=audio ") {
let cols: Vec<&str> = line.split_whitespace().collect();
if cols.len() >= 4 {
port = cols[1].parse::<u16>().ok();
pt = cols[3].parse::<u8>().ok();
}
}
}
Some(Sdp {
ip: ip?,
port: port?,
payload_type: pt.unwrap_or(0),
codec: "PCMU/8000".to_string(),
})
}
fn decode_sip_text(data: &[u8]) -> Result<String, ()> {
String::from_utf8(data.to_vec()).map_err(|_| ())
}
#[derive(Debug)]
#[allow(dead_code)]
struct CoreHeaderSnapshot {
// トランザクション導入時に再利用するためのコアヘッダ(現状は挙動維持のまま取り出す)
via: String,
from: String,
to: String,
call_id: String,
cseq: String,
}
impl CoreHeaderSnapshot {
fn from_request(req: &SipRequest) -> Self {
Self {
via: req.header_value("Via").unwrap_or("").to_string(),
from: req.header_value("From").unwrap_or("").to_string(),
to: req.header_value("To").unwrap_or("").to_string(),
call_id: req.header_value("Call-ID").unwrap_or("").to_string(),
cseq: req.header_value("CSeq").unwrap_or("").to_string(),
}
}
}
impl SipCore {
pub fn new(cfg: SipConfig, transport_tx: SipTransportTx) -> Self {
Self {
cfg,
transport_tx,
invites: std::collections::HashMap::new(),
non_invites: std::collections::HashMap::new(),
}
}
/// SIP ソケットで受けた datagram を処理し、必要ならレスポンス送信と session へのイベントを返す。
/// トランザクション状態機械(INVITEサーバトランザクション)を内部に持つ。
pub fn handle_input(&mut self, input: &SipInput) -> Vec<SipEvent> {
let mut events = self.prune_expired();
let text = match decode_sip_text(&input.data) {
Ok(t) => t,
Err(_) => return vec![SipEvent::Unknown],
};
let msg = match parse_sip_message(&text) {
Ok(m) => m,
Err(_) => return vec![SipEvent::Unknown],
};
let mut ev = match msg {
SipMessage::Request(req) => self.handle_request(req, input.src),
SipMessage::Response(_) => vec![SipEvent::Unknown],
};
events.append(&mut ev);
events
}
fn handle_request(&mut self, req: SipRequest, peer: std::net::SocketAddr) -> Vec<SipEvent> {
let headers = CoreHeaderSnapshot::from_request(&req);
match req.method {
SipMethod::Invite => self.handle_invite(req, headers, peer),
SipMethod::Ack => self.handle_ack(headers.call_id),
SipMethod::Bye => self.handle_non_invite(req, headers, peer, 200, "OK", true),
SipMethod::Register => self.handle_non_invite(req, headers, peer, 200, "OK", false),
_ => vec![SipEvent::Unknown],
}
}
fn handle_invite(
&mut self,
req: SipRequest,
headers: CoreHeaderSnapshot,
peer: std::net::SocketAddr,
) -> Vec<SipEvent> {
// 再送判定: 既存トランザクションがあれば最新レスポンスを再送し、イベントは出さない
if let Some(ctx) = self.invites.get_mut(&headers.call_id) {
if let Some(action) = ctx.tx.on_retransmit() {
self.send_tx_action(action, peer);
}
return vec![];
}
// 新規 INVITE: トランザクション生成(レスポンスは SessionOut 経由で送るためここでは送信しない)
let mut tx = InviteServerTransaction::new(peer);
tx.invite_req = Some(req.clone());
let ctx = InviteContext {
tx,
req: req.clone(),
};
self.invites.insert(headers.call_id.clone(), ctx);
let offer = parse_offer_sdp(&req.body).unwrap_or_else(|| Sdp::pcmu("0.0.0.0", 0));
vec![SipEvent::IncomingInvite {
call_id: headers.call_id,
from: headers.from,
to: headers.to,
offer,
}]
}
fn handle_ack(&mut self, call_id: CallId) -> Vec<SipEvent> {
let (action, terminate, peer_opt) = if let Some(ctx) = self.invites.get_mut(&call_id) {
let peer = ctx.tx.peer;
let action = ctx.tx.on_ack();
let terminate = ctx.tx.state == InviteTxState::Terminated;
(action, terminate, Some(peer))
} else {
(None, false, None)
};
if let Some(action) = action {
if let Some(peer) = peer_opt {
self.send_tx_action(action, peer);
}
}
if terminate {
self.invites.remove(&call_id);
}
vec![SipEvent::Ack { call_id }]
}
fn handle_non_invite(
&mut self,
req: SipRequest,
headers: CoreHeaderSnapshot,
peer: std::net::SocketAddr,
_status: u16,
_reason: &str,
emit_bye_event: bool,
) -> Vec<SipEvent> {
let tx = self
.non_invites
.entry(headers.call_id.clone())
.or_insert_with(|| NonInviteServerTransaction::new(peer, req.clone()));
if let Some(resp) = tx.on_retransmit() {
self.send_payload(peer, resp);
return if emit_bye_event {
vec![SipEvent::Bye {
call_id: headers.call_id,
}]
} else {
vec![]
};
}
// 最終応答は SessionOut::SipSendBye200 等から送るため、ここでは送信しない
tx.last_request = Some(req);
if emit_bye_event {
vec![SipEvent::Bye {
call_id: headers.call_id,
}]
} else {
vec![]
}
}
fn send_tx_action(&self, action: InviteTxAction, peer: std::net::SocketAddr) {
match action {
InviteTxAction::Retransmit(resp) => self.send_payload(peer, resp),
InviteTxAction::Timeout => { /* Timer H/I 経由の通知などは未使用 */ }
}
}
pub fn handle_session_out(&mut self, call_id: &CallId, out: SessionOut) {
match out {
SessionOut::SipSend180 => {
if let Some(ctx) = self.invites.get_mut(call_id) {
if let Some(resp) = response_provisional_from_request(&ctx.req, 180, "Ringing")
{
let bytes = resp.to_bytes();
ctx.tx.remember_provisional(bytes.clone());
let peer = ctx.tx.peer;
self.send_payload(peer, bytes);
}
}
}
SessionOut::SipSend200 { answer } => {
if let Some(ctx) = self.invites.get_mut(call_id) {
if let Some(resp) = response_final_with_sdp(
&ctx.req,
200,
"OK",
&self.cfg.advertised_ip,
self.cfg.sip_port,
&answer,
) {
let bytes = resp.to_bytes();
ctx.tx.on_final_sent(bytes.clone(), 200);
let peer = ctx.tx.peer;
self.send_payload(peer, bytes);
}
}
}
SessionOut::SipSendBye200 => {
if let Some(tx) = self.non_invites.get_mut(call_id) {
if let Some(req) = tx.last_request.clone() {
if let Some(resp) = response_simple_from_request(&req, 200, "OK") {
let bytes = resp.to_bytes();
tx.on_final_sent(bytes.clone());
let peer = tx.peer;
let final_resp = tx.last_final.clone();
let expires = tx.expires_at;
self.send_payload(peer, bytes.clone());
// Timer J 相当の再送(送信キュー経由)
if let Some(final_resp) = final_resp {
let transport_tx = self.transport_tx.clone();
let src_port = self.cfg.sip_port;
tokio::spawn(async move {
let mut interval = std::time::Duration::from_millis(500);
while Instant::now() < expires {
let _ = transport_tx.send(SipTransportRequest {
dst: peer,
src_port,
payload: final_resp.clone(),
});
sleep(interval).await;
interval = std::cmp::min(
interval * 2,
std::time::Duration::from_secs(4),
);
}
});
}
}
}
}
}
_ => { /* 他の SessionOut は現状未配線 */ }
}
}
fn send_payload(&self, dst: std::net::SocketAddr, payload: Vec<u8>) {
if let Some(first_line) = payload
.split(|b| *b == b'\n')
.next()
.and_then(|line| std::str::from_utf8(line).ok())
{
log::info!("[sip ->] to {} {}", dst, first_line.trim());
} else {
log::info!("[sip ->] to {} len={}", dst, payload.len());
}
let _ = self.transport_tx.send(SipTransportRequest {
dst,
src_port: self.cfg.sip_port,
payload,
});
}
fn prune_expired(&mut self) -> Vec<SipEvent> {
let now = Instant::now();
let mut events = Vec::new();
self.non_invites.retain(|call_id, tx| {
let alive = tx.state != NonInviteTxState::Terminated && tx.expires_at > now;
if !alive {
events.push(SipEvent::TransactionTimeout {
call_id: call_id.clone(),
});
}
alive
});
events
}
}
tx.rs
- transportの送信キュー型(
TransportSendRequest)を再エクスポート
#![allow(dead_code)]
pub use crate::transport::send::{
TransportSendRequest as SipTransportRequest, TransportSendTx as SipTransportTx,
};
protocols/
-
Via/Name-Addr/CSeq/Max-Forwards/Content-Lengthなど個別ヘッダのパーサ群
use crate::sip::message::SipHeader;
use crate::sip::protocols::HeaderCodec;
use anyhow::{anyhow, Result};
#[derive(Debug, Clone)]
pub struct ContentLengthHeader {
pub length: usize,
}
impl HeaderCodec for ContentLengthHeader {
const NAME: &'static str = "Content-Length";
fn parse(value: &str) -> Result<Self> {
let len = value
.trim()
.parse::<usize>()
.map_err(|_| anyhow!("invalid Content-Length"))?;
Ok(Self { length: len })
}
fn to_header(&self) -> SipHeader {
SipHeader::new(Self::NAME, self.length.to_string())
}
}
use crate::sip::message::SipHeader;
use crate::sip::protocols::HeaderCodec;
use anyhow::{anyhow, Result};
#[derive(Debug, Clone)]
pub struct CSeqHeader {
pub num: u32,
pub method: String,
}
impl HeaderCodec for CSeqHeader {
const NAME: &'static str = "CSeq";
fn parse(value: &str) -> Result<Self> {
let mut iter = value.split_whitespace();
let num_str = iter.next().ok_or_else(|| anyhow!("CSeq missing number"))?;
let method = iter.next().ok_or_else(|| anyhow!("CSeq missing method"))?;
let num = num_str
.parse::<u32>()
.map_err(|_| anyhow!("invalid CSeq num"))?;
Ok(Self {
num,
method: method.to_string(),
})
}
fn to_header(&self) -> SipHeader {
SipHeader::new(Self::NAME, format!("{} {}", self.num, self.method))
}
}
use crate::sip::message::SipHeader;
use crate::sip::protocols::HeaderCodec;
use anyhow::{anyhow, Result};
#[derive(Debug, Clone)]
pub struct MaxForwardsHeader {
pub hops: u32,
}
impl HeaderCodec for MaxForwardsHeader {
const NAME: &'static str = "Max-Forwards";
fn parse(value: &str) -> Result<Self> {
let hops = value
.trim()
.parse::<u32>()
.map_err(|_| anyhow!("invalid Max-Forwards"))?;
Ok(Self { hops })
}
fn to_header(&self) -> SipHeader {
SipHeader::new(Self::NAME, self.hops.to_string())
}
}
#![allow(dead_code)]
use anyhow::{anyhow, Result};
use crate::sip::message::{SipHeader, SipUri};
use crate::sip::protocols::HeaderCodec;
#[derive(Debug, Clone)]
pub struct NameAddrHeader {
pub display: Option<String>,
pub uri: SipUri,
pub params: Vec<(String, String)>,
}
pub type FromHeader = NameAddrHeader;
pub type ToHeader = NameAddrHeader;
pub type ContactHeader = NameAddrHeader;
impl HeaderCodec for NameAddrHeader {
const NAME: &'static str = "Name-Addr"; // 実際のヘッダ名は利用側で指定
fn parse(value: &str) -> Result<Self> {
parse_name_addr(value)
}
fn to_header(&self) -> SipHeader {
let mut value = String::new();
if let Some(disp) = &self.display {
value.push_str(disp);
value.push(' ');
}
value.push('<');
value.push_str(&format_uri(&self.uri));
value.push('>');
for (k, v) in &self.params {
if v.is_empty() {
value.push_str(&format!(";{}", k));
} else {
value.push_str(&format!(";{}={}", k, v));
}
}
SipHeader::new(Self::NAME, value)
}
}
pub fn parse_name_addr(value: &str) -> Result<NameAddrHeader> {
let value = value.trim();
let (display, uri_and_params) = if let Some(start) = value.find('<') {
let end = value
.find('>')
.ok_or_else(|| anyhow!("invalid name-addr"))?;
let display = value[..start].trim().trim_matches('"');
let uri = &value[start + 1..end];
let after = value[end + 1..].trim();
(
if display.is_empty() {
None
} else {
Some(display.to_string())
},
(uri, after),
)
} else {
(None, (value, ""))
};
let uri = parse_uri(uri_and_params.0)?;
let params = parse_params(uri_and_params.1);
Ok(NameAddrHeader {
display,
uri,
params,
})
}
pub fn parse_uri(input: &str) -> Result<SipUri> {
// 超簡易: scheme:user@host:port;param=val
let mut rest = input.trim();
let (scheme, after_scheme) = rest
.split_once(':')
.ok_or_else(|| anyhow!("uri missing scheme"))?;
rest = after_scheme;
let (user_part, host_part) = if let Some(idx) = rest.find('@') {
let (u, h) = rest.split_at(idx);
(Some(u.to_string()), &h[1..])
} else {
(None, rest)
};
let mut host = host_part.to_string();
let mut port = None;
let mut params = Vec::new();
if let Some(idx) = host_part.find(';') {
host = host_part[..idx].to_string();
let param_str = &host_part[idx + 1..];
params = parse_params(param_str);
}
if let Some(idx) = host.find(':') {
let p = host[idx + 1..].parse::<u16>().ok();
port = p;
host = host[..idx].to_string();
}
Ok(SipUri {
scheme: scheme.to_string(),
user: user_part,
host,
port,
params,
})
}
pub fn parse_params(input: &str) -> Vec<(String, String)> {
input
.split(';')
.filter(|s| !s.trim().is_empty())
.filter_map(|p| {
let mut iter = p.splitn(2, '=');
let k = iter.next()?.trim();
let v = iter.next().unwrap_or("").trim();
Some((k.to_string(), v.to_string()))
})
.collect()
}
fn format_uri(uri: &SipUri) -> String {
let mut s = String::new();
s.push_str(&uri.scheme);
s.push(':');
if let Some(u) = &uri.user {
s.push_str(u);
s.push('@');
}
s.push_str(&uri.host);
if let Some(p) = uri.port {
s.push(':');
s.push_str(&p.to_string());
}
for (k, v) in &uri.params {
if v.is_empty() {
s.push_str(&format!(";{}", k));
} else {
s.push_str(&format!(";{}={}", k, v));
}
}
s
}
use anyhow::{anyhow, Result};
use crate::sip::message::SipHeader;
use crate::sip::protocols::HeaderCodec;
#[derive(Debug, Clone)]
pub struct ViaHeader {
pub sent_protocol: String,
pub sent_by: String,
pub params: Vec<(String, String)>,
}
impl HeaderCodec for ViaHeader {
const NAME: &'static str = "Via";
fn parse(value: &str) -> Result<Self> {
// 例: "SIP/2.0/UDP 192.0.2.1:5060;branch=z9hG4bK123;rport"
let mut parts = value.splitn(2, ' ');
let proto = parts
.next()
.ok_or_else(|| anyhow!("Via missing protocol"))?
.trim();
let rest = parts
.next()
.ok_or_else(|| anyhow!("Via missing sent-by"))?
.trim();
let mut sent_by = rest.to_string();
let mut params = Vec::new();
if let Some(idx) = rest.find(';') {
sent_by = rest[..idx].trim().to_string();
let param_str = &rest[idx + 1..];
params = super::name_addr::parse_params(param_str);
}
Ok(ViaHeader {
sent_protocol: proto.to_string(),
sent_by,
params,
})
}
fn to_header(&self) -> SipHeader {
let mut value = format!("{} {}", self.sent_protocol, self.sent_by);
for (k, v) in &self.params {
if v.is_empty() {
value.push_str(&format!(";{}", k));
} else {
value.push_str(&format!(";{}={}", k, v));
}
}
SipHeader::new(Self::NAME, value)
}
}
入出力のI/F(現行)
このプロジェクトのMVPでは、SIPモジュールのI/Fは以下の形になっています。
transport → sip
SipInput { src, data: Vec<u8> }-
transport/packet.rsで UDP 受信したバイト列をそのまま渡す -
BytesではなくVec<u8>を採用している(現状の実装方針)
sip → session
-
SipEventIncomingInvite{call_id, from, to, offer}AckByeTransactionTimeoutUnknown
- 元のリクエスト構造体は渡さず、主要情報を抽出したイベントにしている
session → sip
SessionOut::SipSend180SessionOut::SipSend200{ answer }-
SessionOut::SipSendBye200
などをSipCore::handle_session_outが受けて送信する
sip → transport
SipTransportRequest { dst, src_port, payload }-
payloadはbuilderが生成するVec<u8>
パーサ/ビルダの実装粒度(MVPの割り切り)
パース方針
- 「よくあるSIPを読む」路線
- CRLF/折り返し対応
- 必須ヘッダの存在チェックは
core_headersで実施 -
Content-Lengthを見て body を拾う簡易形
200 OK(INVITE)のSDP
-
PCMU / 8000Hz固定(MVP仕様) -
response_final_with_sdpが担当 -
Contactはsip:rustbot@{contact_ip}:{sip_port}を設定
個別ヘッダの構造化
-
CSeqやName-Addr/Viaの構造化はprotocols/に分離
トランザクション/再送の扱い(現行)
UDPでのSIPは再送が普通に起きるので、MVPでは「最後の応答キャッシュ」で最低限耐える作りにしています。
INVITE
- Proceedingで1xxを記憶
- Completedで最終応答を記憶
- 2xx送信時は Terminated
- 再送INVITEには最後の応答を返す(
InviteTxAction::Retransmit)
ACK
- Completed状態で受信すると Terminated に遷移
※ Timer I等は未実装(MVPの割り切り)
非INVITE
- 最終応答をキャッシュし、期限内は再送に応答
- Timer J相当の32秒を
transaction.rsで保持
prune(期限切れ処理)
-
SipCore::prune_expiredで期限切れを捌き、必要ならSipEvent::TransactionTimeoutを上位へ通知
たたき記事(理想像)との差分 / 残TODO
現状はMVPとして成立している一方で、以下は今後の改善候補です。
- I/Fが
SipSend/Bytesベースではなく、Vec<u8>と transportキュー直結になっている - Timers用の独立ファイルは無く、再送期限は
transaction.rsとmod.rsに内包 - sessionへ渡すイベントは「主要フィールドのみ」で、元のリクエスト構造体は渡していない
- PRACK/UPDATE/Session-Timer等は未対応(MVPスコープ通り)
まとめ(現状のMVP達成度)
-
INVITE/ACK/BYEの受信 -
180/200の送信 -
BYEに200を返す - UDP再送に対して「最後の応答キャッシュ」で最低限の耐性を持つ
…という SIP UASの最小要件は実装済みです。