株式会社MIXIの丸尾一真(X:@Taillook)です。
この記事はMIXI DEVELOPERS Advent Calendar 2023 10日目の記事です。(完全に忘れていて15日遅刻でした)
はじめに
RustのWebサーバーを運用していて、その中でredisのPub/Subを使っています。そのときに同じクライアントを接続状態で保持して使いまわしていると、どこかのタイミングでSubscriptionが切れる問題が発生しました。
これに対応するための対策をしたので書いていきます。
元々のコード
説明の為簡略化していますがtokio::task::JoinHandle<()>を返す関数を用意して使っていました。
このコードの問題点としてwhile let Some(sub) = subscribe_stream.next().await
のnext()
でredisのクライアントに何か問題が起きたとしても永遠に終わらないという問題がありました。
fn gen_r_task(redis_client: Arc<redis::Client>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut subscribe_conn = match redis_client.get_async_connection().await {
Ok(sub) => sub.into_pubsub(),
Err(e) => panic!("{:?}", e),
};
let mut subscribe_stream = match subscribe_conn.psubscribe("*").await {
Ok(_) => subscribe_conn.on_message(),
Err(e) => panic!("{:?}", e),
};
while let Some(sub) = subscribe_stream.next().await {
let msg = (match sub.get_payload::<String>() {
Ok(msg) => msg,
Err(e) => e.to_string(),
});
println!("{:?}", msg);
}
})
}
対処方法
上記の問題に対応するためにtokio::time::timeout
を使って特定の時間(ここではMESSAGE_TIMEOUT_DURATIONという環境変数で指定)経ってsubscribe_stream.next()
から情報が来ない場合、timeoutとして処理を終わらせるようにしました。処理が終わった場合はその後gen_r_task
のredis_subscribe(&redis_client).await
が再び呼ばれるようになります。
これで永遠に処理が終わらない状態が解消されます。
fn gen_r_task(redis_client: Arc<redis::Client>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut reconnect_count: u32 = 0;
let max_reconnects: u32 = env::var("REDIS_MAX_RECONNECT")
.unwrap_or_else(|_| "4".to_string())
.parse::<u32>()
.unwrap_or(4u32);
let mut delay = Duration::from_secs(1);
loop {
match redis_subscribe(&redis_client).await {
Ok(_) => {
reconnect_count = 0;
delay = Duration::from_secs(1);
}
Err(e) => {
println!("{:?}", e.to_string());
}
}
reconnect_count += 1;
if reconnect_count > max_reconnects {
break;
}
tokio::time::sleep(delay).await;
delay = std::cmp::min(
delay * 2,
Duration::from_secs(u64::from(2u32.pow(max_reconnects))),
);
}
})
}
async fn redis_subscribe(redis_client: &redis::Client) -> redis::RedisResult<()> {
let mut subscribe_conn = redis_client.get_async_connection().await?.into_pubsub();
subscribe_conn.psubscribe("*").await?;
let mut subscribe_stream = subscribe_conn.on_message();
let message_timeout_duration = Duration::from_secs(
env::var("MESSAGE_TIMEOUT_DURATION")
.unwrap_or_else(|_| "4".to_string())
.parse::<u64>()
.unwrap_or(4u64),
);
while let Ok(result) = tokio::time::timeout(message_timeout_duration, subscribe_stream.next()).await {
match result {
Some(sub) => {
let msg = (match sub.get_payload::<String>() {
Ok(msg) => msg,
Err(e) => e.to_string(),
});
println!("{:?}", msg);
}
None => {
break;
}
}
}
Ok(())
}
おわりに
本記事ではtokio::time::timeout
が終わることの保証されないタイプの処理に対して便利だという話として同じような問題を抱えた方の助けになればと思います!