3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

actix-webのhttpサーバーをアプリケーションに埋め込む

Last updated at Posted at 2021-01-02

環境

windows 10
rustc 1.49.0
actix_web 3

TL;DR

#[actix_web::main]
async fn run(
    addr: String,
    tx_start: mpsc::Sender<Result<(),String>>,
    rx_stop:mpsc::Receiver<()>,
) {
    let server = HttpServer::new(||
        App::new()
            .route("/", web::get().to(greet))
    ).bind(addr);

    let server = match server {
        Ok(s) => s.run(),
        Err(_) => {
            tx_start.send(Err("can't bind ip adrres".to_owned())).unwrap();
            return;
        }
    };
    tx_start.send(Ok(())).unwrap();

    rx_stop.recv().unwrap();
    server.stop(true).await;
}

ソースはgithubにあります
git clone https://github.com/taC-h/actix_embedding.git

なぜ

httpserverを埋め込みたい
だけど,actix-webのサンプルはどれも↓のように#[actix_web::main]やら#[actix_web::rt]やら書いてあるのでそのままでは移植できない

use actix_web::{web, App, HttpServer, Responder, HttpResponse};

async fn greet() -> impl Responder {
    HttpResponse::Ok().body("hello world")
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .route("/", web::get().to(greet))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

マクロ展開する

actix_web::mainのソースはこんな感じ

#[proc_macro_attribute]
pub fn main(_: TokenStream, item: TokenStream) -> TokenStream {
    use quote::quote;

    let mut input = syn::parse_macro_input!(item as syn::ItemFn);
    let attrs = &input.attrs;
    let vis = &input.vis;
    let sig = &mut input.sig;
    let body = &input.block;
    let name = &sig.ident;

    if sig.asyncness.is_none() {
        return syn::Error::new_spanned(sig.fn_token, "only async fn is supported")
            .to_compile_error()
            .into();
    }

    sig.asyncness = None;

    (quote! {
        #(#attrs)*
        #vis #sig {
            actix_web::rt::System::new(stringify!(#name))
                .block_on(async move { #body })
        }
    })
    .into()
}

うーん,展開後のコードは普通の関数っぽい

確認のためにマクロ展開コマンドcargo expandを使う
インストールには
cargo +nightly install cargo-expand

rustcのマクロ展開オプションと違ってハイライトがつくので見やすい
ということで↓のコードを展開

use actix_web::{web, App, HttpServer, Responder, HttpResponse};

async fn greet() -> impl Responder {
    HttpResponse::Ok().body("hello world")
}

#[actix_web::main]
async fn run() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .route("/", web::get().to(greet))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

fn main() {
    run().unwrap();
}

展開後

#![feature(prelude_import)]
#[prelude_import]
use std::prelude::v1::*;
#[macro_use]
extern crate std;
use actix_web::{web, App, HttpServer, Responder, HttpResponse};
async fn greet() -> impl Responder {
    HttpResponse::Ok().body("hello world")
}
fn run() -> std::io::Result<()> {
    actix_web::rt::System::new("run").block_on(async move {
        {
            HttpServer::new(|| App::new().route("/", web::get().to(greet)))
                .bind("127.0.0.1:8080")?
                .run()
                .await
        }
    })
}
fn main() {
    run().unwrap();
}

大丈夫そう
別スレッドで実行すればいい感じ

use std::thread;
fn block(){
    let mut s = String::new();
    let _ = std::io::stdin().read_line(&mut s).unwrap();
}

fn main() {
    thread::spawn(|| run.unwrap(););
    block();
}

mpscで開始,終了制御

さらに外部から開始のエラーハンドリングと,終了ができると良さげ
公式サンプルの axtix/examples/shutdown-server を見ながら変形

まず終了処理の実装から

use std::sync::mpsc;

#[actix_web::main]
async fn run(//引数の追加
    addr: String,
    rx: mpsc::Receiver<()>,
) -> std::io::Result<()> {
    let server = HttpServer::new(||
        App::new()
            .route("/", web::get().to(greet))
    ).bind(addr)?
    .run();
    //ここから追加分
    rx.recv().unwrap();
    server.stop(true).await;
    Ok(())
}

fn main() {
    let addr = "127.0.0.1:8080";
    let (tx, rx) = mpsc::channel();
    let handle = thread::spawn(move || {
        run(addr.to_owned(), tx_start, rx_stop)
    });

    println!("start server http://{}", addr);
    block();
    tx.send(()).unwrap();
    handle.join().unwrap().unwrap();

    println!("stopped");
    block();
}

サーバー自体を別スレッドで実行するのでシグナルの送信はメインスレッドから
サーバーアドレスもmoveして指定できるように
ちなみに&strで受けようとすると,actix_web::mainのライフタイム境界エラーが出る

enterを押せばtxからシグナルを送信してrx.recv()のブロッキングが解除される仕組み
サーバー自体はrun()した時点で稼働している

次に開始のエラーハンドリング

#[actix_web::main]
async fn run(
    addr: String,
    tx_start: mpsc::Sender<Result<(),String>>,
    rx_stop:mpsc::Receiver<()>,
) /*-> std::io::Result*/ {
    let server = HttpServer::new(||
        App::new()
            .route("/", web::get().to(greet))
    ).bind(addr);

    let server = match server {
        Ok(s) => s.run(),//構築成功
        Err(_) => {//構築失敗
            tx_start.send(Err("can't bind ip adrres".to_owned())).unwrap();
            return;
        }
    };
    tx_start.send(Ok(())).unwrap();//構築成功

    rx_stop.recv().unwrap();
    server.stop(true).await;
    //Ok(())
}

fn main() {
    let addr = "127.0.0.1:8080";
    let (tx_stop, rx_stop) = mpsc::channel();//リネーム
    let (tx_start, rx_start) = mpsc::channel();//追加
    let handle = thread::spawn(move || {
        run(addr.to_owned(), tx_start, rx_stop)
    });

    rx_start.recv().unwrap().unwrap_or_else(|e| {//サーバー構築失敗時処理
        eprintln!("error: {}",e);//標準エラー出力へ
        std::process::exit(1);
    });

    println!("start server http://{}", addr);
    block();

    tx_stop.send(()).unwrap();
    handle.join().unwrap();

    println!("stopped");
    block();
}

?を使わずmatchで処理することでtx_start.send(/**/)でエラーを送信
成功した場合は空の値を送信してrx_start.recv()のブロッキングのみ解除する仕組み

runの戻り値はhandle::join()した時まで取り出せないから必要ない
?を消したので空にできる

最終コード

use actix_web::{web, App, HttpServer, Responder, HttpResponse};
use std::{
    sync::mpsc,
    thread,
};

fn block(){
    let mut s = String::new();
    let _ = std::io::stdin().read_line(&mut s).unwrap();
}

async fn greet() -> impl Responder {
    HttpResponse::Ok().body("hello world")
}

#[actix_web::main]
async fn run(
    addr: String,
    tx_start: mpsc::Sender<Result<(),String>>,
    rx_stop:mpsc::Receiver<()>,
) {
    let server = HttpServer::new(||
        App::new()
            .route("/", web::get().to(greet))
    ).bind(addr);

    let server = match server {
        Ok(s) => s.run(),
        Err(_) => {
            tx_start.send(Err("can't bind ip adrres".to_owned())).unwrap();
            return;
        }
    };
    tx_start.send(Ok(())).unwrap();//start成功シグナルを送信

    rx_stop.recv().unwrap();//stopシグナルを受信
    server.stop(true).await;//終了を待つ
}

fn main() {
    let addr = "127.0.0.1:8080";
    let (tx_stop, rx_stop) = mpsc::channel();//サーバーstopシグナル用
    let (tx_start, rx_start) = mpsc::channel();//同start成功シグナル用
    let handle = thread::spawn(move || {
        run(addr.to_owned(), tx_start, rx_stop)
    });

    rx_start.recv().unwrap().unwrap_or_else(|e| {//サーバー構築失敗時処理
        eprintln!("error: {}",e);//標準エラー出力へ
        std::process::exit(1);
    });

    println!("start server http://{}", addr);
    block();

    tx_stop.send(()).unwrap();//stopシグナルを送信する
    handle.join().unwrap();//終了を待つ

    println!("stopped");
    block();//終了の確認用
}

参考資料

actix_web::mainのソース

公式サンプル(シャットダウン編)

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?