この記事はRust Advent Calendar 2019 3日目の記事です。
概要
actix-webを使った動画のストリーミング配信の実装方法を紹介します。ストリーミング配信にはいくつかのプロトコルがあるようなのですが、ここではIPカメラでも使われており実装も簡単なMJPEG over HTTP
と呼ばれる手法を使ってみます。
これができるとカメラで撮影した動画をリアルタイムでブラウザなどに表示させられるようになります。
動機
もともとC++で実装していたのcam2webを見かけてそれをRustで書いてみようと思ったのがきっかけで始めました。せっかくHTTP使うのならフレームワークも使ってみたいので、自転車本にも紹介されていたactix-webを選んでいます。
思った以上に苦労してしまいましたが、ひとまず動かせるようになったのでここで紹介記事書いてみます。
MJPEG over HTTPとは
HTTP応答によりサーバーが任意のタイミングで複数の文書を返し、 紙芝居的にレンダリングを切り替えさせるものです。この形式で送れる文章にもいくつかのバリエーションがあるのですが、こちらによると、特にHTTP上でmultipart/x-mixed-replace
によってJPEGを送信することを、MJPEG over HTTP や MJPEG と呼ぶそうです。
HTTPとしてのイメージは下記のようになります。いくつか省略していますが、最初に2行がレスポンスヘッダで、その後に--myboundary
から始まる各文章(ブロック)がJPEG画像一枚一枚に対応します。
HTTP/1.1 200 OK
content-type: multipart/x-mixed-replace; boundary=myboundary
--myboundary
Content-Length:74429
Content-Type:image/jpeg
(JPEG画像)
--myboundary
Content-Length:74429
Content-Type:image/jpeg
(JPEG画像)
MIMEタイプの参考:multipart/x-mixed-replace
HTTPのイメージはこちらを参考にしました:https://wiki.suikawiki.org/n/multipart%2Fx-mixed-replace#section-例
actix-webでstreamingを扱うサンプルを求めて
ここを調べるのが今回一番手間取りましたacitx-webにはstreamingも扱えるとあるのですが、実際にそれを利用した例がなかなか見つかりませんでした。
また、actix-web streaming
でググるとこちらのissueが見つかりました。が、ヒントしかなくてコードがない。
そこでactix-webを勉強していればそのうち理解できるかなと期待して、こちらのexamplesをしばらく触っていました。すると、その中にserver-send-eventsというものがありました。websocketかと思ったのですが、今回のとは異なるMIMEタイプでのストリーム処理でした。
fn new_client(broadcaster: Data<Mutex<Broadcaster>>) -> impl Responder {
let rx = broadcaster.lock().unwrap().new_client();
HttpResponse::Ok()
.header("content-type", "text/event-stream")
.no_chunking()
.streaming(rx)
}
今回はこれをもとに実装していくことにします。
実装
少し前置きが長くなりましたが、順番に実装していきます。
大まかな構成ですが、actix-webのメインが画像のキャプチャ、JPEG変換などを行うBroadcasterを保持する形をとっています。
このBroadcasterは名前の通り複数のクライアントに一斉に送信する作りとなっています。新しいリクエストがあったら、クライアントをこのBroadcasterに登録することで、Multi-clientにも対応しているわけです。
Webサーバーの設定
Broadcaster::create()
で画像配信用の構造体を初期化しておきます。この返り値はData<Mutex<Broadcaster>>
という型を持っており、.register_data()
を通してサーバーアプリケーション全体で共有することができます。
ルートへアクセスがあったらデフォルトのhtmlを返しておくようにします。今回は実行ファイル単体で動くようにhtmlの中身を埋め込んでしまいます。本記事の一番下にサンプル置いていますが、重要ではないのでここでは割愛。
use actix_web::web::Data;
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use std::sync::Mutex;
mod broadcaster;
use broadcaster::Broadcaster;
fn main() {
env_logger::init();
let data = Broadcaster::create();
HttpServer::new(move || {
App::new()
.register_data(data.clone())
.route("/", web::get().to(index))
.route("/streaming", web::get().to(new_client))
})
.bind("0.0.0.0:8080")
.expect("Unable to bind port")
.run()
.unwrap();
}
/// Return a default html
fn index() -> impl Responder {
let content = include_str!("index.html");
HttpResponse::Ok()
.header("Content-Type", "text/html")
.body(content)
}
streamingレスポンス
MPEG over HTTPを実装するにあたって、ここからが大事なところです。
/streaming
側へのレスポンスは、Motion JPEGを開始する準備だけしておきます。先ほど示したHTTPのイメージのうちの、ヘッダ部分に相当する部分だけということです。
/// Register a new client and return a response
fn new_client(broadcaster: Data<Mutex<Broadcaster>>) -> impl Responder {
let rx = broadcaster.lock().unwrap().new_client();
HttpResponse::Ok()
.header("Cache-Control", "no-store, must-revalidate")
.header("Pragma", "no-cache")
.header("Expires", "0")
.header("Connection", "close")
.header(
"Content-Type",
format!("multipart/x-mixed-replace; boundary={}", BOUNDARY),
)
.no_chunking()
.streaming(rx) // now starts streaming
}
new_client()
の実装はserver-send-events、MIMEタイプなどのheaderはcam2webをそれぞれ参考にしました。
BroadcasterとClient
次に画像を配信するBroadcaster周りを作っていきます。Broadcasterの各Clientにはchannelを通してデータを送るようにするため、Broadcaster自身は(channelの片方である)Senderをvectorに保持しておきます。
/// Hold clients channels
pub struct Broadcaster {
clients: Vec<Sender<Bytes>>,
}
逆にClientは(channelのもう片方である)Recieverをラップしているだけです。また、このClientは最終的に.streaming()
に渡すので、Stream traitを実装しておきます。この時、actix-web用にErrorタイプを指定しておく必要があるようです。
// wrap Receiver in own type, with correct error type
pub struct Client(Receiver<Bytes>);
impl Stream for Client {
type Item = Bytes;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll().map_err(ErrorInternalServerError)
}
}
Broadcasterの詳細
次にBroadcasterの実装に入ります。この構造体はcreate()
で初期化します。同時にカメラの初期化もやってしまいます。
impl Broadcaster {
fn new() -> Self {
Broadcaster {
clients: Vec::new(),
}
}
pub fn create() -> Data<Mutex<Self>> {
// Data ≃ Arc
let me = Data::new(Mutex::new(Broadcaster::new()));
Broadcaster::spawn_capture(me.clone());
me
}
...
}
新しいでクライアントからの要求があったときは、.new_client()
を通してclients
にSenderを追加します。Client(rx)
はStream traitを実装してあり、最初のほうに出てきたactix-webの.streaming()
に渡す構造体です。
impl Broadcaster {
...
pub fn new_client(&mut self) -> Client {
let (tx, rx) = channel(100);
self.clients.push(tx);
Client(rx)
}
...
}
JPEG画像の生成
ここまででactix-webのストリーミングにデータを流し込む準備が一通りできました。最後に
- キャプチャ
- JPEG変換とメッセージブロックへの整形
- 送信
を見ます。
キャプチャ
Windows用にはescapiを使ってみました。macos用にはopencvを使っています。このあたりもいろいろ躓いたのですが、それはどこかのネタにしようと思います。
impl Broadcaster {
...
#[cfg(target_os = "windows")]
fn spawn_capture(me: Data<Mutex<Self>>) {
const WIDTH: u32 = 320;
const HEIGHT: u32 = 240;
let camera =
escapi::init(0, WIDTH, HEIGHT, FRAME_RATE).expect("Could not initialize the camera");
let (width, height) = (camera.capture_width(), camera.capture_height());
std::thread::spawn(move || loop {
let pixels = camera.capture();
let frame = match pixels {
Ok(pixels) => {
// Lets' convert it to RGB.
let mut buffer = vec![0; width as usize * height as usize * 3];
for i in 0..pixels.len() / 4 {
buffer[i * 3] = pixels[i * 4 + 2];
buffer[i * 3 + 1] = pixels[i * 4 + 1];
buffer[i * 3 + 2] = pixels[i * 4];
}
buffer
}
_ => {
warn!("failed to capture");
vec![0; width as usize * height as usize * 3]
}
};
let msg = Broadcaster::make_message_block(&frame, WIDTH, HEIGHT);
me.lock().unwrap().send_image(&msg);
});
}
...
}
注:このキャプチャループは終了方法を実装していません。これのせいでactix-webをctrl-Cで終了させるのを妨げている気がしてなりませんが、まだ調べ切れていません。
loopの最後のほうでキャプチャした画像からメッセージを作って、それを送っていますね。次にそれを見てみます。
JPEG変換とメッセージブロックへの整形
メッセージのブロックはContent-LenghthとContent-Typeのヘッダー相当の部分 + JPEG画像からなります。ここではImage crateのJPEGEncoderを使って変換して、それをbodyにセットします。
impl Broadcaster {
...
fn make_message_block(frame: &[u8], width: u32, height: u32) -> Vec<u8> {
let mut buffer = Vec::new();
let mut encoder = image::jpeg::JPEGEncoder::new(&mut buffer);
encoder
.encode(&frame, width, height, image::ColorType::RGB(8))
.unwrap();
let mut msg = format!(
"--{}\r\nContent-Length:{}\r\nContent-Type:image/jpeg\r\n\r\n",
super::BOUNDARY, buffer.len()
)
.into_bytes();
msg.extend(buffer);
msg
}
...
}
送信
ここまでくれば後はchannelに流しこむだけです。送信のたびにクライアントに送信できたかチェックしていますが、もう少し頻度下げていいのかとも思います。
impl Broadcaster {
...
fn send_image(&mut self, msg:&[u8]) {
let mut ok_clients = Vec::new();
for client in self.clients.iter() {
let result = client.clone().try_send(Bytes::from(&msg[..]));
if let Ok(()) = result {
ok_clients.push(client.clone());
}
}
self.clients = ok_clients;
}
...
}
ストリーミングのチェック
お疲れさまでした。ここまで出来ればあとはブラウザから127.0.0.1:8080/streaming
にアクセスすれば動画っぽく見えると思います。
これ以外にもルートにアクセスされたらデフォルトのHTMLを返すようにしてみます。とは言っても、内容は何にもないですが。
<html lang="en">
<head>
</head>
<body>
<img src="/streaming" alt="Live streaming" width="320">
</body>
</html>
キャプチャ周りの残念なところ
- キャプチャのパラメータが手抜きハードコーディング><
- カメラ周りの終了処理が・・・
- 残念ながらspawnしたthreadをactix-webから終了させる方法がすぐには分かりませんでした。
まとめ
actix-webに関するサンプルコードがなかなか見つからずに苦労はしたものの、出来上がったものをコード量が少なくて驚きました。その分、各ステートメントの密度が高くて初心者にはきついです。そもそもWeb周りの知識がなさ過ぎたのもありますが・・・。
これ一個動かしておくと、複数のブラウザから同時に見ることができます。ブラウザをたくさん開いて動画を見せてあげると、なんと息子君が楽しそうに眺めてくれていたので僕は満足です。
コードはこちらにおいています。手持ちのわずかな環境でしか動作チェックできていないので、なかなか動かないかもしれませんが、遊んでもらえると幸いです。
引用資料
用語等について
実装の参考にさせていただいたもの。