3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Rustで通知システムを作る

Last updated at Posted at 2023-07-28

伝統的には、ウェブページが新たなデータを受け取るために、サーバーにリクエストを送信しなければなりません。すなわち、ページがサーバーからデータを要求します。サーバー送信イベントによって、サーバーがウェブページにメッセージをプッシュ送信することにより、サーバーからウェブページへ新たなデータをいつでも送信することができます。入ってくるメッセージは、ウェブページ内の イベントおよびデータ として扱うことができます。(MDN Web docs - サーバ送信イベント)

サーバーからクライアントに通知したい!

基本サーバークライアント方式では、クライアントがリクエストを送り、サーバーはそのリクエストの内容に応じた処理を行ってからその結果をレスポンスとしてクライアントに送ります。

じゃあ・・・・・・サーバー側で更新があったらどうするの?
クライアントのデータや状態はサーバーにリクエスト→レスポンスで更新されます。なのでクライアントはサーバーの状態を知ることは(リクエストを送らない限り)ないのです。

いくつか方法はありますが、

  • 定期的にクライアントがリクエストを送る → もちろんサーバーの状態を最新のものを取ってくれば有効なのでありではありますが、無駄撃ちする場合もあるので、ちょっと非効率
  • WebSocketとかの双方向通信 → もちろんありな手段だけど、通信プロトコルをわざわざ変えないといけないので実装が少しつらい、便利だけど小規模なアプリケーションには過剰かも
  • 今日の本題:Server-Sent-Events → サーバーが更新を通知すればいいのよ!

今日はそんな話。

つくったもの

Tokio

今回のSSEを実装するために必要なクレートがTokio。非同期I/Oや非同期プログラミングを行う際のサポートをしてくれる便利なランタイム。いろんな非同期処理のできるクレートが依存しているやべーやつ。
この記事で語るにはちょっと余白が足りないので()ドキュメント読んでください(無理解の言い訳)

実装について

イベント処理

struct Events {
    clients: Arc<Mutex<HashMap<u64, tokio::sync::mpsc::UnboundedSender<String>>>>,
    last_id: AtomicU64,
}

impl Events {
    pub fn new() -> Self {
        Self {
            clients: Arc::new(Mutex::new(HashMap::new())),
            last_id: AtomicU64::new(0),
        }
    }

    pub async fn subscribe(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
        let id = self.last_id.fetch_add(1, Ordering::SeqCst);
        self.clients.lock().await.insert(id, tx);
        rx
    }

    pub async fn notify(&self, msg: String) {
        let mut clients = self.clients.lock().await;
        clients.retain(|_, sender| sender.send(msg.clone()).is_ok());
    }
}

ここで扱うのはtokio::sync::mpscモジュール。 Multiple Producer, Single Consumerの略称で、非同期タスク間(今回はサーバとクライアント)のメッセージの送受信に関するやり取りをサポート。

  • async fn subcribe(&self) -> tokio::sync::mpsc::UnboudedReceiver クライアントとサーバのチャネルを確保する関数。sub(下に)-scribe(書く)でサインだから定期購読とかの意味になる。
  • async fn notify(&self, msg: String) 購読している全クライアントに引数のmsgの内容を通知する。

RustのSSEの実装はこの2つのやり取りを定義しておけば最低限はなんとかなる。多分。

サーバ側の処理(ハンドラの定義)

  • 今回は簡単に/pingにアクセスされたのを擬似的なサーバの更新として処理。
    なので同じクライアントでやってるように見えるけど内部処理で言えば別物。
    フレームワークは例によってAxumを使います。
  • SSEであることを明示するため、レスポンスヘッダにはCONTENT_TYPE: text/event-streamを追記すること。
async fn sse(events: Arc<Events>) -> impl IntoResponse {
    let rx = events.subscribe().await;
    let stream = UnboundedReceiverStream::new(rx).map(|msg| Ok::<_, hyper::Error>(format!("data: {}\n\n", msg)));

    
    Response::builder()
        .header(header::CONTENT_TYPE, "text/event-stream")
        .body(Body::wrap_stream(stream))
        .unwrap()
}

