この記事はRust Advent Calendar 2021 5日目の記事です。
(12/26更新。色々と理解が間違っていた部分もあったので修正しました。)
なぜやろうと思ったか
最近メタバースで盛り上がってますよね。
メタバースが普及したら重要になるのは、そうリアルタイム通信。リアルタイム通信といえばWebSocket。
さらにリアルタイム通信といえば速度、速度といえばRust。さらにRustといえばActix。
というわけで、Rust + ActixでWebsocketを使ったチャットサーバーを勉強のために立ててみました。
とは言っても公式のサンプルを真似して書きながら理解していった感じですが。
参考にしたのはこのあたりです。
それでは、いきましょう〜
目次
概要
WebSocketとは
サーバーとクライアントが双方向通信を低コストで行うための通信プロトコル。
どうやってWebSocket通信をするかと言うと、一度Http通信を行い、その通信をWebSocketにアップグレードすると言った流れでWebSocket通信を確立することができます。ハンドシェイクって言ったりします。
Actorモデルとは
アクターモデル(英: actor model)とは、1973年、カール・ヒューイット、Peter Bishop、Richard Steiger が発表した並行計算の数学的モデルの一種[1]。
ざっくり言うと並行計算を効率的に行う数学的モデル。
Actix-webは内部的にこのアクターモデルを採用していて、Actixでチャットサーバーを作ろうとするとこのアクターについて理解しておく必要があります。
アクターモデルには登場人物が2人います。
- 主役のアクター
- パイプ役のメッセージ
アクターはシステムに複数人存在していて、そのアクター同士がメッセージを介してやりとりしあっているイメージです。
ちょうど最近Web3.0の分散システムに似ていますね(というか応用できたりするらしいのですが)。
Rustではメッセージは構造体としてあらかじめどんなデータをやり取りするのか、レスポンスの型は何かというのを決めることができます。
詳しい実装は次の章で説明します。
他にもActorモデルについて詳しく知りたい方はこちらの連載がとても勉強になります。
「Akkaで学ぶアクターモデル入門」連載一覧
実装
さて、実装についてですが、全体のファイル構成は以下の通りです。
.
├── chat_server.rs
├── config.rs
├── handlers.rs
├── lib.rs
├── main.rs
├── models
│ ├── messages.rs
│ ├── mod.rs
│ ├── rooms.rs
│ ├── subscribes.rs
│ └── users.rs
├── schema.rs
├── utils.rs
└── ws_chat_session.rs
特に肝となっているのが、ws_chat_session.rs
とchat_server.rs
です。
それぞれws_chat_session.rs
はWebSocketのセッションを管理する?アクターサーバーのコードがかかれていて、chat_server.rs
はチャットが投稿されたり、ユーザーがチャットルームに入った時に行う処理を担っています。
さて、そんなアクターモデルを使ったチャットサーバーをどのように実装するかと言うと、
めちゃくちゃはしょって次のコードのようになります。
pub struct WsChatSession {
pub id: usize, // Session ID (Unique)
pub hb: Instant, // Heart Beat
pub room: String, // Joined Room name
pub name: Option<String>, // Peer name ?
pub addr: Addr<ChatServer>, // Address of ChatServer (Actor)
}
impl Actor for WsChatSession {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, context: &mut Self::Context) {
self.hb(context);
let addr = context.address();
self.addr
.send(chat_server::Connect {
addr: addr.recipient()
})
.into_actor(self)
.then(|res, act, ctx| {
match res {
Ok(res) => act.id = res,
_ => ctx.stop(),
}
fut::ready(())
})
.wait(context);
}
fn stopping(&mut self, _: &mut Self::Context) -> actix::Running {
...
}
}
#[derive(Message)]
#[rtype(usize)]
pub struct Connect {
pub addr: Recipient<Message>
}
pub struct ChatServer {
sessions: HashMap<usize, Recipient<Message>>,
rooms: HashMap<String, HashSet<usize>>,
rng: ThreadRng,
visitor_count: Arc<AtomicUsize>,
}
impl Handler<Connect> for ChatServer {
type Result = usize;
fn handle(&mut self, actor_message: Connect, _: &mut Context<Self>) -> Self::Result {
...
}
}
ここではChatServer
というアクターサーバーを実装していて、Connect
型のメッセージキューを受け付けられるようにしています。Connect
型のメッセージキューを受け取った場合のハンドラをimpl Handler<Connect> for ChatServer
以降で実装しています。)
下半分はアクターにやってもらう処理(ハンドラ)を定義しています。
この辺りの実装は参考にしたRustのActixで、シンプルなWebSocket例を書いたが分かりやすかったので参考にするといいのかなと思います。
ルームの過去の会話を残せるようにする
さて、ここまでは既存のプロジェクトを写経しながら考察してきたのですが、
それだけでは面白くないということで、少し自分なりにアレンジしてみました。
他の記事では瞬間的なチャットやWebSocketのサーバーのみで、ルームの過去の会話を残したい!と思ったので、データベースに保存していこうと思い立ちました。
使用するのはちょうど仕事で使っていたDieselというRustのORMとMySQLです。
データ構造
本当はユーザーログインを実装して、ルームに登録(複数可能)して、ログインしたらセッションが走って...みたいなことがしたかったんですが時間が足りなかったので、最小構成でいきました。
Dieselのセットアップなどは色んないい記事がある+公式がわかりやすいので割愛します!
モデルとCRUD系処理を作成
全部書くとなかなかのコード量になるので一つだけ紹介すると、
use chrono::NaiveDateTime;
use diesel::prelude::*;
use uuid::Uuid;
use crate::{
schema::messages,
utils::establish_connection
};
use std::{error::Error, str::FromStr};
#[derive(Debug, Clone, Queryable, QueryableByName, Identifiable)]
#[table_name="messages"]
pub struct Message {
pub id: Vec<u8>,
pub user_id: Vec<u8>,
pub room_id: Vec<u8>,
pub message: String,
pub send_at: NaiveDateTime,
}
#[derive(Debug, Insertable)]
#[table_name="messages"]
pub struct NewMessage {
pub user_id: Vec<u8>,
pub room_id: Vec<u8>,
pub message: String,
}
impl Message {
pub fn get_messages(room_id: String)
-> Result<Vec<Message>, Box<dyn Error>>
{
let connection = establish_connection();
let room_id = Uuid::from_str(&room_id)?.as_bytes().to_vec();
Ok(messages::dsl::messages
.filter(messages::dsl::room_id.eq(&room_id))
.limit(50)
.load::<Message>(&connection)?)
}
pub fn send_message(message: NewMessage) -> Result<Message, Box<dyn Error>> {
let connection = establish_connection();
diesel::insert_into(messages::table)
.values(&message)
.execute(&connection)?;
Ok(messages::dsl::messages
.order(messages::send_at.desc())
.first::<Message>(&connection)?)
}
}
これをusersとroomsにも追加していきます。
WebSocketなやりとり中にデータを保存する
(どうしても間に合わず。後日書きます。。25日には間に合わせます。。多分。。。)
間に合わなかったのでゆっくりやっていきます。w