1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RustでRedisのPub/Sub機能を利用する

Posted at

はじめに

Rustを用いてRedisのPub/Sub機能を使ってみました。Redisは高性能なデータストアですが、そのPub/Sub機能はリアルタイムメッセージングパターンの一つとして利用されています。簡単なテストプログラムを作成し、動作を確認してみました。

Pub/Subについて

RedisのPub/Sub機能は、パブリッシャー(メッセージの送信者)とサブスクライバー(メッセージの受信者)間でのリアルタイムなメッセージ交換を可能にするメッセージング機能です。この仕組みにより、複数のクライアントが同時にメッセージを送受信することができ、非同期に通信を行うことができます。

環境

  • Windows 11
  • Rust 1.81.0
  • Redis cloud

使用したライブラリ

  • redis = "0.27.5"
  • dotenv = "0.15.0"
  • bb8 = "0.8.6"
  • bb8-redis = "0.17.0"
  • tokio = { version = "1.20.1", features = ["full"] }
  • futures = "0.3.21"

作成したテストコード

複数のスレッドを起動して、各スレッドからメッセージをPublishし、1つのSusbscriberでメッセージを受信しています。

use redis::{Client,AsyncCommands,RedisError};
use std::env;
use tokio::time::{sleep, Duration};
use bb8_redis::RedisConnectionManager;
use tokio;
use futures::future::join_all;

pub async fn create_pool(redis_url: &str) -> Result<bb8::Pool<RedisConnectionManager>, redis::RedisError> {
    let manager = RedisConnectionManager::new(redis_url)?;

    let pool = bb8::Pool::builder()
        .connection_timeout(Duration::from_secs(60))
        .max_size(15)
        .build(manager)
        .await
        .map_err(|e| RedisError::from((redis::ErrorKind::IoError, "Failed to build pool", e.to_string())))?;

    Ok(pool)
}


async fn connect_with_retry(client: &redis::Client, max_retries: u32) -> redis::RedisResult<redis::Connection> {
    for attempt in 1..=max_retries {
        match client.get_connection_with_timeout(Duration::from_secs(10)) {
            Ok(con) => return Ok(con),
            Err(e) => {
                eprintln!("attempt {} faile: {}", attempt, e);
                if attempt < max_retries {
                    sleep(Duration::from_secs(2)).await;
                }
            }
        }
    }
    Err(redis::RedisError::from((redis::ErrorKind::IoError, "Over max retry count for redis connection")))
}

#[tokio::main]
async fn main() -> redis::RedisResult<()> {
    dotenv::dotenv().ok();

    let redis_url = env::var("REDIS_URL") .unwrap_or("redis://127.0.0.1:6379/".to_string());    
    // プールの作成
    let pool = create_pool(&redis_url).await.unwrap();

    // 先にsubscribe
    let client = Client::open(redis_url)?;
    let mut con = connect_with_retry(&client, 10).await.unwrap();
    let mut pubsub = con.as_pubsub();
    pubsub.subscribe("my_channel")?;
    let l_max = 4;

    let handles = (1..= l_max).map(|i| {
        let message = format!("test{}", i);
        let pool_clone = pool.clone();
        
        tokio::spawn(async move {
            println!("send message: {}", message);
            let mut con = pool_clone.get().await.unwrap();            // publish 
            let _: () = con.publish("my_channel", message).await.unwrap();
        })

    }).collect::<Vec<_>>();
    join_all(handles).await; 

    // メッセージを受信
    let mut i =1;
    loop {
        let msg = pubsub.get_message()?;
        let payload : String = msg.get_payload()?;
        println!("channel '{}': {}", msg.get_channel_name(), payload);
        
        // すべてのメッセージを受け取ったらbreak
        i += 1;
        if i > l_max {
            break;
        }
    };

    Ok(())
}

コードの説明

  1. Redisへのコネクションプールの作成:Redisへのコネクションプールを作成しました

  2. 複数スレッドからのPublish:複数のスレッドを利用してメッセージをチャネル("my_channel")にたいしてPublishしました

  3. Subscribe:単一のサブスクリプションで指定されたチャネル("my_channel")からのメッセージを受信するようにしました(サブスクライブ処理は同期処理です。非同期にしたかったのですが挫折。)

おわりに

Rustを用いてRedisのPub/Sub機能を実装し、その基本的な動作を確認してみました。このPub/Sub機能を使用して、チャットサーバを試作してみようと考えています。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?