はじめに
Rustを用いてRedisのPub/Sub機能を使ってみました。Redisは高性能なデータストアですが、そのPub/Sub機能はリアルタイムメッセージングパターンの一つとして利用されています。簡単なテストプログラムを作成し、動作を確認してみました。
Pub/Subについて
RedisのPub/Sub機能は、パブリッシャー(メッセージの送信者)とサブスクライバー(メッセージの受信者)間でのリアルタイムなメッセージ交換を可能にするメッセージング機能です。この仕組みにより、複数のクライアントが同時にメッセージを送受信することができ、非同期に通信を行うことができます。
環境
- Windows 11
- Rust 1.81.0
- Redis cloud
使用したライブラリ
- redis = "0.27.5"
- dotenv = "0.15.0"
- bb8 = "0.8.6"
- bb8-redis = "0.17.0"
- tokio = { version = "1.20.1", features = ["full"] }
- futures = "0.3.21"
作成したテストコード
複数のスレッドを起動して、各スレッドからメッセージをPublishし、1つのSusbscriberでメッセージを受信しています。
use redis::{Client,AsyncCommands,RedisError};
use std::env;
use tokio::time::{sleep, Duration};
use bb8_redis::RedisConnectionManager;
use tokio;
use futures::future::join_all;
pub async fn create_pool(redis_url: &str) -> Result<bb8::Pool<RedisConnectionManager>, redis::RedisError> {
let manager = RedisConnectionManager::new(redis_url)?;
let pool = bb8::Pool::builder()
.connection_timeout(Duration::from_secs(60))
.max_size(15)
.build(manager)
.await
.map_err(|e| RedisError::from((redis::ErrorKind::IoError, "Failed to build pool", e.to_string())))?;
Ok(pool)
}
async fn connect_with_retry(client: &redis::Client, max_retries: u32) -> redis::RedisResult<redis::Connection> {
for attempt in 1..=max_retries {
match client.get_connection_with_timeout(Duration::from_secs(10)) {
Ok(con) => return Ok(con),
Err(e) => {
eprintln!("attempt {} faile: {}", attempt, e);
if attempt < max_retries {
sleep(Duration::from_secs(2)).await;
}
}
}
}
Err(redis::RedisError::from((redis::ErrorKind::IoError, "Over max retry count for redis connection")))
}
#[tokio::main]
async fn main() -> redis::RedisResult<()> {
dotenv::dotenv().ok();
let redis_url = env::var("REDIS_URL") .unwrap_or("redis://127.0.0.1:6379/".to_string());
// プールの作成
let pool = create_pool(&redis_url).await.unwrap();
// 先にsubscribe
let client = Client::open(redis_url)?;
let mut con = connect_with_retry(&client, 10).await.unwrap();
let mut pubsub = con.as_pubsub();
pubsub.subscribe("my_channel")?;
let l_max = 4;
let handles = (1..= l_max).map(|i| {
let message = format!("test{}", i);
let pool_clone = pool.clone();
tokio::spawn(async move {
println!("send message: {}", message);
let mut con = pool_clone.get().await.unwrap(); // publish
let _: () = con.publish("my_channel", message).await.unwrap();
})
}).collect::<Vec<_>>();
join_all(handles).await;
// メッセージを受信
let mut i =1;
loop {
let msg = pubsub.get_message()?;
let payload : String = msg.get_payload()?;
println!("channel '{}': {}", msg.get_channel_name(), payload);
// すべてのメッセージを受け取ったらbreak
i += 1;
if i > l_max {
break;
}
};
Ok(())
}
コードの説明
-
Redisへのコネクションプールの作成:Redisへのコネクションプールを作成しました
-
複数スレッドからのPublish:複数のスレッドを利用してメッセージをチャネル("my_channel")にたいしてPublishしました
-
Subscribe:単一のサブスクリプションで指定されたチャネル("my_channel")からのメッセージを受信するようにしました(サブスクライブ処理は同期処理です。非同期にしたかったのですが挫折。)
おわりに
Rustを用いてRedisのPub/Sub機能を実装し、その基本的な動作を確認してみました。このPub/Sub機能を使用して、チャットサーバを試作してみようと考えています。