async fn ping(events: Arc<Events>) -> &'static str {
    events.notify("pong!".to_string()).await;
    "sent ping!"
}

サーバ側の処理(ルーティング)

  • SSEの性質上、イベントを購読するサーバとクライアントの状態はまとめて管理されるべきなので、一つのデータに対して複数の参照でアクセスする必要がある。そこで、once_cell::sync::Lazy<Arc<T>>を使うことで一度だけ初期化され変更されないこととする。この参照をcloneすることで、一つのイベントインスタンスに対して複数の参照で安全にアクセスすることができる。
static EVENTS: once_cell::sync::Lazy<Arc<Events>> = once_cell::sync::Lazy::new(|| Arc::new(Events::new()));

// fn main
    let app = Router::new()
        .route("/events", get({
            let events = Arc::clone(&EVENTS);
            move || sse(Arc::clone(&events))
        }))
        .route("/ping", get({
            let events = Arc::clone(&EVENTS);
            move || ping(Arc::clone(&events))
        }))
        // add cors any
        .layer(
            CorsLayer::new()
                .allow_origin(Any)
        );

クライアントの処理(SSE購読とか)

クライアントにおけるイベント購読がvar source = new EventSource("url")
通知のメッセージを受け取るのがsource.onmessage

HTML抜粋版

<!DOCTYPE html>
<html>
<body>

<h1>Server-Sent Events Demo</h1>
<div id="result"></div>

<script>
var source = new EventSource("http://localhost:3000/events");

source.onmessage = function(event) {
  document.getElementById("result").innerHTML += event.data + "<br>";
};

function pingServer() {
  fetch('http://localhost:3000/ping')
    .then(response => response.text())
    .then(data => console.log(data));
}
</script>

<button onclick="pingServer()">Ping Server</button>

</body>
</html>

このクライアントはトチ狂ってRustで書かれているのでサンプルは全部Cargoで動かせます。

client.rs
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server, StatusCode};
use hyper::header::CONTENT_TYPE;
use std::convert::Infallible;

const HTML: &'static str = r#"
<!DOCTYPE html>
<html>
<body>

<h1>Server-Sent Events Demo</h1>
<div id="result"></div>

<script>
var source = new EventSource("http://localhost:3000/events");

source.onmessage = function(event) {
  document.getElementById("result").innerHTML += event.data + "<br>";
};

function pingServer() {
  fetch('http://localhost:3000/ping')
    .then(response => response.text())
    .then(data => console.log(data));
}
</script>

<button onclick="pingServer()">Ping Server</button>

</body>
</html>
"#;

async fn handle_request(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
    Ok(Response::builder()
        .status(StatusCode::OK)
        .header(CONTENT_TYPE, "text/html")
        .body(Body::from(HTML))
        .unwrap())
}

