目的
バッチを起動して処理を行う時に、冗長性を確保して複数台で起動しバッチを行うのは1のプロセスだけにしてほしいことがあります。PostgreSQLとDyanamoDBで既にやったことありましたが、今回はRedisでやってみました。
以前DynamoDBで同じことをやった記事も紹介します。
コード
Redis
Redisではハッシュを使っています。ハッシュのフィールドを起動したバッチの名前を指定して、値はバッチを起動した時間を保存します。
先にそのフィールドに値があるか確認して、無ければそのまま書き込みます。
ある場合は開始してから一定期間経過しているか判定して、経過していれば書き込みますが、そうでないなら、そのバッチは実行できません。
上記の処理をアトミックにRustのコードから実行するのは大変なので、Redisの内部で実行できるLuaスクリプトで処理を行いました。
local key = KEYS[1]
local field = KEYS[2]
local start_utsms = tonumber(ARGV[1])
local lock_milli_second = tonumber(ARGV[2])
local value = redis.call('HGET', key, field)
if value then
local current_start_utsms = tonumber(value)
if current_start_utsms + lock_milli_second < start_utsms then
redis.call('HSET', key, field, start_utsms)
return "1"
end
else
redis.call('HSET', key, field, start_utsms)
return "1"
end
Rust
Rustでは現在時間を取得してLuaスクリプトを呼び出しているだけです。Luaからの戻り値を見てバッチを実行してよいかを判断するbool値を返しています。
RustでRedisのLuaスクリプトを呼び出すサンプルにはちょうど良いかもしれません。Luaスクリプトではkeyとargを複数渡すことでできます。Luaスクリプト無いではKEYSとARGVでアクセスできます。keyとargの差はほとんど無いので使い分けは特に意識する必要はなさそうです。
テストは呼び出し方のサンプルになっています。
[package]
name = "redis-batches"
version = "0.1.2"
edition = "2021"
[dependencies]
once_cell = "1.15.0"
redis = { version = "0.22", features = ["script", "aio"]}
[features]
default = ["tokio-comp"]
tokio-comp = ["redis/tokio-comp"]
async-std-comp = ["redis/async-std-comp"]
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
use once_cell::sync::Lazy;
use redis::Script;
use std::time::SystemTime;
static LOCK_SCRIPT: Lazy<Script> =
Lazy::new(|| Script::new(include_str!("./redis_scripts/batches_lock.lua")));
pub async fn lock(
key: &str,
field: &str,
lock_milli_second: u128,
redis_conn: &mut redis::aio::Connection,
) -> Result<bool, redis::RedisError> {
let start_utsms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let mut invocation = LOCK_SCRIPT.key(key);
invocation.key(field);
invocation.arg(start_utsms.to_string());
invocation.arg(lock_milli_second.to_string());
let res: Option<String> = invocation.invoke_async(redis_conn).await?;
Ok(res.is_some())
}
#[cfg(test)]
mod tests {
use crate::lock;
#[tokio::test]
async fn it_works() {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_async_connection().await.unwrap();
let res1 = lock("mutex", "test", 1000, &mut con).await.unwrap();
assert_eq!(true, res1);
let res2 = lock("mutex", "test", 1000, &mut con).await.unwrap();
assert_eq!(false, res2);
tokio::time::sleep(tokio::time::Duration::from_millis(1001)).await;
let res3 = lock("mutex", "test", 1000, &mut con).await.unwrap();
assert_eq!(true, res3);
tokio::time::sleep(tokio::time::Duration::from_millis(1001)).await;
}
}
まとめ
処理が終わった後にロックを解除することも考えましたがRAIIを意識してDropの中でasyncを呼び出してみたんですがうまく動作しませんでした。
定期的で起動間隔がそれなりに離れていて実行時間が短いバッチなら、特にロックを解除しなくても問題無いと思います。