#[tokio::main]
async fn main() {
    let make_svc = make_service_fn(|_conn| {
        async { Ok::<_, Infallible>(service_fn(handle_request)) }
    });

    let addr = ([127, 0, 0, 1], 3001).into();
    
    let server = Server::bind(&addr).serve(make_svc);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

今回は通信の最低限を実装したので、正直SSEらしさがあるかと言うのは若干不安。
(クライアントを2本以上同時に起動したらそれっぽくなりそう)
実際にAPIの通知システムとして実装した例もあるのでそちらも興味があれば読んでください(なお可読性は保障できません)

おまけ: Nginxかませる場合

[error] 23#23: *2 upstream timed out (110: Connection timed out) while reading upstream, client: 172.22.0.1, server: localhost, request: "GET /events HTTP/1.1", upstream: "http://172.22.0.3:3000/events", host: "localhost:8081", referrer: "http://localhost:5174/"

SSEの実装をしたサーバをプロキシサーバと一緒に動かすと上みたいなエラーが出るかもしれない。
サービスを走らせるとき、Nginxなどのプロキシサーバを使って管理する場合、このSSEの実装でもうひと工夫しないといけない。

それはNginxのX-Accelが原因で、これ自体はバックエンドのレスポンスのヘッダーで決定される場所への内部リダイレクトを可能にする、つまりバックエンドでコンテンツ以外の認証などの処理をしたあとにコンテンツ送信を行うようにするための仕組みで、大量のデータ送信などの際にはバッファリングをしてくれるためNginxの優しさとも言える。

ただ、SSEのような長時間接続を保持するような通信と相性は悪い。接続のためのデータをバッファリングしてしまい、購読するために許容される待ち時間を越えるのでタイムアウトが起きてしまう。

なので、購読のためのエンドポイントのみバッファリングを無効化することで解決する。
レスポンスヘッダにX-Accel-Buffering: noとすればOK。

    let rx = events.subscribe().await;
    let stream = UnboundedReceiverStream::new(rx).map(|msg| Ok::<_, hyper::Error>(format!("data: {}\n\n", msg)));

    let response = Response::builder()
        .header("Content-Type", "text/event-stream")
        .header("X-Accel-Buffering", "no") // add here
        .body(Body::wrap_stream(stream))
        .unwrap();

ただこの処理はあくまでNginx固有の処理のため、他のプロキシサーバを扱う際は「hoge sse config」などで調べてたり、バッファリングに関するリファレンスを読んでから実装しましょう。

サーバ全コード

use axum::{routing::get, Router, response::IntoResponse};
use tower_http::cors::{Any, CorsLayer};
use std::sync::{Arc, atomic::{AtomicU64, Ordering}};
use tokio::sync::Mutex;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::StreamExt;

use std::collections::HashMap;
use hyper::Body;
use http::{Response, header};

struct Events {
    clients: Arc<Mutex<HashMap<u64, tokio::sync::mpsc::UnboundedSender<String>>>>,
    last_id: AtomicU64,
}

impl Events {
    pub fn new() -> Self {
        Self {
            clients: Arc::new(Mutex::new(HashMap::new())),
            last_id: AtomicU64::new(0),
        }
    }

    pub async fn subscribe(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
        let id = self.last_id.fetch_add(1, Ordering::SeqCst);
        self.clients.lock().await.insert(id, tx);
        rx
    }

    pub async fn notify(&self, msg: String) {
        let mut clients = self.clients.lock().await;
        clients.retain(|_, sender| sender.send(msg.clone()).is_ok());
    }
}

static EVENTS: once_cell::sync::Lazy<Arc<Events>> = once_cell::sync::Lazy::new(|| Arc::new(Events::new()));

async fn sse(events: Arc<Events>) -> impl IntoResponse {
    let rx = events.subscribe().await;
    let stream = UnboundedReceiverStream::new(rx).map(|msg| Ok::<_, hyper::Error>(format!("data: {}\n\n", msg)));

    
    Response::builder()
        .header(header::CONTENT_TYPE, "text/event-stream")
        .body(Body::wrap_stream(stream))
        .unwrap()
}

async fn ping(events: Arc<Events>) -> &'static str {
    events.notify("pong!".to_string()).await;
    "sent ping!"
}

#[tokio::main]
async fn main() {
    let app = Router::new()
        .route("/events", get({
            let events = Arc::clone(&EVENTS);
            move || sse(Arc::clone(&events))
        }))
        .route("/ping", get({
            let events = Arc::clone(&EVENTS);
            move || ping(Arc::clone(&events))
        }))
        // add cors any
        .layer(
            CorsLayer::new()
                .allow_origin(Any)
        );

    
    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}

依存関係

Cargo.toml
[dependencies]
axum = "0.6.19"
futures = "0.3.17"
tokio = { version = "1.14.0", features = ["full"] }
tokio-stream = "0.1.6"
hyper = "0.14.14"
http = "0.2.4"
once_cell = "1.8.0"
tower = "0.4.12"
tower-http = { version = "0.4.1", features = ["full"] }
3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?